Java-线程池

执行异步任务最简单的方式就是通过线程来执行,因为线程本质上是操作系统的资源,应用如果不加限制地占用——最严重的情况下——将会导致系统的宕机。因此,本地线程任务主要依赖线程池来执行,线程池可以看做一种线程资源池,提供了对线程资源的调度功能。当然,提到线程就不得不提并发安全,这又是一个非常复杂的主题,水平有限,无法一一道清。

为什么使用线程池

  1. 降低资源消耗
    通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
    其他类似的“池”还有:内存池、连接池、实例池,是同一模式在不同场景下的应用。
  2. 提高响应速度
    任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性
    线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能
    线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

JDK - ExecutorService

Executor 是 Java5 引入的线程执行器,主要用于负责线程的创建和调度,程序中不用再显示地调用 new Thread().start()(实现业务逻辑和线程创建、调度间的解耦)。
接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。一个 ExecutorService 实例因此特别像一个线程池。事实上,在 java.util.concurrent 包中的 ExecutorService 的实现就是一个线程池的实现。

ExecutorService

execute、submit - 任务提交

  • execute(Runnable)
    接收一个 java.lang.Runnable 对象作为参数,并且以异步的方式执行它。如下是一个使用 ExecutorService 执行 Runnable 的例子:
    1
    2
    3
    4
    5
    6
    7
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(new Runnable() {
    public void run() {
    System.out.println("Asynchronous task");
    }
    });
    executorService.shutdown();
    使用这种方式没有办法获取执行 Runnable 之后的结果,如果你希望获取运行之后的返回值,就必须使用 接收 Callable 参数的 execute() 方法,后者将会在下文中提到。
  • submit(Runnable)
    会返回一个 Future, Future 对象可以用于判断 Runnable 是否结束执行。如下是一个 ExecutorService 的 submit() 方法的例子:
    1
    2
    3
    4
    5
    6
    7
    Future future = executorService.submit(new Runnable() {
    public void run() {
    System.out.println("Asynchronous task");
    }
    });
    //如果任务结束执行则返回 null
    System.out.println("future.get()=" + future.get());
  • submit(Callable)
    接收一个 Callable 参数,Callable 的返回值可以从方法 submit(Callable) 返回的 Future 对象中获取。
    1
    2
    3
    4
    5
    6
    7
    Future future = executorService.submit(new Callable(){
    public Object call() throws Exception {
    System.out.println("Asynchronous Callable");
    return "Callable Result";
    }
    });
    System.out.println("future.get() = " + future.get());

invokeAll、invokeAny - 调用 Callable 任务

  • invokeAll()
    方法 invokeAll() 会调用所有存在于参数集合中的 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这个返回的集合来管理每個 Callable 的执行结果。
    需要注意的是,任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这個差异。
  • invokeAny()
    方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个 Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。
    如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。

shutdown - 终止

当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态,ExecutorService 中仍处于执行状态的用户线程会阻止 Java 虚拟机的关闭。

  • shutdown
    ExecutorService 并不会马上关闭,而是不再接收新的任务,一旦所有的线程结束执行当前任务,ExecutorService 才会真的关闭。
  • shutdownNow
    立刻终止,这个方法不推荐使用,如果线程正在执行重要任务,直接中断掉可能破坏数据一致性。

shutdown 虽然可以保证应用能安全退出,但是不能排除某些情况下线程陷入了死循环或遇到一些特别耗时的任务、导致线程池无法退出的情况,这时最好设置一个超时退出的机制,可以使用awaitTermination来实现。

ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService 的实现类,也是最常使用的线程池实现类。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

corePoolSize

线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程,即当前线程总数小于 corePoolSize 的情况下新建的是核心线程,如果超过 corePoolSize,则新建的是非核心线程。
这里需要注意的是:在刚刚创建 ThreadPoolExecutor 的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了 prestartCoreThread/prestartAllCoreThreads 事先启动核心线程。再考虑到 keepAliveTime 和 allowCoreThreadTimeOut 超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是 corePoolSize。

maximumPoolSize

线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于 maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是 largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用 setMaximumPoolSize()改变运行的最大线程的数目。

poolSize

线程池中当前线程的数量,当该值为 0 的时候,意味着没有任何线程,线程池会终止;同一时刻,poolSize 不会超过 maximumPoolSize。

keepAliveTime

