并发和同步策略

通过操作 Thread、Object 的 API 和内置锁我们可以解决大部分的线程协调问题,但这绝不是最优雅的方式,接下来我们就来见识一下 JUC 中的各种高效线程同步工具。

锁的原理

线程同步和锁

线程同步不是必须的,但是不同步的情况下很有可能会导致数据不一致或其他不可预料的问题。
线程同步是让线程之间互相协调的机制,协调的前提是互相知道对方的存在,以计算机的角度来看,就是选定一块多线程均可访问的存储空间,或者令它们以某种机制互相通信。按存储区域的类别来分,有以下几种同步机制:

  • 内存
    共享内存(进程内存)、锁(Lock)、互斥量(Mutex)、原子变量(Atomic)、阻塞队列(BlockingQueue)等。
  • 磁盘
    共享文件、消息队列等。
  • 网络
    Socket 等。

像 Java 中 Thread 的 wait、notify 的实现基本也可以使用内存共享来解释,只不过它们是 native 方法,实现由平台说了算;volatile 关键字不能算是同步机制,因为它实际上无法保证线程安全性。

乐观锁和悲观锁

乐观锁和悲观锁是在数据库中使用的名词。

  • 悲观锁
    悲观锁指对数据被外界修改持保守态度,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制,数据库中实现是对数据记录操作前给记录加排它锁。如果获取锁失败,则说明数据正在被其它线程修改,则等待或者抛出异常。如果加锁成功,则获取记录,对其修改,然后事务提交后释放排它锁。
    使用悲观锁的一个常用的例子: select * from 表 where .. for update;
  • 乐观锁
    乐观锁是相对悲观锁来说的,它认为数据一般情况下不会造成冲突,所以在访问记录前不会加排它锁,而是在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测。具体说是根据 update 返回的行数让用户决定如何去做。
    例如:update 表 set comment='***',status='operator',version=version+1 where version = 1 and id = 1;
    乐观锁并不会使用数据库提供的锁机制,一般在表添加 version 字段或者使用业务状态来做。乐观锁直到提交的时候才去锁定,所以不会产生任何锁和死锁。

死锁

死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的互相等待的现象,在无外力作用的情况下,这些线程会一直相互等待而无法继续运行下去。

死锁产生的条件

死锁的产生必须满足以下 4 个必要条件:

  • 互斥条件:指线程对已经获取到的资源进行排它性使用,即该资源同时只由一个线程占用。如果此时还有其它进行请求获取该资源,则请求者只能等待,直至占有资源的线程用毕释放。
  • 请求并持有条件:指一个线程已经持有了至少一个资源,但又提出了新的资源请求,而新资源已被其其它线程占有,所以当前线程会被阻塞,但阻塞的同时并不释放自己已经获取的资源。
  • 不可剥夺条件:指线程获取到的资源在自己使用完之前不能被其它线程抢占,只有在自己使用完毕后由自己释放。
  • 环路等待条件:指在发生死锁时,必然存在一个线程——资源的环形链,即线程集合{T0,T1,T2,···,Tn}中的 T0 正在等待一个 T1 占用的资源;T1 正在等待 T2 占用的资源,……Tn 正在等待已被 T0 占用的资源。

举个形象的例子,两辆车在窄道上相向而行,他们占着道不肯退开,又希望对方能让开,最终形成了死锁。
例 1 - 死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
线程A:
synchronized(resourceA) {
sleep(1s)
synchronized(resourceB) {
// do sth...
}
}

线程B:
synchronized(resourceB) {
sleep(1s)
synchronized(resourceA) {
// do sth...
}
}

首先资源 resourceA 和 resourceB 都是互斥资源,当线程 A 调用 synchronized(resourceA) 获取到 resourceA 上的监视器锁后释放前,线程 B 在调用 synchronized(resourceA) 尝试获取该资源会被阻塞,只有线程 A 主动释放该锁,线程 B 才能获得,这满足了资源互斥条件
线程 A 首先通过 synchronized(resourceA) 获取到 resourceA 上的监视器锁资源,然后通过 synchronized(resourceB) 等待获取到 resourceB 上的监视器锁资源,这就满足了持有并等待
线程 A 在获取 resourceA 上的监视器锁资源后,不会被线程 B 掠夺走,只有线程 A 自己主动释放 resourceA 的资源时候,才会放弃对该资源的持有权,这满足了资源的不可剥夺条件
线程 A 持有 objectA 资源并等待获取 objectB 资源,而线程 B 持有 objectB 资源并等待 objectA 资源,这满足了循环等待条件
所以线程 A 和 B 就形成了死锁状态。

死锁避免

要想避免死锁,需要破坏构造死锁必要条件的至少一个即可,但是目前只有持有并等待和循环等待是可以被破坏的。
其实,实现死锁避免最简单的方案就是控制资源申请的有序性。

阻塞锁

阻塞锁,可以说是让线程进入阻塞状态进行等待,当获得相应的信号(唤醒,时间) 时,才可以进入线程的准备就绪状态,准备就绪状态的所有线程,通过竞争,进入运行状态。
JAVA 中,能够进入\退出、阻塞状态或包含阻塞锁的方法有:synchronized 关键字(其中的重量锁), ReentrantLock, Object.wait() / notify(), LockSupport.park() / unpart()

阻塞锁的优势在于,阻塞的线程不会占用 cpu 时间,不会导致 CPU 占用率过高,但进入时间以及恢复时间都要比自旋锁略慢。
在竞争激烈的情况下阻塞锁的性能要明显高于自旋锁。
理想的情况:在线程竞争不激烈的情况下使用自旋锁,竞争激烈的情况下使用阻塞锁。

独占锁和共享锁

根据锁只能被单个线程持有还是能被多个线程共同持有,锁分为独占锁和共享锁。
独占锁保证任何时候都只有一个线程能得到锁,ReentrantLock 就是以独占方式实现的。共享锁则同时有多个线程可以持有,例如 ReadWriteLock 读写锁,它允许一个资源可以被多线程同时进行读操作。
独占锁是一种悲观锁,每次访问资源都先加上互斥锁,这限制了并发性,因为读操作并不会影响数据一致性,而独占锁只允许同时一个线程读取数据,其它线程必须等待当前线程释放锁才能进行读取。
共享锁则是一种乐观锁,它放宽了加锁的条件,允许多个线程同时进行读操作。

公平性

锁有公平和非公平之分:

  • 公平锁:每个线程按调用 lock 的顺序来获取锁,不可插队;
  • 非公平锁:每个线程获取锁的顺序不确定,存在插队的问题。

平时使用的 ReentrantLock 默认的 lock()就是非公平锁,它会先尝试用 CAS 获取一次锁,若获取不到才进入一个队列等待锁的释放。
Semaphore 中,tryAcquire 是非公平的而 acquire 是公平的。
在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销。

可重入性

当一个线程要获取一个被其它线程持有的独占锁时候,该线程会被阻塞,那么当一个线程再次获取它自己已经获取的锁时是否会被阻塞呢?如果不被阻塞,那么我们说该锁是可重入的,也就是只要该线程获取了该锁,那么可以无限制次数(高级篇我们会知道严格来说是有限次数)进入被该锁锁住的代码。

可重入锁原理

可重入锁的原理是在锁内部维护了一个线程标示,用来标示该锁目前被那个线程占用,然后关联一个计数器。一开始计数器值为 0,说明该锁没有被任何线程占用,当一个线程获取了该锁,计数器会变成 1,其它线程在获取该锁时候发现锁的所有者不是自己就会被阻塞挂起。
但是当获取该锁的线程再次获取锁时候发现锁拥有者是自己,就会把计数器值+1, 当释放锁后计数器会-1,当计数器为 0 时候,锁里面的线程标示重置为 null,这时候阻塞的线程会获取被唤醒来竞争获取该锁。

synchronized(监视器锁)是可重入的

1
2
3
4
5
6
Object lock = new Object();
synchronized (lock) {
synchronized (lock) {
System.out.println("reentranted!");
}
}

ReentrantLock 是可重入的

废话,名字里 Reentrant 就是可重入的意思。

1
2
3
4
5
6
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.lock();
System.out.println("reentranted!");
lock.unlock();
lock.unlock();

ReentrantReadWriteLock

加锁分三种情况:

  • 读锁本身定义就支持重入;
  • 写锁支持重入
    1
    2
    3
    4
    5
    6
    ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
    rwlock.writeLock().lock();
    rwlock.writeLock().lock();
    System.out.println("reentranted!");
    rwlock.writeLock().unlock();
    rwlock.writeLock().unlock();
  • 读锁和写锁同时使用会死锁
    1
    2
    3
    4
    5
    6
    ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
    rwlock.readLock().lock();
    rwlock.writeLock().lock();
    System.out.println("reentranted!");
    rwlock.writeLock().unlock();
    rwlock.readLock().unlock();

抢占

TODO

SpinLock(自旋锁)

自旋锁是采用让当前线程不停地的在循环体内执行实现的,当循环的条件被其他线程改变时才能进入临界区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class SpinLock {
private AtomicReference<Thread> sign = new AtomicReference<>();
public void lock(){
final Thread current = Thread.currentThread();
// 当sign的值为null时才设置为current
while(!sign.compareAndSet(null, current)){
}
}
public void unlock (){
Thread current = Thread.currentThread();
sign.compareAndSet(current, null);
}
}
public class LockTest {
SpinLock spinLock = new SpinLock();
@Test
public void test() {
Runnable runnable = () -> {
spinLock.lock();
Random random = new Random();
System.out.println(Thread.currentThread().getName());
spinLock.unlock();
};
new Thread(runnable).start();
new Thread(runnable).start();
}
}

由于自旋锁只是将当前线程不停地执行循环体,不进行线程状态的改变,所以响应速度更快。
但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用 CPU 时间。
如果线程竞争不激烈,并且保持锁的时间短,适合使用自旋锁。
该例子为非公平锁,获得锁的先后顺序不会按照进入 lock 的先后顺序进行。

锁算法 - CLHLock(Craig, Landin, and Hagersten locks)

使用链表协调每个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class CLHLock {

public static class CLHNode {
// 刚开始当前线程是锁住的
private volatile Thread isLocked;
// boolean方案、private volatile boolean isLocked = true;
}

@SuppressWarnings("unused")
private volatile CLHNode tail;
private static final ThreadLocal<CLHNode> LOCAL = new ThreadLocal<>();
private static final AtomicReferenceFieldUpdater<CLHLock, CLHNode> UPDATER =
AtomicReferenceFieldUpdater.newUpdater(CLHLock.class, CLHNode.class, "tail");

public void lock() {
CLHNode node = new CLHNode();
LOCAL.set(node);
// 原子更新当前CLHLock对应的值,并返回旧值
// getAndSet使自己成为队列的尾部,同时获取其前驱节点的引用
// 如果是第一个调用lock的,直接返回,如果不是,则等待上一个unlock
CLHNode preNode = UPDATER.getAndSet(this, node);
if (preNode != null) {
// boolean方案:while(preNode.isLocked) {}
preNode.isLocked = Thread.currentThread();
LockSupport.park(this);
preNode = null;
LOCAL.set(node);
}
}

public void unlock() {
CLHNode node = LOCAL.get();
if (!UPDATER.compareAndSet(this, node, null)) {
// boolean方案:node.isLocked = false;
System.out.println("unlock\t" + node.isLocked.getName());
LockSupport.unpark(node.isLocked);
}
node = null;
}
}

在上述代码中注明了另一种 boolean 实现方案,boolean 方案看起来会直观一点。

JUC ReentrantLock 默认内部使用的锁 即是 CLH 锁(有很多改进的地方,将自旋锁换成了阻塞锁等等)。
CLH 队列锁的优点是空间复杂度低(如果有 n 个线程,L 个锁,每个线程每次只获取一个锁,那么需要的存储空间是 O(L+n),n 个线程有 n 个 Node,L 个锁有 L 个 tail),CLH 的一种变体被应用在了 JAVA 并发框架中(AbstractQueuedSynchronizer.Node)。
CLH 在 SMP 系统结构下是非常有效的,但在 NUMA 系统结构下,每个线程有自己的内存,而 CLHLock 会不停地查询前驱变量,如果前趋结点的内存位置比较远,自旋判断前趋结点的 locked 域,性能将大打折扣,一种解决 NUMA 系统结构的思路是 MCS 队列锁。

