Future - 好莱坞原则的应用
Future
FutureTask
并行化
刚开始接触并发编程,容易写出下面的代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | long startTime = System.currentTimeMillis();ExecutorService threadPool = Executors.newCachedThreadPool();
 List<Future> futures = new ArrayList<>();
 for (int i = 0; i < 3; i++) {
 Future<?> future = threadPool.submit(() -> {
 try {
 // 假设这里是一个一直阻塞着的调用
 Thread.sleep(1000000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 });
 futures.add(future);
 }
 futures.forEach(future -> {
 try {
 future.get(1000, TimeUnit.MILLISECONDS);
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 System.out.println((System.currentTimeMillis() - startTime));
 
 | 
此时开发很有可能会误以为结果是 1s 左右,毕竟 3 个FutureTask同时执行、每个只等待 1s,结果确实应该是 1s,但是这里并没有实现并行化,因为future.get是轮询调用的,第一个执行完毕后,第二个仍然需要等待 1s,因此结果是 3s。
虽然每个 FutureTask 是同时开始执行的,但是future.get并不是同时开始等待的,如果想要达到并行执行的效果,一定是在上一个执行完毕的时候,下一个就已经执行完毕了,此时就可以直接获取结果了。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 
 | long startTime = System.currentTimeMillis();ExecutorService threadPool = Executors.newCachedThreadPool();
 ExecutorService innerThreadPool = Executors.newCachedThreadPool();
 List<Future> futures = new ArrayList<>();
 for (int i = 0; i < 3; i++) {
 Future<?> future = threadPool.submit(() -> {
 // 假设这里是一个一直阻塞着的调用
 Future<?> innerFuture = innerThreadPool.submit(() -> {
 try {
 Thread.sleep(1000000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 });
 try {
 innerFuture.get(800, TimeUnit.MILLISECONDS);
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 futures.add(future);
 }
 futures.forEach(future -> {
 try {
 future.get(1000, TimeUnit.MILLISECONDS);
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 System.out.println((System.currentTimeMillis() - startTime));
 
 | 
阻塞还是轮询
阻塞虽然看起来很廉价,似乎线程阻塞后就不占用资源了,实际上很多阻塞都是通过轮询标志位实现的,比如 FutureTask 内部实现了这样的一种状态机:

每次状态的变更都是通过CAS(UNSAFE)实现线程安全的,比如set方法:
| 12
 3
 4
 5
 6
 7
 
 | protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 outcome = v;
 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
 finishCompletion();
 }
 }
 
 | 
Guava - ListenableFuture
ListenableFuture 是在 JDK1.8 之前出现的,现在 CompletableFuture 一般是更好的选择。
CompletionService
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | long start = System.currentTimeMillis();CompletionService<String> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(3));
 cs.submit(() -> {
 Thread.sleep(1000);
 return "x";
 });
 cs.submit(() -> {
 Thread.sleep(1000);
 return "x";
 });
 for (int i = 0; i < 2; i++) {
 String s = cs.take().get();
 System.out.println(s);
 }
 System.out.println(System.currentTimeMillis() - start);
 
 | 
- 将线程任务提交到线程池执行,并将 Future 提交到阻塞队列供客户端获取,阻塞队列默认是LinkedBlockingQueue;
- CompletionService 内部只有在任务完成的时候才会把 Future 丢到队列中,因此如果任务特别耗时,会导致take调用一直不返回,注意下面源码中的done方法:| 12
 3
 4
 5
 6
 7
 8
 
 | private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {
 super(task, null);
 this.task = task;
 }
 protected void done() { completionQueue.add(task); }
 private final Future<V> task;
 }
 
 |  
 
CompletableFuture
CompletableFuture是对Future的加强,
CompletableFuture实现了 Future 接口,这意味着它本身也提供了通过阻塞或轮询获取结果的方式,相对 Future 来说,它的相对优势在于任务的编排,相对 CompletionService 来说,CompletableFuture又有接口更灵活的优势。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | long start = System.currentTimeMillis();List<String> items = Lists.newArrayList("a", "b");
 List<CompletableFuture<String>> futures = items.stream().map(
 item -> CompletableFuture.supplyAsync(() -> {
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 return item + "x";
 }))
 .collect(Collectors.toList());
 List<String> strs = futures.stream().map(CompletableFuture::join)
 .collect(Collectors.toList());
 strs.forEach(System.out::println);
 System.out.println(System.currentTimeMillis() - start);
 
 | 
注意上面的CompletableFuture.supplyAsync和CompletableFuture.join
- 提交异步执行任务 CompletableFuture.supplyAsync,有2个方法,其中有个需要用户指定Executor,如果没有指定则使用JDK内置的线程池ForkJoinPool.commonPool()
- 提交任务到CompletableFuture和直接提交到ExecutorService的区别是线程任务提交前会用AsyncSupply封装
 
- 线程池执行任务
- 任务经过AsyncSupply封装,会调用重写的AsyncSupply.exec()
- 执行完毕后,回调CompletableFuture.internalComplete
 
- 任务结果聚合CompletableFuture::join
- 如果结果已经计算出来,则直接取result
- 否则CompletableFuture.waitingGet
 创建等待队列q = new WaitNode(interruptible, 0L, 0L);
 排队queued = UNSAFE.compareAndSwapObject(this, WAITERS, q.next = waiters, q);
 调ForkJoinPool.managedBlock(q);阻塞,会调WaitNode.block()来判断是否还阻塞着
- 回头看CompletableFuture.internalComplete里的代码,会调LockSupport.unpark(t);唤醒等待的线程
 
参考
- [译]20 个使用 Java CompletableFuture 的例子
- Java CompletableFuture 详解