Future - 好莱坞原则的应用
Future
FutureTask
并行化
刚开始接触并发编程,容易写出下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| long startTime = System.currentTimeMillis(); ExecutorService threadPool = Executors.newCachedThreadPool(); List<Future> futures = new ArrayList<>(); for (int i = 0; i < 3; i++) { Future<?> future = threadPool.submit(() -> { try { // 假设这里是一个一直阻塞着的调用 Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } }); futures.add(future); } futures.forEach(future -> { try { future.get(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } }); System.out.println((System.currentTimeMillis() - startTime));
|
此时开发很有可能会误以为结果是 1s 左右,毕竟 3 个FutureTask
同时执行、每个只等待 1s,结果确实应该是 1s,但是这里并没有实现并行化,因为future.get
是轮询调用的,第一个执行完毕后,第二个仍然需要等待 1s,因此结果是 3s。
虽然每个 FutureTask 是同时开始执行的,但是future.get
并不是同时开始等待的,如果想要达到并行执行的效果,一定是在上一个执行完毕的时候,下一个就已经执行完毕了,此时就可以直接获取结果了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| long startTime = System.currentTimeMillis(); ExecutorService threadPool = Executors.newCachedThreadPool(); ExecutorService innerThreadPool = Executors.newCachedThreadPool(); List<Future> futures = new ArrayList<>(); for (int i = 0; i < 3; i++) { Future<?> future = threadPool.submit(() -> { // 假设这里是一个一直阻塞着的调用 Future<?> innerFuture = innerThreadPool.submit(() -> { try { Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } }); try { innerFuture.get(800, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } }); futures.add(future); } futures.forEach(future -> { try { future.get(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } }); System.out.println((System.currentTimeMillis() - startTime));
|
阻塞还是轮询
阻塞虽然看起来很廉价,似乎线程阻塞后就不占用资源了,实际上很多阻塞都是通过轮询标志位实现的,比如 FutureTask 内部实现了这样的一种状态机:
每次状态的变更都是通过CAS
(UNSAFE
)实现线程安全的,比如set
方法:
1 2 3 4 5 6 7
| protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
|
Guava - ListenableFuture
ListenableFuture 是在 JDK1.8 之前出现的,现在 CompletableFuture 一般是更好的选择。
CompletionService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| long start = System.currentTimeMillis(); CompletionService<String> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(3)); cs.submit(() -> { Thread.sleep(1000); return "x"; }); cs.submit(() -> { Thread.sleep(1000); return "x"; }); for (int i = 0; i < 2; i++) { String s = cs.take().get(); System.out.println(s); } System.out.println(System.currentTimeMillis() - start);
|
- 将线程任务提交到线程池执行,并将 Future 提交到阻塞队列供客户端获取,阻塞队列默认是
LinkedBlockingQueue
;
- CompletionService 内部只有在任务完成的时候才会把 Future 丢到队列中,因此如果任务特别耗时,会导致
take
调用一直不返回,注意下面源码中的done
方法: 1 2 3 4 5 6 7 8
| private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
|
CompletableFuture
CompletableFuture
是对Future的加强,
CompletableFuture
实现了 Future 接口,这意味着它本身也提供了通过阻塞或轮询获取结果的方式,相对 Future 来说,它的相对优势在于任务的编排,相对 CompletionService 来说,CompletableFuture
又有接口更灵活的优势。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| long start = System.currentTimeMillis(); List<String> items = Lists.newArrayList("a", "b"); List<CompletableFuture<String>> futures = items.stream().map( item -> CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return item + "x"; })) .collect(Collectors.toList()); List<String> strs = futures.stream().map(CompletableFuture::join) .collect(Collectors.toList()); strs.forEach(System.out::println); System.out.println(System.currentTimeMillis() - start);
|
注意上面的CompletableFuture.supplyAsync
和CompletableFuture.join
- 提交异步执行任务
CompletableFuture.supplyAsync
,有2个方法,其中有个需要用户指定Executor,如果没有指定则使用JDK内置的线程池ForkJoinPool.commonPool()
- 提交任务到CompletableFuture和直接提交到ExecutorService的区别是线程任务提交前会用
AsyncSupply
封装
- 线程池执行任务
- 任务经过AsyncSupply封装,会调用重写的
AsyncSupply.exec()
- 执行完毕后,回调
CompletableFuture.internalComplete
- 任务结果聚合
CompletableFuture::join
- 如果结果已经计算出来,则直接取result
- 否则
CompletableFuture.waitingGet
创建等待队列q = new WaitNode(interruptible, 0L, 0L);
排队queued = UNSAFE.compareAndSwapObject(this, WAITERS, q.next = waiters, q);
调ForkJoinPool.managedBlock(q);
阻塞,会调WaitNode.block()
来判断是否还阻塞着
- 回头看
CompletableFuture.internalComplete
里的代码,会调LockSupport.unpark(t);
唤醒等待的线程
参考
- [译]20 个使用 Java CompletableFuture 的例子
- Java CompletableFuture 详解