锁算法 - MCSLock

MCSLock 则是对本地变量的节点进行循环,不会出现 CLHLock 的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MCSLock {

public static class MCSNode {

volatile MCSNode next;
volatile boolean isLocked = true;
}

private static final ThreadLocal<MCSNode> NODE = new ThreadLocal<MCSNode>();
@SuppressWarnings("unused")
private volatile MCSNode queue;
private static final AtomicReferenceFieldUpdater<MCSLock, MCSNode> UPDATER = AtomicReferenceFieldUpdater.newUpdater(MCSLock.class,
MCSNode.class, "queue");

public void lock() {
MCSNode currentNode = new MCSNode();
NODE.set(currentNode);
MCSNode preNode = UPDATER.getAndSet(this, currentNode);
if (preNode != null) {
preNode.next = currentNode;
while (currentNode.isLocked) {

}
}
}

public void unlock() {
MCSNode currentNode = NODE.get();
if (currentNode.next == null) {
if (!UPDATER.compareAndSet(this, currentNode, null)) {
while (currentNode.next == null) {
}
}
} else {
currentNode.next.isLocked = false;
currentNode.next = null;
}
}
}

CLH 的队列是隐式的队列,没有真实的后继结点属性。
MCS 的队列是显式的队列,有真实的后继结点属性。

锁算法 - TicketLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class TicketLock {

private AtomicInteger serviceNum = new AtomicInteger();
private AtomicInteger ticketNum = new AtomicInteger();
private static final ThreadLocal<Integer> LOCAL =
new ThreadLocal<Integer>();

public void lock() {
// 得到门票
int myticket = ticketNum.getAndIncrement();
LOCAL.set(myticket);
// 如果当前叫票和自己的票号不同
while (myticket != serviceNum.get()) {
}
}

public void unlock() {
int myticket = LOCAL.get();
// compareAndSet内部会循环判断当前值是否为myticket,然后才执行赋值操作
serviceNum.compareAndSet(myticket, myticket + 1);
}

@Test
public void test() throws InterruptedException {
// TicketLock保存在主线程内,子线程每次lock拿到一张票,
// 此时别的线程unlock,直到票数增长到自己的票号
TicketLock ticketLock = new TicketLock();
Runnable runnable = () -> {
ticketLock.lock();
System.out.println(Thread.currentThread().getName());
ticketLock.unlock();
};
for (int i = 0; i < 100; i++) {
// 线程的执行顺序和每个线程调用lock的顺序相同
new Thread(runnable).start();
}
}
}

TicketLock,主要解决的是访问顺序的问题,主要的问题是在多核 cpu 上。
缺点是每次都要查询一个 serviceNum 服务号,影响性能(必须要到主内存读取,并阻止其他 cpu 修改)。

内置锁 - synchronized

内置锁在Java中被抽象为监视器锁(monitor)。在 JDK 1.6 之前,监视器锁可以认为直接对应底层操作系统中的互斥量(mutex)
在JDK1.6之前,内置锁的成本较高,包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等,因此被称为重量级锁,但是 JDK1.6 内置锁引入了升级策略。

自旋锁

优势:减少了线程阻塞时的线程上下文切换开销,锁的粒度越小这种性能提升会越明显,因为锁阻塞造成线程切换的时间与锁持有的时间相当。
缺点:自旋锁会额外占用CPU,如果是计算密集型任务,这一优化通常会得不偿失,线程多而处理器少的时候尤其如此。
如果锁粒度较大且竞争激烈,会导致长时间的自旋等待,白白浪费自旋占用的 CPU 时间。

使用-XX:-UseSpinning 参数关闭自旋锁优化;-XX:PreBlockSpin 参数修改默认的自旋次数。

自适应自旋锁

普通自旋锁的自旋时间是固定的,但有时可以做些调整,比如一个锁之前被成功加锁成功过那么这次也很有可能被自旋获取成功。
自适应意味着自旋的时间不再固定,而是由前一次在同一个锁上的自旋时间及锁持有者的状态来决定的:

  • 如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100个循环。
  • 相反的,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能减少自旋时间甚至省略自旋过程,以避免浪费处理器资源。

优势:解决了锁竞争时间不确定的问题,自适应自旋假定不同线程持有同一个锁对象的时间基本相当,即竞争程度趋于稳定,因此可以根据上一次自旋的时间与结果调整下一次自旋的时间。
缺点:自适应自旋没能彻底解决锁竞争时间不确定的问题,如果默认的自旋次数设置不合理,那么自适应的过程将很难收敛到合适的值。

轻量级锁

自旋锁的目标是降低线程切换的成本,如果锁竞争激烈,则不得不依赖重量级锁,但是还有一种情况锁竞争并不激烈,只是锁被持有的时间较长,导致其他线程自旋获取锁的时间较长,此时并没有实际的锁竞争,轻量级锁的目标就是在这种情况下,减少使用重量级锁产生的性能消耗,包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。
轻量级锁是相对于重量级锁而言的。使用轻量级锁时,不需要申请互斥量,仅仅将对象头Mark Word中的部分字节通过 CAS 更新成指向线程栈中的 Lock Record,如果更新成功,则轻量级锁获取成功,记录锁状态为轻量级锁;否则,说明已经有线程获得了轻量级锁,目前发生了锁竞争(不适合继续使用轻量级锁),接下来膨胀为重量级锁。
优势:在不存在锁竞争的场景下,减少使用重量级锁产生的性能消耗。
缺点:如果锁竞争激烈,那么轻量级锁很快会膨胀为重量级锁,这种情况下维持轻量级锁的开销就浪费了。

偏向锁

在没有实际竞争的情况下,还能够针对部分场景继续优化:自始至终都只有一个线程尝试获取该锁。轻量级锁每次申请、释放锁都至少需要一次 CAS,但偏向锁只有初始化时需要一次 CAS,在 Mark Word 中 CAS 记录 owner,如果记录成功,则偏向锁获取成功,记录锁状态为偏向锁,以后当前线程等于owner就可以零成本的直接获得锁;否则,说明有其他线程竞争,膨胀为轻量级锁。
优势:减少了轻量级锁执行CAS的开销。
缺陷:如果明显存在其他线程申请同一个锁,那么偏向锁将很快膨胀为轻量级锁。

如果需要,可以使用参数-XX:-UseBiasedLocking禁止偏向锁优化(默认打开)。

重量级锁

重量级锁可以作为降级方案,在自旋几次后使用重量级锁阻塞,既减少了线程切换,又不会因为长时间自旋导致 CPU 被打满。

锁粗化

大部分情况下我们都倾向于将同步块的作用范围限制得尽可能小——只在共享数据的实际作用域中才进行同步,这样是为了使得需要同步的操作数尽可能少,当发生锁竞争时等待锁的线程能尽可能快地拿到锁。
但是如果一系列的操作都对同一个对象反复加锁和释放,甚至这样的情况是发生在循环体中的,那么即使没有线程竞争,频繁地进行加锁也会导致不必要的性能损耗。虚拟机如果探测到有这样一串零碎的操作都对同一个对象加锁,就会将加锁同步的范围粗化到整个操作序列的外面。

锁分配和膨胀策略

总而言之,内置锁具有一下三种加锁策略:

  • 偏向锁:无实际竞争,且将来只有第一个申请锁的线程会使用锁。
  • 轻量级锁:无实际竞争,多个线程交替使用锁;允许短时间的锁竞争。
  • 重量级锁:有实际竞争,且锁竞争时间长。

如果锁竞争时间短,可以使用自旋锁进一步优化轻量级锁、重量级锁的性能,减少线程切换。
如果锁竞争程度逐渐提高(缓慢),那么从偏向锁逐步膨胀到重量锁,能够提高系统的整体性能。
但是内置锁不存在反向的“收缩”过程,这基于JVM的另一个假定,“一旦破坏了上一级锁的假定,就认为该假定以后也必不成立”。

内置锁的实现基于对象头,对象头主要包含两部分数据:Mark Word(标记字段)Class Pointer(类型指针)

  • Class Pointer是对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象属于哪个类的实例;
  • Mark Word用于存储对象自身的运行时数据,它是实现轻量级锁和偏向锁的关键。
    MarkWord存储结构

内置锁的分配策略是膨胀式的,偏向锁、轻量级锁、重量级锁,下图来源网上,对内置锁的加锁流程描述比较全:
锁分配和膨胀过程
上图来自参考[1],由图可知锁膨胀的具体历程:

  1. 从偏向锁到轻量级锁
    判断锁状态,刚开始是偏向锁(值为 01),这时就可以通过 CAS 设置线程 ID 到对象头。
    刚开始只有第一个申请偏向锁的线程能够申请成功,并在对象头处设置 owner 属性,后续的线程尝试通过 CAS 操作设置 owner 都是必然失败的,因此不能获取成功。
    线程申请偏向锁并执行结束后,会重新将对象头中的线程 ID 清除,之后其他线程就又可以重新获取偏向锁了。
    当尝试 CAS 设置线程 ID 失败时会开始执行锁升级,升级到轻量级锁。
  2. 从轻量级锁到重量级锁
    锁升级时可能此时正有另一个线程正持有偏向锁,这时我们要记对应线程为锁的持有者:先在持有线程的栈中分配锁记录,然后设置对象头中的锁记录指针指向该锁记录。
    锁竞争就是CAS设置对象头中的锁记录指针指向当前线程的锁记录。
    锁释放就是清除对象头中的锁记录指针,之后唤醒其他正在等待的线程。
    当锁竞争经过一定自旋次数后仍然没有获取成功时,将锁升级为重量级锁。

synchronized锁膨胀的简化版过程

可重入锁 - ReentrantLock

ReentrantLock 是可重入的独占锁,同时获取锁的线程中只有一个能获取成功,其余的线程会被阻塞后放入该锁的 AQS 阻塞队列里。
ReentrantLock类图
ReentrantLock执行流程

ReentrantLock vs synchronized

  • 粒度和灵活度:ReentrantLock 优于 synchronized;
  • 性能:ReentrantLock 的原理是自旋锁,通过循环 CAS 操作来加锁。随着 JDK 后来版本的优化,synchronized 同样引入了偏向锁和轻量级锁(自旋锁),两者性能相差不多。

    这种优化减少了线程在用户态和内核态之间的切换。

  • ReentrantLock 可以指定是公平锁还是非公平锁。而 synchronized 只能是非公平锁。
  • ReenTrantLock 提供了一个 Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程。
  • ReenTrantLock 提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly()来实现这个机制。

公平性

ReentrantLock 支持两种获取锁的方式,一种是公平模型,一种是非公平模型。
默认情况下,ReentrantLock 是非公平的,可以通过构造方法来构建公平的锁。

1
2
3
4
5
6
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

NonfairSyncFairSync都是AQS的实现,分别实现了公平和非公平策略。
在这里 AQS 的状态值 state 代表线程获取该锁的可重入次数,默认情况下 state 的值为 0 标示当前锁没有被任何线程持有,当一个线程第一次获取该锁时候会使用尝试使用 CAS 设置 state 的值为 1,如果 CAS 成功则当前线程获取了该锁,然后记录该锁的持有者为当前线程,在该线程没有释放锁第二次获取改锁后状态值被为 2,这就是可重入次数,在该线程释放该锁的时候,会尝试使用 CAS 让状态值减一,如果减一后状态值为 0 则当前线程释放该锁。

获取非公平锁

ReentrantLock.lock:默认获取非公平锁
-> ReentrantLock.NonfairSync.lock CAS 设置锁状态(state),设置锁的持有者为当前线程,然后调用 AQS acquire

1
2
3
4
5
6
7
8
final void lock() {
//(1)CAS设置状态值
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//(2)调用AQS的acquire方法
acquire(1);
}

-> AbstractQueuedSynchronizer.acquire:tryAcquire 尝试获取锁,获取失败则 acquireQueued 添加到 AQS 队列

