立的小任务,然后两两求和合并。
其实显然易见负责整个fork/join初始化工作的就是ForkJoinPool!初始化代码就是那一行 ForkJoinPool forkJoinPool = new ForkJoinPool(),点进去查看源码。
ForkJoinPool forkJoinPool = new ForkJoinPool();
//最终调用到这段代码
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism), //并行度,当前机器的cpu核数
checkFactory(factory), //工作线程创建工厂
handler, //异常处理handler
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任务队列出队模式 异步:先进先出,同步:后进先出
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
看完初始化的代码我们可以知道原来创建ForkJoinPool创建workerThread的工作都是统一由一个叫ForkJoinWorkerThreadFactory的工厂去创建,创建出来的线程都有一个统一的前辍名称"ForkJoinPool-" + nextPoolId() + "-worker-".队列出队模式是LIFO(后进先出),那这样后面的入队的任务是会被先处理的。所以上面提到对代码做了一些修改就是先处理rightTask,再处理leftTask。这其实是对代码的一种优化!
//执行子任务
leftTask.fork();
rightTask.fork();
Integer rightResult = rightTask.join();
Integer leftResult = leftTask.join();
4、任务的提交逻辑?
fork/join其实大部分逻辑处理操作都集中在提交任务和处理任务这两块,了解任务的提交基本上后面就很容易理解了。
fork/join提交任务主要分为两种:
第一种:第一次提交到forkJoinPool
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
第二种:任务切分之后的提交
leftTask.fork();
rightTask.fork();
提交到forkJoinPool :
代码调用路径 submit(ForkJoinTask<T> task) -> externalPush(ForkJoinTask<?> task) -> externalSubmit(ForkJoinTask<?> task)
下面贴上externalSubmit的详细代码,着重留意注释的部分。
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) { //采用循环入队的方式
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize 初始化操作
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots //config就是cpu的核数
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //算出workQueues的大小n,n一定是2的次方数
workQueues = new WorkQueue[n]; //初始化队列,然后跳到最外面的循环继续把任务入队~
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) { //选中了一个一个非空队列
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //利用cas操作加锁成功!
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //计算出任务在队列中的位置
U.putOrderedObject(a, j, task); //把任务放在队列中
U.