并发和线程任务编排

Future - 好莱坞原则的应用

Future

FutureTask

并行化

刚开始接触并发编程,容易写出下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long startTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Future<?> future = threadPool.submit(() -> {
try {
// 假设这里是一个一直阻塞着的调用
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
futures.add(future);
}
futures.forEach(future -> {
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println((System.currentTimeMillis() - startTime));

此时开发很有可能会误以为结果是 1s 左右,毕竟 3 个FutureTask同时执行、每个只等待 1s,结果确实应该是 1s,但是这里并没有实现并行化,因为future.get是轮询调用的,第一个执行完毕后,第二个仍然需要等待 1s,因此结果是 3s。
虽然每个 FutureTask 是同时开始执行的,但是future.get并不是同时开始等待的,如果想要达到并行执行的效果,一定是在上一个执行完毕的时候,下一个就已经执行完毕了,此时就可以直接获取结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
long startTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
ExecutorService innerThreadPool = Executors.newCachedThreadPool();
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Future<?> future = threadPool.submit(() -> {
// 假设这里是一个一直阻塞着的调用
Future<?> innerFuture = innerThreadPool.submit(() -> {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
innerFuture.get(800, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
futures.add(future);
}
futures.forEach(future -> {
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println((System.currentTimeMillis() - startTime));

阻塞还是轮询

阻塞虽然看起来很廉价,似乎线程阻塞后就不占用资源了,实际上很多阻塞都是通过轮询标志位实现的,比如 FutureTask 内部实现了这样的一种状态机:
FutureTask状态机
每次状态的变更都是通过CASUNSAFE)实现线程安全的,比如set方法:

1
2
3
4
5
6
7
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

Guava - ListenableFuture

ListenableFuture 是在 JDK1.8 之前出现的,现在 CompletableFuture 一般是更好的选择。

CompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
long start = System.currentTimeMillis();
CompletionService<String> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(3));
cs.submit(() -> {
Thread.sleep(1000);
return "x";
});
cs.submit(() -> {
Thread.sleep(1000);
return "x";
});
for (int i = 0; i < 2; i++) {
String s = cs.take().get();
System.out.println(s);
}
System.out.println(System.currentTimeMillis() - start);
  • 将线程任务提交到线程池执行,并将 Future 提交到阻塞队列供客户端获取,阻塞队列默认是LinkedBlockingQueue
  • CompletionService 内部只有在任务完成的时候才会把 Future 丢到队列中,因此如果任务特别耗时,会导致take调用一直不返回,注意下面源码中的done方法:
    1
    2
    3
    4
    5
    6
    7
    8
    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
    super(task, null);
    this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
    }

CompletableFuture

CompletableFuture是对Future的加强,
CompletableFuture实现了 Future 接口,这意味着它本身也提供了通过阻塞或轮询获取结果的方式,相对 Future 来说,它的相对优势在于任务的编排,相对 CompletionService 来说,CompletableFuture又有接口更灵活的优势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
long start = System.currentTimeMillis();
List<String> items = Lists.newArrayList("a", "b");
List<CompletableFuture<String>> futures = items.stream().map(
item -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return item + "x";
}))
.collect(Collectors.toList());
List<String> strs = futures.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
strs.forEach(System.out::println);
System.out.println(System.currentTimeMillis() - start);

注意上面的CompletableFuture.supplyAsyncCompletableFuture.join

  1. 提交异步执行任务 CompletableFuture.supplyAsync,有2个方法,其中有个需要用户指定Executor,如果没有指定则使用JDK内置的线程池ForkJoinPool.commonPool()
    1. 提交任务到CompletableFuture和直接提交到ExecutorService的区别是线程任务提交前会用AsyncSupply封装
  2. 线程池执行任务
    1. 任务经过AsyncSupply封装,会调用重写的AsyncSupply.exec()
    2. 执行完毕后,回调CompletableFuture.internalComplete
  3. 任务结果聚合CompletableFuture::join
    1. 如果结果已经计算出来,则直接取result
    2. 否则CompletableFuture.waitingGet
      创建等待队列q = new WaitNode(interruptible, 0L, 0L);
      排队queued = UNSAFE.compareAndSwapObject(this, WAITERS, q.next = waiters, q);
      ForkJoinPool.managedBlock(q);阻塞,会调WaitNode.block()来判断是否还阻塞着
    3. 回头看CompletableFuture.internalComplete里的代码,会调LockSupport.unpark(t);唤醒等待的线程

参考

  1. [译]20 个使用 Java CompletableFuture 的例子
  2. Java CompletableFuture 详解