1
2
3
4
5
6
7
public final void acquire(int arg) {
// 调用ReentrantLock重写的tryAcquire方法
if (!tryAcquire(arg) &&
// tryAcquire返回false会把当前线程放入AQS阻塞队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

-> ReentrantLock.NonfairSync.tryAcquire:tryAcquire需要子类重写,在ReentrantLock中其主要功能是修改state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前AQS状态值为0,说明锁是空闲的
if (c == 0) {
// CAS尝试将state修改为1
if (compareAndSetState(0, acquires)) {
// 设置当前锁的持有者为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程已是该锁持有者
else if (current == getExclusiveOwnerThread()) {
// 状态值增加1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这里的非公平策略在获取锁的时候并没有考虑AQS队列中是否已经存在等待中的线程。

获取公平锁

ReentrantLock.lock(true):显式指定采用公平策略。
-> ReentrantLock.FairSync.lock
-> AbstractQueuedSynchronizer.acquire
-> ReentrantLock.FairSync.tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平策略:队列中没有线程已在等待
if (!hasQueuedPredecessors() &&
// CAS修改state,即加锁
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是该锁的持有者
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

/**
* 这个方法用于保证公平性
* 但是仍不能排除竞争的情况,比如这个方法返回false后立刻就有另一个线程入队了
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h==t说明当前队列为空
return h != t &&
// s==null说明有一个元素将作为AQS的首节点入队,或者s.thread != Thread.currentThread() 说明队列里已经有线程在等待
((s = h.next) == null || s.thread != Thread.currentThread());
}

lockInterruptibly 对中断的响应

ReentrantLock 提供了可中断的加锁 API,比如 lockInterruptibly 能够在当前线程被其他线程调用 interrupt 方法中断后直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 检查中断标志位,已中断则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取资源
if (!tryAcquire(arg))
// AQS的可被中断方法
doAcquireInterruptibly(arg);
}

tryLock 尝试获取锁

tryLock 和 lock 接口的主要区别是 tryLock 是非阻塞的,它尝试获取锁失败后,并不会将线程放入队列中等待。
具体代码和 lock 差别不大,这里不再赘述。
tryLock 还有一个可以设置最长等待时间的接口,每次尝试获取锁失败后,会通过LockSupport.parkNanos来挂起指定时间,当条件变量释放或锁被释放时会调用LockSupport.unpark来尝试唤醒。

unlock 释放锁

尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的 AQS 状态值减一,如果减去 1 后当前状态值为 0 则当前线程会释放对该锁的持有,否则仅仅减一。如果当前线程没有持有该锁调用了该方法则会抛出 IllegalMonitorStateException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void unlock() {
sync.release(1);
}

protected final boolean tryRelease(int releases) {
// 如果不是锁持有者调用unlock则抛出异常。
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果当前可重入次数为0,则清空锁持有线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置可重入次数为原始值-1
setState(c);
return free;
}

读写锁 - ReentrantReadWriteLock

ReentrantReadWriteLock类图

  • firstReader 用来记录第一个获取到读锁的线程;
  • firstReaderHoldCount 则记录第一个获取到读锁的线程获取读锁的可重入数;
  • cachedHoldCounter 用来记录最后一个获取读锁的线程获取读锁的可重入次数。
    1
    2
    3
    4
    5
    static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
    }
  • readHolds 是 ThreadLocal 变量,用来存放除去第一个获取读锁线程外的其它线程获取读锁的可重入数, 可知 ThreadLocalHoldCounter 继承了 ThreadLocal,里面 initialValue 方法返回一个 HoldCounter 对象。
    1
    2
    3
    4
    5
    6
    static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
    return new HoldCounter();
    }
    }

AQS 只维护了一个 state 状态变量,ReentrantReadWriteLock 利用其高 16 位表示读状态也就是获取该读锁的线程个数,低 16 位表示获取到写锁的线程的可重入次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final int SHARED_SHIFT   = 16;

// 共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享锁线程最大个数65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

// 排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 返回读锁线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回写锁可重入个数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

writeLock.lock 获取写锁

写锁是个独占锁,同时只有一个线程可以获取该锁。 如果当前没有线程获取到读锁和写锁则当前线程可以获取到写锁然后返回。如果当前已经有线程取到读锁和写锁则当前线程则当前请求写锁线程会被阻塞挂起。
另外写锁是可重入锁,如果当前线程已经获取了该锁,再次获取的只是简单的把可重入次数加1后直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void lock() {
sync.acquire(1);
}
/**
* AbstractQueuedSynchronizer.acquire
*/
public final void acquire(int arg) {
// sync重写的tryAcquire方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* ReentrantReadWriteLock.Sync.tryAcquire
*/
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
// AQS的state变量
int c = getState();
// 写锁已被获取的次数
int w = exclusiveCount(c);
// c!=0说明读锁或者写锁已经被某线程获取
if (c != 0) {
// w=0说明已经有线程获取了读锁或者w!=0并且当前线程不是写锁拥有者,则返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 说明某线程获取了写锁,判断可重入个数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// 设置可重入数量
setState(c + acquires);
return true;
}

// 此时没有线程获取到读锁和写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

对于非公平锁来说,获取写锁时不需要阻塞,writerShouldBlock的实现为:

1
2
3
final boolean writerShouldBlock() {
return false; // writers can always barge
}

公平锁中的实现为:

1
2
3
4
final boolean writerShouldBlock() {
// 当前线程节点是否有前驱节点
return hasQueuedPredecessors();
}

writeLock.lockInterruptibly

可中断的,如果其他线程调用了当前线程的interrupt方法,则当前线程抛出InterruptedException异常。

writeLock.tryLock

尝试获取写锁:

  • 如果当前线程已经持有了该写锁则简单增加 AQS 的状态值后直接返回 true;
  • 如果当前没有其它线程持有写锁或者读锁,则当前线程获取写锁会成功;
  • 如果当前已经其它线程持有写锁或者读锁则获取失败,当前线程并不会被阻塞。

writeLock.tryLock(long timeout, TimeUnit unit)

  • 尝试获取一次写锁,如果获取失败则等待一段时间后再尝试获取一次,如果失败则返回false;
  • 可以响应中断,被interrupt中断后抛出InterruptedException异常。

writeLock.unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void unlock() {
sync.releaseShared(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
// 激活队列里下一个线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
// 是否是写锁调用的unlock(exclusively表示互斥的)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 写锁负责state的低16位,表示可重入值,这里没有考虑高16位,因为写锁时候读锁状态值肯定为0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果写锁可重入值为0则释放锁,否则只是简单更新状态值
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

ReadLock.lock

读锁是共享锁,多个线程可以同时获取读锁,因此为了计算每个线程的重入次数,不能单纯使用state计数,而是另外用一个ThreadLocal类型的readHolds来记录每个线程获取读锁的次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public void lock() {
sync.acquireShared(1);
}

public final void acquireShared(int arg) {
// 调用ReentrantReadWriteLock中的sync的tryAcquireShared方法
if (tryAcquireShared(arg) < 0)
// 调用AQS的doAcquireShared方法,把当前线程放入阻塞队列
doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {

// 获取当前状态值
Thread current = Thread.currentThread();
int c = getState();

// 判断是否写锁被占用
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;

// 获取读锁计数
int r = sharedCount(c);
// 尝试获取锁,readerShouldBlock返回true表示有线程正在获取写锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试
if (!readerShouldBlock() &&
// 当前获取读锁线程是否达到了最大值
r < MAX_COUNT &&
// 设置state的高16位值增加1
compareAndSetState(c, c + SHARED_UNIT)) {
// 第一个线程获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 当前线程是第一个获取读锁的线程,并统计该线程获取读锁的可重入数
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 记录最后一个获取读锁的线程并同时cachedHoldCounter记录其获取读锁的可重入数(表示最后一个获取到读锁的线程的可重入数)
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// readHolds记录当前线程获取读锁的可重入数
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 类似tryAcquireShared,但是是自旋获取
return fullTryAcquireShared(current);
}

final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}

// 如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

ReadLock.lockInterruptibly

可响应中断,被调用interrupt后会抛出InterruptedException

ReadLock.tryLock

  • 尝试获取读锁,如果当前没有其它线程持有写锁,则当前线程获取写锁会成功,然后返回 true
  • 如果当前已经其它线程持有写锁则该方法直接返回 false,当前线程并不会被阻塞。
  • 如果其它获取当前线程已经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true。

ReadLock.tryLock(long timeout, TimeUnit unit)

与 tryLock()不同在于多了超时时间的参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到读锁则返回 false。另外该方法对中断响应, 也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException

ReadLock.unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void unlock() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果当前线程是第一个获取读锁线程
if (firstReader == current) {
// 如果可重入次数为1
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 否则可重入次数减去1
firstReaderHoldCount--;
} else {
// 如果当前线程不是最后一个获取读锁线程,则从threadlocal里面获取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
// 如果可重入次数<=1则清除threadlocal
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 可重入次数减去一
--rh.count;
}

// 循环直到自己的读计数-1,cas更新成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))

return nextc == 0;
}
}

Condition - 条件变量

ReentrantReadWriteLock 的 WriteLock 类似 ReentrantLock,提供了条件变量的支持,但是 ReadLock 并不支持,如果调用ReentrantReadWriteLock.ReadLock.newCondition获取状态变量会抛出UnsupportedOperationException异常。

StampedLock - 签锁

StampedLock 包含了三种并发控制模式:

  • writeLock(写锁),是个排它锁或者叫独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其它请求读锁和写锁的线程必须等待,类似于 ReentrantReadWriteLock 的写锁(不同在于这里的写锁是不可重入锁);当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个 stamp 票据变量用来表示该锁的版本,当释放该锁时候需要调用 unlockWrite 方法并传递获取锁时候的 stamp 参数。并且提供了非阻塞的 tryWriteLock 方法。
  • readLock(悲观读锁),是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁; 如果已经有线程持有写锁,其它线程请求获取该读锁会被阻塞,这类似 ReentrantReadWriteLock 的读锁(不同在于这里的读锁是不可重入锁)。这里说的悲观是指在具体操作数据前悲观的认为其它线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑, 请求该锁成功后会返回一个 stamp 票据变量用来表示该锁的版本,当释放该锁时候需要 unlockRead 并传递参数 stamp。并且提供了非阻塞的 tryReadLock。
  • tryOptimisticRead(乐观读锁),是相对于悲观锁来说的,在操作数据前并没有通过 CAS 设置锁的状态,仅仅是通过位运算测试;如果当前没有线程持有写锁,则简单的返回一个非 0 的 stamp 版本信息,获取该 stamp 后在具体操作数据前还需要调用 validate 验证下该 stamp 是否已经不可用,也就是看当调用 tryOptimisticRead 返回 stamp 后,到当前时间间是否有其它线程持有了写锁,如果是那么 validate 会返回 0,否则就可以使用该 stamp 版本的锁对数据进行操作。由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显示的释放该锁。该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及 CAS 操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其它写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。

提供了锁的升级策略,long tryConvertToWriteLock(long stamp)期望把 stamp 标示的锁升级为写锁,这个函数会在下面几种情况下返回一个有效的 stamp(也就是晋升写锁成功):

  • 当前锁已经是写锁模式了。
  • 当前锁处于读锁模式,并且没有其他线程是读锁模式
  • 当前处于乐观读模式,并且当前写锁可用。

stamp 标识了一个锁:

  • 会在每次调用加锁方法时返回;
  • 其中 try 系列获取锁的函数,当获取锁失败后会返回为 0 的 stamp 值;
  • 当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的 stamp 值

不可重入

StampedLock 的读写锁都是不可重入锁,所以同一线程不应重复获取同一个锁,以避免产生死锁。

公平性

StampedLock 没有使用AQS,而是自己内部维护了一个双向阻塞队列。

StampedLock vs ReentrantReadWriteLock

StampedLock 与 ReentrantReadWriteLock 的主要区别是乐观读锁,因为 StampedLock 的乐观读写只是检测 stamp 状态、不需要通过 CAS 修改锁的状态,所以在多线程多读的情况下能提供更好的性能。

乐观读锁的使用

乐观读锁的使用容易出错,需要保证按照如下的顺序获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 非阻塞获取版本信息
long stamp = lock.tryOptimisticRead();
// 拷贝变量到线程本地堆栈
copyVaraibale2ThreadMemory();
// 校验
if(!lock.validate(stamp)){
// 获取读锁
long stamp = lock.readLock();
try {
// 拷贝变量到线程本地堆栈
copyVaraibale2ThreadMemory();
} finally {
// 释放悲观锁
lock.unlock(stamp);
}

}
// 使用线程本地堆栈里面的数据进行操作
useThreadMemoryVarables();

实例 - 乐观读锁、读锁、写锁及升级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Point {

/**
加锁的语义保证了内存可见性,因此成员变量不必声明为volatile
private double x, y;

private final StampedLock sl = new StampedLock();

/**
* 获取写锁
*/
void move(double deltaX, double deltaY) {
// 写锁(排他锁)
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

/**
* 尝试获取乐观读锁
*/
double distanceFromOrigin() {
// 尝试获取乐观读锁
long stamp = sl.tryOptimisticRead();
// 将全部变量拷贝到方法栈内
double currentX = x, currentY = y;
// 检查stamp,确认没有被其他写线程占用写锁,因为并发读写可能会导致上边获取到的x、y不一致
if (!sl.validate(stamp)) {
// 如果被抢占则获取一个悲观读锁,如果此时其他线程持有写锁则当前线程会阻塞直到写锁被释放
stamp = sl.readLock();
try {
// 将全部变量拷贝到方法栈内
currentX = x;
currentY = y;
} finally {
// 释放共享读锁
sl.unlockRead(stamp);
}
}
// 返回计算结果,使用的是方法栈中的变量值,成员变量x、y可能已经被其他线程修改
return Math.sqrt(currentX * currentX + currentY * currentY);
}

/**
* 尝试获取悲观读锁,并尝试转换为写锁
*/
void moveIfAtOrigin(double newX, double newY) {
// 这里可以使用乐观读锁替换
long stamp = sl.readLock();
try {
// 如果当前点在原点则移动
while (x == 0.0 && y == 0.0) {
// 尝试将获取的读锁升级为写锁
long ws = sl.tryConvertToWriteLock(stamp);
// 升级成功,则更新票据,并设置坐标值,然后退出循环
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
// 读锁升级写锁失败,说明已被其他线程升级成功,则释放读锁,显示获取独占写锁,然后循环重试
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
// 释放锁
sl.unlock(stamp);
}
}
}

线程调度 和 LockSupport

挂起和唤醒是线程调度中和锁的实现最密切的操作,juc 中通过一个 LockSupport 来抽象这两种操作,它是创建锁和其它同步类的基础。

  • LockSupport 内部使用 Unsafe 类实现;
  • LockSupport 类与每个使用它的线程都会关联一个许可证,park实际上就是消耗许可的过程,unpark则是释放许可的过程。

相对Object中的wait/notify来说,LockSupport有以下优点:

  1. 使用wait/notify前必须先使用synchronized获取锁,LockSupport.park没有这个限制;
  2. 使用notify只能随机唤醒一个线程,LockSupport可以指定唤醒某个线程。

void park()

调用线程会被禁止参与线程的调度,也就是会被阻塞挂起

1
2
3
System.out.println("begin park");
LockSupport.park();
System.out.println("end park");
  • 可重入:如果调用 park() 的线程已经拿到了与 LockSupport 关联的permit(许可证),则调用 LockSupport.park() 会马上返回。
  • 在其它线程调用 unpark(Thread thread)方法并且当前线程作为参数时候,调用 park 方法被阻塞的线程会返回;
  • 其他线程调用 interrupt()设置中断标识后可以返回,不会抛出 InterruptedException 异常;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    Thread thread = new Thread(() -> {
    System.out.println("child thread begin park!");
    // 调用park方法,挂起自己
    LockSupport.park();
    System.out.println("child thread unpark!");
    });
    thread.start();
    Thread.sleep(1000);

  • 由于线程的虚假唤醒也会导致阻塞的线程返回,park 方法返回的时候不会告知调用者返回的原因,所以调用 park()最好也用循环条件判断方式。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ParkTest {
    public static void main(String[] args) throws InterruptedException {
    Thread child = new Thread(() -> {
    System.out.println("child thread park");
    while (!Thread.currentThread().isInterrupted()) {
    System.out.println("child thread parking");
    LockSupport.park();
    }
    System.out.println("child thread unpark");
    });
    child.start();
    Thread.sleep(1000);
    System.out.println("main thread interrupt");
    child.interrupt();
    }
    }

    输出:
    child thread park
    child thread parking
    main thread interrupt
    child thread unpark
    调用子线程的 interrupt 方法后,子线程中的 park 调用会返回,然后检查线程的中断标识,当然此时已中断因此 isInterrupted 为 true,于是退出循环、结束线程任务。这个例子中,如果 main 线程单纯只是调用 unpark(child)是无法让子线程退出的。

void unpark(Thread thread)

  • 如果 thread 之前调用了 park() 被挂起,则调用 unpark 后,该线程会被唤醒;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Thread thread = new Thread(() -> {
    System.out.println("child thread begin park!");
    // 调用 park 方法,挂起自己
    LockSupport.park();
    System.out.println("child thread unpark!");
    });
    thread.start();
    Thread.sleep(1000);
    System.out.println("main thread begin unpark!");
    //调用 unpark 让 thread 线程持有许可证,然后 park 方法会返回
    LockSupport.unpark(thread);
  • 如果 thread 之前没有调用 park,则调用 unPark 方法后,在调用 park() 方法,会立刻返回,且让该线程持有一个,可供下次调用 park 时获取。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static void main( String[] args ) {
    System.out.println( "begin park!" );
    //使当前线程获取到许可证
    LockSupport.unpark(Thread.currentThread());
    //再次调用 park
    LockSupport.park();
    System.out.println( "end park!" );
    }
    }

void parkNanos(long nanos)

和 park()差不多,只是在等待 nanos 时间后若还是没有拿到 permit 则 parkNanos 会直接返回。

void park(Object blocker)

blocker 可以记录线程调用 park 后被阻塞挂起的原因,这个对象是保存到当前线程 Thread 对象里的(Thread.parkBlocker),可以通过 LockSupport#getBlocker(Thread)获取。

1
2
3
4
5
6
7
8
9
10
public static void park(Object blocker) {
// 获取调用线程
Thread t = Thread.currentThread();
// 设置该线程的 blocker 变量
setBlocker(t, blocker);
// 挂起线程
UNSAFE.park(false, 0L);
// 线程被激活后清除 blocker 变量,因为一般都是线程阻塞时候才分析原因
setBlocker(t, null);
}

普通park()挂起后,查看线程堆栈如下:

1
2
3
4
5
"main" #1 prio=5 os_prio=31 tid=0x00007fc0f0800800 nid=0x1303 waiting on condition [0x000070000ee42000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at com.tallate.localcache.ParkTest.main(ParkTest.java:9)

而使用 park(this)挂起后,查看线程堆栈如下:

1
2
3
4
5
6
"main" #1 prio=5 os_prio=31 tid=0x00007ff1f6011000 nid=0x2603 waiting on condition [0x000070000ecba000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007956f1ef0> (a java.lang.Class)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.tallate.localcache.ParkTest.main(ParkTest.java:9)

可见,线程堆栈还提供了 blocker 对象的地址,这个 blocker 可以通过getBlocker(Thread)获取到,一些诊断工具可以通过这个对象来获取阻塞的具体原因。

void parkNanos(Object blocker, long nanos)

相对 park(Object blocker)多了个超时时间

void parkUntil(Object blocker, long deadline)

最多等待到某个时间就直接返回,deadline 的计算是从 1970 年开始的,比如如果要等待 1 秒钟,可以这么写:

1
2
3
System.out.println(System.currentTimeMillis());
LockSupport.parkUntil(System.currentTimeMillis() - new Date(0).getTime() + 1000);
System.out.println(System.currentTimeMillis());

内部是通过 UNSAFE.park 实现的:

1
2
// isAbsolute=true,time=deadline;表示到 deadline 时间时候后返回
UNSAFE.park(true, deadline);

使用 LockSupport 实现一个互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class FIFOLock {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
// 刚开始将当前线程放入队列中排队获取锁
waiters.add(current);

// 只有队首的线程可以获取锁,如果当前线程不在队首则继续等待
// 使用CAS操作加锁
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
// 挂起,等待其他线程调用unpark来唤醒当前线程
LockSupport.park(this);
// 注意中断也可以令park返回,如果被其他线程中断了,在中断标识还保留的情况下park是无法正常挂起的,所以这里需要先重置中断标识
if (Thread.interrupted()) {
wasInterrupted = true;
}
}

waiters.remove();
// 其实意思是其它线程中断了该线程,虽然我对中断信号不感兴趣,但是不代表其它线程对该标志不感兴趣,所以需要恢复
if (wasInterrupted) {
current.interrupt();
}
}

public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}

AbstractQueuedSynchronizer(AQS) - JUC 锁实现的基础

AQS 本质上是一个双向队列,不仅仅是锁,AQS 其实是 JUC 中大部分并发安全组件的实现基础。
AQS主要包含了两部分功能:

  • 基于CAS的同步控制功能;
  • 基于LockSupport的通知功能。

AQS 结构

AQS类图
AQS大致结构
AQS 是一个 FIFO 的双向队列,内部通过节点 headtail 记录队首和队尾元素,队列元素类型为 Node

  • 其中 Node 中 thread 变量用来存放进入 AQS 队列里面的线程,在ReentrantLock里即锁的持有者;
  • Node 节点内部 SHARED 用来标记该线程是获取共享资源时候被阻塞挂起后放入 AQS 队列,EXCLUSIVE 标示线程是获取独占资源时候被挂起后放入 AQS 队列;
  • waitStatus 记录当前线程等待状态,分别为 CANCELLED(线程被取消了),SIGNAL(线程需要被唤醒),CONDITION(线程在条件队列里面等待),PROPAGATE(释放共享资源时候需要通知其它节点);
  • prev 记录当前节点的前驱节点,next 记录当前节点后继节点。

state

AQS 中维持了一个单一的状态信息 state, 可以通过 getState, setState, compareAndSetState 函数修改其值,在不同的同步工具类中,这个state可能代表了不同的含义,需要自定义tryAcquire、tryRelease如何修改state的值来达到自己需要的目的:

  • 对于 ReentrantLock 的实现来说,state 可以用来表示当前线程获取锁的可重入次数,定义当 status 为 0 的时候标示锁空闲,为 1 的时候标示锁已经被占用。
    在重写 tryAcquire 时候,内部需要使用 CAS 算法看当前 status 是否为 0,如果为 0 则使用 CAS 设置为 1,并设置当前线程的持有者为当前线程,并返回 true, 如果 CAS 失败则 返回 false;
    实现 tryRelease 时候,内部需要使用 CAS 算法把当前 status 值从 1 修改为 0,并设置当前锁的持有者为 null,然后返回 true, 如果 cas 失败则返回 false。
  • 对应读写锁 ReentrantReadWriteLock 来说 state 的高 16 位表示读状态也就是获取该读锁的次数,低 16 位表示获取到写锁的线程的可重入次数;
    读写锁里面的读锁重写 tryAcquireShared 时候,首先看写锁是否被其它线程持有,如果是则直接返回 false,否则使用 cas 递增 status 的高 16 位,在 ReentrantReadWriteLock 中 status 的高 16 为获取读锁的次数。
    读锁在重写 tryReleaseShared 时候,内部需要使用 CAS 算法把当前 status 值的高 16 位减一,然后返回 true, 如果 cas 失败则返回 false。

    锁除了需要重写tryAcquiretryRelease之外,还需要重写isHeldExclusively以用来判断锁是被当前线程独占还是被共享。

  • 对于 Semaphore 来说 state 用来表示当前可用信号的个数;
  • 对于 FutuerTask 来说,state 用来表示任务状态(例如还没开始,运行,完成,取消);

    1.8 版本中 FutuerTask 取消了对 AQS 的依赖,改为通过 CAS 跟踪状态、Treiber stack 管理等待中的线程,是因为通过 AQS 的实现存在,这篇文章的最后提到了这个问题:线程阻塞(三),FutureTask,官方有一个Bug Report:JDK-8016247 : ThreadPoolExecutor may interrupt the wrong task

  • CountDownlatchCyclicBarrier 来说 state 用来表示计数器当前的值。

对于 AQS 来说线程同步的关键是对状态值 state 进行操作,根据 state 是否属于一个线程,操作 state 的方式分为独占模式共享模式

state和独占模式

1
2
3
4
// 独占模式
void acquire(int arg)
void acquireInterruptibly(int arg)
boolean release(int arg)

对于独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是那个线程获取到了,其它线程尝试操作 state 获取资源时候发现当前该资源不是自己持有的,就会获取失败后被阻塞。
比如独占锁 ReentrantLock 的实现,当一个线程获取了 ReentrantLock 的锁后,AQS 内部会首先使用 CAS 操作把 state 状态值从 0 变为 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时候发现当前线程就是锁的持有者则会把状态值从 1 变为 2 也就是设置可重入次数,当另外一个线程获取锁的时候发现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。

  1. 获取。当一个线程调用 acquire(int arg) 方法获取独占资源时候,会首先使用 tryAcquire 尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回。失败则将当前线程封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞队列尾部,并调用 LockSupport.park(this) 挂起当前线程。
    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  2. 释放。当一个线程调用 release(int arg) 时候会尝试使用 tryRelease 操作释放资源,这里是设置状态变量 state 的值,然后调用 LockSupport.unpark(thread) 激活 AQS 队列里面最早被阻塞的线程 (thread)。被激活的线程则使用 tryAcquire 尝试看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

state和共享模式

1
2
3
4
// 共享模式
void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)
boolean releaseShared(int arg)

共享模式下资源是与具体线程不相关的,多个线程去请求资源时候是通过 CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次获取时候如果当前资源还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可,共享模式下并不需要记录那个线程获取了资源。
比如 Semaphore 信号量,当一个线程通过 acquire() 方法获取一个信号量时候,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋 CAS 获取信号量。

  1. 获取。当线程调用 acquireShared(int arg) 获取共享资源时候,会首先使用 tryAcquireShared 尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回。失败则将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列尾部,并使用 LockSupport.park(this) 挂起当前线程。
    1
    2
    3
    4
    public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
    }
  2. 释放。当一个线程调用 releaseShared(int arg) 时候会尝试使用, tryReleaseShared 操作释放资源,这里是设置状态变量 state 的值,然后使用 LockSupport.unpark(thread)激活 AQS 队列里面最早被阻塞的线程 (thread)。被激活的线程则使用 tryReleaseShared 尝试看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
    1
    2
    3
    4
    5
    6
    7
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }

中断

1
2
3
4
void acquire(int arg)
void acquireInterruptibly(int arg)
void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)

不带Interruptibly后缀的方法不会响应中断,也就是说在获取资源的时候或者获取资源失败被挂起的时候,被别的线程中断(interrupt),那么该线程就会抛出InterruptedException异常。

队列

AQS的队列基础是一个双向链表。

  • 入队:当一个线程获取锁失败后该线程会被转换为 Node 节点,然后就会使用 enq(final Node node) 方法插入该节点到 AQS 的阻塞队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private Node enq(final Node node) {
    for (;;) {
    // 指向队尾节点
    Node t = tail;
    // 队列为空,则初始化设置哨兵节点
    if (t == null) { // Must initialize
    // 通过CAS设置一个哨兵节点为头节点
    if (compareAndSetHead(new Node()))
    // 设置成功,让队尾节点也指向哨兵节点
    tail = head;
    } else {
    // 设置node的前驱节点为队尾节点
    node.prev = t;
    // 通过CAS设置node节点为队尾节点
    if (compareAndSetTail(t, node)) {
    // 设置原来的队尾节点的后继节点为node
    t.next = node;
    return t;
    }
    }
    }
    }

ConditionObject - 条件变量

AQS 有个内部类 ConditionObject 是用来结合锁实现线程同步,ConditionObject 可以直接访问 AQS 对象内部的变量,比如 state 状态值和 AQS 队列。
类似于共享变量的notify和wait需要先通过synchronized获取内置锁,条件变量的signal和await也需要配合AQS实现的锁来使用,如果在没有获取锁的情况下调用条件变量的await或signal方法就会抛出IllegalMonitorStateException异常。下面以ReentrantLock为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");

} catch (Exception e) {
e.printStackTrace();

} finally {
lock.unlock();
}
1
2
3
4
5
6
7
8
9
10
11
lock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
} catch (Exception e) {
e.printStackTrace();

} finally {
lock.unlock();
}

ConditionObject 是条件变量,每个条件变量对应着一个**条件队列 (单向链表队列)**,用来存放调用条件变量的 await() 方法后被阻塞的线程,注意这个条件队列和AQS队列不是一回事,这个条件队列的头尾元素分别为 firstWaiterlastWaiter

await

当线程调用了条件变量的 await() 方法时候(事先必须先调用了锁的 lock() 方法获取锁),内部会构造一个类型为 Node.CONDITION 的 node 节点,然后插入该节点到条件队列末尾,然后当前线程会释放获取的锁(也就是会操作锁对应的 status 变量的值),并被阻塞挂起。
这时候如果有其它线程调用了 lock.lock() 尝试获取锁时候,就会有一个线程获取到锁,如果获取到锁的线程又调用了条件变量的 await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,阻塞到 await() 方法处。

当多个线程同时调用 lock.lock() 获取锁的时候,同时只有一个线程获取到了该锁,其他线程会被转换为 Node 节点插入到 lock 锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝试获取锁,这个和ReentrantLock的具体实现有关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建新的node,并插入到Condition的等待队列末尾
Node node = addConditionWaiter();
// 释放在AQS中设置的state,相当于释放当前线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前线程是否在AQS队列中
while (!isOnSyncQueue(node)) {
// 调用park方法阻塞挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果在AQS队列中,则排队等待
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
...
// 创建一个Node.CONDITION类型的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 向条件队列尾部插入节点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

从源码中可知await:

  1. 当前线程获取锁后调用了该锁创建的条件变量,那么线程会释放获取到的锁;
  2. 当前线程会被转换为Node节点后插入到对应条件变量的条件队列
  3. 之后AQS阻塞队列中其他线程就有机会获取到该锁了。

signal

当另外一个线程调用了条件变量的 signal 方法时候(事先必须先调用了锁的 lock() 方法获取锁),内部会把条件队列里面队头的一个线程节点从条件队列里面移除后放入到 AQS 的阻塞队列里面,然后激活这个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 通知等待最久的线程
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 将节点从条件队列转移到同步队列
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

Condition和wait/notify之间的区别

为什么已经有了Object的wait方法和notify方法还需要Condition呢?因为Condition更灵活。
synchronized方式对应的wait、notify不能有多个谓词条件,而Condition则可以有多个谓词条件。
比如生产者生产一个元素后需要唤醒一个消费者线程,notify只能随机唤醒一个线程,如果当前有一个生产者正在通过synchronized上的wait等待,则可能会唤醒这个生产者线程;
如果是Condition方式,我们可以定义多个谓词条件,比如notEmpty、notFull,可以令所有消费者在notEmpty上等待,令所有生产者在notFull上等待,当队列满的时候所有的生产者线程阻塞,添加元素之后则通过notEmpty唤醒某个消费者线程,这样就不用担心会唤醒消费者线程。

实例 - Condition 的使用

1
2
3
4
5
6
Lock l = new ReentrantLock();
Condition c = l.newCondition();
c.await(); // 信号量-1
c.signal(); // 信号量+1
c.signalAll(); // 所有信号量+1
System.out.println();

实例 - 多线程下的缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BoundedBuffer {

final Lock lock = new ReentrantLock();//锁对象
final Condition notFull = lock.newCondition();//写线程条件
final Condition notEmpty = lock.newCondition();//读线程条件

final Object[] items = new Object[100]; //缓存队列
int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)//如果队列满了
{
notFull.await();//阻塞写线程
}
items[putptr] = x;//赋值
if (++putptr == items.length) {
putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
}
++count;//个数++
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)//如果队列为空
{
notEmpty.await();//阻塞读线程
}
Object x = items[takeptr];//取值
if (++takeptr == items.length) {
takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
}
--count;//个数--
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}

