ForkJoinPool

Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。

使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

下面的UML类图显示了ForkJoinPool、ForkJoinTask之间的关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
* @Author: Usher
* @Description:
* ForkJoinPool
*/
public class TestForkJoinPool {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
Instant instant = Instant.now();
ForkJoinTask<Long> task = new ForkAndJoin(0L,10000000L);
Long sum = pool.invoke(task);
Instant end = Instant.now();
System.out.println(sum + " 时间:" + Duration.between(instant,end).toMillis());
}
//JDK1.8 Stream新特性
@Test
public void test(){
Instant instant = Instant.now();
Long sum = LongStream.rangeClosed(0L,10000000L)
.parallel()
.reduce(0L,Long ::sum);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间:" + Duration.between(instant,end).toMillis());
}
}

class ForkAndJoin extends RecursiveTask<Long> {

private long start;
private long end;

private static final long THURSHOLD = 0L;

public ForkAndJoin(long start, long end){
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long length = end - start;
if (length <= THURSHOLD){
long sum = 0L;
for(long i = start;i <= end;i++){
sum += i;
}
return sum;
}else {
long middle = (start + end) / 2;
ForkAndJoin left = new ForkAndJoin(start, middle);
left.fork();//拆分,压入线程队列
ForkAndJoin right = new ForkAndJoin(middle + 1,end);
right.fork();

return left.join() + right.join();
}
}
}
1
2
3
4
5
6
7
8
9
10
//输出Test
---- IntelliJ IDEA coverage runner ----
sampling ...
include patterns:
main\.java\.com\.usher\.concurrency\.ForkAndJoin\..*
exclude patterns:50000005000000
耗费时间:101

//输出ForkJoinPool
50000005000000 时间:1220