如果一个线程处在空闲状态的时间超过了该属性值,就会因为超时而退出。举个例子,如果线程池的核心大小 corePoolSize=5,而当前大小 poolSize =8,那么超出核心大小的线程,会按照 keepAliveTime 的值判断是否会超时退出。如果线程池的核心大小 corePoolSize=5,而当前大小 poolSize =5,那么线程池中所有线程都是核心线程,这个时候线程是否会退出,取决于 allowCoreThreadTimeOut。
在压力很大的情况下,线程池中的所有线程都在处理新提交的任务或者是在排队的任务,这个时候线程池处在忙碌状态。如果压力很小,那么可能很多线程池都处在空闲状态,这个时候为了节省系统资源,回收这些没有用的空闲线程,就必须提供一些超时机制,这也是线程池大小调节策略的一部分。通过 corePoolSize 和 maximumPoolSize,控制如何新增线程;通过 allowCoreThreadTimeOut 和 keepAliveTime,控制如何销毁线程。

TimeUnit unit

keepAliveTime 的单位,TimeUnit 是一个 枚举类型,其包括:

1
2
3
4
5
6
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天

BlockingQueue workQueue

该线程池中的任务队列,维护着等待执行的 Runnable 对象。
当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
常用的 workQueue 类型:

  • SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了 maximumPoolSize 而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即无限大
  • LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效,因为总线程数永远不会超过 corePoolSize
  • ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到 corePoolSize 的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程 (非核心线程) 执行任务,又如果总线程数到了 maximumPoolSize,并且队列也满了,则发生错误
  • DelayQueue:队列内元素必须实现 Delayed 接口,这就意味着你传进去的任务必须先实现 Delayed 接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务

ThreadFactory threadFactory

创建线程的方式,这是一个接口,你 new 他的时候需要实现他的 Thread newThread(Runnable r)方法,一般用不上。
比如 AsyncTask ,它是对线程池的封装,新建线程池的 threadFactory 参数源码如下,非常简单:

1
2
3
4
5
6
7
new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);

public Thread new Thread(Runnable r) {
return new Thread(r,"AsyncTask#"+mCount.getAndIncrement());
}
}

RejectedExecutionHandler handler

抛出异常专用,比如上面提到的两个错误发生了,就会由这个 handler 抛出异常,不指定的话,这个参数会取默认值 ThreadPoolExecutor.AbortPolicy,所有类型如下:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

执行策略

线程池执行流程

  1. 如果线程池的当前 poolSize < corePoolSize,那么就新增加一个线程处理新提交的任务,即使此时线程池中存在空闲线程;
  2. 如果当前 poolSize 已经达到 corePoolSize,就将新提交的任务提交到 workQueue 排队,等候处理 workQueue.offer(command)
  3. 如果 workQueue 容量已达上限,并且当前 poolSize < maximumPoolSize,那么就新增线程来处理任务;
  4. 如果 workQueue 已满,并且当前 poolSize >= maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务,新提交任务由 RejectedExecutionHandler 处理。

线程池状态

线程池状态

  1. RUNNING
    线程池正处于运行中状态,可接收新任务,也可执行已添加的任务。
    线程池刚创建时就处于这个状态,此时线程池中没有任何任务。
  2. SHUTDOWN
    线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  3. STOP
    线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  4. TIDYING
    当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
    当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数来实现。
  5. TERMINATED
    线程池彻底终止,就变成TERMINATED状态。

线程回收策略

  • 当线程池中超过 corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程
  • 当设置allowCoreThreadTimeOut(true)时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭

排队策略

J.U.C 提供的 ThreadPoolExecutor 只支持任务在内存中排队,通过 BlockingQueue 暂存还没有来得及执行的任务。
常用的有 SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue 三种阻塞队列。
一般如果线程池任务队列采用LinkedBlockingQueue队列的话,那么不会拒绝任何任务(因为队列大小没有限制),这种情况下,ThreadPoolExecutor 最多仅会按照最小线程数来创建线程,也就是说线程池大小被忽略了。
如果线程池任务队列采用ArrayBlockingQueue队列的话,那么 ThreadPoolExecutor 将会采取一个非常负责的算法,比如假定线程池的最小线程数为 4,最大为 8 所用的 ArrayBlockingQueue 最大为 10。随着任务到达并被放到队列中,线程池中最多运行 4 个线程(即最小线程数)。即使队列完全填满,也就是说有 10 个处于等待状态的任务,ThreadPoolExecutor 也只会利用 4 个线程。如果队列已满,而又有新任务进来,此时才会启动一个新线程,这里不会因为队列已满而拒接该任务,相反会启动一个新线程。新线程会运行队列中的第一个任务,为新来的任务腾出空间。
这个算法背后的理念是:该池大部分时间仅使用核心线程(4 个),即使有适量的任务在队列中等待运行。这时线程池就可以用作节流阀。如果挤压的请求变得非常多,这时该池就会尝试运行更多的线程来清理;这时第二个节流阀—最大线程数就起作用了。

