并发和并发安全容器

无锁栈

Treiber Stack

无锁列表

CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全的 ArrayList,对其进行的修改操作和元素迭代操作都是在底层创建一个拷贝的数组(快照)上进行的,也就是写时拷贝策略。CopyOnWriteArrayList 适合读多写少的场景,但如果应用在写操作频繁的场景下反而会降低性能。
CopyOnWriteArrayList类图

  • lock:保证写操作时的并发安全;

add(E e)

添加操作拷贝了份快照,在快照上添加元素,最后替代原数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {

// 加独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取array
Object[] elements = getArray();

// 拷贝array到新数组,添加元素到新数组
// 新数组长度是原数组长度+1,可见CopyOnWriteArrayList是无界的
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;

// 使用新数组替换添加前的数组
setArray(newElements);
return true;
} finally {
// 释放独占锁
lock.unlock();
}
}

get(int index)

get 操作获取下标处的元素,实际上 get 可以被分解为以下两个步骤:

  1. 获取 array 的引用;
  2. 通过下标访问 array 指定位置的元素。

整个过程并没有加锁,如果在访问期间有另一个线程删除了某个元素,实际上因为修改操作是发生在原数组的一个快照上的,get 操作仍然获取的是原数组上的元素,因此不会发生类似数组越界的问题。但同时也不可避免这个过程带来的弱一致性,因为元素事实上已经被删除了却仍然可以被访问到。

set(int index, E element)

修改 list 中指定元素的值。

  • 如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常;
  • 如果指定位置元素与新值不一致,则创建新数组、在新数组上修改,最后设置新数组到 array(COW)。
  • 即使没有变化,也还是需要重新设置一次 array,这主要是因为 array 本身是 volatile 的,set 方法应当提供 volatile 的语义。

remove(int index)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E remove(int index) {

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {

//获取数组
Object[] elements = getArray();
int len = elements.length;

//获取指定元素
E oldValue = get(elements, index);
int numMoved = len - index - 1;

//如果要删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
//分两次拷贝除删除后的元素到新数组
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//使用新数组代替老的
setArray(newElements);
}
return oldValue;
} finally {
//释放锁
lock.unlock();
}
}

remove(Object o)

remove(Object o, Object[] snapshot, int index)

iterator

CopyOnWriteArrayList 中的 iterator 是弱一致性的,其他线程的修改操作对 iterator 不可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

static final class COWIterator<E> implements ListIterator<E> {
//array的快照版本
private final Object[] snapshot;

//数组下标
private int cursor;

//构造函数
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

//是否遍历结束
public boolean hasNext() {
return cursor < snapshot.length;
}

//获取元素
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
  • 如果在该线程使用返回的迭代器遍历元素的过程中,其它线程没有对 list 进行增删改,那么 snapshot 本身就是 list 的 array,因为它们是引用关系。
  • 如果在遍历期间存在其他线程对 list 的增删改操作,那么 snapshot 会成为原 array 的快照,此时其他线程对 list 进行的增删改是不可见的,因为它们操作的是两个不同的数组。

无锁队列

ConcurrentLinkedQueue

  • 线程安全
  • 无界
  • 非阻塞

数据结构

ConcurrentLinkedQueue类图

  • 底层队列使用单向链表实现。
  • 两个volatile的Node节点(head和tail)分别存放队列的首尾节点,从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
    1
    2
    3
    public ConcurrentLinkedQueue() {
    head = tail = new Node(null);
    }
  • Node节点内部为一个volatile修饰的变量item用来存放节点的值,next用来存放链表的下一个节点,从而链接成一个单向无界链表,如下图所示:
    ConcurrentLinkedQueue的队列结构
  • 入队和出队操作使用CAS来实现线程安全。

入队 - offer

offer操作在队列末尾添加一个元素:

  • 如果传入的是null,抛出NPE,表明ConcurrentLinkedQueue是不允许插入null值的;
  • 其他情况下插入任何元素都会返回true,因为该队列是无界队列;
  • 使用CAS操作实现线程安全
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    // 从尾节点进行插入
    for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;

