Java Fork / Join Task – Did You Write It Right?

When we need to perform a lot of small tasks, the experienced Java developers will use the thread pool to efficiently perform these small tasks. However, there is a task, for example, to sort more than 10 million elements of the array, the task itself can be concurrent implementation.

But how to dismantle into a small task in the task of the process of the dynamic split? In general, large tasks can be split into small tasks, small tasks can also continue to split into smaller tasks, and finally the results of the summary of the merger, to get the final result, this model is Java Fork / Join model.

Java7 introduced the Fork / Join framework, we RecursiveTask this class can be easily achieved Fork / Join mode.

Java Fork - Join Task - Did You Write It Right

For example, RecursiveTask for a large array of parallel summation can be written as follows:

class SumTask extends RecursiveTask<Long> {

    static final int THRESHOLD = 100;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
    this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(String.format("compute %d~%d = %d", start, end, sum));
            return sum;
        }
        int middle = (end + start) / 2;
        System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
        return result;
    }
}

The key to writing this Java Fork / Join task is to determine whether the task is small enough to perform the task’s compute () method, and if it is small enough to calculate and return the result directly (note the simulated 1-second delay) To split their own tasks into two, respectively, the calculation of two sub-tasks, and then return to the results of the two sub-tasks and.

Finally, write a main () method to test:

public static void main(String[] args) throws Exception {
    //Create an array of random numbers:
    long[] array = new long[400];
    fillRandom(array);
    // fork/join task:
    ForkJoinPool fjp = new ForkJoinPool(4); // Maximum number of concurrency is 4
    ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
    long startTime = System.currentTimeMillis();
    Long result = fjp.invoke(task);
    long endTime = System.currentTimeMillis();
    System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}

The key code is fjp.invoke (task) to commit a Java Fork / Join task concurrently, and then get the result of the asynchronous execution.

We set the task of the minimum threshold is 100, when submitting a 400-size task, the implementation of the 4-core CPU, will be divided into two, and then divided into four, the minimum sub-task execution time is 1 second, Is the implementation of four sub-tasks, the final task of the final execution time is about 1 second.

Novice in the preparation of Java Fork / Join task, often with a search engine to find an example, and then follow the example to write the following code:

protected Long compute() {
    if (Dose the task small enough?) {
        return computeDirect();
    }
    // The task is too big, divided into two:
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // Call fork ():
    subtask1.fork();
    subtask2.fork();
    // Consolidation results:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}

Unfortunately, this is wrong way! This does not correctly understand the task execution logic of the Fork / Join model.

The size of the worker thread pool used by the JDK to execute the Fork / Join task is equal to the number of CPU cores. On a 4-core CPU, you can execute up to four subtasks at the same time. Summarize the array of 400 elements, the execution time should be 1 second. However, replaced by the above code, the execution time is two seconds.

This is because the thread that executes the compute () method itself is also a Worker thread. When the fork () is called on two subtasks, the Worker thread assigns the task to the other two workers, but it stops waiting for it Lives! This wastes a Worker thread in the Fork / Join thread pool, resulting in at least seven threads in order to execute concurrently with four threads.

For example, suppose a hotel has 400 rooms, a total of four cleaners, each worker can clean 100 rooms a day, so that four workers full load work, 400 rooms all clean up just need 1 day.

Java Fork / Join mode of work like this: First, the workers were assigned a 400 room task, he looked at the task too much of a person can not, so the first 400 rooms split into two 200, and then called B, put one of the 200 points to B.

// Call fork ():
subtask1.fork();
subtask2.fork();

Such as a 400 divided into two 200, this is equivalent to a written to a 200 to B, to another 200 points to C, and then, a became a supervisor, do not work, so B and Bing dry his direct Reporting. B and C in the process of splitting 200 into two 100, they have become supervisors, so that only four workers would have been living, and now need seven workers to complete within 1 day, of which three are not Working.

In fact, we look at the JDK invokeAll () method of the source code can be found, invokeAll N tasks, which N-1 task will use fork () to other threads to perform, but it will leave a task to perform their own, So that the full use of the thread pool, to ensure that there is no idle do not work thread.