One of the primary uses of Lambda expressions is to implement aggregate operations that can often be parallelized using multi-core processors we have today. The computation over bulk data structures is an area where Lambda expressions provide much needed support where part of the computation is caller defined. Doug Lea’s JSR166y Concurrency interest group exactly brings these rich set of Collections that supports parallel operations to perform bulk computation over data in the Java platform.

Let us look at an aggregate operations example using a ParallelArray, that maintains a ForkJoinPool and an array in order to provide parallel aggregate operations. The jsr166y package is already included in Java 7 as part of the ForkJoin framework. The extra166y package that contains Collections supporting parallel operations is not part of the JDK yet, but you should be able to use them today as an add-on library.

import extra166y.Ops;
import extra166y.ParallelArray;
import extra166y.ParallelDoubleArray;

public class AggregateOpsExample {

    ParallelArray<Student> students;

    public AggregateOpsExample() {
        jsr166y.ForkJoinPool fjPool = new jsr166y.ForkJoinPool();
        // Create a "fictional" Student array
        Student s1 = new Student("S1", 2010, 85, 5);
        Student s2 = new Student("S2", 2010, 91, 4);
        Student s3 = new Student("S3", 2010, 95, 3);
        Student[] data = new Student[]{s1, s2, s3};
        students = ParallelArray.createFromCopy(new Student[]{s1, s2, s3}, fjPool);
    }

    public Student getStudent(int index) {
        return students.get(index);
    }

    public double getMaxSeniorGpa() {
        return students.withFilter(isSenior)
        			   .withMapping(gpaField)
        			   .max();
    }

    public ParallelDoubleArray.SummaryStatistics getSummaryStatistics() {
        return students.withFilter(isSenior)
	                   .withMapping(gpaField)
        	           .summary();
    }

    // helpers:
    static final class IsSenior implements Ops.Predicate<Student> {
        public boolean op(Student s) {
            return s.credits > 90;
        }
    }

    static final IsSenior isSenior = new IsSenior();

    static final class GpaField implements Ops.ObjectToDouble<Student> {
        public double op(Student s) {
            return s.gpa;
        }
    }

    static final GpaField gpaField = new GpaField();

    public static void main(String[] args) {
        AggregateOpsExample test = new AggregateOpsExample();
        double bestGpa = test.getMaxSeniorGpa();
        System.out.println("Best GPA : " + bestGpa);
        ParallelDoubleArray.SummaryStatistics summary = test.getSummaryStatistics();
        System.out.println("Last Best Student: " + test.getStudent(summary.indexOfMin()).name);
        System.out.println("Average GPA in class of 2010 : " + summary.average());
    }

    public class Student {
        String name;
        int graduationYear;
        int credits;
        double gpa;

        public Student(String name, int graduationYear, int credits, double gpa) {
            this.name = name;
            this.graduationYear = graduationYear;
            this.credits = credits;
            this.gpa = gpa;
        }
    }
}

In Line#10, a ForkJoinPool is created with parallelism equal to Runtime.availableProcessors(), in my case 2. Lines 24-26 and 30-32 perform aggregate operations on Students array. These operations take advantage of multi-core processors than the traditional sequential approach we used to filter and map a collection. By introducing Lambda expressions in the client code, you can remove much of the boilerplate in lines 36-50 in our code.

Let us build upon the above example to time these aggregate operations and erase some of the boilerplate using Lambda expressions.

import extra166y.Ops;
import extra166y.ParallelArray;
import extra166y.ParallelDoubleArray;

public class TimedAggregateOps {

	ParallelArray<Student> students;

	public TimedAggregateOps() {
		jsr166y.ForkJoinPool fjPool = new jsr166y.ForkJoinPool();
        // Create a "fictional" Student array
		Student s1 = new Student("S1", 2010, 85, 5);
		Student s2 = new Student("S2", 2010, 91, 4);
		Student s3 = new Student("S3", 2010, 95, 3);
		Student[] data = new Student[]{s1, s2, s3};
		students = ParallelArray.createFromCopy(new Student[]{s1, s2, s3}, fjPool);
	}

	interface Block {
		public void execute();
	}

	static void time(boolean maxGpa, Block block) throws Exception {
		long startTime = System.nanoTime();
		boolean success = true;
		try {
			block.execute();
		} catch (final Throwable ex) {
			success = false;
			throw ex;
		} finally {
			recordTiming(maxGpa, System.nanoTime() - startTime, success);
		}
	}

	static void recordTiming(boolean maxGpa, long nanoseconds,  boolean succeeded) {
		if (maxGpa) {
			System.out.println("maxGpa took " + nanoseconds + "ns and operation " + (succeeded ? "succeeded." : "failed."));
		} else {
			System.out.println("avgGpa took " + nanoseconds + "ns and operation " + (succeeded ? "succeeded." : "failed."));
		}
	}

    static final Ops.Predicate<Student> isSenior = #{s -> s.credits > 90};
    static final Ops.ObjectToDouble<Student> gpaField = #{s -> s.gpa};

    public void findMaxSeniorGpa() {
        double maxGpa = students.withFilter(isSenior)
        						.withMapping(gpaField)
        						.max();
        System.out.println("Maximum GPA : " + maxGpa);
    }

    public void findAverageGpa() {
        ParallelDoubleArray.SummaryStatistics summary = students.withFilter(isSenior)
                												.withMapping(gpaField)
                												.summary();
		double avgGpa = summary.average();
		System.out.println("Average GPA : " + avgGpa);
    }

	void test(boolean maxGpa) throws Exception {
		try {
			time(maxGpa, #{
				if (maxGpa)
					findMaxSeniorGpa();
				else
					findAverageGpa();
			});
		} catch (final Throwable t) {
			throw t;
		}
	}

	public static void main(String[] args) throws Exception {
		TimedAggregateOps tao = new TimedAggregateOps();
		tao.test(true);
		tao.test(false);
	}
}

The client code that consumes this parallel Collections library has become more concise and removes much of the boilerplate with the use of Lambda expressions. Lines 36-50 in the earlier example is reduced to Lines 44-45 in this example. When these libraries take advantage of Lambda expressions, implementations would become powerful and more efficient. For example, much of the today’s verbose APIs such as map, reduce, filter defined in ParallelArray class would benefit from executing block of code using Lambda expressions. I recommend watching Closures Cookbook JavaOne 2008 presentation from Neal Gafter.

You can run this example using the prototype compiler that supports Lambda expressions. The batch scripts used in my original post should be updated to include JSR166y libraries in classpath, which you could download from JSR-166 maintenance website.

javac.bat

@echo off
C:\jdk1.7.0\bin\java.exe -cp classes.jar;jsr166y.jar;extra166y.jar com.sun.tools.javac.Main %1

java.bat

@echo off
C:\jdk1.7.0\bin\java.exe -cp classes.jar;.;jsr166y.jar;extra166y.jar %1

Possibly Related Posts:


2 Responses to “Aggregate operations, the Lambda Goodness in Java”

Trackbacks/Pingbacks

  1.  Twitted by scaphe