Thursday, 20 September 2012

Programming for Multicore with Java 7

With the age of Amdals Law almost at an end mulitcore processors are set to become commonplace and concurrent programming is going to become important to utilize those cores.

In this article we will look at how you can program for multicore era in Java.

Sequential

First we will code an algorithm in a sequential fashion and then see how various approaches can be used to get speedup.
public class PrimesSequential extends PrimesComputation {  
    public Boolean[] computePrimes(int upto) {  
        Boolean[] results = new Boolean[upto];  
        for (int x = 0; x < results.length; x++)  
            results[x] = isPrime(x);  
        return results;  
    }  
}  

Threads

Here we can see the basic Java Threads implementation, we have created one thread per processor.
We (roughly) partition the work across the threads. Then we wait all threads to complete by calling join() on all the threads from the main thread.
public Boolean[] computePrimes(int upto) throws InterruptedException {  
    int processorCount = Runtime.getRuntime().availableProcessors();  
    PrimesComputationThread[] threads = new PrimesComputationThread[processorCount];  
    Boolean[] results = new Boolean[upto];  
    for (int i = 0; i < processorCount; i++) {  
        threads[i] = new PrimesComputationThread(i * upto / processorCount,  
                (i + 1) * upto / processorCount, results);  
        threads[i].start();  
    }  
    for (int i = 0; i < processorCount; i++) {  
        threads[i].join();  
    }  
    return results;  
}  
static class PrimesComputationThread extends Thread {  
    ... 
    public void run() {  
        for (int x = from; x < to; x++)  
            results[x] = isPrime(x);  
    }  
}  

ThreadPool

Here you can see a similar approach to our threads version but this time using a thread pool. Thread pools give us better control over a collection of threads, as well as allowing us to manage threads over time and amortize thread overheads.
public Boolean[] computePrimes(int upto) throws InterruptedException {  
    int processorCount = Runtime.getRuntime().availableProcessors();  
    ExecutorService threadPool = Executors.newFixedThreadPool(processorCount);  
    int noTasks = 100;  
    Boolean[] results = new Boolean[upto];  
    for (int i = 0; i < noTasks; i++) {  
        threadPool.execute(new PrimesComputationTask(i * results.length  
                / noTasks, (i + 1) * results.length / noTasks, results));  
    }  
    threadPool.shutdown();  
    threadPool.awaitTermination(100, TimeUnit.SECONDS);  
    return results;  
}  

ForkJoin

One problem with our partitioning scheme is that we do not evenly division up the work because calculating some primes will be harder than others, what is worse we cannot easily predict the difficulty of finding a given prime. There is however a basic solution, there is a well known scheduling algorithm called work stealing, idle threads can steal work from busy threads therefore helping to spread the load. 
Another advantage of the ForkJoin pattern is it maps well to certain types of real world divide and conquer algorithms.
public class PrimesForkJoin extends PrimesComputation {  
    public Boolean[] computePrimes(int upto) throws Exception {  
        ForkJoinPool pool = new ForkJoinPool();  
        Boolean[] results = new Boolean[upto];  
        pool.invoke(new PrimesTask(0, results.length, results));  
        return results;  
    }  
    static class PrimesTask extends RecursiveAction {  
        ...  
        public void compute() {  
            if (to - from < 1000)  
                for (int x = from; x < to; x++)  
                    results[x] = isPrime(x);  
            else {  
                PrimesTask left = new PrimesTask(from, (to + from) / 2, results);  
                PrimesTask right = new PrimesTask((to + from) / 2, to, results);  
                left.fork();  
                right.compute();  
                left.join();  
        }  
    }  
}  

ParallelArrray

Java 8 will add support for ParallelArray's, you can get a glimpse of what this might look like by using the jsr166y libray jars. The idea is to eventually allow simple loop parallel like C#s Paralell.For.

The code is functional in nature, replaceWithMappedIndex() replaces each element in the array with the results of applying the given mapping. In this case our mapping is simply the primality status of the number.

public Boolean[] computePrimes(int upto) {  
    ParallelArray<Boolean> p = ParallelArray.create(upto, Boolean.class, 
        defaultExecutor());  
    p.replaceWithMappedIndex(new IntToObject<Boolean>() {  
        @Override  
        public Boolean op(int x) {  
            return isPrime(x);  
        }  
    });              
    return p.getArray();  
}  