饱和策略

AbortPolicy: 直接抛异常
CallerRunsPolicy: 用调用者的线程来运行任务
DiscardOldestPolicy: 丢弃线程队列里最近的一个任务,执行新提交的任务
DiscardPolicy: 直接将新任务丢弃

如果使用 Executors 的工厂方法创建的线程池,那么饱和策略都是采用默认的 AbortPolicy,所以如果我们想当线程池已满的情况,使用调用者的线程来运行任务,就要自己创建线程池,指定想要的饱和策略,而不是使用 Executors 了。

ThreadPoolExecutor 原理

线程池内部维护两个状态值:

  • 运行状态(runState)
    线程池状态机
  • 线程数量(workerCount)

这两个状态保存在变量ctl里,其高3位保存runState,低29位保存workerCount

1
2
3
4
5
6
7
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 当前线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }

提交任务的入口是execute:
ThreadPoolExecutor工作原理

  • 用户线程调用 submit、execute 提交线程任务 addWorker,如果当前线程池活跃任务数 < corePoolSize,则直接 addWorker,否则添加到 workQueue
  • addWorker 成功则执行线程任务 runWorker
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    /*
    * Proceed in 3 steps:
    *
    * 1. If fewer than corePoolSize threads are running, try to
    * start a new thread with the given command as its first
    * task. The call to addWorker atomically checks runState and
    * workerCount, and so prevents false alarms that would add
    * threads when it shouldn't, by returning false.
    *
    * 2. If a task can be successfully queued, then we still need
    * to double-check whether we should have added a thread
    * (because existing ones died since last checking) or that
    * the pool shut down since entry into this method. So we
    * recheck state and if necessary roll back the enqueuing if
    * stopped, or start a new thread if there are none.
    *
    * 3. If we cannot queue task, then we try to add a new
    * thread. If it fails, we know we are shut down or saturated
    * and so reject the task.
    */
    int c = ctl.get();
    // 第一步,使用corePoolSize个线程资源
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    // corePoolSize个线程正在被使用,此时将command压入队列
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    // 继续创建线程,这时不再使用corePoolSize,而是继续创建线程直到达到maxPoolSize
    else if (!addWorker(command, false))
    reject(command);
    }
  • ThreadPoolExecutor 并不是把任务分给线程去执行,而是由线程不断地从阻塞队列获取任务(ThreadPoolExecutor.runWorker -> ThreadPoolExecutor.getTask

由上面的任务分配代码可知,任务的执行有两种可能情况:

  • 新任务直接由新创建的线程执行,这种情况仅出现在线程初始创建的时候;
  • 当任务没有被立刻执行,它会被添加到一个阻塞队列中,工作线程会从中获取任务执行。

前者已经在上面代码中,而后者是一个异步的过程,其执行流程见getTask,Worker线程调用该方法获取任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 获取任务
while (task != null || (task = getTask()) != null) {
// 加写锁、检查线程池是不是停掉了
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();

// 执行任务
task.run();
}
}
}

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
// 获取运行状态runState
int c = ctl.get();
int rs = runStateOf(c);

// 检查线程池是否已停止运行,若已停止则直接返回null
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

// 当前工作线程数量
int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 若当前线程数过多则直接返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  • getTask多个地方判断了当前线程池的运行状况,在已停止、线程数量过多的情况下会直接返回null,返回null(接收不到任务)的情况下线程会开始被回收。

