11 maja 2013

Java synchronizers - Semaphore

It has been a while since I wrote post about CountDownLatch. Over two months actually but it doesn’t mean I forget that I promised to write about the other synchronizers. So to not speak idly today I'm going to write about Semaphore class.

Ok, so what is Semaphore ? Javadoc in this case is very descriptive and clear so lets look into documentation. We can find there that: "A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly". Nothing to add so lets look on some example of usage.

Lets say you have a pretty big number of tasks that need to be executed asynchronously. Each of those tasks at some point of execution need to communicate with remote server in order to pass result of computation. It could look like this:

import java.io.IOException;
import java.util.concurrent.Semaphore;

public class SenderWithConnectionLimit {

 private final String serverAdress;
 
 private Semaphore semaphore;
 
 public SenderWithConnectionLimit(String adress, int bound) {

  this.serverAdress = adress;
  this.semaphore = new Semaphore(bound);
 }
 
 public void sendMessage(Message message) throws IOException, 
  InterruptedException {
  
  semaphore.acquire();
  try {
   send(message);
  } finally {   
   semaphore.release();
  }
 }

 // more methods
} 

And sender interface:

public interface Sender {

 public void sendMessage(Message message) throws IOException,
   InterruptedException;
}

Now lets say that our server to which we will send results is heavily overloaded. For that reason we don’t want to open another hundreds of connection. We want to limit number of connections for example to maximum 4. How to do that ? Well, we can provide implementation of Sender with synchronized method sendMessage, instantiate only four instances of this class and split them across all AbstractTasks. Should work but it looks more like a workaround rather than solution. What's more if we add requirement that if AbstractTask cannot send message in 5 min it should give up ? With synchronize keyword we cannot do that... So lets try attack this problem with a Semaphore class.

My suggestion is to use a stateless Sender implementation so we can use one instance across all AbstractTask objects. In order to guarantee that we will have no more then 4 connections we will use Semaphore in sendMessage method in the following way:

public class SenderWithConnectionLimit implements Sender {

 private final String serverAdress;
 
 private Semaphore semaphore;
 
 public SenderWithConnectionLimit(String adress, int bound) {

  this.serverAdress = adress;
  this.semaphore = new Semaphore(bound);
 }
 
 @Override
 public void sendMessage(Message message) throws IOException,
  InterruptedException {
  
  semaphore.acquire();
  try {
   send(message);
  } finally {   
   semaphore.release();
  }
 }
}

What we have got here is an implementation of Sender interface that meets our requirements. We are passing integer value to constructor which will specify how many concurrent connections to server we allow. We use this value to create Semaphore object. In sendMessage method we use the previously created semaphore. Before sending anything we are calling acquire method and after the sending is finished we call release method. Each call to acquire decreases value of semaphore counter and each call to release increases it. When counter reaches zero all threads that will call the acquire will wait until one of the sending message threads is finished and calls the release.

Some additional information about Semaphore class:
  • In order to satisfy mentioned above requirement about avoiding sending message after timeout we should use Semaphore method tryAcquire which takes timeout. It returns boolean value - true if a permit was acquired and false if the waiting time elapsed before a permit was acquired.
  • If you set semaphore counter to 1 it will behave similar to Lock object (difference is that semaphores have no notion of ownership)
  • There is an overloaded constructor of Semaphore which takes also boolean value. The value is fairness parameter. When set to false, this class makes no guarantees about the order in which threads acquire permits.
 
I guess that’s all. There is still one post to do - about CyclicBarrier - and I hope I will write it sooner than after 3 months since today.

Brak komentarzy:

Prześlij komentarz