并发和并发安全容器
无锁栈
Treiber Stack
无锁列表
CopyOnWriteArrayList
CopyOnWriteArrayList 是一个线程安全的 ArrayList,对其进行的修改操作和元素迭代操作都是在底层创建一个拷贝的数组(快照)上进行的,也就是写时拷贝策略。CopyOnWriteArrayList 适合读多写少的场景,但如果应用在写操作频繁的场景下反而会降低性能。
- lock:保证写操作时的并发安全;
add(E e)
添加操作拷贝了份快照,在快照上添加元素,最后替代原数组
1 | public boolean add(E e) { |
get(int index)
get 操作获取下标处的元素,实际上 get 可以被分解为以下两个步骤:
- 获取 array 的引用;
- 通过下标访问 array 指定位置的元素。
整个过程并没有加锁,如果在访问期间有另一个线程删除了某个元素,实际上因为修改操作是发生在原数组的一个快照上的,get 操作仍然获取的是原数组上的元素,因此不会发生类似数组越界的问题。但同时也不可避免这个过程带来的弱一致性,因为元素事实上已经被删除了却仍然可以被访问到。
set(int index, E element)
修改 list 中指定元素的值。
- 如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常;
- 如果指定位置元素与新值不一致,则创建新数组、在新数组上修改,最后设置新数组到 array(COW)。
- 即使没有变化,也还是需要重新设置一次 array,这主要是因为 array 本身是 volatile 的,set 方法应当提供 volatile 的语义。
remove(int index)
1 | public E remove(int index) { |
remove(Object o)
remove(Object o, Object[] snapshot, int index)
iterator
CopyOnWriteArrayList 中的 iterator 是弱一致性的,其他线程的修改操作对 iterator 不可见的。
1 | public Iterator<E> iterator() { |
- 如果在该线程使用返回的迭代器遍历元素的过程中,其它线程没有对 list 进行增删改,那么 snapshot 本身就是 list 的 array,因为它们是引用关系。
- 如果在遍历期间存在其他线程对 list 的增删改操作,那么 snapshot 会成为原 array 的快照,此时其他线程对 list 进行的增删改是不可见的,因为它们操作的是两个不同的数组。
无锁队列
ConcurrentLinkedQueue
- 线程安全
- 无界
- 非阻塞
数据结构
- 底层队列使用单向链表实现。
- 两个volatile的Node节点(head和tail)分别存放队列的首尾节点,从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
1
2
3public ConcurrentLinkedQueue() {
head = tail = new Node(null);
} - Node节点内部为一个volatile修饰的变量item用来存放节点的值,next用来存放链表的下一个节点,从而链接成一个单向无界链表,如下图所示:
- 入队和出队操作使用CAS来实现线程安全。
入队 - offer
offer操作在队列末尾添加一个元素:
- 如果传入的是null,抛出NPE,表明ConcurrentLinkedQueue是不允许插入null值的;
- 其他情况下插入任何元素都会返回true,因为该队列是无界队列;
- 使用CAS操作实现线程安全注意这里的循环体从队列尾部添加元素:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 从尾节点进行插入
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果q==null说明p是尾节点,则执行插入
// (1)
if (q == null) {
// 使用CAS设置p节点的next节点
if (p.casNext(null, newNode)) {
// cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
if (p != t)
casTail(t, newNode); // Failure is OK.
return true;
}
}
// (2)
else if (p == q)
// 多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
// 重新找新的head,因为新的head后面的节点才是正常的节点。
p = (t != (t = tail)) ? t : head;
// (3)
else
// 寻找尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
- 刚开始队列为空,代码(1)通过CAS替换p的下一个节点;
注意有一个哨兵节点null,刚开始队列的head和tail节点都是指向该哨兵节点,因此队列中至少都会有一个节点; - 如果多个线程同时执行插入,总会有一个线程CAS时插入失败,这时会进入下一次循环
这时不满足(1)和(2)的条件,在代码(3)处会将q赋值给p
再到下一次循环时q就会移动到null,这时要么正常插入,要么又被别人通过CAS抢了。 - 代码(2)是在执行poll时可能出现的情况:
此时由于t==tail,所以p被赋值为head,然后继续循环插入元素。
出队 - poll
poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null。
1 | public E poll() { |
- 刚开始队列是空的,内层循环代码(3)判断队列为空就直接返回null了;
这时updateHead执行时由于h等于p所以没有设置头节点,poll直接返回null。 - 如果执行到(3)时已经有其他线程调用了offer方法成功添加一个元素到队列末尾,这时q会指向新增元素的节点
这时会进入(5),令p也指向新q。
然后在下一次循环时,进入代码(2),执行p.casItem(item, null)
时会通过CAS操作设置头节点的值为null。
代码(6)处,此时h指向哨兵节点,而p指向队列头节点,这时将p设置为新的头节点(这时p里的值已经被清掉了是一个空节点)。
此时队列的状态为:
这就是之前讲队列offer时的一种特殊情况。 - 自引用的情况
假设线程A已经执行到(2)将第一个节点值置为null,这时又有一个线程B开始执行poll操作,如下图所示:
然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
然后线程 B 继续执行代码(3)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(4)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
ArrayBlockingQueue
offer是不会阻塞的,如果满了直接返回:
1 | public boolean offer(E e) { |
poll同样也不会阻塞,如果空了直接返回null:
1 | public E poll() { |
put操作会等待notFull这个条件:
1 | public void put(E e) throws InterruptedException { |
take操作同理,会等待notEmpty这个条件:
1 | public E take() throws InterruptedException { |
LinkedBlockingQueue
LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。
PriorityBlockingQueue
SynchronousQueue
LinkedTransferQueue
队列比较
Disruptor
无锁内存队列
优化 CPU 伪共享
RingBuffer
环形队列,使用定长数组存储,长度是 2^N,可以使用位运算提升性能。
无锁:无锁设计减少了竞争。
预热:预先填充好任务/事件,不需要像链表那样每次添加/删除节点时去创建/回收节点,从而可以避免一定的垃圾回收。
缓存行填充解决了 CPU 伪共享问题。WorkPool
存储 WorkProcessor 的池子,Disruptor 可以通过 Executor 并发启动每一个 WorkProcessorWorkProcessor
从 RindBuffer 消费事件/任务,并交由 WorkHandler 处理。WorkHandler
处理任务的工作者,根据任务类型委托给不同的 EventHandler。
Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用
异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务写入磁盘,对用户线程来说,耗时只有将数据写入队列中。
并发安全 Map
ConcurrentHashMap
get
代码:java.util.concurrent.ConcurrentHashMap.get
- 计算 key 的散列值,可以使用该散列值定位到散列表中的某个槽。
如果 key 是自定义类型对象,需要实现重写 hash 方法。 - 找到对象
hash 值是不精确匹配的,hash 值的关键是计算简单而且有一定的区分度,比如取 string 的前 3 位的和作为 hash 值。
要精确匹配需要使用对象的 equals 方法。
ConcurrentHashMap 中哈希槽的实现方法有两种:链表和红黑树,链表和红黑树的查找过程就不必细说了。
put
代码:java.util.concurrent.ConcurrentHashMap.put
- hash
- 找对象
找对象过程与 get 的区别主要是 put 需要并发控制:- 如果槽是空的,则通过 CAS 直接赋值;
- 如果槽非空,则先用
synchronized
锁住槽,接下来根据槽的数据结构来插入节点,如果槽是链表,则遍历链表找该 Node 是否已存在,不存在的情况下插入到末尾,如果槽是红黑树,则通过二叉树的遍历找目标 Node,找不到的情况下插入到叶子并重新执行红黑平衡。
rehash
扩容的触发条件与HashMap一致。
扩容流程大致上是:遍历哈希槽,对每个需要迁移的哈希槽进行synchronized
加锁。
当扩容开始后,其他线程必须等扩容完成后才能工作,但其他线程也不是就一直阻塞等扩容完成,而是调用helpTransfer
方法一起帮助进行扩容,实际上因为扩容的单位是哈希槽,因此多线程并发执行扩容并不会导致明显的冲突增加。
扩容入口:
- helpTransfer
写入操作时协助扩容,即判断hash节点是ForwardingNode则调用helpTransfer将 - 全量添加时,需要保证
扩容代码:java.util.concurrent.ConcurrentHashMap.transfer
1 | /** |
size
size操作返回的是一个不精确的值,因为进行统计的过程中,很有可能会有其他线程正在进行插入和删除操作。
1.8之前的size:
- 遍历segments数组,将每个segment的count加起来作为总数,将modCount加起来作为修改总数;
modCount会在每次segment被修改时+1(只增不减),用于比较。 - 再做一遍遍历,将这次的modCount总数和上一次的比较,如果一致则计数准确直接返回,否则重试;
- 如果重试了2次都不行,则第三次会对segment加锁再统计。
1.8之后,没有了分段锁,size不会每次都遍历segments统计,而是在更新时修改总数:
1 | public int size() { |
从源码中可以看到,ConcurrentHashMap#size
的结果就是:baseCount + sum(counterCells)
其中:
- baseCount:计数,总数发生变化时通过CAS修改
- counterCells:如果baseCount CAS修改失败,作为兜底,类似LongAdder的思路。
put操作的末尾会调用addCount()更新baseCount的值,如果CAS修改失败了,则使用counterCells,如果CAS修改 counterCells失败了,则使用fullAddCount方法继续死循环操作,直到成功。
QA
JUC 并发包中并发组件 CopyOnWriteArrayList 的实现原理,CopyOnWriteArrayList 是如何通过写时拷贝实现并发安全的 List?
什么是弱一致性?
说一下 ConcurrentHashMap。
ConcurrentHashMap 怎么实现并发安全?
相对 Hashtable 来说 ConcurrentHashMap 的锁粒度是更小的,Hashtable 中使用 synchronized 实现的一种方法级的悲观锁,相当于把整个散列表锁住了,不利于系统整体吞吐量的提升。
JDK1.7 中它使用的是一种分段锁来保证并发安全,是一种粒度较小的锁,写操作每次只锁住一个哈希槽,
JDK1.8 之后改为通过实现一种基于 CAS 的乐观锁来保证并发安全,当然,和 HashMap 一样,每个哈希槽在增长到一定程度后会自动转换为红黑树。