Monday, 17 September 2012

Basic Actor Model using TPL Dataflow in C#



The Actor Model is a popular way to get reliable currency into your application through isolated immutability.

Microsoft does not provide an Actor class, but they do provide a part of the Task Parallel Library called Dataflow. This is most useful for building asynchronous Pipelines but can also be used to create basic agents.

Here we will see how to create a basic non transactional Actor and use that Actor to calculate primes just as in the Java Akka example.

private static void Main(string[] args)  
{  
    if(args.Length < 2)  
    {  
        Console.WriteLine("Usage: number numberOfParts");  
        return;  
    }  
    int primesBound = int.Parse(args[0]);  
    int concurrency = int.Parse(args[1]);  
    Stopwatch stopwatch = Stopwatch.StartNew();  
    int count = CountPrimes(primesBound, concurrency);  
    stopwatch.Stop();  
    Console.WriteLine("Found {0} primes under {1}", count, primesBound);  
    Console.WriteLine("It took {0} seconds", (stopwatch.ElapsedMilliseconds)/1.0e3f);  
    Console.ReadKey();  
}  

Here we simply take in the upper bound of the number range in which we want to find primes and the level of concurrency like before in our Java example.

public static int CountPrimes(int bound, int concurrency)  
{  
    int chunks = bound / concurrency;  
    var actors = new List<PrimesActor<Tuple<int, int>, int>>();  
    for(int i=0; i < concurrency; i++) {  
        int lower = i * chunks;  
        int upper = (i+1) * chunks;  
        var actor = PrimesActor<Tuple<int, int>, int>.MakeActor(PrimesComputation.CountPrimesInRange);  
        actors.Add(actor);  
        actor.Post(new Tuple<int, int>(lower, upper));  
    }  
    return actors.Sum(a => a.Receive());  
}  

You can see we create the number of Actors we need to meet our desired concurrency level. We add each actor to a list and we post a message to each actor. Finaly we sum the results for all actors by using LINQ.

This is a very basic example to show the fundamentals of TPL Dataflow, in a real implementation we would most likely want to have less actors and have them do more work. In such a situation we would most likely use other Dataflow blocks to perform the sum. If we wanted a more complete Actor model however we may find the TPL Dataflow inadequate compared to Akka or Stact.

 internal class PrimesActor<I, O> where I : Tuple<O, O>   
 {  
   private readonly BufferBlock<I> _incomingMessages = new BufferBlock<I>();  
   private readonly BufferBlock<O> _outgoingMessages = new BufferBlock<O>();  
   private readonly Func<O, O, O> _processFunc;  
   private PrimesActor(Func<O, O, O> processFunc)  
   {  
     _processFunc = processFunc;  
   }  
   public static PrimesActor<I, O> MakeActor(Func<O, O, O> processFunc)  
   {  
     var actor = new PrimesActor<I, O>(processFunc);  
     actor.ProcessMessages();  
     return actor;  
   }  
   internal async void ProcessMessages()  
   {  
     while(true)  
     {  
       var message = await _incomingMessages.ReceiveAsync();  
       O count = _processFunc(message.Item1, message.Item2);  
       _outgoingMessages.Post(count);  
     }  
   }  
   internal void Post(I message)  
   {  
     _incomingMessages.Post(message);        
   }  
   internal O Receive()  
   {  
     return _outgoingMessages.Receive();  
   }  
 }  

Here we can see the definition of our Actor class, it simply pulls messages off the input queue, executes a Func delegate and then puts the result on the output queue.

 Found 664579 primes under 10000000  
 It took 2.368 seconds  

Here you can see again we get a reasonable degree of concurrency.

Get the source code here.

References :-

Read about TPL Dataflow here.
Ensure you read this document.
Install TPL Dataflow with Nuget.

No comments:

Post a Comment