 Sequential version  
 #   primes: 476648  
 Time: 15916ms  

 Threads version  
 #   primes: 476648  
 Time: 4818ms  
 Speed-up: 3.30  

 Pool version  
 #   primes: 476648  
 Time: 3450ms  
 Speed-up: 4.61  

 Fork-join version  
 #   primes: 476648  
 Time: 3380ms  
 Speed-up: 4.71  

 Parallel version  
 #   primes: 476648  
 Time: 4888ms  
 Speed-up: 3.26  

Here you can see the relative speed-up of the different approaches and how there is a trade off between implementation complexity and speed-up. In this case the sweet spot seems to be with the Pool version which is not too complicated but also gives good performance.

Parallel arrays ideally should provide the simplest implementation if there is sufficient language support, but it seems likely they will not quite match a hand crafted approach.

Correctly working with shared mutable state

Next we will look at the basic sorts of issues that arise with multi threaded applications with shared mutable state.

We are going to try to count the number of primes using a noLivePrimes integer field.

 public class PrimesLive extends PrimesComputation implements  
         LiveResults<Integer[]> {  
     Integer[] livePrimes;  
     Integer noLivePrimes;  
     public Boolean[] computePrimes(int upto) throws InterruptedException {  
         // Compute in Parallel ...
     }  
     class PrimesComputationTask implements Runnable {  
         ...
         public void run() {  
             for (int x = from; x < to; x++) {  
                 a[x] = isPrime(x);  
                 if (a[x])  
                     livePrimes[noLivePrimes++] = x;  
             }  
         }  
     }  

There is a problem with this code, the new functionality has introduced mutable shared state, we have introduced a Data Race.

A first attempt with synchronization...

The standard solution to this is use locks to serialise access to the shared state, in Java we normally do this with the synchronized keyword.

         public void run() {  
             for (int x = from; x < to; x++) {  
                 a[x] = isPrime(x);  
                 if (a[x]) {  
                     int tmp;  
                     synchronized (livePrimes) {  
                         tmp = noLivePrimes;  
                         noLivePrimes = tmp + 1;  
                     }  
                     livePrimes[tmp] = x;  
                 }  
             }  
         }  

Contended locks however hurt performance, we lose parallelism as threads have to wait for free access to the shared state.

 Live version  
 #   primes: 476648  
 # live primes: 471033  
 Time: 4147ms  
 Speed-up: 3.84  

 Live version with synchronization  
 #   primes: 476648  
 # live primes: 476648  
 Time: 3879ms  
 Speed-up: 4.10  

Here we can see the performance loss caused by the locks.

Atomics to the rescue...

AtomicInteger noLivePrimes;  
class PrimesComputationTask implements Runnable {  
    ...  
    public void run() {  
        for (int x = from; x < to; x++) {  
            a[x] = isPrime(x);  
            if (a[x]) {  
                int tmp = noLivePrimes.getAndIncrement();  
                livePrimes[tmp] = x;  
            }  
        }  
    }  
}

There is a very simple solution in this case, since the shared state is a single primitive we can use an AtomicInteger to guard the  shared state.

Atomics are actually a form of lock free programming using special Compare and Swap (CAS) instructions supported in hardware. This works similarly to optimistic locking or STM where the processor proceeds on the basis of no conflict and then 'rolls back' if a conflict occurs.

 Live version with atomic
 #   primes: 476648  
 # live primes: 476648  
 Time: 3686ms  
 Speed-up: 4.32  

Here you can see we got our performance back.

You can find the source code here.

Based on I2PC Summer School on Multicore Programming course held at the University of Illinois at Urbana-Champaign.

No comments:

Post a Comment