ScheduledThreadPoolExecutor 原理

  • 使用DelayedWorkQueue保存ScheduledFutureTaskDelayedWorkQueue内部使用优先队列实现,优先级是调度的间隔时间;
  • 用户线程只负责提交任务到队列、必要时启动新的线程,并不负责把任务提交给任何工作线程来执行,那么如何实现到了调度时间才执行目标任务的特性?其实是在上一步工作线程从队列获取任务时,会根据队首任务的延迟时间来进行等待。当向队列中投放任务时,会唤醒正在等待的线程,同样的,为了防止虚假唤醒,源码中在被唤醒后会计算剩余睡眠时间。(ScheduledThreadPoolExecutor.DelayedWorkQueue.poll(long timeout, TimeUnit unit)ScheduledThreadPoolExecutor.DelayedWorkQueue.take)

ScheduledThreadPoolExecutor中的delay是如何生效的?

  1. 提交任务
    1. 业务方提交任务时指定delay时间:ScheduledThreadPoolExecutor.scheduleWithFixedDelay
    2. 新建任务 ScheduledThreadPoolExecutor.ScheduledFutureTask
    3. 将任务插入到队列末尾 ScheduledThreadPoolExecutor.delayedExecute,这里的队列是一个DelayedWorkQueue
    4. 初始化线程 ThreadPoolExecutor.ensurePrestart
  2. 执行任务
    1. 运行WorkerThreadPoolExecutor.runWorker
    2. 线程Worker会轮询中队列中获取任务 ThreadPoolExecutor.getTask,获取任务时DelayedWorkQueue.take会延迟delay的时间

为什么使用 ThreadPoolExecutor 而不是 Executors

Executors 提供了一些常用的线程池类别,它本质上是创建了一些具有特定功能的 ThreadPoolExecutor 实例,建议直接创建 ThreadPoolExecutor 实例而不是使用 Executors 创建,因为这样会促使我们明确线程池的运行规则,避免资源耗尽。

  1. FixedThreadPool 和 SingleThreadPool
    因为阻塞队列的实现为LinkedBlockingQueue,因此允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而发生内存溢出现象(OOM)
  2. CachedThreadPool 和 ScheduledThreadPool
    因为maximumPoolSize=Integer.MAX_VALUE,允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致内存溢出现象(OOM)

Executors

Executors 提供了一些工厂方法,用于创建一些适用于特定场景的线程池。

newCachedThreadPool

返回 ThreadPoolExecutor 实例,corePoolSize 为 0;maximumPoolSize 为 Integer.MAX_VALUE;keepAliveTime 为 60L;unit 为 TimeUnit.SECONDS;workQueue 为 SynchronousQueue(同步队列)

1
2
3
4
5
6
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

当有新任务到来,则插入到 SynchronousQueue 中,由于 SynchronousQueue 是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;
若池中线程空闲时间超过指定大小,则该线程会被销毁。

  • 优势:1.线程数无限制 2.有空闲线程则复用空闲线程,若无空闲线程则新建线程 3.一定程序减少频繁创建/销毁线程,减少系统开销
  • 适用:执行很多短期异步的小程序或者负载较轻的服务器,比如 MQ 消息的发送。

newFixedThreadPool

返回 ThreadPoolExecutor 实例,接收参数为所设定线程数量 nThread,corePoolSize 为 nThread,maximumPoolSize 为 nThread;keepAliveTime 为 0L(不限时);unit 为:TimeUnit.MILLISECONDS;WorkQueue 为:new LinkedBlockingQueue() 无界阻塞队列

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads,
                             0L, TimeUnit.MILLISECONDS,
                             new LinkedBlockingQueue<Runnable>());
}

创建可容纳固定数量线程的池子,每隔线程的存活时间是无限的,当池子满了就不在添加线程了;
如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)。

  • 优势:1.可控制线程最大并发数(同时执行的线程数) 2.超出的线程会在队列中等待
  • 适用:执行长期的任务,性能好很多

newSingleThreadExecutor

FinalizableDelegatedExecutorService 包装的 ThreadPoolExecutor 实例,corePoolSize 为 1;maximumPoolSize 为 1;keepAliveTime 为 0L;unit 为:TimeUnit.MILLISECONDS;workQueue 为:new LinkedBlockingQueue() 无解阻塞队列

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>()));
}

创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)

  • 优势:1.有且仅有一个工作线程执行任务 2.所有任务按照指定顺序执行,即遵循队列的入队出队规则
  • 适用:一个任务一个任务执行的场景

newScheduledThreadPool