    // 如果q==null说明p是尾节点,则执行插入
    // (1)
    if (q == null) {
    // 使用CAS设置p节点的next节点
    if (p.casNext(null, newNode)) {
    // cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
    if (p != t)
    casTail(t, newNode); // Failure is OK.
    return true;
    }
    }
    // (2)
    else if (p == q)
    // 多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
    // 重新找新的head,因为新的head后面的节点才是正常的节点。
    p = (t != (t = tail)) ? t : head;
    // (3)
    else
    // 寻找尾节点
    p = (p != t && t != (t = tail)) ? t : q;
    }
    }
    注意这里的循环体从队列尾部添加元素:
  1. 刚开始队列为空,代码(1)通过CAS替换p的下一个节点;
    注意有一个哨兵节点null,刚开始队列的head和tail节点都是指向该哨兵节点,因此队列中至少都会有一个节点;
  2. 如果多个线程同时执行插入,总会有一个线程CAS时插入失败,这时会进入下一次循环
    ConcurrentLinkedQueue插入1
    这时不满足(1)和(2)的条件,在代码(3)处会将q赋值给p
    ConcurrentLinkedQueue插入2
    再到下一次循环时q就会移动到null,这时要么正常插入,要么又被别人通过CAS抢了。
  3. 代码(2)是在执行poll时可能出现的情况:
    ConcurrentLinkedQueue插入3
    此时由于t==tail,所以p被赋值为head,然后继续循环插入元素。

出队 - poll

poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E poll() {
// goto标记
restartFromHead:

// (1)无限循环
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取当前队头节点
E item = p.item;

// (2)当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//(6)cas成功标志当前节点以及从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// (3)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// (4)自引用了,则重新找新的队列头节点
else if (p == q)
continue restartFromHead;
// (5)
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
  1. 刚开始队列是空的,内层循环代码(3)判断队列为空就直接返回null了;
    这时updateHead执行时由于h等于p所以没有设置头节点,poll直接返回null。
  2. 如果执行到(3)时已经有其他线程调用了offer方法成功添加一个元素到队列末尾,这时q会指向新增元素的节点
    ConcurrentLinkedQueue出队1
    这时会进入(5),令p也指向新q。
    然后在下一次循环时,进入代码(2),执行p.casItem(item, null)时会通过CAS操作设置头节点的值为null。
    代码(6)处,此时h指向哨兵节点,而p指向队列头节点,这时将p设置为新的头节点(这时p里的值已经被清掉了是一个空节点)。
    此时队列的状态为:
    ConcurrentLinkedQueue插入3
    这就是之前讲队列offer时的一种特殊情况。
  3. 自引用的情况
    假设线程A已经执行到(2)将第一个节点值置为null,这时又有一个线程B开始执行poll操作,如下图所示:
    ConcurrentLinkedQueue出队2
    然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
    ConcurrentLinkedQueue出队3
    然后线程 B 继续执行代码(3)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(4)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
    ConcurrentLinkedQueue出队4

ArrayBlockingQueue

offer是不会阻塞的,如果满了直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果已经超过容量阈值,则直接返回false
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 把它看成一个循环数组,如果超出范围就卷回
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒这个Condition,必须是在加了锁的前提下才能使用
notEmpty.signal();
}

poll同样也不会阻塞,如果空了直接返回null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 回卷
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒
notFull.signal();
return x;
}

put操作会等待notFull这个条件:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

take操作同理,会等待notEmpty这个条件:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

PriorityBlockingQueue

SynchronousQueue

LinkedTransferQueue

队列比较

Disruptor

  • 无锁内存队列

  • 优化 CPU 伪共享

  • RingBuffer
    环形队列,使用定长数组存储,长度是 2^N,可以使用位运算提升性能。
    无锁:无锁设计减少了竞争。
    预热:预先填充好任务/事件,不需要像链表那样每次添加/删除节点时去创建/回收节点,从而可以避免一定的垃圾回收。
    缓存行填充解决了 CPU 伪共享问题。

  • WorkPool
    存储 WorkProcessor 的池子,Disruptor 可以通过 Executor 并发启动每一个 WorkProcessor

  • WorkProcessor
    从 RindBuffer 消费事件/任务,并交由 WorkHandler 处理。

  • WorkHandler
    处理任务的工作者,根据任务类型委托给不同的 EventHandler。

Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用

异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务写入磁盘,对用户线程来说,耗时只有将数据写入队列中。

并发安全 Map

ConcurrentHashMap

ConcurrentHashMap结构

get

