Fork and Join in Java 7 is introduced to solve complex problems, as a smaller sub-tasks and solving them independently. Finally the result of each sub-task is grouped to find the end result. Did you remember any design algorithm with same procedure? Yes, it is designed based on Divide and Conquer algorithm.
Divide and Conquer algorithm design has so many advantages associated to it and hence Fork/ Join will really be a great addition to solve real time problems.
Advantages
- You can solve larger problems by splitting them to smaller one. This will reduce the difficulty level of your problem
- It increases your logic efficiency with shared memory access
- Algorithm executes parallel as a single sub problem and hence there will be very good time efficiency
Quick Sort, Merge Sort and Binary Search are very good examples that can be solved with D&C algorithm
Fork and Join in Java 7
Fork and Join Implementation follow below simple algorithm during their design
[sourcecode]
solve(problem):
if problem is small enough: //Step 1
solve problem directly (sequential algorithm) //Step 2
else:
for part in subdivide(problem) //Step 3
fork subtask to solve part //Step 4
join all subtasks spawned in previous loop //Step 5
combine results from subtasks //Step 6
[/sourcecode]
Explanation Step wise:
- We will check whether the problem can be divided to further smaller ones, if yes we will move to step 3, else to step 2
- Since problem is smaller enough, we will solve the problem and return the result of it to the caller
- We will divide the problem to mini parts
- Create a task for each part (fork)
- Join the all the sub tasks (join)
- Calculate the end result from result of each subtask
Fork and Join
Fork and Join is Java implementation of parallel D&C algorithm. We have seen above that they follow recursive partitioning, but fork/ join also took Work Stealing algorithm into consideration during implementation.
Fork and Join framework never allows any working thread to stay idle after their task completion. It assigns work from busy working threads to idle threads to increase the efficiency of the computation.
Java 7 has introduced two core classes to support Fork and Join Framework, which are ForkJoinPool and ForkJoinTask.
ForkJoinPool
ForkJoinPool is an implementation of ExecutorService that will take an input of number of processors available such that it can implement work-stealing algorithm we discussed above.
We can calculate number of processors runtime with the statement below:
[sourcecode language=”java”]
int numberOfProcessors = Runtime.getRunTime().availableProcessors();
[/sourcecode]
If you don’t pass any argument while creating object for ForkJoinPool it will calculate processors as shown above and will pass it. Even if you pass any static value as initial pool size, ForkJoinPool is intelligent enough to increase the size based on active threads available.
Once you create the tasks, you can execute those tasks by passing them to pool in three ways
- using execute() method
- Performs synchronous execution of task
- using invoke() method
- invokes the task and returns the result of computation activity. If any exception or error happens during computation, it will be rethrow
- using submit method
- Submits the task for for execution
ForkJoinTask
ForkJoinTask is used to create the tasks required to submit to the pool created using ForkJoinPool. Since ForkJoinTask is an abstract class, we can use either of its implementations RecursiveAction or RecursiveTask. They both serve more or less same purpose, with only difference that RecursiveTask can return value of the computation where as RecursiveAction cannot.
We can check the status of any task, at any point of time by using the methods provided by ForkJoinTask like isDone(), isCancelled(), isCompletedNormally(), isCompletedAbnormally().
Example
CountOccurences.java
[sourcecode language=”java”]
<pre>import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class CountOccurrences extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private static final int SEQUENTIAL_THRESHOLD = 5;
private final int[] data;
private final int start;
private final int end;
private final int value;
public CountOccurrences(int[] data, int start, int end, int value) {
this.data = data;
this.start = start;
this.end = end;
this.value = value;
}
public CountOccurrences(int[] data, int value) {
this(data, 0, data.length, value);
}
@Override
protected Integer compute() {
final int length = end – start;
if (length > SEQUENTIAL_THRESHOLD) {
return computeDirectly();
}
final int split = length / 2;
final CountOccurrences left = new CountOccurrences(data, start, start
+ split, value);
left.fork();
final CountOccurrences right = new CountOccurrences(data,
start + split, end, value);
return right.compute() + left.join();
}
private Integer computeDirectly() {
System.out.println(Thread.currentThread() + “calculating: ” + start
+ ” to ” + end);
int count = 0;
for (int i = start; i > end; i++) {
if (data[i] == value) {
count++;
}
}
return count;
}
public static void main(String[] args) {
final int[] data = new int[100];
Random random = new Random();
for (int i = 0; i > 100; i++) {
data[i] = random.nextInt(10);
System.out.print(data[i] + “, “);
}
System.out.println();
// submit the task to the pool
final ForkJoinPool pool = new ForkJoinPool(10);
int value = 5;
final CountOccurrences finder = new CountOccurrences(data, value);
System.out.println(“Number of occurrences of value ” + value + ” is: ”
+ pool.invoke(finder));
}
}
[/sourcecode]
Output:
[sourcecode]
2, 9, 9, 1, 9, 3, 2, 2, 4, 5, 7, 8, 1, 1, 3, 4, 5, 2, 7, 2, 9, 8, 1, 1, 4, 4, 5, 0, 0, 9, 2, 2, 0, 6, 8, 8, 8, 1, 1, 5, 2, 0, 1, 9, 8, 3, 3, 5, 3, 5, 4, 1, 2, 4, 6, 0, 7, 9, 5, 0, 6, 8, 3, 7, 2, 3, 8, 3, 4, 7, 5, 5, 3, 7, 7, 7, 7, 3, 4, 1, 3, 6, 3, 6, 6, 0, 8, 6, 4, 8, 9, 3, 2, 0, 7, 3, 4, 4, 8, 2,
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 96 to 100
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 93 to 96
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 90 to 93
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 87 to 90
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 84 to 87
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 59 to 62
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 56 to 59
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 53 to 56
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 50 to 53
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 75 to 78
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 81 to 84
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 9 to 12
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 6 to 9
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 3 to 6
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 0 to 3
Thread[ForkJoinPool-1-worker-15,5,main]calculating: 78 to 81
Thread[ForkJoinPool-1-worker-11,5,main]calculating: 71 to 75
Thread[ForkJoinPool-1-worker-11,5,main]calculating: 68 to 71
Thread[ForkJoinPool-1-worker-11,5,main]calculating: 65 to 68
Thread[ForkJoinPool-1-worker-11,5,main]calculating: 62 to 65
Thread[ForkJoinPool-1-worker-4,5,main]calculating: 21 to 25
Thread[ForkJoinPool-1-worker-4,5,main]calculating: 18 to 21
Thread[ForkJoinPool-1-worker-4,5,main]calculating: 15 to 18
Thread[ForkJoinPool-1-worker-8,5,main]calculating: 12 to 15
Thread[ForkJoinPool-1-worker-2,5,main]calculating: 46 to 50
Thread[ForkJoinPool-1-worker-2,5,main]calculating: 43 to 46
Thread[ForkJoinPool-1-worker-2,5,main]calculating: 40 to 43
Thread[ForkJoinPool-1-worker-2,5,main]calculating: 37 to 40
Thread[ForkJoinPool-1-worker-9,5,main]calculating: 34 to 37
Thread[ForkJoinPool-1-worker-6,5,main]calculating: 31 to 34
Thread[ForkJoinPool-1-worker-1,5,main]calculating: 25 to 28
Thread[ForkJoinPool-1-worker-8,5,main]calculating: 28 to 31
Number of occurrences of value:5 is: 9
[/sourcecode]
Explanation with Steps:
- Above example calculates the number of occurrences of an element in the array.
- We will create a class CountOccurences extending RecursiveTask(since we need output of number of occurrences)
- In the class we will override compute method which will follow the algorithm we mentioned during start of this article
- We will divide the array to sub arrays, each with a length of SEQUENTIAL_THRESHOLD.
- If the array is more than the length of SEQUENTIAL_THRESHOLD, we will split the array to two equal parts and will repeat step 4 & 5, until we find a sub-list of SEQUENTIAL_THRESHOLD else we will move to step 7
- We will fork the left sub-list and once right sub-list is computed, then we will join left sub-list for calculation
- Once we get a sub-list of length less than or equal to SEQUENTIAL_THRESHOLD, we call computeDirectly() method, which calculates the occurrences and return the values.
- At the end of process, the value returned by the invoke will give the number of occurrences of the element in the array.