创建 ScheduledThreadPoolExecutor 实例,corePoolSize 为传递来的参数,maximumPoolSize 为 Integer.MAX_VALUE;keepAliveTime 为 0;unit 为:TimeUnit.NANOSECONDS;workQueue 为:new DelayedWorkQueue() 一个按超时时间升序排序的队列

1
2
3
4
5
6
7
8
9
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor()
public ScheduledThreadPoolExecutor(int corePoolSize) {
   super(corePoolSize, Integer.MAX_VALUE,
         DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
         new DelayedWorkQueue());
}

创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入 DelayedWorkQueue 队列中,这是一种按照超时时间排序的队列结构
适用:定时及周期性执行任务的场景

线程池预热

在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,(除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程)。

参数调优

参数的设置跟系统的负载有直接的关系,下面为系统负载的相关参数:

  • tasks:每秒需要处理的最大任务数量
  • tasktime:处理第个任务所需要的时间
  • responsetime:系统允许任务最大的响应时间,比如每个任务的响应时间不得超过 2 秒。

下面结合线程池原理来解释 ThreadPoolExecutor 参数应该怎么设置:

  • corePoolSize
    每个任务需要 tasktime 秒处理,则每个线程每秒可处理 1 / tasktime 个任务。
    系统每秒有 tasks 个任务需要处理,则需要的线程数为:tasks / ( 1 / tasktime ),即 tasks * tasktime 个线程数。
    假设系统每秒任务数为 100 ~ 1000,每个任务耗时 0.1 秒,则需要 100 * 0.1 至 1000 * 0.1,即 10 ~ 100 个线程。
    那么 corePoolSize 应该设置为大于 10,具体数字最好根据 8020 原则,即 80 % 情况下系统每秒任务数,若系统 80%的情况下第秒任务数小于 200,最多时为 1000,则 corePoolSize 可设置为 20。
  • queueCapacity
    任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。 队列长度可以设置为 ( corePoolSize / tasktime ) * responsetime : ( 20 / 0.1 ) * 2 = 400,即队列长度可设置为 400。
    队列长度设置过大,会导致任务响应时间过长,切忌以下写法:
    1
    LinkedBlockingQueue queue = new LinkedBlockingQueue();
    这实际上是将队列长度设置为 Integer.MAX_VALUE,将会导致线程数量永远为 corePoolSize,再也不会增加,当任务数量陡增时,任务响应时间也将随之陡增。
  • maxPoolSize
    当系统负载达到最大值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。
    每秒 200 个任务需要 20 个线程,那么当每秒达到 1000 个任务时,则需要 ( 1000 - queueCapacity ) * ( 20 / 200 ),即 60 个线程,可将 maxPoolSize 设置为 60。
    若结合 CPU 的情况,比如,当线程数量达到 50 时,CPU 达到 100%,则将 maxPoolSize 设置为 60 也不合适,此时若系统负载长时间维持在每秒 1000 个 任务,则超出线程池处理能力,应设法降低每个任务的处理时间 ( tasktime )。
  • keepAliveTime:
    线程数量只增加不减少也不行。当负载降低时,可减少线程数量,如果一个线程空闲时间达到 keepAliveTiime,该线程就退出。 默认情况下线程池最少会保持 corePoolSize 个线程。
  • allowCoreThreadTimeout:
    默认情况下核心线程不会退出,可通过将该参数设置为 true,让核心线程也退出。

Spring - ThreadPoolTaskExecutor

Spring 的 TaskExecutor 接口等同于 Java.util.concurrent.Executor 接口。 实际上,它存在的主要原因是为了在使用线程池的时候,将对 Java 5 的依赖抽象出来,在需要时给其他 Spring 组件提供一个线程池的抽象。 这个接口只有一个方法 execute(Runnable task),它根据线程池的语义和配置,来接受一个执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<bean id="taskExecutor" name="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">  
<!-- 核心线程数 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="10" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="20" />
<!-- 线程池所使用的缓冲队列 -->
<property name="queueCapacity" value="100" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
1
2
@Resource(name = "taskExecutor")
private ThreadPoolTaskExecutor taskExecutor;

提交线程任务主要有以下两个 API:

  • void taskExecutor.execute(…)
  • Future taskExecutor.submit(…)