代码:java.util.concurrent.ConcurrentHashMap.get

  1. 计算 key 的散列值,可以使用该散列值定位到散列表中的某个槽。
    如果 key 是自定义类型对象,需要实现重写 hash 方法。
  2. 找到对象
    hash 值是不精确匹配的,hash 值的关键是计算简单而且有一定的区分度,比如取 string 的前 3 位的和作为 hash 值。
    要精确匹配需要使用对象的 equals 方法。
    ConcurrentHashMap 中哈希槽的实现方法有两种:链表和红黑树,链表和红黑树的查找过程就不必细说了。

put

ConcurrentHashMap的put操作
代码:java.util.concurrent.ConcurrentHashMap.put

  1. hash
  2. 找对象
    找对象过程与 get 的区别主要是 put 需要并发控制:
    • 如果槽是空的,则通过 CAS 直接赋值;
    • 如果槽非空,则先用synchronized锁住槽,接下来根据槽的数据结构来插入节点,如果槽是链表,则遍历链表找该 Node 是否已存在,不存在的情况下插入到末尾,如果槽是红黑树,则通过二叉树的遍历找目标 Node,找不到的情况下插入到叶子并重新执行红黑平衡。

rehash

扩容的触发条件与HashMap一致。
扩容流程大致上是:遍历哈希槽,对每个需要迁移的哈希槽进行synchronized加锁。
当扩容开始后,其他线程必须等扩容完成后才能工作,但其他线程也不是就一直阻塞等扩容完成,而是调用helpTransfer方法一起帮助进行扩容,实际上因为扩容的单位是哈希槽,因此多线程并发执行扩容并不会导致明显的冲突增加。

扩容入口:

  1. helpTransfer
    写入操作时协助扩容,即判断hash节点是ForwardingNode则调用helpTransfer将
  2. 全量添加时,需要保证

扩容代码:
java.util.concurrent.ConcurrentHashMap.transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 创建一个新的两倍大小的新nextTab,将老tab中的元素迁移过去
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 标记节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果tab当前位置为null,则设置fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 已经是fwd节点,则遍历下一个位置
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// tab当前位置已有节点,则加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 表示链表节点,如果是树节点则fh=-2
if (fh >= 0) {
// 头节点的hash值
int runBit = fh & n;
Node<K,V> lastRun = f;
// 链表的下一节点p
for (Node<K,V> p = f.next; p != null; p = p.next) {
// p节点的hash值
int b = p.hash & n;
// 避免成环?
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将ln和hn转移到nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
原tab置为fwd,表示已经被转移了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

size

size操作返回的是一个不精确的值,因为进行统计的过程中,很有可能会有其他线程正在进行插入和删除操作。

1.8之前的size:

  1. 遍历segments数组,将每个segment的count加起来作为总数,将modCount加起来作为修改总数;
    modCount会在每次segment被修改时+1(只增不减),用于比较。
  2. 再做一遍遍历,将这次的modCount总数和上一次的比较,如果一致则计数准确直接返回,否则重试;
  3. 如果重试了2次都不行,则第三次会对segment加锁再统计。

1.8之后,没有了分段锁,size不会每次都遍历segments统计,而是在更新时修改总数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

从源码中可以看到,ConcurrentHashMap#size的结果就是:
baseCount + sum(counterCells)
其中:

  • baseCount:计数,总数发生变化时通过CAS修改
  • counterCells:如果baseCount CAS修改失败,作为兜底,类似LongAdder的思路。

put操作的末尾会调用addCount()更新baseCount的值,如果CAS修改失败了,则使用counterCells,如果CAS修改 counterCells失败了,则使用fullAddCount方法继续死循环操作,直到成功。

QA

  1. JUC 并发包中并发组件 CopyOnWriteArrayList 的实现原理,CopyOnWriteArrayList 是如何通过写时拷贝实现并发安全的 List?

  2. 什么是弱一致性?

  3. 说一下 ConcurrentHashMap。

  4. ConcurrentHashMap 怎么实现并发安全?
    相对 Hashtable 来说 ConcurrentHashMap 的锁粒度是更小的,Hashtable 中使用 synchronized 实现的一种方法级的悲观锁,相当于把整个散列表锁住了,不利于系统整体吞吐量的提升。
    JDK1.7 中它使用的是一种分段锁来保证并发安全,是一种粒度较小的锁,写操作每次只锁住一个哈希槽,
    JDK1.8 之后改为通过实现一种基于 CAS 的乐观锁来保证并发安全,当然,和 HashMap 一样,每个哈希槽在增长到一定程度后会自动转换为红黑树。

参考

  1. 【目录】JUC 集合框架目录
  2. 并发框架 Disruptor 译文