实际使用中,Condition 必须放在 Lock 的 lock 和 unlock 方法之间,主要用于模拟线程间的协调:

如果不放在 lock 内,Condition 的 await 会释放 lock 的 state,如果还有别的地方使用到了该 lock,相当于直接被释放掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final Lock lock=new ReentrantLock();
final Condition condition=lock.newCondition();

public static void main(String[]args){
ConTest test=new ConTest();
Producer producer=test.new Producer();
Consumer consumer=test.new Consumer();

consumer.start();
producer.start();
}

class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
try {
lock.lock();
System.out.println("我在等一个新信号" + this.currentThread().getName());
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("拿到一个信号" + this.currentThread().getName());
lock.unlock();
}

}
}

class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
try {
lock.lock();
System.out.println("我拿到锁" + this.currentThread().getName());
condition.signalAll();
System.out.println("我发出了一个信号:" + this.currentThread().getName());
} finally {
lock.unlock();
}
}
}

使用Condition协调两个线程交替执行

问题:第一个线程输出奇数,第二个线程输出偶数,要求交替输出有序的结果:1、2、3、4、5...
交替执行这种需求需要两个线程互相通知对方执行,使用基础的wait/notify方法无法实现,需要使用Condition、LockSupport这种可以直接指定通知某个线程的API。
第一次写时我就犯了错,下面的代码执行到一半会发生死锁,输出到一半就不再继续输出了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 会发生死锁
* 原因:
* signal是瞬时的,线程2通知线程1的时候,线程1可能还没运行到await的地方
*/
public class OneByOneDeadLockTest {

private static final Lock LOCK = new ReentrantLock();
private static final Condition ODD = LOCK.newCondition();
private static final Condition EVEN = LOCK.newCondition();

static class OddPrinter extends Thread {

@Override
public void run() {
int num = 1;
while (true) {
LOCK.lock();
try {
System.out.println("await1");
ODD.await();
System.out.println(num);
num += 2;
System.out.println("signal1");
EVEN.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
}

static class EvenPrinter extends Thread {

@Override
public void run() {
int num = 2;
while (true) {
LOCK.lock();
try {
System.out.println("await2");
EVEN.await();
System.out.println(num);
num += 2;
System.out.println("signal2");
ODD.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
}

public static void main(String[] args) {
OddPrinter oddPrinter = new OddPrinter();
EvenPrinter evenPrinter = new EvenPrinter();
oddPrinter.start();
evenPrinter.start();
LOCK.lock();
try {
ODD.signal();
} finally {
LOCK.unlock();
}
}
}

经过优化得到下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 生产者消费者交替执行
*/
public class OneByOneTest {

private static final Lock LOCK = new ReentrantLock();
private static final Condition ODD = LOCK.newCondition();
private static final Condition EVEN = LOCK.newCondition();
private static volatile int CUR = 1;

static class OddPrinter extends Thread {

@Override
public void run() {
int num = 1;
while (true) {
LOCK.lock();
try {
System.out.println("await1");
while(CUR != 1) {
ODD.await();
}
System.out.println(num);
num += 2;
System.out.println("signal1");
CUR = 2;
EVEN.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
}

static class EvenPrinter extends Thread {

@Override
public void run() {
int num = 2;
while (true) {
LOCK.lock();
try {
System.out.println("await2");
while(CUR != 2) {
EVEN.await();
}
System.out.println(num);
num += 2;
System.out.println("signal2");
CUR = 1;
ODD.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
}

public static void main(String[] args) {
OddPrinter oddPrinter = new OddPrinter();
EvenPrinter evenPrinter = new EvenPrinter();
oddPrinter.start();
evenPrinter.start();
}
}

同步工具类

JDK1.5 之前 Java 中如果需要令多个线程互相协作,必须使用wait/notify/join方法簇,这些 API 容易出错(比如虚假唤醒)而且可扩展性差,所以后来 JDK 提供了更多的工具类来处理线程协作的场景。

闭锁 - CountDownLatch

日常开发中经常会遇到需要在主线程中开启多线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后在进行汇总的场景,在 CountDownLatch 出现之前一般都是使用线程的 join() 方法来实现,但是 join 不够灵活,不能够满足不同场景的需要。

join 相对 CountDownLatch 来说有以下两点劣势:
调用一个线程的 join 后,该线程会一直被阻塞直到该线程运行完毕,而 CountDownLatch 则可以在子线程运行完毕或运行过程中递减计数器,从而让 await 返回。
另外,使用线程池来管理线程时候一般都是直接添加一个 Runable 实例到线程池,这时就不方便再调用线程的 join 方法了。

不要这么做

CountDownLatch本身做的是线程间协调的工作,但是如果出现问题,就会有线程一直阻塞着执行不下去的风险,使用时需要尤为注意:

  • finally中countDown,避免抛出异常后无法继续执行;
  • await最好设置超时时间,避免由于意外情况无法释放。

类图

CountDownLatch类图

  • CountDownLatch 是公平的,其实现基础同样是 AQS。

初始化

构造方法中初始化 Sync 时把计数器的值赋值给了 AQS 的状态值 state,也就是这里 AQS 的状态值来表示计数器值。

void await()

挂起当前线程,直到下面情况之一发生时才返回:

  • 当所有线程都调用了 CountDownLatch 对象的 countDown() 方法后,也就是计时器值为 0 的时候;
  • 其它线程调用了当前线程的 interrupt() 方法中断了当前线程,当前线程会抛出 InterruptedException 异常后返回

CountDownLatch.await()
-> AbstractQueuedSynchronizer.acquireSharedInterruptibly:获取共享资源,且可中断
> 为什么是acquireSharedInterruptibly而不是获取独占资源的acquireInterruptibly?因为这里状态值需要的并不是非 0 即 1 的效果,而是和初始化时候指定的计数器值有关系,比如你初始化时候计数器值为 8,那么 state 的值应该就有 0 到 8 的状态,而不是只有 0 和 1 的情况下的独占效果。plainplain
1. 先调用Thread.interrupted()判断一下线程的中断状态,若被中断则抛出异常
1. 尝试看当前是否计数值为0,为0则直接返回,否则进入AQS的队列等待
-> AbstractQueuedSynchronizer.doAcquireSharedInterruptibly

boolean await(long timeout, TimeUnit unit)

比起 await 多了个返回条件:

  • 超时后返回 false。

void countDown()

递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用 await 方法而被阻塞的线程。

CountDownLatch.countDown
-> AbstractQueuedSynchronizer.releaseShared
-> CountDownLatch.Sync.tryReleaseShared:循环 CAS 设置状态值,返回值表示当前线程是否为第一个将状态值设置为 0 的,当前状态值已经为 0 则返回 false,或者当前线程成功完成 cas 使计数值(状态值 state)减一并更新到 state 并返回,如果当前状态值为 0 返回 true
-> AbstractQueuedSynchronizer.doReleaseShared

long getCount()

获取当前计数器的值,也就是 AQS 的 state 的值,一般在 debug 测试时候使用。

实例 - 模拟并发请求

如果要用 Java 模拟并发请求,最基本的方案就是创建多个线程然后一一启动,但是如此一来请求就会带上先后顺序了,一种解决办法是通过 CountDownLatch 来同步多个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
start.await();

try {
System.out.println(i);
} finally {
end.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
start.countDown();
end.await();

栅栏 - CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
ExecutorService threadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
threadPool.submit(() -> {
try {
cyclicBarrier.await(1000, TimeUnit.MILLISECONDS);
System.out.println("x");
} catch (Exception e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
threadPool.awaitTermination(3000, TimeUnit.MILLISECONDS);

不要这么做

对CyclicBarrier的使用注意和CountDownLatch有点类似,都是需要避免永久等待的情况:

  • 发令枪不要报错否则全部阻塞;
  • await最好设置一个等待时间,否则可能出现某个线程出错导致所有线程都执行不下去的情况。

CyclicBarrier结构

CyclicBarrier类图

  • parties:记录线程个数,用来表示需要多少线程先调用 await 后,所有线程才会冲破屏障继续往下运行;
  • count:count 一开始等于 parties,每当线程调用 await 方法后就递减 1,当为 0 时候就表示所有线程都到了屏障点;

    用两个变量而不是一个的原因是CyclicBarrier可以复用,parties始终用来记录总线程个数,当count计数器变为0后,会使用parties赋值给count,达到复用的目的。

  • lock:lock保证了await方法的线程安全性,更新计数器count的时候可以保证原子性;
  • trip:条件变量,支持线程间使用notify、wait操作进行同步。
  • generation:generation 内部就一个变量 broken 用来记录当前屏障是否被打破,另外注意这里 broken 并没有被声明为 volatile,是因为锁内使用变量不需要。
    1
    2
    3
    private static class Generation {
    boolean broken = false;
    }

await

当线程调用await方法时线程会被阻塞,直到满足以下条件之一:

  • parties 个线程都调用了 await 函数,也就是线程都到了屏障点;
  • 其它线程调用了当前线程的 interrupt方法中断了当前线程,则当前线程会抛出 InterruptedException 异常返回;
  • 当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时候,会抛出 BrokenBarrierException 异常。
1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

await(long timeout, TimeUnit unit)

除了上边await的三个条件之外:

  • 到达了设置的超时时间后也会直接返回false;
1
2
3
4
5
6
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

dowait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)plain
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 如果index==0说明所有线程都到到了屏障点,则执行初始化时候传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 执行任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 激活其它调用await而被阻塞的线程,并重置CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 轮询等待
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 没有设置爱超时时间
if (!timed)
trip.await();
// 设置了超时时间
else if (nanos > 0L)
// 当前线程被放入条件变量trip的阻塞队列,当前线程会被挂起并释放lock
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// 唤醒条件队列里面阻塞线程
trip.signalAll();
// 重置CyclicBarrier
count = parties;
generation = new Generation();
}

Semaphore

Semaphore 信号量也是 Java 中一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的。
Semaphore类图

公平和非公平

类似ReentrantLock,Semaphore也提供了公平和非公平两套API:

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new
NonfairSync(permits);
}

其中,参数permits为信号量个数,这里最终会赋值给AQS的state。

非公平 acquire

当前线程调用该方法时候目的是希望获取一个信号量资源,如果当前信号量计数个数大于 0,并且当前线程获取到了一个信号量则该方法直接返回,当前信号量的计数会减少 1. 否则会被放入 AQS 的阻塞队列,当前线程被挂起,直到其它线程调用了 release 方法释放了信号量,并且当前线程通过竞争获取到了该信号量。
可中断的:当前线程被其它线程调用了 interrupt() 方法中断后,当前线程会抛出 InterruptedException 异常然后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void acquire() throws InterruptedException {
// 传递参数为 1,说明要获取 1 个信号量资源
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 否则调用 sync 子类方法尝试获取,这里根据构造函数确定使用公平策略,tryAcquireShared 返回小于 0 说明剩余信号量已不够本次请求
if (tryAcquireShared(arg) < 0)
// 获取失败,放入阻塞队列,然后再次尝试如果失败则调用 park 方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前信号量值
int available = getState();
// 计算当前剩余值
int remaining = available - acquires;
// 如果当前剩余小于0或者CAS设置成功则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

可见,非公平版本的实现的核心是nonfairTryAcquireShared中的循环。

公平版 acquire

1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

公平版本的核心是hasQueuedPredecessors判断AQS中是否有前驱节点。

acquire(int permits)

acquireUninterruptibly

acquireUninterruptibly(int permits)

release

该方法作用的把当前 semaphore 对象的信号量值增加 1,如果当前有线程因为调用 aquire 方法被阻塞放入了 AQS 的阻塞队列,则会根据公平策略选择一个线程进行激活,激活的线程会尝试获取刚增加的信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void release() {
// 默认释放 1 个
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {

// 尝试释放资源
if (tryReleaseShared(arg)) {

// 资源释放成功则调用park唤醒AQS队列里面最先挂起的线程
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {

// 获取当前信号量值
int current = getState();

// 当前信号量值增加releases,这里为增加1
int next = current + releases;
if (next < current) // 移除处理
throw new Error("Maximum permit count exceeded");

// 使用cas保证更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}

release(int permits)

Phase

TODO
相对 CountDownLatch 和 CyclicBarrier 来说,Phase 支持多阶段协同;

  • Phase 表示当前阶段,最多可以有(2^32 - 1)个阶段;
  • Parties 表示参与此次过程的参与者(不一定是线程)数量,最多 65535 个。

隔离可变性

引起线程安全问题的原因无非如下两点:

  1. 多个线程在临界区的竞争。
  2. 对象本身是可变的。

其中,如何解决竞争问题我们讨论过了,可变性的管理有两种策略:

  1. 杜绝可变性,即控制变量为不可变的,需要控制对象所有字段为 final,并且还要保证不会有一些乱来的反射代码去强行写入这些字段;
  2. 隔离可变性,令变量永远只由一个线程来操作,最常见的实现方式是 ThreadLocal。

ThreadLocal(线程本地变量)

不同于锁,ThreadLocal 的思路不是同步,而是规避了同步必要,因为每个线程都只操作属于自己的变量。

为什么要使用 ThreadLocal

  1. 在同一线程中方法与方法,类与类之间的共享内容传输(也是官方创建 ThreadLocal 的本意)。
  2. 利用 ThreadLocal 规避线程安全问题,这种方式已经大量应用,如 Spring mvc 在 Controller 中注入全局 HttpSession 对象。

ThreadLocal 原理

ThreadLocal类图
注意 ThreadLocal 中的 set、setInitialValue、set 方法,及 Thread 中的 ThreadLocalMap(不是 HashMap)类型的 threadLocals 变量。
每个线程维持一个 ThreadLocalMap 用于保存当前线程的所有 ThreadLocal 变量,ThreadLocalMap 保存的键值对,每个线程在第一次调用 set()或 get()时会初始化之。
key 为什么是 ThreadLocal?线程执行时可能创建多个 ThreadLocal,我们可以重定义它的 hashCode 来定位到散列表的不同槽位(实际上 ThreadLocal 并没有重定义 hashCode 方法,而是使用成员变量 threadLocalHashCode 来表示,大概是想用自定义的素数来得到更小的碰撞概率吧)。
这样每个线程相当于都有自己的一个局部变量表,不同线程的表又是完全隔离的。

ThreadLocal的内存泄露问题

ThreadLocal使用ThreadLocalMap来保存值,ThreadLocalMap中每个key都是弱引用,弱引用本身会在内存不足时被回收,但是value并不是弱引用,因此value会一直留在原地导致泄漏,除非该位置被重新hash。

InheritableThreadLocal

为什么要使用 InheritableThreadLocal

若父线程(比如 main 线程)中设置了一个 ThreadLocal 值,子线程中是无法获得的,因为它们处于不同的线程内。InheritableThreadLocal 就是用于解决子线程中获取不到父线程中设置的 ThreadLocal 变量的值的问题的。
例 1 - 使用 InheritableThreadLocal:

1
2
3
4
5
6
7
8
9
10
public class InheritableThreadLocalTest {
private InheritableThreadLocal<String> itl = new InheritableThreadLocal<>();
@Test
public void test() {
itl.set("hello");
new Thread(() -> {
System.out.println(itl.get());
}).start();
}
}

一些应用场景如:

  1. 存放用户登录信息的 threadlocal 变量,很有可能子线程中也需要使用用户登录信息
  2. 一些中间件需要用统一的追踪 ID 把整个调用链路记录下来的情景。

原理

InheritableThreadLocal 继承了 ThreadLocal 并对以下三个方法进行了重写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
实例化每个线程的 ThreadLocalMap 时,会根据这个方法来获取父线程中的值,在初始化子线程时保存了父线程线程隔离变量的一份拷贝。
具体的,看 ThreadLocalMap 的构造方法 ThreadLocalMap(parentMap)中对 childValue()的使用
调用链是这样的:Thread.init() -> ThreadLocal.createInheritedMap(parentMap) -> ThreadLocalMap(parentMap)
*/
protected T childValue (T parentValue){
return parentValue;
}
/*
ThreadLocal 中的 CRUD 操作都会调用 getMap(Thread.currentThread())来获取实现,如果是对 InheritableThreadLocal 调用的就会获取 inheritableThreadLocals
具体的,看 ThreadLocal 中的 get()、set()、remove()等 CRUD 方法
*/
ThreadLocalMap getMap (Thread t){
return t.inheritableThreadLocals;
}
void createMap (Thread t, T firstValue){
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocalRandom

ThreadLocalRandom 类是 JDK7 在 JUC 包下新增的随机数生成器,它解决了 Random 类在多线程下的不足。

Random 类及其局限性

Random 中对 seed(随机数种子)的更新是通过 CAS 操作完成的,随机数即种子值的末尾几位,如下代码所示。

1
2
3
4
5
6
7
8
9
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> (48 - bits));
}

为什么要使用 CAS?实际上 seed 在多线程环境下是可能会产生线程安全问题的,多个线程在根据同一个老种子计算新种子时候,第一个线程的新种子计算出来后,第二个线程要丢弃自己老的种子,要使用第一个线程的新种子来计算自己的新种子,依次类推,只有保证了这个,才能保证多线程下产生的随机数是随机的。也就是说需要进行同步,CAS 相对 synchronized 等基于锁的方案来说更高效。
多线程下使用单个 Random 实例生成随机数时候,多个线程同时计算新的种子时候会竞争同一个原子变量的更新操作,由于原子变量的更新是 CAS 操作,同时只有一个线程会成功,所以会造成大量线程进行自旋重试,这是会降低并发性能的,所以 ThreadLocalRandom 应运而生。

原理

ThreadLocalRandom类图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 初始化 UNSAFE
private static final sun.misc.Unsafe UNSAFE;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
//获取 unsafe 实例
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
// ThreadLocalRandom 中没有种子,而是使用 Thread 中的 threadLocalRandomSeed 变量来代替
// 获取 Thread 类里面 threadLocalRandomSeed 变量在 Thread 实例里面偏移量
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
//获取 Thread 类里面 threadLocalRandomProbe 变量在 Thread 实例里面偏移量
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
//获取 Thread 类里面 threadLocalRandomProbe 变量在 Thread 实例里面偏移量,这个值在后面讲解的 LongAdder 里面会用到
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception e) {
throw new Error(e);
}
}

// 单例变量
static final ThreadLocalRandom instance = new ThreadLocalRandom();

final long nextSeed() {
Thread t; long r; // read and update per-thread seed
U.putLong(t = Thread.currentThread(), SEED,
r = U.getLong(t, SEED) + GAMMA);
return r;
}

public int nextInt() {
return mix32(nextSeed());
}

static final void localInit() {
// seeder 和 probeGenerator 是两个原子性变量,在初始化调用线程的种子和探针变量时候用到,每个线程只会使用一次
// 根据 probeGenerator 计算当前线程中 threadLocalRandomProbe 的初始化值
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
// 然后根据 seeder 计算当前线程的初始化种子
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
// 设置到当前线程
U.putLong(t, SEED, seed);
U.putInt(t, PROBE, probe);
}

// 通过 UNSAFE 运算获取 ThreadLocalRandom 实例,调用 localInit()初始化 Thread 中的 threadLocalRandomSeed 和 threadLocalRandomProbe 变量
public static ThreadLocalRandom current() {
// 延迟初始化:当前线程中的 threadLocalRandomProbe 值为 0 的情况,说明是第一次调用 current 方法,需要调用 localInit 初始化
// 初始化逻辑:计算当前线程的初始化种子变量
if (U.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}

// 计算当前线程的下一个随机数
public int nextInt(int bound) {
if (bound <= 0)
throw new IllegalArgumentException(BadBound);
int r = mix32(nextSeed());
int m = bound - 1;
if ((bound & m) == 0) // power of two
r &= m;
else { // reject over-represented candidates
for (int u = r >>> 1;
u + m - (r = u % bound) < 0;
u = mix32(nextSeed()) >>> 1)
;
}
return r;
}

final long nextSeed() {
Thread t; long r;
// 使用 UNSAFE 的 putLong 方法把新种子放入当前线程的 threadLocalRandomSeed 变量
UNSAFE.putLong(t = Thread.currentThread(), SEED,
// 获取当前线程中 threadLocalRandomSeed 变量的值,然后在种子的基础上累加 GAMMA 值作为新种子
r = UNSAFE.getLong(t, SEED) + GAMMA);
return r;
}

原子性操作

原子性

多个线程执行一个操作时,其中任何一个线程要么完全执行完此操作,要么没有执行此操作的任何步骤,那么这个操作就是原子的。
要程序的最终结果等同于它在严格的顺序化环境下的结果,那么指令的执行顺序就可能与代码的顺序不一致,比如多线程是在一个CPU上执行的,多个线程的指令需要进行重排,再按顺序执行。
如果动作B要看到动作A的执行结果(无论A/B是否在同一个线程里面执行),那么A/B就需要满足 happens-before 关系。

实现原子性的几种方式

  1. 互斥锁:如synchronized、Lock、Condition、ReadWriteLock等。
  2. 线程隔离变量:如ThreadLocal等。
  3. 线性化:将线程任务丢到一个单线程的线程池中调度执行(比如Executors.newSingleThreadExecutor)。
  4. CAS:包括AtomicInteger等现成的工具类,或者可以使用Unsafe自行实现,另外,JDK8还提供了高并发下性能更优的LongAdder

原子表达式

在设计计数器时候一般都是先读取当前值,然后+1,然后更新,这个过程是读 -> 改 -> 写的过程,如果不能保证这个过程是原子性,那么就会出现线程安全问题。如下代码是线程不安全的,因为不能保证 ++value 是原子性操作。

1
2
3
4
5
6
7
8
9
public class ThreadNotSafeCount {
private Long value;
public Long getCount() {
return value;
}
public void inc() {
++value;
}
}

通过使用 Javap -c 查看汇编代码如下:

1
2
3
4
5
6
7
8
9
public void inc();
Code:
0: aload_0
1: dup
2: getfield #2 // Field value:J
5: lconst_1
6: ladd
7: putfield #2 // Field value:J
10: return

可知简单的 ++value有 2,5,6,7 组成,其中2是获取当前 value 的值并放入栈顶,5是把常量1放入栈顶,6是把当前栈顶中2个值相加并把结果放入栈顶,7则是把栈顶结果赋值会 value 变量,可知 Java 中简单的一句 ++value 转换为汇编后就不具有原子性了。

CAS(Compare and Swap)

在 Java 中锁在并发处理中占据了一席之地,但是使用锁不好的地方是当一个线程没有获取到锁后会被阻塞挂起,这会导致线程上下文的切换和重新调度的开销。
Java 中提供了非阻塞的 volatile 关键字来解决共享变量的可见性问题,这在一定程度上弥补了锁所在带来的开销,但是 volatile 只能保证共享变量的可见性问题,但是还是不能解决例如读 -> 改 -> 写等的原子性问题。
CAS 即 Compare And Swap,是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较-更新操作的原子性。在set之前先比较该值有没有变化,只有在没变的情况下才对其赋值。

ABA问题

ABA 问题:变量a经过A->B->A的修改变回了原来的值,此时CAS会认为a的值没有发生变化,但它确实发生了变化。ABA的解决办法很多,基本上是另外使用一个变量来标志这个a是否发生了变化。
在Java中已经有现成的工具类解决了ABA问题:AtomicMarkableReferenceAtomicStampedReference

  • AtomicMarkableReference:通过引入一个 boolean变量来反映中间有没有变过;
  • AtomicStampedReference:通过引入一个 int 来累加来反映中间有没有变过。

AtomicXxx

1
2
3
4
5
6
7
8
int addAndGet(int delta):以原子方式将给定值与当前值相加。 实际上就是等于线程安全版本的 i =i+delta 操作。
boolean compareAndSet(int expect, int update):如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。 如果成功就返回 true,否则返回 false,并且不修改原值。
int decrementAndGet():以原子方式将当前值减 1。 相当于线程安全版本的–i 操作。
int getAndAdd(int delta):以原子方式将给定值与当前值相加。 相当于线程安全版本的 t=i;i+=delta;return t;操作。
int getAndDecrement():以原子方式将当前值减 1。 相当于线程安全版本的 i–操作。
int getAndIncrement():以原子方式将当前值加 1。 相当于线程安全版本的 i++操作。
int getAndSet(int newValue):以原子方式设置为给定值,并返回旧值。 相当于线程安全版本的 t=i;i=newValue;return t;操作。
int incrementAndGet():以原子方式将当前值加 1。 相当于线程安全版本的++i 操作。

JUC中的原子性工具类包括 AtomicInteger,AtomicLong,AtomicBoolean,其内部使用 Unsafe 来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AtomicLong extends Number implements java.io.Serializable {
// 获取 Unsafe 实例
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 存放变量value的偏移量plain
private static final long valueOffset;

// 判断JVM是否支持Long类型无锁CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();

static {
try {
// 获取value在AtomicLong中偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// 实际变量值,volatile保证了多线程下的内存可见性
private volatile long value;

public AtomicLong(long initialValue) {
value = initialValue;
}
....
}

getAndIncrement

在JDK7中的实现逻辑为:

1
2
3
4
5
6
7
8
public final long getAndIncrement() {
while (true) {
long current = get();
long next = current + 1;
if (compareAndSet(current, next))
return current;
}
}

而 JDK 8 将CAS操作抽取到了Unsafe中,getAndIncrement的实现被修改为:

1
2
3
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

Unsafe

compareAndSwapLong(Object obj,long valueOffset,long expect, long update)

compareAndSwap 的意思也就是比较并交换,四个操作数分别为:对象内存位置,对象中的变量的偏移量,变量预期值 expect,新的值 update。
操作含义是如果对象 obj 中内存偏移量为 valueOffset 位置的变量值为 expect 则使用新的值 update 替换旧的值 expect。这个是处理器提供的一个原子性指令。
JDK 的 rt.jar 包中的 Unsafe 类提供了硬件级别的原子操作,Unsafe 里面的方法都是 native 方法,通过使用 JNI 的方式来访问本地 C++ 实现库。

objectFieldOffset(Field field)

返回指定的变量在所属类的内存偏移地址,偏移地址仅仅在该 Unsafe 函数中访问指定字段时候使用。

1
2
3
4
5
6
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

其他一些方法如:

1
2
3
4
5
6
7
8
int arrayBaseOffset(Class arrayClass) 方法 获取数组中第一个元素的地址
int arrayIndexScale(Class arrayClass) 方法 获取数组中单个元素占用的字节数
boolean compareAndSwapLong(Object obj, long offset, long expect, long update) 方法 比较对象 obj 中偏移量为 offset 的变量的值是不是和 expect 相等,相等则使用 update 值更新,然后返回 true,否者返回 false
public native long getLongVolatile(Object obj, long offset) 方法 获取对象 obj 中偏移量为 offset 的变量对应的 volatile 内存语义的值。
void putLongVolatile(Object obj, long offset, long value) 方法 设置 obj 对象中内存偏移为 offset 的 long 型变量的值为 value,支持 volatile 内存语义。
void putOrderedLong(Object obj, long offset, long value) 方法 设置 obj 对象中 offset 偏移地址对应的 long 型 field 的值为 value。这是有延迟的 putLongVolatile 方法,并不保证值修改对其它线程立刻可见。变量只有使用 volatile 修饰并且期望被意外修改的时候使用才有用。
void park(boolean isAbsolute, long time) 阻塞当前线程,其中参数 isAbsolute 等于 false 时候,time 等于 0 表示一直阻塞,time 大于 0 表示等待指定的 time 后阻塞线程会被唤醒,这个 time 是个相对值,是个增量值,也就是相对当前时间累加 time 后当前线程就会被唤醒。 如果 isAbsolute 等于 true,并且 time 大于 0 表示阻塞后到指定的时间点后会被唤醒,这里 time 是个绝对的时间,是某一个时间点换算为 ms 后的值。另外当其它线程调用了当前阻塞线程的 interrupt 方法中断了当前线程时候,当前线程也会返回,当其它线程调用了 unpark 方法并且把当前线程作为参数时候当前线程也会返回。
void unpark(Object thread) 唤醒调用 park 后阻塞的线程,参数为需要唤醒的线程。

JDK8中新增了一些方法:

1
2
long getAndSetLong(Object obj, long offset, long update) 方法 获取对象 obj 中偏移量为 offset 的变量 volatile 语义的值,并设置变量 volatile 语义的值为 update。
long getAndAddLong(Object obj, long offset, long addValue) 方法 获取对象 obj 中偏移量为 offset 的变量 volatile 语义的值,并设置变量值为原始值 +addValue。

使用Unsafe

Unsafe不能直接使用,因为Unsafe是rt.jar包内的,由Bootstrap类加载器加载,而我们自定义的类是由AppClassLoader加载的,getUnsafe中包含对类的加载方式的鉴权:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final Unsafe theUnsafe = new Unsafe();

public static Unsafe getUnsafe() {
// 调用者的类
Class localClass = Reflection.getCallerClass();

// 类加载器鉴权plain
if (!VM.isSystemDomainLoader(localClass.getClassLoader())) {
throw new SecurityException("Unsafe");
}
return theUnsafe;
}

// 判断 paramClassLoader 是不是 BootStrap 类加载器
public static boolean isSystemDomainLoader(ClassLoader paramClassLoader) {
return paramClassLoader == null;
}

如果没有这里的鉴权,我们就可以随意使用Unsafe了,而Unsafe可以直接操作内存,非常不安全,因此需要做这个限制。
如果一定要使用Unsafe,可以采用反射的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class UnsafeTest {

static final Unsafe unsafe;

static final long stateOffset;

private volatile long state = 0;

static {
try {
// 反射获取 Unsafe 的成员变量 theUnsafe
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);

//获取 state 在 TestUnSafe 中的偏移量
stateOffset = unsafe.objectFieldOffset(UnsafeTest.class.getDeclaredField("state"));
} catch (Exception ex) {
System.out.println(ex.getLocalizedMessage());
throw new Error(ex);
}
}

public static void main(String[] args) {
UnsafeTest test = new UnsafeTest();
Boolean sucess = unsafe.compareAndSwapInt(test, stateOffset, 0, 1);
System.out.println(sucess);
}
}

LongAdder

高并发下AtomicLong的性能并不能让人接受,因为大量线程会同时尝试获取-修改的流程,而只有一个线程可以修改成功,其他线程进入下一次轮询尝试修改(自旋),这大大降低了CPU的利用率。
LongAdder采用分治的方式来提升效率:

  • 将对同一个变量的竞争划分为对多个Cell的竞争;
  • 而且多个线程如果争夺同一个Cell失败,不会自旋CAS重试,而是尝试获取其他原子变量的锁;
  • 最后获取当前值时候是把所有变量的值累加后在加上 base 返回的。

LongAdder类图

  • 分治
    LongAdder 维护了一个延迟初始化的原子性更新数组和一个基值变量base。数组的大小保持是 2 的 N 次方大小,数组表的下标使用每个线程的 hashcode 值的掩码表示,数组里面的变量实体是 Cell 类型。
    LongAdder 继承自 Striped64 类,Striped64 内部维护着三个变量,LongAdder 的真实值其实是 base 的值与 Cell 数组里面所有 Cell元素值的累加,base 是个基础值默认是 0,cellsBusy 用来实现自旋锁,当创建 Cell 元素或者扩容 Cell 数组时候用来进行线程间的同步。
  • 伪共享:Cell改进了AtomicLong,让原子性数组元素相邻存放,可以经常共享缓存行,以提高性能。
  • 惰性加载:由于 Cells 占用内存相对比较大,所以并不会在应用启动时立刻创建,而是在需要时候在创建,也就是惰性加载,当一开始没有空间时候,所有的更新都是操作base变量。

Cell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

sum

内部操作是累加所有 Cell 内部的 value 的值后累加 base。因为统计所有Cell时可能正好有部分Cell正被修改,或者数组进行了扩容,所以sum的值并不是精确的。

reset

重置操作:

  • 把base置为0;
  • 如果 Cell 数组有元素,则元素值重置为 0。

sumThenReset

sum 的改造版本,在计算 sum 累加对应的 cell 值后,把当前 cell 的值重置为 0,base 重置为 0。当多线程调用该方法时候会有问题,比如考虑第一个调用线程会清空 Cell 的值,后一个线程调用时候累加时候累加的都是 0 值。

longValue

同sum。

add(long x)

累加增量 x 到原子变量,这个过程是原子性的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 尝试设置 base+=x,如果设置成功则直接返回
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 如果多个线程同时执行 casBase 可能会失败,此时尝试设置 cell+=x
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || // 判断 cells 数组长度是否为 0
(a = as[getProbe() & m]) == null || // 获取当前线程对应的 cell
!(uncontended = a.cas(v = a.value, v + x))) // 设置 cell+=x
// 如果 cells 长度为 0、cell 为 null 或 CAS 设置 cell 的值失败,则进行数组扩充和初始化
longAccumulate(x, null, uncontended);
}
}
// CAS 设置 base 的值为 val
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

LongAccumulator

LongAdder 类是 LongAccumulator 的一个特例:

  • LongAccumulator 相比于 LongAdder 可以提供累加器初始非 0 值,后者只能默认为0;
  • LongAccumulator可以指定累加规则,比如不是累加而是相乘,只需要在构造 LongAccumulator 时传入自定义的双目运算器;

比如按下面这样初始化的LongAccumulator可以起到和LongAdder一样的作用:

1
2
3
4
5
6
7
LongAccumulator accumulator = new LongAccumulator(new LongBinaryOperator() {

@Override
public long applyAsLong(long left, long right) {
return left + right;
}
}, 0);

QA

  1. Java中内置锁synchronized的原理?
    synchronized怎么使用的(修饰代码块、静态方法、方法)。
    synchronized涉及到的内存结构:
    *
    *
    synchronized

  2. 什么是 Java 中原子性操作?
    原子性操作保证了原子性,即操作要么全部发生要么全部不发生,中间不会被线程调度机制打断,也不会发生任何上下文切换,在Java中是通过CAS实现的。

  3. 什么是 Java 中的 CAS 操作,AtomicLong 的实现原理?

  4. 乐观锁相对悲观锁的优势?加锁的开销是什么?
    乐观锁没有加锁的开销。
    加锁开销主要在于用户态和内核态之间的切换:申请锁时,从用户态进入内核态,申请成功后从内核态返回用户态,没有申请到时阻塞在内核态;使用完资源后释放锁,从用户态进入内核态,唤醒其他正在阻塞等待锁的进程,返回用户态。
    竞争锁的线程变多,会间接导致线程上下文切换变得频繁,这个开销主要包括CPU寄存器保存和加载。

  5. CAS有什么问题?如何解决ABA问题?
    CAS操作存在ABA问题,简而言之,就是一个值为A的变量被改成B后又被改回来了。解决办法是给该变量加版本号,每次修改的同时增加该版本号,并且需要保证写入和修改版本号这两个操作是原子的,AtomicStampedReference提供了这种特性,其中保存的值类型是Pair<reference, stamp>,通过UNSAFE.compareAndSwapObject来修改这个值。

  6. 原子变量在性能上有什么问题?
    原子变量的原理是使用CAS操作自旋判断值是否与期望的一致,如果并发线程特别多就会出现空轮询,导致浪费大量CPU资源,解决办法是采用LongAdderLongAdder的原理是分治,将由原来对一个变量的自旋操作改成了对多个变量的自旋操作,如果其中一个获取失败则转而获取另一个,可以减少冲突,增加加锁成功的几率,打个比方,就像原来只有一台服务器提供web服务,随着并发量的增长,需要增加几台服务器来均衡负载。

  7. 什么是可重入锁、乐观锁、悲观锁、公平锁、非公平锁、独占锁、共享锁?

  8. 抽象同步队列 AQS (AbstractQueuedSynchronizer) 概述,AQS 是实现同步的基础组件,并发包中锁的实现底层就是使用 AQS 实现,虽然大多数开发者可能从来不会直接用到 AQS,但是知道其原理对于架构设计还是很有帮助的。
    AQS主要结构包含一个state和一个双向队列:

    • state在不同的并发工具中有不同的含义,比如,ReentrantLock中,state记录了锁的重入次数,而在ReentrantReadWriteLock中,state的高16位记录了读线程的数量,而低16位记录了写线程的重入次数;
    • 双向队列中的Node保存的是线程,一般用于实现公平锁。
      AQS的主要操作包含获取和释放资源:
    • 线程获取资源——比如ReentrantLock中的lock,会设置状态变量state的值,如果没有获取成功则会调用LockSupport.park(thread)挂起,如果是公平锁,还需要先将当前线程插入到一个双向队列中。
    • 线程释放资源——比如ReentrantLock中的unlock,就是设置状态变量 state 的值,然后调用 LockSupport.unpark(thread) 激活 AQS 队列里面最早被阻塞的线程 (thread)。被激活的线程则尝试看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
  9. AQS中的state变量为什么是volatile的?
    因为AQS需要通过CAS操作state从而保证原子性,CAS操作需要变量具有可见性。

  10. AQS中用到的LockSupport是什么?
    LockSupport是一个同步工具类,主要用于替代Object中的wait和notify方法,相对wait/notify来说更易用:

    • wait/notify必须在加锁状态下才能使用,而LockSupport没有这个要求;
    • LockSupport可以指定某个线程唤醒,而notify只能随机唤醒一个线程;
    • notify后wait会死锁,而LockSupport.unpark后park不会死锁(因为LockSupport控制的是许可证)
  11. 独占锁 ReentrantLock 原理探究,ReentrantLock 是可重入的独占锁或者叫做排它锁,同时只能有一个线程可以获取该锁,其实现分为公平与非公平的独占锁。

  12. ReentrantLock怎么实现公平和非公平锁?
    非公平锁每次获取时直接通过CAS操作设置state,而公平锁在设置失败后会先将当前线程插入一个双向队列中排队,当检查state=0时还需要判断队列中没有其他线程才可以获取锁成功。

  13. 读写锁 ReentrantReadWriteLock 原理,ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,本文来介绍读写分离锁的实现。

    • 读锁,加锁前会额外检查是否写锁被人占用,加锁成功后,如果当前线程是第一次加该读锁,则state高16位+1,如果是第二次加读锁则不会修改state而是修改一个ThreadLocal变量+1;
    • 读锁,释放锁后,判断state归零则唤醒AQS队列中下一个线程;
    • 写锁,加锁前检查state是否为0(为0表示读锁和写锁都没有被占用);
    • 写锁,释放锁后,唤醒AQS队列中下一个线程。
  14. ReentrantReadWriteLock是可重入的,但是同一个线程又加读锁又加写锁为什么会死锁?
    这个与可重入无关,因为读锁和写锁分别使用的是state的高16位和低16位,计算可重入时并不会互相影响。

  15. Condition的实现原理?
    Condition相当于一个0-1信号量,主要用于模拟线程间的协调。可以通过lock.newCondition()来创建,使用时需要先使用lock加锁,condition.await会释放原lock里的state,然后其他线程lock就可以进入临界区了。

  16. JUC 中倒数计数器 CountDownLatch 的使用与原理分析,当需要等待多个线程执行完毕后在做一件事情时候 CountDownLatch 是比调用线程的 join 方法更好的选择,CountDownLatch 与 线程的 join 方法区别是什么?
    join相对CountDownLatch来说不够灵活:

    • 调用一个线程的 join 后,该线程会一直被阻塞直到该线程运行完毕,而 CountDownLatch 则可以在子线程运行完毕或运行过程中递减计数器,从而让 await 返回。
    • 另外,使用线程池来管理线程时候一般都是直接添加一个 Runnable 实例到线程池,这时就不方便再调用线程的 join 方法了。
      CountDownLatch的原理简而言之,就是把AQS的state当做一个计数器,countDown时-1,await就是一直在等待state变0。
  17. JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复用。那么 CyclicBarrier 内部的实现与 CountDownLatch 有何不同那?

  18. ThreadLocal 的实现原理,ThreadLocal 作为变量的线程隔离方式,其内部是如何做的?

  19. InheritableThreadLocal 的实现原理,InheritableThreadLocal 是如何弥补 ThreadLocal 不支持继承的特性?

  20. 经常使用的随机数生成器 Random 类的原理是什么?及其局限性是什么?ThreadLocalRandom 是如何利用 ThreadLocal 的原理来解决 Random 的局限性?

  21. ThreadLocal 的一个使用场景,Spring 框架中 Scope 作用域 Bean 的实现原理。

参考

内置锁

  1. 浅谈偏向锁、轻量级锁、重量级锁
  2. 死磕Synchronized底层实现