任务处理的逻辑和 JDK ExecutorService 类似:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列
如果此时线程池中的数量大于corePoolSize,缓冲队列 workQueue 满,并且线程池中的数量小于 maxPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列 workQueue 满,并且线程池中的数量等于 maxPoolSize,那么通过 handler 所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程 corePoolSize、任务队列 workQueue、最大线程 maximumPoolSize,如果三者都满了,使用 handler 处理被拒绝的任务。
当线程池中的线程数量大于corePoolSize 时,如果某线程空闲时间超过 keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

  • SimpleAsyncTaskExecutor 类
    这个实现不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。如果你需要真正的池,请继续往下看。
  • SyncTaskExecutor 类
    这个实现不会异步执行。相反,每次调用都在发起调用的线程中执行。它的主要用处是在不需要多线程的时候,比如简单的 test case。
  • ConcurrentTaskExecutor 类
    这个实现是对 Java 5 java.util.concurrent.Executor 类的包装。有另一个备选, ThreadPoolTaskExecutor 类,它暴露了 Executor 的配置参数作为 bean 属性。很少需要使用 ConcurrentTaskExecutor, 但是如果 ThreadPoolTaskExecutor 不敷所需,ConcurrentTaskExecutor 是另外一个备选。
  • SimpleThreadPoolTaskExecutor 类
    这个实现实际上是 Quartz 的 SimpleThreadPool 类的子类,它会监听 Spring 的生命周期回调。当你有线程池,需要在 Quartz 和非 Quartz 组件中共用时,这是它的典型用处。
  • ThreadPoolTaskExecutor 类
    它不支持任何对 java.util.concurrent 包的替换或者下行移植。Doug Lea 和 Dawid Kurzyniec 对 java.util.concurrent 的实现都采用了不同的包结构,导致它们无法正确运行。
    这个实现只能在 Java 5 环境中使用,但是却是这个环境中最常用的。它暴露的 bean properties 可以用来配置一个 java.util.concurrent.ThreadPoolExecutor,把它包装到一个 TaskExecutor 中。如果你需要更加先进的类,比如 ScheduledThreadPoolExecutor,我们建议你使用 ConcurrentTaskExecutor 来替代。
  • TimerTaskExecutor 类
    这个实现使用一个 TimerTask 作为其背后的实现。它和 SyncTaskExecutor 的不同在于,方法调用是在一个独立的线程中进行的,虽然在那个线程中是同步的。
  • WorkManagerTaskExecutor 类
    CommonJ 是 BEA 和 IBM 联合开发的一套规范。这些规范并非 java ee 的标准,但它是 BEA 和 IBM 的应用服务器实现的共同标准
    这个实现使用了 CommonJ WorkManager 作为其底层实现,是在 Spring context 中配置 CommonJ WorkManager 应用的最重要的类。和 SimpleThreadPoolTaskExecutor 类似,这个类实现了 WorkManager 接口,因此可以直接作为 WorkManager 使用。

线程池优化 - 用最少的资源干最多的活

线程任务特性分析

任务的性质:CPU 密集型任务、IO 密集型任务、混合型任务。
任务的优先级:高、中、低。
任务的执行时间:长、中、短。
任务的依赖性:是否依赖其他系统资源,如数据库连接等。

线程池容量评估

评估线程池要开多大一般会借助于一个公式:
TN = CN * CU * (1 + w/c)
TN = 线程池中总线程数
CN = CPU核心数
CU = CPU使用率
w = 等待时间
c = 计算时间

《java 并发编程实践》和《Java 虚拟机并发编程》都有描述。

对线程池大小的计算需要区分所需执行的任务特点:

  • IO 密集型
    若任务对其他系统资源有依赖,如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则 CPU 空闲的时间越长,那么线程数量应设置得越大,才能更好的利用 CPU。
    一般情况下,如果存在 IO,那么肯定 w/c>1(阻塞耗时一般都是计算耗时的很多倍),但是需要考虑系统内存有限(每开启一个线程都需要内存空间),这里需要上服务器测试具体多少个线程数适合(CPU 占比、线程数、总耗时、内存消耗)。如果不想去测试,保守点取 1 即:
    TN = 2CN
  • 计算密集型
    假设没有等待或者近似为没有等待 w=0,则 w/c=0,有:
    TN = CN
    对于计算密集型的任务,在拥有 N 个处理器的系统上,当线程池的大小为 N+1 时,通常能实现最优的效率,这样,即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费,但也同时会多一个 CPU 上下文切换。
  • 混合型
    如果可以拆分,拆分成 IO 密集型和 CPU 密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。

