Sunday, 16 September 2012

Actor Based Concurrency in Java

Often the best way to implement concurrency in your program is with isolated mutability. A popular technique to achieve isolated mutability is by using the Actor model. This is not a new idea and was implemented in Smalltalk in the 1970's.

The core idea is based on message passing, essentially an Actor is an object with an internal message queue, and a single fibre that reads from that queue. Other objects can post messages to the actor and these will be processed sequentially. The concurrency arises from the ability to have multiple actors working asynchronously.

This example demonstrates how to implement basic actor support using Java and a Scala library called Akka.

We will create a program that takes a number range say 0-10 and computes the number of primes in that range, for example there are 8 primes in the range 0-10.

Here you can see our main Java entry point :-
public static void main(final String[] args) {  
    if(args.length < 2) {  
        out.println("Usage: number numberOfParts");  
        return;  
    }   
    final int primesBound = parseInt(args[0]);  
    final int concurrency = parseInt(args[1]);  
    final long start = System.nanoTime();  
    final int count = countPrimes(primesBound, concurrency);  
    final long end = System.nanoTime();  
    out.println(format("Found {0} primes under {1}", count, primesBound));  
    out.println(format("It took {0} seconds", (end - start)/1.0e9f));  
    }  
}

We can see here we take the number range upper bound and also we can specify a concurrency level that determines the amount of partitioning we use.

Firstly to define our class as an Actor we must subclass from an Akka Actor base class, here we use an UntypedActor, this leaves messages untyped a leaves the responsibility of deserialization on us.
public class PrimesActor extends UntypedActor {  

Next we must implement the message processing by implementing onReceive.
public void onReceive(final Object boundsList) {  
    final List<Integer> bounds = (List<Integer>) boundsList;  
    int count = PrimesComputation.countPrimesInRange(bounds.get(0), bounds.get(1));  
    getSender().tell(count);  
}

Here you can see we kick of our main task of prime computation by calling countPrimesInRange(). We then return the result as a Future by calling tell(count).

Now it gets interesting, lets see how we use Akka to implement our actor :-
public static int countPrimes(final int bound, final int concurrency) {  
    final int chunks = bound / concurrency;  
    final List<Future<?>> futures = new ArrayList<Future<?>>();  
    final ActorSystem system = ActorSystem.create();  
    final FiniteDuration duration = Duration.create(3, SECONDS);  
    final Timeout timeout = new Timeout(duration);  
    for(int i=0; i < concurrency; i++) {  
        final int lower = i * chunks;  
        final int upper = (i+1) * chunks;  
        final List<Integer> bounds = Collections.unmodifiableList(
            Arrays.asList(lower, upper));  
        final ActorRef primeFinder = system.actorOf(new Props(PrimesActor.class));  
        futures.add(ask(primeFinder, bounds, timeout));  
    }  
    int count = 0;  
    for(Future<?> f : futures) {  
        try {  
            count += (Integer)(Await.result(f, Duration.create(1, SECONDS)));  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    system.shutdown();    
    return count;  
}

First we must initialise the Actor system by calling ActorSystem.create().

We then break the problem down into 'chunks' based on our concurrency level.

Messages in an Actor based system MUST be immutable, we therefore ensure our message is passed as an unmodifiable List. We create an actor for each chunk, we post the message using ask() and store its future in a future List.

Finally we iterate through our futures waiting for them to finish, the Await is an asynchronous blocking operation. We 'Join' these results into a total count and return the count. The Actor system must be shut down to terminate any remaining actors and free resources.

Here is are some sample runs :-
 java PrimesActor 10 10  
 Found 8 primes under 10  
 It took 1.242 seconds

 java PrimesActor 10000000 100  
 Found 664,579 primes under 10,000,000  
 It took 6.597 seconds  

You can see we get pretty good speedup.

Download source here (Eclipse project).

No comments:

Post a Comment