总而言之:

  1. 高并发、任务执行时间短的业务
    线程池线程数可以设置为CPU 核数+1,减少线程上下文的切换
  2. 并发不高、任务执行时间长的业务
    要区分开看:
    • 假如是业务时间长集中在 IO 操作上,也就是 IO 密集型的任务,因为 IO 操作并不占用 CPU,所以不要让所有的 CPU 闲下来,可以适当加大线程池中的线程数目,让 CPU 处理更多的业务
    • 假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样吧,线程池中的线程数设置得少一些,减少线程上下文的切换
  3. 并发高、业务执行时间长
    解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,然后再考虑如何设置线程池属性。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件对任务进行拆分和解耦。

当然具体合理设置线程池大小,需要结合系统实际情况,在大量的尝试下比较才能得出,以上只是前人总结的规律。

ForkJoinPool

  • 控制并行度:理想情况下每个 CPU 线程执行一个线程任务,极大减少了线程上下文切换耗费的时间,并行度在构造时确定,默认是 CPU 的可用线程数(Runtime.getRuntime().availableProcessors())。
  • Work-stealing:get 或 join 的时候会触发(ForkJoinTask),从存放 Task 的双端队列末尾“窃取”任务交给空闲的线程来执行,避免线程资源的浪费。
    ForkJoinPool工作原理

线程池隔离

默认情况下,CompletableFuture这样的并发工具类使用一个默认的线程池来执行任务,如果进程的多个地方都在使用CompletableFuture,那么它们无疑是会竞争这个线程池的。

对象池 - commons-pools2

池化技术主要分为两种:对象池和线程池。

  • 对象池提供了对象的回收机制,这些对象一般是非常”重”的,每次需要时都实例化一次非常浪费资源,常见的实现如数据库连接池、客户端实例等。
  • 线程池相对对象池,增加了销毁、状态机、预热等机制,线程的调度规则也复杂很多。
    commons-pool2类图

创建对象流程

GenericObjectPool为例:

  • 获取对象:org.apache.commons.pool2.impl.GenericObjectPool#borrowObject(long)
  • 返还对象:org.apache.commons.pool2.impl.GenericObjectPool#returnObject

获取对象基本流程

org.apache.commons.pool2.impl.GenericObjectPool#borrowObject(long borrowMaxWaitMillis

  1. 从一个双端队列(LinkedBlockingDeque)的头部弹出(unlink)一个对象;
  2. 如果队列为空,则尝试创建(org.apache.commons.pool2.impl.GenericObjectPool#create);
  3. 使用PooledObjectFactory激活(activateObject)该对象(一般是做一些初始化);

返还对象

org.apache.commons.pool2.impl.GenericObjectPool#returnObject

  1. 反初始化(passivateObject)对象;
  2. 将对象插入队列。

线程安全性

线程安全主要体现在队列上,LinkedBlockingDeque 采用InterruptibleReentrantLock来同步写线程。

QA

  1. 当设置 allowCoreThreadTimeOut = true 时,long keepAliveTime 会转而作用于核心线程,那么同时我还想设置非核心线程的闲置超时时常要怎样做呢?
    allowCoreThreadTimeOut 的值不管是 false 还是 true,都会对非核心线程起作用的。只不过在为 true 时,对核心线程也会起作用
  2. 线程的创建 / 销毁伴随着系统开销 具体是指什么?
    分两个部分说
    第一点:我们会在服务器上创建线程的大小,现在默认 1M。这个是一个很大的内存。
    第二点:需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。 所以多线程在某些情况下并不是 100% 能提高效率,反而会比单线程慢。当然池化后,可以降低一部分创建,销毁的开销, 但是调度的资源是一定会被浪费的。

参考

  1. 【目录】JUC 锁框架目录
  2. 【目录】JUC 集合框架目录

线程

  1. liuzhengyang/hsdis
  2. 面试官问:为什么 Java 线程没有 Running 状态?我懵了
  3. 上下文切换详解

线程任务

  1. 多线程并发读写是否一定要加锁?

线程池

  1. 手动造一个线程池(Java)
  2. ExecutorService – Waiting for Threads to Finish
  3. 如何合理地估算线程池大小?
    提供了一个Dark Magic方法来计算线程池大小。