Tallate

该吃吃该喝喝 啥事别往心里搁

无锁栈

Treiber Stack

无锁列表

CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全的 ArrayList,对其进行的修改操作和元素迭代操作都是在底层创建一个拷贝的数组(快照)上进行的,也就是写时拷贝策略。CopyOnWriteArrayList 适合读多写少的场景,但如果应用在写操作频繁的场景下反而会降低性能。
CopyOnWriteArrayList类图

  • lock:保证写操作时的并发安全;

add(E e)

添加操作拷贝了份快照,在快照上添加元素,最后替代原数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {

// 加独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取array
Object[] elements = getArray();

// 拷贝array到新数组,添加元素到新数组
// 新数组长度是原数组长度+1,可见CopyOnWriteArrayList是无界的
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;

// 使用新数组替换添加前的数组
setArray(newElements);
return true;
} finally {
// 释放独占锁
lock.unlock();
}
}

get(int index)

get 操作获取下标处的元素,实际上 get 可以被分解为以下两个步骤:

  1. 获取 array 的引用;
  2. 通过下标访问 array 指定位置的元素。

整个过程并没有加锁,如果在访问期间有另一个线程删除了某个元素,实际上因为修改操作是发生在原数组的一个快照上的,get 操作仍然获取的是原数组上的元素,因此不会发生类似数组越界的问题。但同时也不可避免这个过程带来的弱一致性,因为元素事实上已经被删除了却仍然可以被访问到。

set(int index, E element)

修改 list 中指定元素的值。

  • 如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常;
  • 如果指定位置元素与新值不一致,则创建新数组、在新数组上修改,最后设置新数组到 array(COW)。
  • 即使没有变化,也还是需要重新设置一次 array,这主要是因为 array 本身是 volatile 的,set 方法应当提供 volatile 的语义。

remove(int index)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E remove(int index) {

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {

//获取数组
Object[] elements = getArray();
int len = elements.length;

//获取指定元素
E oldValue = get(elements, index);
int numMoved = len - index - 1;

//如果要删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
//分两次拷贝除删除后的元素到新数组
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//使用新数组代替老的
setArray(newElements);
}
return oldValue;
} finally {
//释放锁
lock.unlock();
}
}

remove(Object o)

remove(Object o, Object[] snapshot, int index)

iterator

CopyOnWriteArrayList 中的 iterator 是弱一致性的,其他线程的修改操作对 iterator 不可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

static final class COWIterator<E> implements ListIterator<E> {
//array的快照版本
private final Object[] snapshot;

//数组下标
private int cursor;

//构造函数
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

//是否遍历结束
public boolean hasNext() {
return cursor < snapshot.length;
}

//获取元素
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
  • 如果在该线程使用返回的迭代器遍历元素的过程中,其它线程没有对 list 进行增删改,那么 snapshot 本身就是 list 的 array,因为它们是引用关系。
  • 如果在遍历期间存在其他线程对 list 的增删改操作,那么 snapshot 会成为原 array 的快照,此时其他线程对 list 进行的增删改是不可见的,因为它们操作的是两个不同的数组。

无锁队列

ConcurrentLinkedQueue

  • 线程安全
  • 无界
  • 非阻塞

数据结构

ConcurrentLinkedQueue类图

  • 底层队列使用单向链表实现。
  • 两个volatile的Node节点(head和tail)分别存放队列的首尾节点,从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
    1
    2
    3
    public ConcurrentLinkedQueue() {
    head = tail = new Node(null);
    }
  • Node节点内部为一个volatile修饰的变量item用来存放节点的值,next用来存放链表的下一个节点,从而链接成一个单向无界链表,如下图所示:
    ConcurrentLinkedQueue的队列结构
  • 入队和出队操作使用CAS来实现线程安全。

入队 - offer

offer操作在队列末尾添加一个元素:

  • 如果传入的是null,抛出NPE,表明ConcurrentLinkedQueue是不允许插入null值的;
  • 其他情况下插入任何元素都会返回true,因为该队列是无界队列;
  • 使用CAS操作实现线程安全
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    // 从尾节点进行插入
    for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;

    // 如果q==null说明p是尾节点,则执行插入
    // (1)
    if (q == null) {
    // 使用CAS设置p节点的next节点
    if (p.casNext(null, newNode)) {
    // cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
    if (p != t)
    casTail(t, newNode); // Failure is OK.
    return true;
    }
    }
    // (2)
    else if (p == q)
    // 多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
    // 重新找新的head,因为新的head后面的节点才是正常的节点。
    p = (t != (t = tail)) ? t : head;
    // (3)
    else
    // 寻找尾节点
    p = (p != t && t != (t = tail)) ? t : q;
    }
    }
    注意这里的循环体从队列尾部添加元素:
  1. 刚开始队列为空,代码(1)通过CAS替换p的下一个节点;
    注意有一个哨兵节点null,刚开始队列的head和tail节点都是指向该哨兵节点,因此队列中至少都会有一个节点;
  2. 如果多个线程同时执行插入,总会有一个线程CAS时插入失败,这时会进入下一次循环
    ConcurrentLinkedQueue插入1
    这时不满足(1)和(2)的条件,在代码(3)处会将q赋值给p
    ConcurrentLinkedQueue插入2
    再到下一次循环时q就会移动到null,这时要么正常插入,要么又被别人通过CAS抢了。
  3. 代码(2)是在执行poll时可能出现的情况:
    ConcurrentLinkedQueue插入3
    此时由于t==tail,所以p被赋值为head,然后继续循环插入元素。

出队 - poll

poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E poll() {
// goto标记
restartFromHead:

// (1)无限循环
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取当前队头节点
E item = p.item;

// (2)当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//(6)cas成功标志当前节点以及从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// (3)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// (4)自引用了,则重新找新的队列头节点
else if (p == q)
continue restartFromHead;
// (5)
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
  1. 刚开始队列是空的,内层循环代码(3)判断队列为空就直接返回null了;
    这时updateHead执行时由于h等于p所以没有设置头节点,poll直接返回null。
  2. 如果执行到(3)时已经有其他线程调用了offer方法成功添加一个元素到队列末尾,这时q会指向新增元素的节点
    ConcurrentLinkedQueue出队1
    这时会进入(5),令p也指向新q。
    然后在下一次循环时,进入代码(2),执行p.casItem(item, null)时会通过CAS操作设置头节点的值为null。
    代码(6)处,此时h指向哨兵节点,而p指向队列头节点,这时将p设置为新的头节点(这时p里的值已经被清掉了是一个空节点)。
    此时队列的状态为:
    ConcurrentLinkedQueue插入3
    这就是之前讲队列offer时的一种特殊情况。
  3. 自引用的情况
    假设线程A已经执行到(2)将第一个节点值置为null,这时又有一个线程B开始执行poll操作,如下图所示:
    ConcurrentLinkedQueue出队2
    然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
    ConcurrentLinkedQueue出队3
    然后线程 B 继续执行代码(3)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(4)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
    ConcurrentLinkedQueue出队4

ArrayBlockingQueue

offer是不会阻塞的,如果满了直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果已经超过容量阈值,则直接返回false
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 把它看成一个循环数组,如果超出范围就卷回
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒这个Condition,必须是在加了锁的前提下才能使用
notEmpty.signal();
}

poll同样也不会阻塞,如果空了直接返回null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 回卷
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒
notFull.signal();
return x;
}

put操作会等待notFull这个条件:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

take操作同理,会等待notEmpty这个条件:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

PriorityBlockingQueue

SynchronousQueue

LinkedTransferQueue

队列比较

Disruptor

  • 无锁内存队列

  • 优化 CPU 伪共享

  • RingBuffer
    环形队列,使用定长数组存储,长度是 2^N,可以使用位运算提升性能。
    无锁:无锁设计减少了竞争。
    预热:预先填充好任务/事件,不需要像链表那样每次添加/删除节点时去创建/回收节点,从而可以避免一定的垃圾回收。
    缓存行填充解决了 CPU 伪共享问题。

  • WorkPool
    存储 WorkProcessor 的池子,Disruptor 可以通过 Executor 并发启动每一个 WorkProcessor

  • WorkProcessor
    从 RindBuffer 消费事件/任务,并交由 WorkHandler 处理。

  • WorkHandler
    处理任务的工作者,根据任务类型委托给不同的 EventHandler。

Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用

异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务写入磁盘,对用户线程来说,耗时只有将数据写入队列中。

并发安全 Map

ConcurrentHashMap

ConcurrentHashMap结构

get

代码:java.util.concurrent.ConcurrentHashMap.get

  1. 计算 key 的散列值,可以使用该散列值定位到散列表中的某个槽。
    如果 key 是自定义类型对象,需要实现重写 hash 方法。
  2. 找到对象
    hash 值是不精确匹配的,hash 值的关键是计算简单而且有一定的区分度,比如取 string 的前 3 位的和作为 hash 值。
    要精确匹配需要使用对象的 equals 方法。
    ConcurrentHashMap 中哈希槽的实现方法有两种:链表和红黑树,链表和红黑树的查找过程就不必细说了。

put

ConcurrentHashMap的put操作
代码:java.util.concurrent.ConcurrentHashMap.put

  1. hash
  2. 找对象
    找对象过程与 get 的区别主要是 put 需要并发控制:
    • 如果槽是空的,则通过 CAS 直接赋值;
    • 如果槽非空,则先用synchronized锁住槽,接下来根据槽的数据结构来插入节点,如果槽是链表,则遍历链表找该 Node 是否已存在,不存在的情况下插入到末尾,如果槽是红黑树,则通过二叉树的遍历找目标 Node,找不到的情况下插入到叶子并重新执行红黑平衡。

rehash

扩容的触发条件与HashMap一致。
扩容流程大致上是:遍历哈希槽,对每个需要迁移的哈希槽进行synchronized加锁。
当扩容开始后,其他线程必须等扩容完成后才能工作,但其他线程也不是就一直阻塞等扩容完成,而是调用helpTransfer方法一起帮助进行扩容,实际上因为扩容的单位是哈希槽,因此多线程并发执行扩容并不会导致明显的冲突增加。

扩容入口:

  1. helpTransfer
    写入操作时协助扩容,即判断hash节点是ForwardingNode则调用helpTransfer将
  2. 全量添加时,需要保证

扩容代码:
java.util.concurrent.ConcurrentHashMap.transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 创建一个新的两倍大小的新nextTab,将老tab中的元素迁移过去
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 标记节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果tab当前位置为null,则设置fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 已经是fwd节点,则遍历下一个位置
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// tab当前位置已有节点,则加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 表示链表节点,如果是树节点则fh=-2
if (fh >= 0) {
// 头节点的hash值
int runBit = fh & n;
Node<K,V> lastRun = f;
// 链表的下一节点p
for (Node<K,V> p = f.next; p != null; p = p.next) {
// p节点的hash值
int b = p.hash & n;
// 避免成环?
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将ln和hn转移到nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
原tab置为fwd,表示已经被转移了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

size

size操作返回的是一个不精确的值,因为进行统计的过程中,很有可能会有其他线程正在进行插入和删除操作。

1.8之前的size:

  1. 遍历segments数组,将每个segment的count加起来作为总数,将modCount加起来作为修改总数;
    modCount会在每次segment被修改时+1(只增不减),用于比较。
  2. 再做一遍遍历,将这次的modCount总数和上一次的比较,如果一致则计数准确直接返回,否则重试;
  3. 如果重试了2次都不行,则第三次会对segment加锁再统计。

1.8之后,没有了分段锁,size不会每次都遍历segments统计,而是在更新时修改总数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

从源码中可以看到,ConcurrentHashMap#size的结果就是:
baseCount + sum(counterCells)
其中:

  • baseCount:计数,总数发生变化时通过CAS修改
  • counterCells:如果baseCount CAS修改失败,作为兜底,类似LongAdder的思路。

put操作的末尾会调用addCount()更新baseCount的值,如果CAS修改失败了,则使用counterCells,如果CAS修改 counterCells失败了,则使用fullAddCount方法继续死循环操作,直到成功。

QA

  1. JUC 并发包中并发组件 CopyOnWriteArrayList 的实现原理,CopyOnWriteArrayList 是如何通过写时拷贝实现并发安全的 List?

  2. 什么是弱一致性?

  3. 说一下 ConcurrentHashMap。

  4. ConcurrentHashMap 怎么实现并发安全?
    相对 Hashtable 来说 ConcurrentHashMap 的锁粒度是更小的,Hashtable 中使用 synchronized 实现的一种方法级的悲观锁,相当于把整个散列表锁住了,不利于系统整体吞吐量的提升。
    JDK1.7 中它使用的是一种分段锁来保证并发安全,是一种粒度较小的锁,写操作每次只锁住一个哈希槽,
    JDK1.8 之后改为通过实现一种基于 CAS 的乐观锁来保证并发安全,当然,和 HashMap 一样,每个哈希槽在增长到一定程度后会自动转换为红黑树。

参考

  1. 【目录】JUC 集合框架目录
  2. 并发框架 Disruptor 译文

并发安全容器(Queue)

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层数据结构使用单向链表实现,入队和出队操作使用 CAS 来实现线程安全。
2.1 ConcurrentLinkedQueue 类图结构
先简单介绍下 ConcurrentLinkedQueue 的类图结构如下图:
image.png
如上类图 ConcurrentLinkedQueue 内部的队列是使用单向链表方式实现,其中两个 volatile 类型的 Node 节点分别用来存放队列的首尾节点。从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
Node 节点内部则维护一个 volatile 修饰的变量 item 用来存放节点的值,next 用来存放链表的下一个节点,从而链接为一个单向无界链表。
首先一个图来概况该队列构成,读者可以读完本节后在回头体会这个图:image.png
2.2 ConcurrentLinkedQueue 原理介绍
本节主要介绍 ConcurrentLinkedQueue 的几个主要的方法的实现原理
2.2.1 offer 操作
offer 操作是在队列末尾添加一个元素,如果传递的参数是 null 则抛出 NPE 异常,否者由于 ConcurrentLinkedQueue 是无界队列该方法一直会返回 true。另外由于使用 CAS 无阻塞算法,该方法不会阻塞调用线程,下面具体看看实现原理。
public boolean offer(E e) {
//(1)e为null则抛出空指针异常
checkNotNull(e);

//(2)构造Node节点
final Node newNode = new Node(e);

//(3)从尾节点进行插入
for (Node<E> t = tail, p = t;;) {

    Node<E> q = p.next;

    //(4)如果q==null说明p是尾节点,则执行插入
    if (q == null) {

        //(5)使用CAS设置p节点的next节点
        if (p.casNext(null, newNode)) {
            //(6)cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
            if (p != t)
                casTail(t, newNode);  // Failure is OK.
            return true;
        }
    }
    else if (p == q)//(7)
        //多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
        //重新找新的head,因为新的head后面的节点才是正常的节点。
        p = (t != (t = tail)) ? t : head;
    else
        //(8) 寻找尾节点
        p = (p != t && t != (t = tail)) ? t : q;
}

}
上节类图结构时候谈到构造队列时候参构造函数创建了一个 item 为 null 的哨兵节点,并且 head 和 tail 都是指向这个节点,下面通过图形结合来讲解下 offer 操作的代码实现。
首先看下当一个线程调用 offer(item)时候情况:首先代码(1)对传参判断空检查,如果为 null 则抛出 NPE 异常,然后代码(2)则使用 item 作为构造函数参数创建了一个新的节点,代码(3)从队列尾部节点开始循环,意图是从队列尾部添加元素。
image.png
上图是执行代码(4)时候队列的情况,这时候节点 p,t,head,tail 同时指向了 item 为 null 的哨兵节点,由于哨兵节点的 next 节点为 null, 所以这里 q 指向也是 null。
代码(4)发现q==null则执行代码(5)通过 CAS 原子操作判断 p 节点的 next 节点是否为 null,如果为 null 则使用节点 newNode 替换 p 的 next 节点,然后执行代码(6)由于p==t所以没有设置尾部节点,然后退出 offer 方法,这时候队列的状态图如下:
image.png
上面讲解的是一个线程调用 offer 方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5),假设线程 A 调用 offer(item1), 线程 B 调用 offer(item2), 线程 A 和 B 同时执行到 p.casNext(null, newNode)。而 CAS 的比较并设置操作是原子性的,假设线程 A 先执行了比较设置操作则发现当前 p 的 next 节点确实是 null 则会原子性更新 next 节点为 newNode,这时候线程 B 也会判断 p 的 next 节点是否为 null,结果发现不是 null(因为线程 A 已经设置了 p 的 next 为 newNode)则会跳到步骤(3),然后执行到步骤(4)时候队列分布图为:
image.png
根据这个状态图可知线程 B 会去执行代码(8),然后 q 赋值给了 p,这时候队列状态图为:
image.png
然后线程 B 再次跳转到代码(3)执行,当执行到代码(4)时候队列状态图为:
image.png
由于这时候 q==null, 所以线程 B 会执行步骤(5),通过 CAS 操作判断当前 p 的 next 节点是否是 null,不是则再次循环后尝试,是则使用 newNode 替换,假设 CAS 成功了,那么执行步骤(6)由于 p!=t 所以设置 tail 节点为 newNode,然后退出 offer 方法。这时候队列分布图为:
image.png
分析到现在,offer 代码的执行路径现在就差步骤(7)还没走过,其实这个要在执行 poll 操作后才会出现,这里先看下执行 poll 操作后可能会存在的的一种情况如下图:
image.png
下面分析下当队列处于这种状态时候调用 offer 添加元素代码执行到步骤(4)时候的状态图:
image.png
由于 q 节点不为空并且p==q所以执行步骤(7),由于t==tail所以 p 被赋值为了 head,然后进入循环,循环后执行到代码(4)时候队列状态为:
image.png
由于q==null, 所以执行步骤(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p 的 next 节点被设置为新增节点,然后执行步骤(6),由于p!=t所以设置新节点为队列为节点,现在队列状态如下:
image.png
这里自引用的节点会被垃圾回收掉。
总结:可见 offer 操作里面关键步骤是代码(5)通过原子 CAS 操作来进行控制同时只有一个线程可以追加元素到队列末尾,进行 cas 竞争失败的线程则会通过循环一次次尝试进行 cas 操作,直到 cas 成功才会返回,也就是通过使用无限循环里面不断进行 CAS 尝试方式来替代阻塞算法挂起调用线程,相比阻塞算法这是使用 CPU 资源换取阻塞所带来的开销。
2.2.2 poll 操作
poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null,下面看看实现原理。
public E poll() {
//(1) goto标记
restartFromHead:

//(2)无限循环
for (;;) {
    for (Node<E> h = head, p = h, q;;) {

        //(3)保存当前节点值
        E item = p.item;

        //(4)当前节点有值则cas变为null
        if (item != null && p.casItem(item, null)) {
            //(5)cas成功标志当前节点以及从链表中移除
            if (p != h) 
                updateHead(h, ((q = p.next) != null) ? q : p);
            return item;
        }
        //(6)当前队列为空则返回null
        else if ((q = p.next) == null) {
            updateHead(h, p);
            return null;
        }
        //(7)自引用了,则重新找新的队列头节点
        else if (p == q)
            continue restartFromHead;
        else//(8)
            p = q;
    }
}

}
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
同理本节也通过图形结合的方式来讲解代码执行逻辑:
poll 操作是从队头获取元素,所以代码(2)内层循环是从 head 节点开始迭代,代码(3)获取当前队头的节点,当队列一开始为空时候队列状态为:
image.png
由于 head 节点指向的为 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null 如下图:
image.png
所以执行 updateHead 方法,由于 h 等于 p 所以没有设置头结点,poll 方法直接返回 null。
假设执行到代码(6)时候已经有其它线程调用了 offer 方法成功添加一个元素到队列,这时候 q 指向的是新增元素的节点,这时候队列状态为:
image.png
所以代码(6)判断结果为 false,然后会转向代码(7)执行,而此时 p 不等于 q,所以转向代码(8)执行,执行结果是 p 指向了节点 q,此时队列状态为:
image.png
然后程序转向代码(3)执行,p 现在指向的元素值不为 null,则执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时没有其它线程进行 poll 操作,CAS 成功则执行代码(5)由于此时 p!=h 所以设置头结点为 p,poll 然后返回被从队列移除的节点值 item。此时队列状态为:
image.png
这个状态就是讲解 offer 操作时候,offer 代码的执行路径(7)执行的前提状态。
假如现在一个线程调用了 poll 操作,则在执行代码(4) 时候队列状态为:
image.png
可知这时候执行代码(6)返回 null.
现在 poll 的代码还有个分支(7)没有执行过,那么什么时候会执行那?下面来看看,假设线程 A 执行 poll 操作时候当前队列状态为:
image.png
那么执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null。
假设 CAS 设置成功则标示该节点从队列中移除了,此时队列状态为:
image.png
然后由于 p!=h, 所以会执行 updateHead 方法,假如线程 A 执行 updateHead 前另外一个线程 B 开始 poll 操作这时候线程 B 的 p 指向 head 节点,但是还没有执行到代码(6)这时候队列状态为:
image.png
然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
image.png
然后线程 B 继续执行代码(6)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(7)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
image.png
总结:poll 方法移除一个元素时候只是简单的使用 CAS 操作把当前节点的 item 值设置 null,然后通过重新设置头结点让该元素从队列里面摘除,被摘除的节点就成了孤立节点,这个节点会被在垃圾回收的时候会回收掉。另外执行分支中如果发现头节点被修改了要跳到外层循环重新获取新的头节点。
2.2.3 peek 操作
peek 操作是获取队列头部一个元素(只不获取不移除),如果队列为空则返回 null,下面看看实现原理。
public E peek() {
//(1)
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
//(2)
E item = p.item;
//(3)
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
//(4)
else if (p == q)
continue restartFromHead;
else
//(5)
p = q;
}
}
}
代码结构与 poll 操作类似,不同在于步骤(3)的使用只是少了 castItem 操作,其实这很正常,因为 peek 只是获取队列头元素值并不清空其值,根据前面我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次 peek 时候代码(3)中会发现 item==null, 然后会执行 q = p.next, 这时候 q 节点指向的才是队列里面第一个真正的元素或者如果队列为 null 则 q 指向 null。
当队列为空时候这时候队列状态为:
image.png
这时候执行 updateHead 由于 h 节点等于 p 节点所以不进行任何操作,然后 peek 操作会返回 null。
当队列至少有一个元素时候(这里假设只有一个)这时候队列状态为:
image.png
这时候执行代码(5)这时候 p 指向了 q 节点,然后执行代码(3)这时候队列状态为:
image.png
执行代码(3)发现 item 不为 null,则执行 updateHead 方法,由于 h!=p, 所以设置头结点,设置后队列状态为:
image.png
也就是剔除了哨兵节点。
总结:peek 操作代码与 poll 操作类似只是前者只获取队列头元素但是并不从队列里面删除,而后者获取后需要从队列里面删除,另外在第一次调用 peek 操作时候,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者 null。
2.2.4 size 操作
获取当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁所以从调用 size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。
public int size() {
int count = 0;
for (Node p = first(); p != null; p = succ(p))
if (p.item != null)
// 最大返回Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}

//获取第一个队列元素(哨兵元素不算),没有则为null
Node first() {
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

//获取当前节点的next元素,如果是自引入节点则返回真正头节点
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
2.2.5 remove 操作
如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回 true,否者返回 false
public boolean remove(Object o) {

//查找元素为空,直接返回false
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
    E item = p.item;

    //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。
    if (item != null &&
        o.equals(item) &&
        p.casItem(item, null)) {

        //获取next元素
        Node<E> next = succ(p);

        //如果有前驱节点,并且next不为空则链接前驱节点到next,
        if (pred != null && next != null)
            pred.casNext(p, next);
        return true;
    }
    pred = p;
}
return false;

}
注:ConcurrentLinkedQueue 底层使用单向链表数据结构来保存队列元素,每个元素被包装为了一个 Node 节点,队列是靠头尾节点来维护的,创建队列时候头尾节点指向一个 item 为 null 的哨兵节点,第一次 peek 或者 first 时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以获取 size 的时候有可能进行了 offer,poll 或者 remove 操作,导致获取的元素个数不精确,所以在并发情况下 size 函数不是很有用。

LinkedBlockingQueue

前面介绍了使用 CAS 算法实现的非阻塞队列 ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列 LinkedBlockingQueue 的实现
3.1 LinkedBlockingQueue 类图结构
同理首先看下 LinkedBlockingQueue 的类图结构
image.png
如上类图可知 LinkedBlockingQueue 也是使用单向链表实现,也有两个 Node 分别用来存放首尾节点,并且里面有个初始值为 0 的原子变量 count 用来记录队列元素个数。另外里面有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列获取元素,其它线程必须等待,putLock 控制同时只能有一个线程可以获取锁去添加元素,其它线程必须等待。另外 notEmpty 和 notFull 是信号量,内部分别有一个条件队列用来存放进队和出队时候被阻塞的线程,其实这个是个生产者 - 消费者模型。如下是独占锁创建代码:
/** 执行take, poll等操作时候需要获取该锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 当队列为空时候执行出队操作(比如take)的线程会被放入这个条件队列进行等待 */
private final Condition notEmpty = takeLock.newCondition();

/** 执行put, offer等操作时候需要获取该锁*/
private final ReentrantLock putLock = new ReentrantLock();

/**当队列满时候执行进队操作(比如put)的线程会被放入这个条件队列进行等待 */
private final Condition notFull = putLock.newCondition();

/** 当前队列元素个数 */
private final AtomicInteger count = new AtomicInteger(0);
如下是 LinkedBlockingQueue 无参构造函数代码:
public static final int MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点,指向哨兵节点
last = head = new Node(null);
}
从代码可知默认队列容量为 0x7fffffff; 用户也可以自己指定容量,所以一定程度上 LinkedBlockingQueue 可以说是有界阻塞队列。
首先使用一个图来概况该队列,读者在读完本节后在回头体会下:
image.png
3.2 LinkedBlockingQueue 原理介绍
3.2.1 offer 操作
向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false,如果 e 元素为 null 则抛出 NullPointerException 异常,另外该方法是非阻塞的。
public boolean offer(E e) {

    //(1)空元素抛空指针异常
    if (e == null) throw new NullPointerException();

    //(2) 如果当前队列满了则丢弃将要放入的元素,然后返回false
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;

    //(3) 构造新节点,获取putLock独占锁
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //(4)如果队列不满则进队列,并递增元素计数
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            //(5)
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        //(6)释放锁
        putLock.unlock();
    }
    //(7)
    if (c == 0)
        signalNotEmpty();
    //(8)
    return c >= 0;
}

private void enqueue(Node node) {
last = last.next = node;
}
步骤(2)判断如果当前队列已满则丢弃当前元素并返回 false
步骤(3)获取到 putLock 锁,当前线程获取到该锁后,则其它调用 put 和 offer 的线程将会被阻塞(阻塞的线程被放到 putLock 锁的 AQS 阻塞队列)。
步骤(4)这里有重新判断了下当前队列是否满了,这是因为在执行代码(2)和获取到 putLock 锁期间可能其它线程通过 put 或者 offer 方法向队列里面添加了新元素。重新判断队列确实不满则新元素入队,并递增计数器。
步骤(5)判断如果新元素入队后队列还有空闲空间,则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作(比如执行 put 方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程。
代码(6) 则释放获取的 putLock 锁,这里要注意锁的释放一定要在 finally 里面做,因为即使 try 块抛异常了,finally 也是会被执行到的。另外释放锁后其它因为调用 put 和 offer 而被阻塞的线程将会有一个获取到该锁。
代码(7)c==0 说明在执行代码(6)释放锁时候队列里面至少有一个元素,队列里面有元素则执行 signalNotEmpty,下面看看 signalNotEmpty 的代码:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
可知作用是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这里也说明了调用条件变量的方法前要首先获取对应的锁。
综上可知 offer 方法中通过使用 putLock 锁保证了在队尾新增元素的原子性和队列元素个数的比较和递增操作的原子性。
3.2.2 put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回 true,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。
put 操作的代码结构与 offer 操作类似,代码如下:
public void put(E e) throws InterruptedException {
//(1)空元素抛空指针异常
if (e == null) throw new NullPointerException();
//(2) 构建新节点,并获取独占锁putLock
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//(3)如果队列满则等待
while (count.get() == capacity) {
notFull.await();
}
//(4)进队列并递增计数
enqueue(node);
c = count.getAndIncrement();
//(5)
if (c + 1 < capacity)
notFull.signal();
} finally {
//(6)
putLock.unlock();
}
//(7)
if (c == 0)
signalNotEmpty();
}
代码(2)中使用 putLock.lockInterruptibly() 获取独占锁,相比 offer 方法中这个获取独占锁方法意味着可以被中断,具体说是当前线程在获取锁的过程中,如果被其它线程设置了中断标志则当前线程会抛出 InterruptedException 异常,所以 put 操作在获取锁过程中是可被中断的。
代码(3)如果当前队列已满,则调用 notFull 的 await() 把当前线程放入 notFull 的条件队列,当前线程被阻塞挂起并释放获取到的 putLock 锁,由于 putLock 锁被释放了,所以现在其它线程就有机会获取到 putLock 锁了。
另外考虑下代码(3)判断队列是否为空为何使用 while 循环而不是 if 语句那?其实是考虑到当前线程被虚假唤醒的问题,也就是其它线程没有调用 notFull 的 singal 方法时候 notFull.await() 在某种情况下会自动返回。如果使用 if 语句那么虚假唤醒后会执行代码(4)元素入队,并且递增计数器,而这时候队列已经是满了的,导致队列元素个数大于了队列设置的容量,导致程序出错。而使用 while 循环假如 notFull.await() 被虚假唤醒了,那么循环在检查一下当前队列是否是满的,如果是则再次进行等待。
3.2.3 poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)队列为空则返回null
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
//(2)获取独占锁
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//(3)队列不空则出队并递减计数
if (count.get() > 0) {//3.1
x = dequeue();//3.2
c = count.getAndDecrement();//3.3
//(4)
if (c > 1)
notEmpty.signal();
}
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)返回
return x;
}
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
代码 (1) 如果当前队列为空,则直接返回 null
代码(2)获取独占锁 takeLock,当前线程获取该锁后,其它线程在调用 poll 或者 take 方法会被阻塞挂起
代码 (3) 如果当前队列不为空则进行出队操作,然后递减计数器。
代码(4)如果 c>1 则说明当前线程移除掉队列里面的一个元素后队列不为空(c 是删除元素前队列元素个数),那么这时候就可以激活因为调用 poll 或者 take 方法而被阻塞到 notEmpty 的条件队列里面的一个线程。
代码(6)说明当前线程移除队头元素前当前队列是满的,移除队头元素后队列当前至少有一个空闲位置,那么这时候就可以调用 signalNotFull 激活因为调用 put 或者 offer 而被阻塞放到 notFull 的条件队列里的一个线程,signalNotFull 的代码如下:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll 代码逻辑比较简单,值得注意的是获取元素时候只操作了队列的头节点。
3.2.4 peek 操作
获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)
if (count.get() == 0)
return null;
//(2)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node first = head.next;
//(3)
if (first == null)
return null;
else
//(4)
return first.item;
} finally {
//(5)
takeLock.unlock();
}
}
peek 操作代码也比较简单,这里需要注意的是代码(3)这里还是需要判断下 first 是否为 null 的,不能直接执行代码(4)。正常情况下执行到代码(2)说明队列不为空,但是代码(1)和(2)不是原子性操作,也就是在执行点(1)判断队列不空后,在代码(2)获取到锁前有可能其它线程执行了 poll 或者 take 操作导致队列变为了空,然后当前线程获取锁后,直接执行 first.item 会抛出空指针异常。
3.2.5 take 操作
获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//(1)获取锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//(2)当前队列为空则阻塞挂起
while (count.get() == 0) {
notEmpty.await();
}
//(3)出队并递减计数
x = dequeue();
c = count.getAndDecrement();
//(4)
if (c > 1)
notEmpty.signal();
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)
return x;
}
代码(1)当前线程获取到独占锁,其它调用 take 或者 poll 的线程将会被阻塞挂起。
代码(2)如果队列为空则阻塞挂起当前线程,并把当前线程放入 notEmpty 的条件队列。
代码(3)进行出队操作并递减计数。
代码(4)如果 c>1 说明当前队列不为空,则唤醒 notEmpty 的条件队列的条件队列里面的一个因为调用 take 或者 poll 而被阻塞的线程。
代码(5)释放锁。
代码(6)如果 c == capacity 则说明当前队列至少有一个空闲位置,则激活条件变量 notFull 的条件队列里面的一个因为调用 put 或者 offer 而被阻塞的线程。
3.2.6 remove 操作
删除队列里面指定元素,有则删除返回 true,没有则返回 false
public boolean remove(Object o) {
if (o == null) return false;

//(1)双重加锁
fullyLock();
try {

    //(2)遍历队列找则删除返回true
    for (Node<E> trail = head, p = trail.next;
         p != null;
         trail = p, p = p.next) {
         //(3)
        if (o.equals(p.item)) {
            unlink(p, trail);
            return true;
        }
    }
    //(4)找不到返回false
    return false;
} finally {
    //(5)解锁
    fullyUnlock();
}

}
代码(1)通过 fullyLock 获取双重锁,当前线程获取后,其它线程进行入队或者出队的操作时候就会被阻塞挂起。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
代码(2)遍历队列寻找要删除的元素,找不到则直接返回 false,找到则执行 unlink 操作,unlik 操作代码如下:
void unlink(Node p, Node trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
如果当前队列满,删除后,也不忘记唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
可知删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中一个因为调 用 put 或者 offer 方法而被阻塞的线程。
代码(5)调用 fullyUnlock 方法使用与加锁顺序相反的顺序释放双重锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
总结下,由于 remove 方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元素过程中是线程安全的,并且此时其它调用入队出队操作的线程全部会被阻塞,另外获取多个资源锁与释放的顺序是相反的。
3.2.7 size 操作
int size() : 获取当前队列元素个数。
public int size() {
return count.get();
}
由于在操作出队入队时候操作 Count 的时候是加了锁的,所以相比 ConcurrentLinkedQueue 的 size 方法比较准确。这里考虑下为何 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不适用一个原子变量那?这是因为使用原子变量保存队列元素个数需要保证入队出队操作和操作原子变量是原子性操作,而 ConcurrentLinkedQueue 是使用 CAS 无锁算法的,所以无法做到这个。
注:LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

ArrayBlockingQueue

上节介绍了有界链表方式的阻塞队列 LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列 ArrayBlockingQueue 的原理
4.1 ArrayBlockingQueue 类图结构
同理为了能从全局一览 ArrayBlockingQueue 的内部构造,先看下类图:
image.png
如图 ArrayBlockingQueue 内部有个数组 items 用来存放队列元素,putindex 变量标示入队元素下标,takeIndex 是出队下标,count 统计队列元素个数,从定义可知并没有使用 volatile 修饰,这是因为访问这些变量使用都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。
另外有个独占锁 lock 用来保证出入队操作原子性,这保证了同时只有一个线程可以进行入队出队操作,另外 notEmpty,notFull 条件变量用来进行出入队的同步。
另外由于 ArrayBlockingQueue 是有界队列,所以构造函数必须传入队列大小参数,构造函数代码如下:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

可知默认情况下使用的是 ReentrantLock 提供的非非公平独占锁进行出入队操作的加锁。
首先一个图概况该队列,读者可以读完本节后在回头体会下:
image.png
4.2 ArrayBlockingQueue 原理介绍
本节主要讲解下面几个主要函数的原理。
4.2.1 offer 操作
向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false,如果 e 元素为 null 则抛出 NullPointerException 异常,另外该方法是不阻塞的。
public boolean offer(E e) {
//(1)e为null,则抛出NullPointerException异常
checkNotNull(e);
//(2)获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(3)如果队列满则返回false
if (count == items.length)
return false;
else {
//(4)否者插入元素
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
代码(2)获取独占锁,当前线程获取该锁后,其它入队和出队操作的线程都会被阻塞挂起后放入 lock 锁的 AQS 阻塞队列。
代码(3)如果队列满则直接返回 false,否者调用 enqueue 方法后返回 true,enqueue 的代码如下:
private void enqueue(E x) {
//(6)元素入队
final Object[] items = this.items;
items[putIndex] = x;
//(7)计算下一个元素应该存放的下标
if (++putIndex == items.length)
putIndex = 0;
count++;
//(8)
notEmpty.signal();
}
如上代码首先把当前元素放入 items 数组,然后计算下一个元素应该存放的下标,然后递增元素个数计数器,最后激活 notEmpty 的条件队列中因为调用 poll 或者 take 操作而被阻塞的的一个线程。这里由于在操作共享变量比如 count 前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在 CPU 缓存或者寄存器里面的值。
代码(5)释放锁,释放锁后会把修改的共享变量值比如 Count 的值刷新回主内存中,这样其它线程通过加锁在次读取这些共享变量后就可以看到最新的值。
4.2.2 put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回 true,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。
public void put(E e) throws InterruptedException {
//(1)
checkNotNull(e);
final ReentrantLock lock = this.lock;

//(2)获取锁(可被中断)
lock.lockInterruptibly();
try {

    //(3)如果队列满,则把当前线程放入notFull管理的条件队列
    while (count == items.length)
        notFull.await();

    //(4)插入元素
    enqueue(e);
} finally {
    //(5)
    lock.unlock();
}

}
代码(2)在获取锁的过程中当前线程被其它线程中断了,则当前线程会抛出 InterruptedException 异常而退出。
代码(3)判断如果当前队列满了,则把当前线程阻塞挂起后放入到 notFull 的条件队列,注意这里也是使用了 while 而不是 if。
代码(4)如果队列不满则插入当前元素,此处不再累述。
4.2.3 poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)当前队列为空则返回null,否者调用dequeue()获取
return (count == 0) ? null : dequeue();
} finally {
//(3)释放锁
lock.unlock();
}
}
代码(1)获取独占锁
代码(2)如果队列为空则返回 null,否者调用 dequeue() 方法,dequeue 代码如下:
private E dequeue() {
final Object[] items = this.items;

//(4)获取元素值
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//(5)数组中值值为null;
items[takeIndex] = null;

//(6)队头指针计算,队列元素个数减一

if (++takeIndex == items.length)
takeIndex = 0;
count–;

//(7)发送信号激活notFull条件队列里面的一个线程
notFull.signal();
return x;

}
可知首先获取当前队头元素保存到局部变量,然后重置队头元素为 null,并重新设置队头下标,元素计数器递减,最后发送信号激活 notFull 的条件队列里面一个因为调用 put 或者 offer 而被阻塞的线程。
4.2.4 take 操作
获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {

    //(2)队列为空,则等待,直到队列有元素
    while (count == 0)
        notEmpty.await();
    //(3)获取队头元素
    return dequeue();
} finally {
    //(4) 释放锁
    lock.unlock();
}

}
take 操作的代码也比较简单与 poll 相比只是步骤(2)如果队列为空则把当前线程挂起后放入到 notEmpty 的条件队列,等其它线程调用 notEmpty.signal() 方法后在返回,需要注意的是这里也是使用 while 循环进行检测并等待而不是使用 if。
4.2.5 peek 操作
获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)
return itemAt(takeIndex);
} finally {
//(3)
lock.unlock();
}
}

@SuppressWarnings(“unchecked”)
final E itemAt(int i) {
return (E) items[i];
}
peek 的实现更简单,首先获取独占锁,然后从数组 items 中获取当前队头下标的值并返回,在返回前释放了获取的锁。
4.2.6 size 操作
获取当前队列元素个数。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
size 操作是简单的,获取锁后直接返回 count,并在返回前释放锁。也许你会疑问这里有没有修改 Count 的值,只是简单的获取下,为何要加锁那?其实如果 count 声明为 volatile 这里就不需要加锁了,因为 volatile 类型变量保证了内存的可见性,而 ArrayBlockingQueue 的设计中 count 并没有声明为 volatile,是因为 count 的操作都是在获取锁后进行的,而获取锁的语义之一是获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。
注:ArrayBlockingQueue 通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加 synchronized 的意味。ArrayBlockingQueue 的 size 操作的结果是精确的,因为计算前加了全局锁。

PriorityBlockingQueue

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素,内部是平衡二叉树堆的实现。
5.1 PriorityBlockingQueue 类图结构
下面首先通过类图来从全局了解下 PriorityBlockingQueue 的结构
image.png
如图 PriorityBlockingQueue 内部有个数组 queue 用来存放队列元素,size 用来存放队列元素个数,allocationSpinLock 是个自旋锁,用 CAS 操作来保证同时只有一个线程可以扩容队列,状态为 0 或者 1,其中 0 表示当前没有在进行扩容,1 标示当前正在扩容。
如下构造函数,默认队列容量为 11,默认比较器为 null,也就是使用元素的 compareTo 方法进行比较来确定元素的优先级,这意味着队列元素必须实现了 Comparable 接口;
private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

首先通过一个图来对该队列进行概况,读者读完本机后,可以回头在体会下:
image.png
5.2 原理介绍
5.2.1 offer 操作
offer 操作作用是在队列插入一个元素,由于是无界队列,所以一直返回 true,如下是 offer 函数的代码:
public boolean offer(E e) {

if (e == null)
    throw new NullPointerException();

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();

int n, cap;
Object[] array;

//如果当前元素个数>=队列容量,则扩容(1)
while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);

try {
    Comparator<? super E> cmp = comparator;

    //默认比较器为null (2)
    if (cmp == null)
        siftUpComparable(n, e, array);
    else
        //自定义比较器 (3)
        siftUpUsingComparator(n, e, array, cmp);

    //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程(9)
    size = n + 1;
    notEmpty.signal();//激活调用take()方法被阻塞的线程
} finally {
    //释放独占锁
    lock.unlock();
}
return true;

}
如上代码,主流程比较简单,下面主要看看如何进行扩容的和内部如何建堆的,首先看下扩容逻辑:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //释放获取的锁
Object[] newArray = null;

//cas成功则扩容(4)
if (allocationSpinLock == 0 &&
    UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                             0, 1)) {
    try {
        //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
        int newCap = oldCap + ((oldCap < 64) ?
                               (oldCap + 2) : // grow faster if small
                               (oldCap >> 1));
        if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
            int minCap = oldCap + 1;
            if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                throw new OutOfMemoryError();
            newCap = MAX_ARRAY_SIZE;
        }
        if (newCap > oldCap && queue == array)
            newArray = new Object[newCap];
    } finally {
        allocationSpinLock = 0;
    }
}

//第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
if (newArray == null) // back off if another thread is allocating
    Thread.yield();
lock.lock();//(6)
if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
}

}
tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功。其实这里不先释放锁,也是可行的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容时候还占用锁那么其它线程在这个时候是不能进行出队和入队操作的,这大大降低了并发性。所以为了提高性能,使用 CAS 控制只有一个线程可以进行扩容,并且在扩容前释放了锁,让其它线程可以进行入队出队操作。
spinlock 锁使用 CAS 控制只有一个线程可以进行扩容,CAS 失败的线程会调用 Thread.yield() 让出 cpu,目的意在让扩容线程扩容后优先调用 lock.lock 重新获取锁,但是这得不到一定的保证。有可能 yield 的线程在扩容线程扩容完成前已经退出,并执行代码(6)获取到了锁,这时候获取到的锁的线程发现 newArray 为 null 就会执行代码(1)。如果当前数组扩容还没完毕,当前线程会再次调用 tryGrow 方法,然后释放锁,这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后有调用 yield 方法出让 CPU。可知当扩容线程进行扩容期间,其他线程是原地自旋通过代码(1)检查当前扩容是否完毕,等扩容完毕后才退出代码(1)的循环。
当扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取了该锁,并且 allocationSpinLock 被修饰为了 volatile。
当扩容线程扩容完毕后会执行代码 (6) 获取锁,获取锁后复制当前 queue 里面的元素到新数组。
然后看下具体建堆算法:
private static void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;

//队列元素个数>0则判断插入位置,否者直接入队(7)
while (k > 0) {
    int parent = (k - 1) >>> 1;
    Object e = array[parent];
    if (key.compareTo((T) e) >= 0)
        break;
    array[k] = e;
    k = parent;
}
array[k] = key;(8)

}
下面用图来解释上面算法过程,假设队列初始化容量为 2, 创建的优先级队列的泛型参数为 Integer。
首先调用队列的 offer(2) 方法,希望插入元素 2 到队列,插入前队列状态如下图:
image.png
首先执行代码(1),从上图变量值可知判断值为 false,所以紧接着执行代码(2),由于 k=n=size=0 所以代码(7)判断结果为 false,所以会执行代码(8)直接把元素 2 入队,最后执行代码(9)设置 size 的值加 1,这时候队列的状态如下图:
image.png
然后调用队列的 offer(4) 时候,首先执行代码(1),从上图变量值可知判断为 false,所以执行代码(2),由于 k=1, 所以进入 while 循环,由于 parent=0;e=2;key=4; 默认元素比较器是使用元素的 compareTo 方法,可知 key>e 所以执行 break 退出 siftUpComparable 中的循环; 然后把元素存到数组下标为 1 的地方,最后执行代码(9)设置 size 的值加 1,这时候队列状态为:
image.png
然后调用队列的 offer(6) 时候,首先执行代码(1),从上图变量值知道这时候判断值为 true, 所以调用 tryGrow 进行数组扩容, 由于 2<64 所以 newCap=2 + (2+2)=6; 然后创建新数组并拷贝,然后调用 siftUpComparable 方法,由于 k=2>0 进入 while 循环,由于 parent=0;e=2;key=6;key>e 所以 break 后退出 while 循环; 并把元素 6 放入数组下标为 2 的地方,最后设置 size 的值加 1,现在队列状态:
image.png
然后调用队列的 offer(1) 时候,首先执行代码(1),从上图变量值知道这次判断值为 false,所以执行代码(2),由于k=3, 所以进入 while 循环,由于parent=0;e=4;key=1; key<e,所以把元素 4 复制到数组下标为 3 的地方,然后 k=0 退出 while 循环;然后把元素 1 存放到下标为 0 地方,现在状态:
image.png
这时候二叉树堆的树形图如下:
image.png
可知堆的根元素是 1,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时候,会一次返回堆里面值最小的元素。
5.2.2 poll 操作
poll 操作作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。poll 函数代码如下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//获取独占锁
try {
return dequeue();
} finally {
lock.unlock();//释放独占锁
}
}
如上代码可知在进行出队操作过程中要先加锁,这意味着,当当前线程进行出队操作时候,其它线程不能再进行入队和出队操作,但是从前面介绍 offer 函数时候知道这时候可以有其它线程进行扩容,下面主要看下具体执行出队操作的 dequeue 方法的代码:
private E dequeue() {

//队列为空,则返回null
int n = size - 1;
if (n < 0)
    return null;
else {

    //获取队头元素(1)
    Object[] array = queue;
    E result = (E) array[0];

    //获取队尾元素,并值null(2)
    E x = (E) array[n];
    array[n] = null;

    Comparator<? super E> cmp = comparator;
    if (cmp == null)//(3)
        siftDownComparable(0, x, array, n);
    else
        siftDownUsingComparator(0, x, array, n, cmp);
    size = n;//(4)
    return result;
}

}
如上代码,如果队列为空则直接返回 null,否者执行代码(1)获取数组第一个元素作为返回值存放到变量 Result,这里需要注意下数组里面第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。 然后代码(2)获取队列尾部元素存放到变量 x, 并且置空尾部节点,然后执行代码(3)插入变量 x 到数组下标为 0 的位置后,重新调成堆为最大或者最小堆,然后返回。这里重要的是看如何去掉堆的根节点后,使用剩下的节点重新调整为一个最大或者最小堆,下面我们看下 siftDownComparable 的代码实现:
private static void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];(5)
int right = child + 1;(6)
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}
同理下面我们结合图来模拟上面调整堆的算法过程,接着上节队列的状态继续讲解,上节队列元素序列为 1,2,6,4:
第一次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =4;n=3;result=1;x=4; 这时候队列状态
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第二次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =3;n=2;result=2;x=6; 这时候队列状态:
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第三次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =2;n=1;result=4;x=6; 这时候队列状态:
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第四次直接返回元素 6.
下面重点说说 siftDownComparable 这个调整堆的算法: 首先说下堆调整的思路,由于队列数组第 0 个元素为树根,出队时候要被移除,这时候数组就不在是最小堆了,所以需要调整堆,具体是要从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会看自己作为根节点的树的左右子树里面那个是最小值,这是一个递归,直到树叶节点结束递归,如果还不明白,没关系,下面结合图来说明下,假如当前队列内容如下:
image.png
其对应的二叉堆树为:
image.png
这时候如果调用了 poll(); 那么 result=2;x=11;队列末尾的元素设置为 null 后,剩下的元素调整堆的步骤如下图:
image.png
如上图(1)树根的 leftChildVal = 4;rightChildVal = 6; 4<6; 所以 c=4; 然后 11>4 也就是 key>c;所以使用元素 4 覆盖树根节点的值,现在堆对应的树如图(2)。
然后树根的左子树树根的左右孩子节点中 leftChildVal = 8;rightChildVal = 10; 8<10; 所以 c=8; 然后发现 11>8 也就是 key>c;所以元素 8 作为树根左子树的根节点,现在树的形状如图(3), 这时候判断 k<half 为 false 就会退出循环,然后把 x=11 设置到数组下标为 3 的地方,这时候堆树如图(4),至此调整堆完毕,siftDownComparable 返回 result=2,poll 方法也返回了。
5.2.3 put 操作
put 操作内部调用的 offer, 由于是无界队列,所以不需要阻塞
public void put(E e) {
offer(e); // never need to block
}
5.2.4 take 操作
take 操作作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞,如下代码:
public E take() throws InterruptedException {
//获取锁,可被中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {

    //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列
    while ( (result = dequeue()) == null)
        notEmpty.await();//阻塞当前线程
} finally {
    lock.unlock();//释放锁
}
return result;

}
如上代码,首先通过 lock.lockInterruptibly() 获取独占锁,这个方式获取的锁是对中断进行响应的。然后调用 dequeue 方法返回堆树根节点元素,如果队列为空,则返回 false,然后当前线程调用 notEmpty.await() 阻塞挂起当前线程,直到有线程调用了 offer()方法(offer 方法内在添加元素成功后调用了 notEmpty.signal 方法会激活一个阻塞在 notEmpty 的条件队列里面的一个线程)。另外这里使用 while 而不是 if 是为了避免虚假唤醒。
5.2.5 size 操作
获取队列元个数,如下代码,在返回 size 前加了锁,保证在调用 size() 方法时候不会有其它线程进行入队和出队操作,另外由于 size 变量没有被修饰为 volatie,这里加锁也保证了多线程下 size 变量的内存可见性。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
注:PriorityBlockingQueue 队列内部使用二叉树堆维护元素优先级,内部使用数组作为元素存储的数据结构,这个数组是可扩容的,当当前元素个数 >= 最大容量时候会通过算法扩容,出队时候始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素,默认元素优先级比较规则是使用元素的 compareTo 方法来做,用户可以自定义优先级的比较规则。

队列对比

上面介绍的各种队列中只有 ConcurrentLinkedQueue 是使用 UNSAFE 类提供的 CAS 非阻塞算法实现的,其他几个队列内部都是使用锁来保证线程安全的。使用 CAS 算法的效率较好,那么是不是所有场景都用 ConcurrentLinkedQueue 那?
其实不然,因为 ConcurrentLinkedQueue 还是无界队列,无界队列使用不当可能造成 OOM。所以当使用 ConcurrentLinkedQueue 的时候在添加元素前应该先判断当前队列元素个数是否已经达到了设定的阈值,如果达到就做一定的处理措施,比如直接丢弃等。这里需要注意判断当前队列元素个数与阈值这个操作不是原子性的,最终会导致队列元素个数比设置的阈值大。
ConcurrentLinkedQueue 在 Tomcat 的的 NioEndPoint 中得到了应用,通过使用 ConcurrentLinkedQueue 将同步转换为异步,可以让 tomcat 同时接受更多请求,模型如下图:
image.png
tomcat 的 NioEndPoint 模式中 acceptor 线程负责接受用户请求,接受后把请求放入到 poll 线程对应的队列,poll 线程从队列里面获取任务后委托给 worker 线程具体处理。
LinkedBlockingQueue 和 ArrayBlockingQueue 都是有界阻塞队列,不同在于一个底层数据结构是链表,一个是数组;另外前者入队出队使用单独的锁,而后者出入队使用同一个锁,所以前者的并发度比后者高。另外创建前者时候可以不指定队列大小,默认队列元素个数为 Integer.MAX_VALUE,而后者必须要指定数组大小。所以使用 LinkedBlockingQueue 时候要记得指定队列大小。
比如比较有名的 LogBack 日志系统的异步日志打印实现中就是用了 ArrayBlockingQueue 作为缓冲队列,如下图,业务检查调用异步 log 进行写入日志时候,实际是把日志放入了 ArrayBlockingQueue 队列就返回了,而具体真正写入日志到磁盘是一个日志线程从队列里面获取任务来做的,这其实是一个多生产单消费模型:
image.png
PriorityBlockingQueue 是无界阻塞队列,是一个队列元素有优先级的队列,前面的队列模式都是 FIFO 先进先出,而 PriorityBlockingQueue 而是优先级最高的元素先出队,而不管谁先进入队列的,所以 PriorityBlockingQueue 经常会用在一些任务具有优先级的场景。还比如上面说的 logback 异步日志模型,如果把日志等级分了优先级,比如 error>warn>info,那么上述模型中队列就可以使用 PriorityBlockingQueue,日志线程会先从队列里面首先获取 error 级别的日志,但是需要注意的是如果业务线程一直向队列里面写入 error 级别日志,那么可能先写入到队列的 warn 和 info 级别的日志将很久甚至永远没机会写入到磁盘。还有一点要注意 PriorityBlockingQueue 是无界限队列,要注意判断队列元素个数不要超过设置的阈值。

线程和状态机

线程和线程任务

线程任务区别于线程,可以理解为线程需要执行的逻辑,类似 Thread 中要执行的 Runnable。

阅读全文 »

参考

ZooKeeper

  1. Zookeeper error: Cannot open channel to X at election address

Elasticsearch

  1. Install Elasticsearch with Docker
  2. 记一次 Docker 下安装 Logstash+Elasticsearch+Kibana 经历
  3. Kibana on Docker cannot connect to Elasticsearch
  4. Elasticsearch 集群部署

Docker MySQL

  1. Docker MySQL

Docker MySQL 双机热备实现

  1. Mysql Master/Slave Replication With Docker
  2. Mysql 双机热备实现
  3. 2.5.6 Deploying MySQL on Linux with Docker
  4. 让 docker 中的 mysql 启动时自动执行 sql
  5. MySQL 主从复制资料汇总

Redis

  1. docker redis
  2. redis/4.0/Dockerfile
  3. 使用Docker Compose部署基于Sentinel的高可用Redis集群

RabbitMQ

  1. docker rabbitmq
  2. bijukunjummen/docker-rabbitmq-cluster
  3. RabbitMQ集群 Docker一键部署
  4. Docker中实现RabbitMQ集群

Nginx

  1. docker nginx
  2. Docker Swarm Load Balancing with NGINX and NGINX Plus

Visualizer

  1. docker-swarm-visualizer

ZooKeeper

  1. 使用 Docker 一步搞定 ZooKeeper 集群的搭建

Docker 是什么

  • Docker 是开源应用容器引擎,轻量级容器技术。
  • 基于 Go 语言,并遵循 Apache2.0 协议开源。
  • Docker 可以让开发者打包他们的应用、依赖包及配置文件打包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 系统上,也可以实现虚拟化。
  • 容器完全使用沙箱技术,相互之间不会有任何接口。
  • 类似于虚拟机技术(vmware、vitural),但 docker 直接运行在操作系统(Linux)上,而不是运行在虚拟机中,速度快,性能开销极低。
    Docker 支持将软件编译成一个镜像,然后在镜像中对各种软件做好配置,将镜像发布出去(Docker Hub),其他使用者可以直接使用这个镜像。 运行中的这个镜像称为容器,容器启动是非常快速的。类似 windows 里面的 ghost 操 作系统,安装好后什么都有了。
    docker容器可以理解为在沙盒中运行的进程。这个沙盒包含了该进程运行所必须的资源,包括文件系统、系统类库、shell 环境等等。但这个沙盒默认是不会运行任何程序的。你需要在沙盒中运行一个进程来启动某一个容器。这个进程是该容器的唯一进程,所以当该进程结束的时候,容器也会完全的停止。

常见应用场景

  • Web 应用的自动化打包和发布。
  • 自动化测试和持续集成、发布。
  • 在服务型环境中部署和调整数据库或其他的后台应用。
  • 从头编译或者扩展现有的 OpenShift 或 Cloud Foundry 平台来搭建自己的 PaaS 环境。

Docker 特点

Docker 是一个基于容器的应用开发、部署和运行平台,它为开发者和系统管理员们提供了一种新式的应用部署方式,具有灵活(最复杂的应用都能容器化)、轻量(容器共享一个服务器内核)、可替换的(可以在容器运行过程中更新服务器)、可移植的(本地、云上皆可)、可伸缩的(可以轻松地进行复制)、可栈化(指的是可以将多个服务部署在一起,比如用 docker-compose)的特性。
Docker is a platform for developers and sysadmins to develop, deploy, and run applications with containers. The use of Linux containers to deploy applications is called containerization. Containers are not new, but their use for easily deploying applications is.
Containerization is increasingly popular because containers are:

  • Flexible: Even the most complex applications can be containerized.
  • Lightweight: Containers leverage and share the host kernel.
  • Interchangeable: You can deploy updates and upgrades on-the-fly.
  • Portable: You can build locally, deploy to the cloud, and run anywhere.
  • Scalable: You can increase and automatically distribute container replicas.
  • Stackable: You can stack services vertically and on-the-fly.

Docker 优势

容器技术相比虚拟机,主要优势在于性能上,其性能优势可以说达到了一个量级的差距。根据 Boden Russell 在 OpenStack 上做的一次基准测试报告,一个 KVM 实例的平均内存消耗有 292MB,而一个 docker 实例的平均内存消耗在 49MB 左右。在 CPU overhead 和启动时间方面,docker 基本都比 KVM 有一个量级的优势。
目前,一个 AWS 上的 micro 实例,每小时的按需使用成本大约在一美分多一些。如果用 docker 来提供实例,那么每小时的按需使用成本很可能会做到 0.1 美分。这一点对于云经济至关重要。正如经济学家 William Stanley Jevons 的理论所呈现的,随着商品的价格越便宜,人们使用它们的场景和频率会越来越多。

  1. container 是一种部署单元,用户可以自由决定部署的范围(dev、test、production),即组织容器的方式,换句话说,容器可以简化工作流和软件的开发、部署生命周期;
  2. 可以从传统的虚拟机环境平滑过渡到裸机生产环境内;
    保证了线上线下环境的一致性。我们在线下的开发环境使用 Docker 构建好 weaapp 的镜像后,可以直接在线上使用一个镜像,保证了线上线下环境的一致性,再也不会有在线下开发环境中运行正常,而部署到线上各种错误了。
  3. 实现了模块化,提高了复用性。
    我们可以将数据库和 Tomcat 运行在不同的容器中,从某种角度来说,这也降低了模块之间的耦合性,便于拓展。比如我们要把 MySQL 替换为 oracle,只需要再构建一个 oracle 镜像并启动与 Tomcat 连接即可,非常方便。对于我们构建的镜像,在其他 app 中直接拿来用就可以了,不必重复劳动。
  4. 提高整体效率;
    极大的简化了 webapp 的部署流程。在不使用 Docker 时,我们部署 app 时,要先搭建好 app 运行所需环境,这个过程做过的人都知道多么枯燥繁琐,一不小心还出错。而有了 Docker,我们只需要直接构建一个我们 webapp 的镜像然后将其运行即可,无论在多少台服务器中部署,都是如此。再比如,使用 Docker 之前要搭建一个 WordPress 对于新手来说是有些困难的,而有了 Docker,只需要从 DockerHub 上 pull 一个 WordPress 镜像并启动就可以了,非常非常方便。
  5. 实现了虚拟化,提高硬件利用率,有了 Docker,我们可以在一台服务器上运行很多 webapp,充分利用闲置资源。
    这时候,服务器的操作系统就类似于货轮,而一个个 Docker 容器就相当于货轮上的一个个集装箱。现在大热的云服务市场,不少就用了 Docker。举个例子来说,现在我们有一台操作系统为 Ubuntu14.04 的服务器,我们构建不同版本的 ubuntu 镜像并启动,并且为不同的用户分配不同的容器。这样,用一台服务器可以虚拟出 n 个运行着不同操作系统的虚拟服务器,而对于用户来说,这些是透明的––用户则认为自己拥有一台完整的服务器。据我推测,阿里云的服务器就是这么干的。这充分利用了闲置的硬件资源。
  6. Fast
    • 传统方式慢,传统情况下,应用服务器扩容缩容步骤繁多流程冗长,从服务器申请、初始化、应用部署、测试、加入退出集群、服务器下线。比如,业务遇到突发的流量高峰时,无法进行快速的扩容,当准备好的时候可能流量高峰已经过去了。
    • 传统不稳定,代码上线发布历经多个环境,在某个环境中测试时修复了 bug,代码等无法及时同步各环境中,提升了服务上线的风险。
    • Runtime performance at near bare metal speeds (typically 97+ percent or bare metal – a few ticks shaven off for bean counters).
    • Management operations (boot, stop, start, reboot, etc.) in seconds or milliseconds.
  7. Agile
    • VM-like agility – it’s still “virtualization”.
    • Seamlessly move between virtual and bare metal environments permitting new development workflows which reduce costs (e.g. develop on VMs and move to bare metal in the “click of a button” for production).
  8. Flexible
    • Containerize a “system” (OS less the kernel).
    • Containerize “application(s)”.
  9. Lightweight
    • Just enough Operating System (JeOS); include only what you need reducing image and container bloat.
    • Minimal per container penalty which equates to greater density and hence greater returns on existing assets – imagine packing 100s or 1000s of containers on a single host node.
  10. Inexpensive
    • Open source – free – lower TCO.
    • Supported with out-of-the-box modern Linux kernels.
  11. Ecosystem
    • Growing in popularity – just checkout the google trends for docker or LXC.
    • Vibrant community and numerous 3rd party applications (1000s of prebuilt images on docker index and 100s of open source apps on github or other public sources).
  12. Cloudy
    • Various Cloud management frameworks provide support for creating and managing Linux Containers – including OpenStack my personal favorite.

Docker 劣势

既然容器技术有如此大的优势,为什么基于容器的云现在还没有成为主流?我认为主要还是安全性的问题。虚拟机可以利用来自硬件的信任机制来提升安全性,这些机制在 Intel Virtualization Technology Evolution 的演示中有详细的介绍。即使如此,虚拟机仍然被视为相对不安全,比如前一段时间 Xen(半虚拟化,在硬件层和 OS 层之间的虚拟层)爆出一个漏洞,导致 AWS 不得不大量升级自己的主机。

  1. Docker Hub(镜像管理中心)不稳定
    第一个就是很重要的 Docker Hub 的访问问题。我们知道国内访问一些海外的网站有时候会有稳定性的问题。Docker Hub 在我们的实践中就经常出现访问不了的问题。但这种访问的问题并不是持续的,而是时有时无。由于大量的成熟 Docker 映像(image)都需要从 Docker Hub 下载,很多脚本在执行到这一步时,结果很难预料。一种方案是修改缺省的 Docker Hub 地址,改为采用国内的一些镜像(mirror)。但是在没有官方认证的成熟稳定的镜像网站时,Docker 映像的更新不容易得到保证;另一种方案是自行搭建自己的 Docker Hub。但是一来这样就失去了强大的社区贡献的映像资源,二来要花费很多精力来保持更新和同步。容器技术带来的简单化,又因为映像管理而复杂化,得不偿失。
  2. 运维难度大
    第二个就是容器技术的资源管理和运维。因为容器技术本身更适于解决大规模应用场景,所以通常都是集群基础上的部署、运维,但是目前对这一系列任务的自动化处理尚无统一的或者标准的框架。如果要让 Docker 真正在实际环境中发挥最大的效能并且易于维护,就需要有很成熟稳定的资源编排(orchestration)、资源调度(scheduling)和部署(deployment)的支持,但是这方面暂时还没有很明显的最佳解决方案,所以大多数人都在摸索和搭建自己的解决方案。我们在微软开放技术内部也是在一些开源技术的基础之上,自行开发了容器在微软公有云 Azure 上的资源管理调度和部署运维的系统,传统上的开发运维和持续集成,持续部署的技术,比如 Chef,Puppet,Jenkins 等,都可以很容易的与容器技术一起工作。

镜像和容器(Images and containers)

A container is launched by running an image. An image is an executable package that includes everything needed to run an application–the code, a runtime, libraries, environment variables, and configuration files.
A container is a runtime instance of an image–what the image becomes in memory when executed (that is, an image with state, or a user process). You can see a list of your running containers with the command, docker ps, just as you would in Linux.
一个镜像是:

  • 一个只读模板,可以用来创建容器,一个镜像可以创建多个容器
  • Docker 提供了一个很简单的机制来创建和更新现有的镜像,甚至可以直接从其他人那里获取做好的镜像直接使用
    可以理解为 Java 中的类
    一个容器是:
  • 容器是从镜像创建的运行实例,也就是镜像启动后的一个实例称为容器,是独立运行的一个或一组应用。
  • docker 利用容器来运行应用,他可以被启动、开始、停止、删除,每个容器都是相互隔离的、保证安全的平台。
  • 可以把容器看做是一个简易版的 Linux(包括 root 用户权限、进程空间、用户空间和网络空间等)和运行在其中的应用程序。
  • 可以理解为 Java 中通过类创建的实例。

服务端和客户端

Docker系统有两个程序:docker服务端和docker客户端。其中docker服务端是一个服务进程,管理着所有的容器。docker客户端则扮演着docker服务端的远程控制器,可以用来控制docker的服务端进程。大部分情况下,docker服务端和客户端运行在一台机器上。
Docker开放的API与Docker的守护进程进行通信。

docker 仓库(Resoisitory)

  • 仓库是集中存放镜像文件的场所,类似 git 代码仓库等。
  • 仓库(Respository)和仓库注册服务器(Registry)是有区别的。仓库注册服务器一般存放多个仓库,每个仓库又有多个镜像,每个镜像又有不同的标签(tag)。
  • 仓库分为公开仓库(public)和私有仓库(private)两种形式。
  • 最大的公开仓库是 Docker Hub,国内的公开仓库有阿里云等。
  • 可以在本地网络创建一个私有仓库。
  • 当创建好自己的镜像后,可以通过 push 命令把它上传到公开或私有仓库。
  • 仓库的概念类似 Git,仓库注册服务器可以理解为 GitHub 这种托管服务。

Containers & virtual machines

传统的部署云服务的方式是通过虚拟机完成的,虚拟机会在宿主机上运行一个完整的操作系统、通过hypervisor来间接使用宿主机的硬件资源,实际上这远远超出了应用运行所必须的资源。而容器正相反,它在操作系统中作为进程运行,与所有其他容器共享同一内核、占用相同容量的内存空间,相对来说,会更加轻量。
A container runs natively on Linux and shares the kernel of the host machine with other containers. It runs a discrete process, taking no more memory than any other executable, making it lightweight.
By contrast, a virtual machine (VM) runs a full-blown “guest” operating system with virtualaccess to host resources through a hypervisor. In general, VMs provide an environment with more resources than most applications need.
下图是Docker(容器)和传统虚拟机之间运行架构的示意图。
Container stack example Virtual machine stack example
In reality virtualization and Docker can and are used together in modern dev-ops. Most VPS providers are running bare-metal full virtualization technologies like Xen and Docker usually runs on top of a virtualized Ubuntu instance.

Docker 与 LXC(Linux Container)

LXC利用Linux上相关技术实现容器,Docker则在如下的几个方面进行了改进:
移植性:通过抽象容器配置,容器可以实现一个平台移植到另一个平台;
镜像系统:基于AUFS的镜像系统为容器的分发带来了很多的便利,同时共同的镜像层只需要存储一份,实现高效率的存储;
版本管理:类似于GIT的版本管理理念,用户可以更方面的创建、管理镜像文件;
仓库系统:仓库系统大大降低了镜像的分发和管理的成本;
周边工具:各种现有的工具(配置管理、云平台)对Docker的支持,以及基于Docker的Pass、CI等系统,让Docker的应用更加方便和多样化。

Docker与Vagrant

两者的定位完全不同
Vagrant类似于Boot2Docker(一款运行Docker的最小内核),是一套虚拟机的管理环境,Vagrant可以在多种系统上和虚拟机软件中运行,可以在Windows。Mac等非Linux平台上为Docker支持,自身具有较好的包装性和移植性。
原生Docker自身只能运行在Linux平台上,但启动和运行的性能都比虚拟机要快,往往更适合快速开发和部署应用的场景。
Docker不是虚拟机,而是进程隔离,对于资源的消耗很少,单一开发环境下Vagrant是虚拟机上的封装,虚拟机本身会消耗资源。因此对于开发环境来讲,使用Docker是更好的选择。

开始使用Docker

查看操作系统版本

Docker是基于LXC(Linux Container)的,因此最好在Linux环境下使用。
Docker要求内核版本高于3.10(如果是Ubuntu则需要高于12.04的发行版)可以使用下面命令查看操作系统版本:

1
uname -r

安装(Ubuntu)

最好上官网下载安装最新版docker-ce,命令行下的太旧了:

1
2
sudo apt-get install docker.io
docker version

或者按照官网上的步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 安装插件,可以使用HTTPS来下载仓库软件
sudo apt-get update
sudo apt-get install \
apt-transport-https \
ca-certificates \
curl \
software-properties-common
# 获取GPG公钥
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
# 验证公钥是正确的
sudo apt-key fingerprint 0EBFCD88
# 设置stable仓库
sudo apt-get install software-properties-common python-software-properties # 如果缺少了add-apt-repository命令需要安装一下
sudo add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
# 安装
sudo apt-get update
sudo apt-get install docker-ce
# 安装某个特定版本
#apt-cache madison docker-ce # 列出可用版本
#sudo apt-get install docker-ce=<VERSION>
# 运行hello-world
docker run hello-world

安装(CentOS)

1
2
3
4
5
yum update
yum install docker
systemctl start docker
systemtctl enable docker # 设定为开机启动
systemtctl stop docker

安装(Mac)

1

下载镜像

1
2
3
4
# 搜索可用的镜像,或者上http://index.docker.io/查找
docker search tutorial
# 下载镜像,在docker的镜像索引网站上面,镜像都是按照用户名/镜像名的方式来存储的。有一组比较特殊的镜像,比如ubuntu这类基础镜像,经过官方的验证,值得信任,可以直接用镜像名来检索到。
docker pull learn/tutorial

运行容器

1
2
3
4
# docker run命令有两个参数,一个是镜像名,一个是要在镜像中运行的命令。
docker run learn/tutorial echo "hello word"
# 在容器中安装一个软件,在执行apt-get 命令的时候,要带上-y参数。如果不指定-y参数的话,apt-get命令会进入交互模式,需要用户输入命令来进行确认,但在docker环境中是无法响应这种交互的
docker run learn/tutorial apt-get install -y ping

保存对容器的修改

当你对某一个容器做了修改之后(通过在容器中运行某一个命令),可以把对容器的修改保存下来,这样下次可以从保存后的最新状态运行该容器。docker中保存状态的过程称之为committing,它保存的新旧状态之间的区别,从而产生一个新的版本。

1
2
3
4
5
6
7
8
9
10
11
12
# 获得查看正在运行中的、安装完ping命令之后容器的id
docker ps -l
docker container ls --all
# 查看更详细的信息
docker inspect
# 将镜像保存为learn/ping,无需拷贝完整的id,通常来讲最开始的三至四个字母即可区分,比如94b82c71517f可以简写为94b
docker commit [CONTAINER ID] learn/ping
# 查看刚保存的镜像
docker images
# 在新的镜像中运行ping www.baidu.com,旧的镜像中没有安装所以不能运行、会返回奇怪的信息
docker run learn/tutorial ping www.baidu.com
docker run learn/tutorial ping www.baidu.com

发布镜像

1
2
3
4
# 列出本地的所有镜像
docker images
# 将某一个镜像发布到官网
docker push

设置用户

  1. 使用root用户运行
    通常我们使用Docker的时候都是使用root用户身份运行的,官方说法如下:
    1
    2
    The docker daemon binds to a Unix socket instead of a TCP port. By default that Unix socket is owned by the user root and other users can access it with sudo. For this reason, docker daemon always runs as the root user. 
    To avoid having to use sudo when you use the docker command, create a Unix group called docker and add users to it. When the docker daemon starts, it makes the ownership of the Unix socket read/writable by the docker group.
  2. 使用普通用户运行
    1
    2
    3
    sudo groupadd docker # docker组可能已经存在了
    sudo gpasswd -a ${USER} docker # 将当前用户加入docker组
    sudo systemctl restart docker # 重新启动docker服务(下面是CentOS7的命令)
    然后当前用户注销再重新登录就可以正常使用docker命令了:
    1
    docker ps

登录

要登录容器进行操作,一种办法是在运行容器的时候开放22端口到外部,然后使用ssh来连接:

1
2
docker create -it --name=容器别名 -p 20022:22 ics-image
ssh -p 20022 root@localhost

另一种办法是在运行中的容器内执行/bin/bash:

1
docker exec -it 容器名 /bin/bash

Docker Daemon

Docker迁移

将一台宿主机上的Docker环境迁移到另一台宿主机上是比较方便的,只需停止Docker服务,将整个docker存储文件复制到另外一台宿主机上,然后调整另外一台宿主机的配置即可。

Docker Hub

仓库(Repository)、注册服务器(Registry)、注册索引(Index)

仓库是存放一组关联镜像的集合,比如同一个应用的不同版本的镜像;
注册服务器是存放实际的镜像的地方;
注册索引则负责维护用户的账号,权限,搜索,标签等管理。注册服务器利用注册索引来实现认证等管理。

QA

Docker Hub

  1. docker pull老超时
    试试国内的加速:https://www.daocloud.io/mirror#accelerator-doc
    或连接VPN后试试
  2. pull或push时出现一直Waiting的情况
    网上没有找到答案,开了VPN也没啥用,最后把环境变量改回来(eval $(docker-machine env -u))就好了。
  3. 从非官方仓库(如:dl.dockerpool.com)下载镜像的时候,有时候会提示“Error:Invaild registry endpoint https://dl.docker.com:5000/v1/…”?
    Docker 自1.3.0版本往后以来,加强了对镜像安全性的验证,需要手动添加对非官方仓库的信任。
    DOCKER_OPTS=”–insecure-registry dl.dockerpool.com:5000”
    重启docker服务

参考

  1. docker-library / official-images
  2. Best practices for writing Dockerfiles
  3. 十分钟带你理解 Kubernetes 核心概念
  4. Install Docker https://docs.docker.com/install/
  5. Get Docker CE for Ubuntu
  6. Get Started
  7. 入门教程 中文
  8. Docker Hub
  9. Docker Cloud
  10. 命令行参考文档
  11. 安装后配置
  12. daemon配置

原理

  1. Docker 核心技术与实现原理
  2. Docker vs Virtualization
  3. How is Docker different from a virtual machine?
  4. KVM and Docker LXC Benchmarking with OpenStack
  5. OK, I give up. Is Docker now Moby? And what is LinuxKit?
  6. Docker 切出 Moby 背后的真实原因分析
  7. docker/libcontainer
  8. docker最新代码源码编译
  9. 如何编译docker 1.2.0版本的源码

Docker容器命令

运行中的镜像我们称之为容器,有点类似程序和进程的概念。

运行容器

1
docker run --name container-name -d image-name

运行一个容器,使用 docker run 命令即可。 另,docker run -参数 含义:

  • – name:为容器起一个名称;
  • -d:detached,执行完这句命令后,控制台将不会阻塞,可以继续输入命令操作,不会阻塞,也就是启动守护式容器,如果执行 docker run –name mycentos -it centos 会进入启动容器的命令控制台,也就是启动交互式容器;
  • -i:以交互方式运行容器,通常与 -t搭配使用;
  • -t:为容器重新分配一个伪输入终端,通常与 -i 搭配使用;
  • -P:随机端口映射;
  • -p:指定端口映射,后面会有端口映射详细讲解;
  • image-name:要运行的镜像名称;
    如果以守护式方式启动 centos 容器,执行如下命令:docker run –name mycentos -d centos,会正常返回 container-id,但是通过 docker ps 查看,却发现没有在运行,通过 docker ps -a 发现,原来已经停止了,这是为什么呢?
    1
    2
    # 接下来我们创建一个守护态的Docker容器
    sudo docker run -itd ubuntu:14.04 /bin/bash

查看容器

1
2
# 查看运行中的容器列表
docker ps

输出内容中:

  • CONTAINER ID:启动时生成的 ID;
  • IMAGE:该容器使用的镜像;
  • COMMAND:容器启动时执行的命令;
  • CREATED:容器创建时间;
  • STATUS:当前容器状态;
  • PORTS:当前容器所使用的默认端口号;
  • NAMES:启动时给容器设置的名称。

另外,docker ps -参数,有:

  • -a:查看所有容器,包括已停止运行的;
  • -q:静默模式,只显示容器编号;
  • -l:显示最近创建的容器;
  • -n 3:显示最近创建的 num(此处为 3)个容器;
  • –no-trunc:不截断输出,显示完整信息。
1
2
# 可用通过如下命令查看容器中正在运行进程:
docker top container-id/container-top
1
2
# 可用通过如下命令查看容器内部细节,返回为 json:
docker insepct container-id

进入容器

docker attach

1
sudo docker attach 44fc0f0582d9

但在,使用该命令有一个问题。当多个窗口同时使用该命令进入该容器时,所有的窗口都会同步显示。如果有一个窗口阻塞了,那么其他窗口也无法再进行操作。
另外,官方文档中说 attach 后可以通过 CTRL-C 来 detach,但实际上经过我的测试,如果 container 当前在运行 bash,CTRL-C 自然是当前行的输入,没有退出;如果 container 当前正在前台运行进程,如输出 nginx 的 access.log日志,CTRL-C 不仅会导致退出容器,而且还 stop 了。这不是我们想要的,detach 的意思按理应该是脱离容器终端,但容器依然运行。好在 attach 是可以带上 –sig-proxy=false 来确保 CTRL-D 或 CTRL-C 不会关闭容器。

1
docker attach --sig-proxy=false 7f237caad43b

因为这些原因,所以docker attach命令不太适合于生产环境,平时自己开发应用时可以使用该命令。

ssh

在生产环境中排除了使用docker attach命令进入容器之后,相信大家第一个想到的就是ssh。在镜像(或容器)中安装SSH Server,这样就能保证多人进入容器且相互之间不受干扰了,相信大家在当前的生产环境中(没有使用Docker的情况)也是这样做的。但是使用了Docker容器之后不建议使用ssh进入到Docker容器内。
但是不建议,原因在

nsenter

在上面两种方式都不适合的情况下,还有一种比较方便的方法,即使用nsenter进入Docker容器。
关于什么是nsenter请参考如下文章:
https://github.com/jpetazzo/nsenter
在了解了什么是nsenter之后,系统默认将我们需要的nsenter安装到主机中
如果没有安装的话,按下面步骤安装即可(注意是主机而非容器或镜像)
具体的安装命令如下:

1
2
3
4
5
6
$ wget https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz  
$ tar -xzvf util-linux-2.24.tar.gz
$ cd util-linux-2.24/
$ ./configure --without-ncurses
$ make nsenter
$ sudo cp nsenter /usr/local/bin

安装好nsenter之后可以查看一下该命令的使用。
nsenter可以访问另一个进程的名称空间。所以为了连接到某个容器我们还需要获取该容器的第一个进程的PID。可以使用docker inspect命令来拿到该PID。
docker inspect命令使用如下:

1
$ sudo docker inspect --help   

inspect命令可以分层级显示一个镜像或容器的信息。比如我们当前有一个正在运行的容器

可以使用docker inspect来查看该容器的详细信息。

1
$ sudo docker inspect 44fc0f0582d9  

由其该信息非常多,此处只截取了其中一部分进行展示。如果要显示该容器第一个进行的PID可以使用如下方式

1
$ sudo docker inspect -f {{.State.Pid}} 44fc0f0582d9  

在拿到该进程PID之后我们就可以使用nsenter命令访问该容器了。

1
2
$ sudo nsenter --target 3326 --mount --uts --ipc --net --pid  
$ sudo nsenter --target 3326 --mount --uts --ipc --net --pid

其中的3326即刚才拿到的进程的PID
当然,如果你认为每次都输入那么多参数太麻烦的话,网上也有许多做好的脚本供大家使用。
地址如下:
http://yeasy.gitbooks.io/docker_practice/content/container/enter.html
http://www.tuicool.com/articles/eYnUBrR

exec

除了上面几种做法之外,docker在1.3.X版本之后还提供了一个新的命令exec用于进入容器,这种方式相对更简单一些。

1
sudo docker exec -it 775c7c9ee1e1 /bin/bash

exec和attach的区别:

  • attach:直接进入容器启动命令的终端,不会启动新的进程;
  • exec:在容器中打开新的终端,并且可以启动新的进程,可在宿主机中直接执行操作容器的命令,比如docker exec -it 7f237caad43b ls /tmp列出容器 /tmp 目录下的文件

容器和宿主机互相拷贝文件

宿主机拷贝文件到容器:

1
2
3
4
docker cp 文件 container-id:目标文件/文件夹
eg.
docker cp /tmp/suzhuji.txt 7f237caad43b:/tmp
将宿主机tem文件夹下suzhujia.txt文件拷贝到容器7f237caad43b中tmp目录中

从容器拷贝文件到宿主机:

1
2
3
4
docker cp container-id:目标文件/文件夹 宿主机目标文件/文件夹
eg.
docker cp 7f237caad43b:/tmp/yum.log /tmp
将容器7f237caad43b中tmp目录下yum.log拷贝到宿主机/tmp目录下

停止、启动、重启容器

1
2
3
4
5
6
7
8
9
10
# 通过以下命令来停止运行中的容器,停止后需要使用docker start命令来重新启动
docker stop container-name/container-id
# 强制停止容器(类似强制关机):
docker kill container-name/container-id
# 停止所有正在运行的容器
docker kill $(docker ps -q)
# 通过以下命令启动容器:
docker start container-name/container-id
# 通过以下命令启动容器:
docker restart container-name/container-id

容器退出(或者显示调用docker stop)后会进入终止(exited)状态,此时可以通过 docker ps -a 查看,其中数据不会丢失,还可以通过docker start 来启动,只有删除容器才会清除数据。

删除容器

1
2
3
4
5
6
# 删除单个容器:
docker rm container-id
# 删除多个容器:
docker rm container-id container-id
# 删除所有容器:
docker rm $(docker ps -a -q )

另,docker rm -参数含义:

  • -f:强制删除,如果在运行中,先停止,再删除

查看容器日志

1
2
# 查看当前容器日志,可通过如下命令:
docker logs container-id/container-name

另,docker logs -参数含义:

  • -t:加入时间戳;
  • -f:跟随最新的日志打印;
  • –tail:显示最后多少条。

端口映射

容器中可以运行一些网络应用,要让外部也可以访问这些应用,可以通过 -P(大写) 或 -p (小写) 参数来指定端口映射。启动容器的时候如果不指定对应参数,在容器外部是无法通过网络来访问容器内的网络应用和服务的。
Docker 的端口映射通过 -p 或 -P 参数实现,-p 和 -P 区别为:

  • -P : 随机映射一个49000~49900的端口到内部容器开放的网络端口
  • -p : 可以指定要映射的IP和端口,但是在一个指定端口上只可以绑定一个容器
1
2
3
# 把主机端口 8888 请求映射到 Docker 容器内部端口 8080
docker run --name tomcat1 -d tomcat
docker run --name tomcat2 -d -p 8888:8080 tomcat

使用docker ps查看端口映射情况,访问localhost:8080和localhost:8888,发现前者无法访问,后者可以。

端口映射格式

  1. ip:hostport:containerport #指定ip、指定主机port、指定容器port
    1
    2
    docker run -d -p 127.0.0.1:5000:5000 training/webapp python app.py
    指定映射使用一个特定地址,比如 localhost地址 127.0.0.1
  2. ip::containerport #指定ip、未指定主机port、指定容器port
    1
    2
    3
    4
    docker run -d -p 127.0.0.1::5000 training/webapp python app.py
    绑定 localhost 的任意端口到容器的 5000 端口,本地主机会自动分配一个端口
    还可以使用 udp 标记来指定 udp 端口
    docker run -d -p 127.0.0.1:5000:5000/udp training/webapp python app.py
  3. hostport:container #未指定ip port、指定主机port、指定容器port
    1
    2
    docker run -d -p 5000:5000 training/webapp python app.py
    将本地的 5000 端口映射到容器的 5000 端口,默认会绑定本地所有接口上的所有地址

查看端口映射

1
2
# 可以通过如下命令查看容器映射了哪些端口及协议:
docker port container-id

如果返回空表示没有进行端口映射。

QA

  1. docker ps(等其他指令)没有响应

我的情况比较简单,是因为云主机内存不够了,所以没有响应。但是如果是某个容器出问题了就需要另外想办法了:docker之 docker ps无响应

  1. 连接进入docker

https://stackoverflow.com/questions/30172605/how-to-get-into-a-docker-container

  1. 查看Docker日志

https://docs.docker.com/config/daemon/#read-the-logs

https://takacsmark.com/docker-logs/

docker logs <Container-ID>docker service logs <Service-ID> 命令能查看某一容器或服务的日志,但是怎么查看Docker守护进程的执行日志?

docker logs 和 docker service logs 命令会显示类似于终端中交互式运行命令的输出。UNIX 和 Linux 命令在运行时通常会打开三个 I/O 流,分别称为 STDIN,STDOUT 和 STDERR。STDIN 是命令的输入流,可能包括来自键盘的输入或来自另一个命令的输入。STDOUT 通常是命令的正常输出,而 STDERR 通常用于输出错误消息。默认情况下,docker logs 显示命令的 STDOUT 和 STDERR。

https://stackoverflow.com/questions/30969435/where-is-the-docker-daemon-log

比如Ubuntu:sudo journalctl -fu docker.service

查看已经退出的容器的日志

  1. 运行容器直接退出

使用docker-logs命令(或者使用docker ps -a查看正在运行中的容器)发现容器启动后直接退出,原因是Docker容器后台运行,就必须有一个前台进程,进程结束,容器就会退出,如果不是那些一直挂起的命令(eg. top, tail等),就会自动退出。

正常情况下,启动服务只需启动相应的service即可,例如:service nginx start && service php5-fpm start,但是这样做的话nginx和fpm均以后台进程模式运行,就会导致docker前台没有正在运行的应用,因此容器会立即自杀,因为已经没有事情能做了。

* 将要运行的程序以前台进程的形式运行,如果容器需要同时启动多个进程,那么只需要将其中的一个挂起到前台即可,比如对上面所说的 web 容器,只需要将启动指令修改为`service php5-fpm start && nginx -g "daemon off;"`

* 对于不知道怎么在前台运行的程序,只需要在启动的命令后面添加类似tail、top这种可以前台运行的程序,以上面所说的web容器为例,可以写成`service nginx start && service php5-fpm start && tail -f /var/log/nginx/error.log`,又如,在启动centos/ubuntu容器的时候,可以做一个死循环,持续输出任意内容,这样容器就不会认为没事可做而自杀了:`docker run -d centos /bin/sh -c "while true; do echo hello world; sleep 1; done"`。

* 可以加上-it选项,-i 开启了input(输入)功能 -t开启了一个连接容器里边的terminal(终端)(是不是相当于在容器内开启了一个bash进程?),如果希望在后台运行,可以再加上-d选项。

一般来说我们不推荐在同一个容器内运行多个应用程序,这会影响容器的效率,因此最后一种方法是最好的。

  1. 日志中出现 not support swap limit capabilities

https://segmentfault.com/q/1010000002888521

  1. 开启服务失败(elasticsearch)

在查看日志(journalctl -fu docker.service)的时候发现是内存不够用,在docker-compose.yml中去掉resources配置项就可以了。

如果没有设置限制,也有可能是物理机本身内存不够了,实在不行可以创建swap来凑空间。

  1. 使用远程主机作为docker-machine时“connection refused”

调用docker-machine env <docker-machine名>出现连接失败的情况

查看了一些资料但并没有什么卵用,/etc/default/docker这个文件根本没有被读进去。

据说是因为docker的daemon默认监听fd,而远程连接时用的是tcp。

根据这里的说明修改配置文件,但是发现无法重启docker服务,查看daemon日志发现-H的配置项重复了,而/etc/systemd/system/下并没有发现配置文件,在这里了解到原来配置文件还有一个位置/lib/systemd/system/,将相关内容(”-H fd://“)去掉就行了。

但是这样配置完毕后仍然不能在ECS上创建Docker Machine,最后采用了generic驱动成功了,具体配置见我的另一篇[App & Cloud]。

就优先级来说,/etc/docker/daemon.json > /etc/systemd/system/ > /lib/systemd/system/,优先级高的会覆盖低的。

  1. 对阿里云美西节点 docker-machine create 时 tls: DialWithDialer timed out

刚开始我也试着regenerate-certs了,但没什么卵用

在stackoverflow上查不到相关的讨论

官网相关页面上也没有。

github上对这个问题的讨论,也有了解决,在代码里把超时时间改大了,但还是写死的,我在连接美国服务器时仍然超时。

有的人提议提议将超时时间作为一个命令行参数提供出来,但是下面一个傻逼一直坚持说“suggest retrying”。

最后还是放弃了在这个节点上部署docker。

PS:在这里看到开发者的想法:用户是愚蠢的,不能把危险的选项公布出来,所以TLS任何方法不能关闭!WTF!!!随便搜索了一遍releases列表,没有发现对TLS的更改,所以我彻底放弃了。

  1. 如何让Docker启动时执行自定义命令/脚本?

如果官方提供的Dockerfile已经内置了这个功能是最好的(比如MySQL官方Dockerfile),如果没有的话,就需要自定义Dockerfile了(How to run bash command after startup?),ENTRYPOINT和CMD命令都可以用于执行命令,CMD会被接到ENTRYPOINT后面,主要用于提供一些可变的参数,docker run后面的参数其实就是CMD。

在docker-compose.yml文件中不适合自定义执行命令,command命令可以定义启动时命令,但是只能定义一条,我尝试command: a.sh && b.sh,结果也只是执行了前面的指令,所以就不要白费力气了。

  1. docker 中怎么修改应用的配置?

    • 官方镜像已有提供该功能

    • 使用volumes将宿主机文件夹挂载到容器内

    • 自定义镜像

  2. 怎么查看容器的启动命令

    • 在宿主机上docker inspect

    • 在容器内部ps -fe,其中1号进程就是启动命令(有可能出现ps命令找不到的情况)

    • 查看这个镜像的Dockerfile,其中的ENTRYPOINT和CMD指定了容器的运行时执行命令

  3. 如何临时退出一个正在交互的容器的终端,而不终止它?

按Ctrl+p,后按Ctrl+q,如果按Ctrl+c会使容器内的应用进程终止,进而会使容器终止。

  1. 使用docker port 命令映射容器的端口时,系统报错Error: No public port ‘80’ published for …,是什么意思?

创建镜像时Dockerfile要指定正确的EXPOSE的端口,容器启动时指定PublishAllport=true

  1. 可以在一个容器中同时运行多个应用进程吗?
    一般不推荐在同一个容器内运行多个应用进程,如果有类似需求,可以通过额外的进程管理机制,比如supervisord来管理所运行的进程

  2. 如何控制容器占用系统资源(CPU,内存)的份额?
    在使用docker create命令创建容器或使用docker run 创建并运行容器的时候,可以使用-c|–cpu-shares[=0]参数来调整同期使用CPU的权重,使用-m|–memory参数来调整容器使用内存的大小。

参考

容器

  1. 命令
  2. Docker 分配宿主机网段 IP
  3. docker容器网络通信原理分析
  4. Service 之间如何通信?- 每天5分钟玩转 Docker 容器技术(101)

容器编排

Swarm基本使用

  1. Play with Docker classroom: Service Discovery under Docker Swarm Mode

  2. 运维之我的docker-swarm集群中删除节点和服务

  3. docker swarm 搭建及跨主机网络互连案例分析

  4. Docker Swarm 入门一篇文章就够了

Swarm运维

  1. Docker 引擎的 Swarm 模式:入门教程

  2. 生产环境中使用Docker Swarm的一些建议

  3. 云计算之路-阿里云上:重启 manager 节点引发 docker swarm 集群宕机

Swarm原理

  1. Consul入门

  2. Service Discovery with Docker and Consul: part 1

  3. Docker Reference Architecture: Universal Control Plane 2.0 Service Discovery and Load Balancing

  4. 浮动IP(FLOAT IP)

持续部署

  1. Docker持续部署图文详解

服务发现

  1. docker swarm获取客户端IP

OpenStack(开源云计算平台)

  1. KVM and Docker LXC Benchmarking with OpenStack

  2. OpenStack

  3. OpenStack-Docker-wiki

客户端

  1. spotify/docker-client
  2. how to copy file from file system into running container
  3. 单元测试 DefaultDockerClientTest.java

基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,不再需要写死服务提供方地址,注册中心基于接口名查询服务提供者的 IP 地址,使服务提供方可以平滑增加或减少机器。

角色分类

以功能角度来说服务可以分成以下几种:

  • 服务提供者;
  • 服务消费者;
  • 服务提供者兼消费者。

注册中心分类

可以分成以下几种注册中心:

  • Simple 注册中心 点对点直连
  • Multicast 注册中心 多播
  • Zookeeper 注册中心
  • Redis 注册中心

配置

服务提供者(provider)配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- 应用名称,可显示依赖关系 -->
<dubbo:application name="dubbo-order-server" />

<!-- 注册中心是ZooKeeper,也可以选择Redis做注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"
client="zkclient" />

<!-- 通过dubbo协议在注册中心(127.0.0.1表示本机)的20880端口暴露服务 -->
<dubbo:protocol name="dubbo" host="127.0.0.1" port="20880" />

<!-- 提供服务用地的是service标签,将该接口暴露到dubbo中 -->
<dubbo:service interface="com.dubbo.service.OrderService"
ref="orderService" />

<!-- Spring容器加载具体的实现类-->
<bean id="orderService" class="dubbo.service.impl.OrderServiceImpl" />

<dubbo:monitor protocol="registry" />

服务消费者(consumer)配置:

1
2
3
4
5
6
7
8
9
10
11
<!-- 应用名称,可显示依赖关系 -->
<dubbo:application name="dubbo-user-consumer" />

<!-- zookeeper作为注册中心 ,也可以选择Redis做注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"
client="zkclient" />

<dubbo:protocol host="127.0.0.1" />

<!-- 调用服务使用reference标签,从注册中心中查找服务 -->
<dubbo:reference id="orderService" interface="com.dubbo.service.OrderService" />

查看服务注册/暴露结果

Dubbo服务注册信息
Dubbo 在 ZooKeeper 中以树形结构维护服务注册信息:

  • 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址;
  • 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址;
  • 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

ZooKeeper 启动的时候会把配置信息加载进内存并持久化到数据库,然后启动定时器脏数据检查定时器 DirtyCheckTask,分别检查消费者和提供者的地址列表缓存、消费者和提供者地址列表的数据库数据,清理不存活的消费者和提供者数据,对于缓存中的存在的消费者和提供者而数据库不存在,提供者重新注册和消费者重新订阅。

Dubbo 提供了一些异常情况下的兜底方案:

  • 当提供者出现断电等异常停机时,注册中心能自动删除提供者信息
  • 当注册中心重启时,能自动恢复注册数据,以及订阅请求
  • 当会话过期时,能自动恢复注册数据,以及订阅请求
  • 当设置 <dubbo:registry check=”false” /> 时,记录失败注册和订阅请求,后台定时重试

在了解 ZooKeeper 基础上,还可以增加一些配置来修改注册细节:
可通过 <dubbo:registry username="admin" password="1234" /> 设置 ZooKeeper 登录信息
可通过 <dubbo:registry group="dubbo" /> 设置 ZooKeeper 的根节点,不设置将使用无根树
支持 * 号通配符 <dubbo:reference group="*" version="*" /> ,可订阅服务的所有分组和所有版本的提供者

在 Provider 启动完毕后,可以登录到 ZooKeeper 上查看注册的结果:

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 11] ls /
[dubbo, zookeeper]
[zk: localhost:2181(CONNECTED) 12] ls /dubbo
[com.alibaba.dubbo.monitor.MonitorService, com.tallate.UserServiceBo]
[zk: localhost:2181(CONNECTED) 13] ls /dubbo/com.tallate.UserServiceBo
[configurators, consumers, providers, routers]
[zk: localhost:2181(CONNECTED) 14] ls /dubbo/com.tallate.UserServiceBo/providers
[dubbo%3A%2F%2F192.168.96.194%3A20880%2Fcom.tallate.UserServiceBo%3Fanyhost%3Dtrue%26application%3DdubboProvider%26dubbo%3D2.0.2%26generic%3Dfalse%26group%3Ddubbo%26interface%3Dcom.tallate.UserServiceBo%26methods%3DsayHello%2CtestPojo%2CsayHello2%26pid%3D28129%26revision%3D1.0.0%26side%3Dprovider%26timeout%3D3000%26timestamp%3D1575202776615%26version%3D1.0.0]

服务自动发现流程

服务自动发现功能完成下面这个流程,我们接下来分点概述:

  1. 服务提供者在启动时,向注册中心注册自己提供的服务。
  2. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  3. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  4. 服务消费者,从提供者地址列表中,基于软负载均衡算法(基于软件的负载均衡,与 F5 相对),选一台提供者进行调用,如果调用失败,再选另一台调用。

注册和注销服务(Provider 执行流程)

服务的注册与注销,是对服务提供方角色而言,大致流程如下所示:
注册和注销服务

  1. 在接口提供者初始化时,每个接口都会创建一个 Invoker 和 Exporter,Exporter 持有 Invoker 实例,通过 Invocation 中的信息就可找到对应的 Exporter 和 Invoker
  2. 同 Consumer 的过程类似,调用 Invoker 前会调用 Invoker-Filter。
  3. 调用 Invoker.invoke() 时,通过反射调用最终的服务实现执行相关逻辑。

ServiceBean 负责了服务的暴露:

  • 继承自 ServiceConfig,export 方法实现了服务暴露的逻辑;
  • 实现了 Spring 中的 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware

启动时,ServiceBean 主要负责以下任务:

  • 生成 DubboExporter 对象并缓存起来
  • 添加过滤器和监听器支持
  • 在 zk 上注册相关信息,暴露服务,方便被感知到
  • 监听端口,等待通信的到来

Dubbo服务导出

  1. 前置工作,主要用于检查参数和组装 URL;
    ServiceBean#onApplicationEvent: 接收 Spring 上下文刷新事件后执行服务导出操作
    -> ServiceBean#export: 导出服务
    -> ProviderConfig.getExport、getDelay 获取配置,如果 export 为 false 则无法提供给其他服务调用、一般只提供给本地调试时使用,如果需要 delay 则将任务交给一个 ScheduledExecutorService 延迟执行,否则调用 doExport 暴露服务
    -> ServiceConfig.doExport 一堆配置检查
  2. 导出服务,包含导出服务到本地(JVM)和导出服务到远程两个过程;
    ServiceConfig.doExportUrls
    导出服务,Dubbo 中所有服务都通过 URL 导出,支持多协议多注册中心导出服务(遍历 ProtocolConfig 集合导出每个服务)
    AbstractInterfaceConfig#loadRegistries
    加载注册中心链接
    ServiceConfig#doExportUrlsFor1Protocol
    组装 URL,将服务注册到注册中心
    JavassistProxyFactory#getInvoker
    获取 Invoker 实例,用于接收请求
    ServiceConfig#exportLocal、DubboProtocol#export
    根据配置信息导出服务到本地或远程,远程默认取Dubbo协议
    DubboProtocol#openServer
    开始监听请求
  3. 向注册中心注册服务,用于服务发现
    Dubbo 服务注册本质是在 zk 指定目录下创建临时节点,路径是{group}/{Interface}/providers/{url}
    RegistryProtocol#register
    -> RegistryFactory#getRegistry
    -> AbstractRegistry#register

因为Dubbo一般使用ZooKeeper作为注册中心,所以完全可以利用ZooKeeper的临时节点自动删除机制来实现服务器下线自动踢出的机制。

服务订阅和取消(Consumer 执行流程)

为了满足应用系统的需求,服务消费方的可能需要从服务注册中心订阅指定的有服务提供方发布的服务,在得到通知可以使用服务时,就可以直接调用服务。反过来,如果不需要某一个服务了,可以取消该服务。
服务订阅和取消

有两种服务引入方式:

  1. 饿汉式:Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,可通过配置 <dubbo:reference> 的 init 属性开启。
  2. 懒汉式:ReferenceBean 对应的服务被注入到其他类中时引用

服务提供的方式有三种:

  1. 引用本地 (JVM) 服务;
  2. 通过直连方式引用远程服务;
  3. 通过注册中心引用远程服务。

不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。

获取客户端Proxy:

  1. 在 Consumer 初始化的时候,会生成一个代理注册到容器中,该代理回调中持有一个 Invoker 实例,消费调用服务接口时它的 invoke() 方法会被调用。
    spring.ReferenceBean#getObject
    ReferenceConfig#createProxy
    创建代理实例,根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用,不是本地引用的情况下默认采用Dubbo协议。
    Protocol#refer
    -> DubboProtocol#getClients 获取客户端实例,实例类型为 ExchangeClient,ExchangeClient 不具备通信能力,它需要依赖更底层的客户端实例
    -> DubboProtocol#getSharedClient 默认获取共享客户端
    -> DubboProtocol#initClient 创建客户端实例,默认为 Netty
    -> Exchangers#connect(URL url, ExchangeHandler handler)
  2. 使用 Cluster 合并 Invoker
    org.apache.dubbo.rpc.cluster.Cluster#join
    如果配置了多个 URL,则使用 Cluster 合并多个 Invoker
  3. 创建动态代理
    -> ProxyFactory#getProxy(Invoker invoker)
    常用的动态代理技术有 javassist、cglib、jdk,其中 dubbo 使用的是 javassist。

    根据早期 Dubbo 作者梁飞(http://javatar.iteye.com/blog/814426)的说法,使用 javassist 是为了性能。

Consumer端服务调用过程

Dubbo组件

调用代理类的方法

请求实际调用的是InvokerInvocationHandler.invoke

Registry & Directory

Registry 将注册信息保存到本地的Directory

启动服务时需要给一个Dubbo接口创建代理,这时需要将注册URL转换为Invoker对象:
org.apache.dubbo.registry.integration.RegistryProtocol#refer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}

引用一个服务时,会注册一个zkListener,监听注册服务的命名空间的变更情况。
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
那么服务是怎么注册的呢?其实就是上边Provider注册服务的过程。
监听到注册中心的变更后,更新本地的Invoker列表,同时删除不可用的。
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

...

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

...

try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

Invoker使用Directory

为了服务高可用同一个服务一般会有多个应用服务器提供,要先挑选一个提供者提供服务。在服务接口消费者初始化时,接口方法和提供者 Invoker 对应关系保存在 Directory。 中,通过调用的方法名称(或方法名称+第一个参数)获取该方法对应的提供者 Invoker 列表,如注册中心设置了路由规则,对这些 Invoker 根据路由规则进行过滤。
启动时订阅某个服务:
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 订阅providers、configurators、routers这几个namespace
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 使用Cluster组合Invoker
Invoker invoker = cluster.join(directory);
return invoker;
}

添加监听器:
org.apache.dubbo.registry.integration.RegistryDirectory#subscribe

1
2
3
4
5
6
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}

Consumer端监听服务变更事件,刷新Invoker列表:
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker

Registry的几种实现

  • ZooKeeperRegistry
  • RedisRegistry
    注册信息的存储,是在启动时调用的:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Override
    public void doRegister(URL url) {
    // key = dubbo/com.package.to.InterfaceName/providers
    String key = toCategoryPath(url);
    // url的全名
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    // 使用hash结构,可以providers一个key下面存多个url
    jedis.hset(key, value, expire);
    jedis.publish(key, Constants.REGISTER);
    success = true;
    if (! replicate) {
    break; //  如果服务器端已同步数据,只需写入单台机器
    }
    } finally {
    jedisPool.returnResource(jedis);
    }
    } catch (Throwable t) {
    exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
    }
    if (exception != null) {
    if (success) {
    logger.warn(exception.getMessage(), exception);
    } else {
    throw exception;
    }
    }
    }
    注册信息的主动删除,进程关闭时:
    1

Directory的几种实现

  • RegistryDirectory
    保存注册中心的服务注册信息,包括routers、configurators、provider。
  • StaticDirectory
    Invoker列表是固定的。

Cluster

封装了服务降级和容错机制,比如,如果调用失败则执行其他(FailoverClusterInvoker)、仍然调用失败则降级执行 mock(MockClusterInvoker)。
调用的第一层是MockClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
// 没有设置mock属性或设置为false,则直接调就完了
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
// 配成force了,直接调mock方法
else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
}
// fail-mock的方式
else {
try {
result = this.invoker.invoke(invocation);

//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}

} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}

if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}

实际invoke调用的是父类AbstractClusterInvoker的invoke方法,这个方法的主要功能是提供负载均衡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

// 找到所有可调用的服务器
List<Invoker<T>> invokers = list(invocation);
// 发送时要经过负载均衡
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}

上面的doInvoke是一个模板方法,由子类实现,默认子类是FailoverClusterInvoker,可以看到,它先通过负载均衡策略得到一个Invoker,再调用该Invoker,Invoker的默认实现是DubboInvoker,表示使用的是Dubbo协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Result doInvoke(List<Invoker<T>> invokers,
final List<Invoker<T>> invoked,
Holder<RpcException> lastException,
final Set<String> providers,
final Invocation invocation,
final LoadBalance loadbalance,
final int totalRetries,
int retries,
Holder<Invoker<T>> lastInvoked) throws RpcException {
if (retries < totalRetries) {
checkWheatherDestoried();
invokers = list(invocation);
checkInvokers(invokers, invocation);
}

// 负载均衡
final Invoker<T> invoker = select(loadbalance, invocation, invokers, invoked);
invoked.add(invoker);
lastInvoked.value = invoker;
RpcContext.getContext().setInvokers((List) invoked);

try {
return invoker.invoke(invocation);
} catch (RpcException e) {
//业务异常不重试
if (e.isBiz()) {
throw e;
}
lastException.value = e;
} catch (Throwable e) {
lastException.value = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}

if (--retries == 0) {
throw populateException(invokers, lastException.value, providers, invocation, totalRetries);
}

return doInvoke(invokers, invoked, lastException, providers, invocation, loadbalance, totalRetries, retries, lastInvoked);
}

Cluster的实现

  • MockClusterInvoker
    调用失败降级到mock接口;
  • BroadcastClusterInvoker
    每个Invoker都调一次,忽略了LoadBalance;
  • AvailableClusterInvoker
    把处于可用状态的Invoker都调一遍。
  • FailoverClusterInvoker
    一个Invoker失败就换个Invoker重试几次。
  • FailbackClusterInvoker
    如果调用失败就放到一个线程池中延迟5秒再发,一般用于发消息。
  • FailfastClusterInvoker
    失败立刻报错
  • FailsafeClusterInvoker
    失败就忽略,一般是用于记日志这种失败了影响也不大的场景。
  • ForkingClusterInvoker
    一次性选n个Invoker,并行调用,只要有一个调用成功就返回,线程间通过LinkedBlockingQueue通信。

LoadBalance

Cluster 层包含多个 Invoker,LoadBalance 负责从中选出一个来调用,有多种 LoadBalance 策略,比如随机选一个(RandomLoadBalance)、轮询(RoundRobinLoadBalance)、一致性hash(ConsistentHashLoadBalance)。
实例化LoadBalance:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
使用LoadBalance选择一个Invoker:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#select

LoadBalance的多种实现

  • RandomLoadBalance
    计算权重,然后根据每个Invoker的权重调一个。
  • LeastActiveLoadBalance
    找最近最不活跃的Invoker调用,如果这样的Invoker有多个,则按权重来随机选一个。
  • RoundRobinLoadBalance
    轮询
  • ConsistentHashLoadBalance
    一致性哈希,启动时会将Invoker排列在一个圆环上:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    this.identityHashCode = identityHashCode;
    URL url = invokers.get(0).getUrl();

    String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
    argumentIndex = new int[index.length];
    for (int i = 0; i < index.length; i++) {
    argumentIndex[i] = Integer.parseInt(index[i]);
    }

    int replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
    for (Invoker<T> invoker : invokers) {
    String address = invoker.getUrl().getAddress();
    // 多复制几个,更均匀,避免所有请求都被hash到同一个Invoker
    for (int i = 0; i < replicaNumber / 4; i++) {
    byte[] digest = md5(address + i);
    for (int h = 0; h < 4; h++) {
    long m = hash(digest, h);
    // 放入圆环上
    virtualInvokers.put(m, invoker);
    }
    }
    }
    }
    将Invoker保存到virtualInvokers上,但是virtualInvokers本身是一个HashMap,如果新来的请求不能精确hash到其中的某个Invoker怎么办?是通过tailMap找到的下一个Invoker:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Invoker<T> selectForKey(long hash) {
    Invoker<T> invoker;
    Long key = hash;

    if (!virtualInvokers.containsKey(key)) {
    SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
    if (tailMap.isEmpty()) {
    key = virtualInvokers.firstKey();
    } else {
    key = tailMap.firstKey();
    }
    }
    invoker = virtualInvokers.get(key);
    return invoker;
    }

Filter & Invoker 层

不过,在实际网络调用之前,Dubbo还提供Filter功能,Cluster会先激活Filter链然后最终调到DubboInvoker.invoke(RpcInvocation)

  1. ConsumerContextFilter可以将请求对象Invocation添加到上下文RpcContext中,其实就是存储到一个ThreadLocal变量中。
  2. FutureFilter在调用完毕后唤醒调用者线程。
  3. 或许还会有一些自定义的Filter,比如增加线程的TraceId、打印一些调用日志之类的,Filter结束后才最终调用到DubboInvoker

DubboInvoker封装了同步和异步调用,Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。
DubboInvoker是通过Netty发送消息的,消息本身如何发送的就不多说了。

Exchange

封装了网络客户端的发送逻辑,包括:

  • HeaderExchangeChannel
    对 Request 的序列化
  • ReferenceCountExchangeClient
    无引用时自动关闭客户端
  • HeaderExchangeClient
    心跳检测

数据编码 & 发送

DubboCodec
NettyChannel#send

Provider端接受调用的过程

  1. 接收请求
    NettyClient
    请求被接收后,通过 Netty 调用链向下传递执行
    NettyHandler#messageReceived
    NettyChannel
  2. 解码
    ExchangeCodec
  3. 线程派发
    Dispatcher
    IO 线程接收请求后分发给事件处理线程执行,具体的派发逻辑在ChannelHandler中实现,比如AllChannelHandler
  4. 请求分发
    ChannelEventRunnable
    根据请求类型将请求分发给不同的ChannelHandler处理。

Provider 端响应

Consumer 端接收响应

  1. 发送完请求后阻塞
    HeaderExchangeHandler
    用户线程在发送完请求后,会调用 DefaultFutureget 方法等待响应对象的到来,这时每个DefaultFuture都会关联一个调用编号,用于在接收到响应时能对应上请求的DefaultFuture
    当响应对象到来后,IO 线程根据调用编号可以找到DefaultFuture,之后会将响应对象保存到DefaultFuture,并唤醒用户线程。

预备知识

进程和clone系统调用

进程(Process)是运行中的程序实例,又可以称为任务(Task),进程在Linux内核中使用一个PCB来表示,主要包含运行状态、信号、进程号、父进程号、运行时间累计值、正在使用的文件(文件描述符表)、本任务的局部描述符及任务状态段信息。
不同于fork,clone系统调用允许子进程共享部分父进程的上下文,如内存空间、文件描述符表、信号等。

1
2
3
4
5
6
7
/* Prototype for the glibc wrapper function */
#define _GNU_SOURCE
#include <sched.h>
int clone(int (*fn)(void *), void *child_stack,
int flags, void *arg, ...
/* pid_t *ptid, void *newtls, pid_t *ctid */ );
/* For the prototype of the raw system call, see NOTES */

clone常用于实现多线程,因为子进程和父进程可以共享内存。
不同于fork创建的子进程会从调用的位置开始执行,clone创建的子进程会执行实参传入的fn(arg),并将实参中的arg传入。
fn(arg)返回后子进程页会终止,返回值即为子进程的exit code,当然子进程在遇到显式的exit调用或终止信号也会立刻退出。
子进程与父进程共享内存,它们不能(也不应该)使用同一个栈,因此必须使用child_stack参数指定子进程使用的栈所在的内存空间。栈是从上向下生长的,因此最好指定最顶层的一个地址。
flags的低位包含子进程退出了发送给父进程的信号,If this signal is specified as anything other than SIGCHLD, then the parent process must specify the __WALL or __WCLONE options when waiting for the child with wait(2). If no signal is specified, then the parent process is not signaled when the child terminates.
flags 还可以指定子进程和父进程间可以共享的内容,具体内容见man clone

虚拟网络设备和veth pair

Linux container 中用到一个叫做veth的东西,这是一种新的设备,专门为 container 所建。veth 从名字上来看是 Virtual ETHernet 的缩写,它的作用很简单,就是要把从一个 network namespace 发出的数据包转发到另一个 namespace。veth 设备是成对的,一个是 container 之中,另一个在 container 之外,即在真实机器上能看到的。

veth设备实现原理

VETH设备总是成对出现,送到一端请求发送的数据总是从另一端以请求接受的形式出现。创建并配置正确后,向其一端输入数据,VETH会改变数据的方向并将其送入内核网络子系统,完成数据的注入,而在另一端则能读到此数据。(Namespace,其中往veth设备上任意一端上RX到的数据,都会在另一端上以TX的方式发送出去)veth工作在L2数据链路层,veth-pair设备在转发数据包过程中并不串改数据包内容。
这里写图片描述
显然,仅有veth-pair设备,容器是无法访问网络的。因为容器发出的数据包,实质上直接进入了veth1设备的协议栈里。如果容器需要访问网络,需要使用bridge等技术,将veth1接收到的数据包通过某种方式转发出去。

VETH: Typically used when you are trying to connect two entities which would want to “get hold of” (for lack of better phrase) an interface to forward/receive frames. These entities could be containers/bridges/ovs-switch etc. Say you want to connect a docker/lxc container to OVS. You can create a veth pair and push the first interface to the docker/lxc (say, as a phys interface) and push the other interface to OVS. You cannot do this with TAP.

veth设备特点

  • veth和其它的网络设备都一样,一端连接的是内核协议栈
  • veth设备是成对出现的,另一端两个设备彼此相连
  • 一个设备收到协议栈的数据发送请求后,会将数据发送到另一个设备上去

常用命令

1
2
# 创建veth
ip link add name veth0 type veth0 peer name veth1

其他细节见Linux篇《虚拟化-网络设备》

资源隔离、namespace和cgroup

namespace的主要作用是对两个系统内的标识符的命名进行隔离,namespace有以下几种。
Linux-namespace种类
这几个flag 可以在调用clone进行进程创建的时候作为参数传入,从而实现namespace的隔离,从这个角度来说,container主要是进程角度的隔离,而不是传统的虚拟机(一些虚拟机实现同样是基于对操作系统层的虚拟化,所以应该和container是类似的,比如普通vmware),因为container底层用的是同一个内核来调度。

cgroup 是linux 内核的另外一个控制和隔离进程的特性,他分为cpu ,memory,net,io等几个子系统,从而实现对进程cpu,内存,磁盘,网络等资源使用的控制。

虚拟文件系统(VFS)和chroot

将根目录设置成另外一个目录

自制容器

docker 只是一个工具,container 技术的核心还是linux 内核的cgroup + chroot + namespace 技术。

制作image

制作自己容器,需要一个image ,可以从网上下一个,也可以自己制作,制作很简单,新装一个操作系统,安装一些需要用到的软件包,然后用tar 制作 / 目录下的压缩包,去掉一些虚拟文件系统的文件,本文用的是自己制作的centos 6.5 的image。

容器实现过程

容器实现过程可以归纳为

  1. 用clone系统调用创建子进程,传入namespace的那几个参数,实现namespace的隔离;
  2. 父进程中创建veth pair ,一个veth在自己的namespace,将另一个设置为子进程的namespace,实现container和宿主机的网络通信;
  3. 父进程创建cgroup memory和cpuset子系统,将子进程attach到cgroup子系统上,实现container 的资源限制和隔离;
  4. 子进程在自己的namespace里,设置主机名,mount proc虚拟文件系统,设置veth ip,chroot到centos 6镜像的位置, 最终将进程镜像替换成/bin/bash;
  5. 父进程调用waitpid 等待子进程退出。

容器实现

见下面的《源码》部分

实验

在解压好镜像后,该镜像根目录下可能还没有必须的命令文件及其依赖的动态链接库,主要是由于用chroot改变根目录后,原来路径下的一些文件在当前镜像内都找不到了,一般安装完操作系统后这些文件应该都安装好了,如果需要可以通过手动移动来解决

1
2
ldd /bin/bash
cp --parents /lib/x86_64-linux-gnu/libtinfo.so.5 ./

编译执行容器代码。

1
2
gcc lxc_demo.c -o lxc_demo -lcgroup
sudo ./lxc_demo

接下来分别在宿主机和容器内执行命令,可以得出一些结论:

  1. 容器和宿主机镜像可能不同,但是内核相同;
    1
    2
    3
    4
    5
    6
    7
    # (宿主机内)
    hgc@hgc-X555LD:~$ uname -r
    4.13.0-45-generic

    # (容器内)
    bash-4.4# uname -r
    4.13.0-45-generic
  2. 根目录下的文件不同;
    1
    ls /
  3. hostname,宿主机内为用户设定的主机名,容器内为mydocker,说明UTS namespace隔离成功;
    1
    hostname
  4. 网络方面有回环网络卡lo和veth1,veth1 169.254.1.2 能ping 通veth0的地址(宿主机上的veth)169.254.2.1,如果在外面加iptables 做nat 转换的话,container里面还可以和外面通信。我们看不到外面宿主机的eth0 和 eth1,说明container 的network namespace 隔离成功。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    # (宿主机内)
    $ ip addr
    ......
    19: veth0@if18: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether 46:c0:eb:a8:34:8d brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet 169.254.1.1/30 scope global veth0
    valid_lft forever preferred_lft forever
    inet6 fe80::44c0:ebff:fea8:348d/64 scope link
    valid_lft forever preferred_lft forever

    # (容器内)网络方面有回环网络卡lo和veth1
    bash-4.4# ip addr
    1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
    valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
    valid_lft forever preferred_lft forever
    18: veth1@if19: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether ce:04:78:b6:5a:2d brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet 169.254.1.2/30 scope global veth1
    valid_lft forever preferred_lft forever
    inet6 fe80::cc04:78ff:feb6:5a2d/64 scope link
    valid_lft forever preferred_lft forever
    # 尝试ping宿主机上的veth0设备
    bash-4.4# ping 169.254.1.1
    PING 169.254.1.1 (169.254.1.1) 56(84) bytes of data.
    64 bytes from 169.254.1.1: icmp_seq=1 ttl=64 time=0.101 ms
    ......
  5. 目前container 里面只有/bin/bash , 且进程号为 1,不是我们常见的init进程,或者systemd 。因为/bin/bash 为该namespace 下的第一个进程,说明我们的pid namespace隔离成功。
    1
    2
    3
    4
    5
    # 查看进程
    bash-4.4# ps -aux
    USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
    0 1 0.0 0.0 9924 3264 ? S 01:02 0:00 /bin/bash
    0 8 0.0 0.0 30524 2820 ? R+ 01:02 0:00 ps -aux
  6. mount 显示挂载的文件系统,和宿主机的不一样,说明mount namespace隔离成功。
    1
    2
    3
    4
    # 查看挂载情况(这里有点问题,原文里还有rootfs、sysfs等)
    bash-4.4# mount
    proc on /proc type proc (rw,relatime)
    ......
  7. 就cgroup的隔离情况来说,在cgroup文件系统内,memory的限制是我们设置的512M,cpu使用的是0-1号。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    hgc@hgc-X555LD:~/tools/virtualmachines/ubuntu-16.04$ cd /sys/fs/cgroup/memory/mydocker_1530580167/
    hgc@hgc-X555LD:/sys/fs/cgroup/memory/mydocker_1530580167$ cat memory.limit_in_bytes
    536870912
    hgc@hgc-X555LD:/sys/fs/cgroup/memory/mydocker_1530580167$ cd /sys/fs/cgroup/cpuset/mydocker_1530580167/
    hgc@hgc-X555LD:/sys/fs/cgroup/cpuset/mydocker_1530580167$ cat cpuset.cpus
    0-1
    hgc@hgc-X555LD:/sys/fs/cgroup/cpuset/mydocker_1530580167$ cat cpuset.mems
    0
    hgc@hgc-X555LD:/sys/fs/cgroup/cpuset/mydocker_1530580167$ top
    从下面执行情况可以看出,只有0号和1号cpu idle为0 ,其他的都接近100%,说明cgroup隔离效果是很好的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    # (宿主机内)执行top命令并点击1查看CPU负载情况
    hgc@hgc-X555LD:/sys/fs/cgroup/cpuset/mydocker_1530580167$ top
    top - 10:02:22 up 13:19, 1 user, load average: 0.74, 0.68, 0.72
    Tasks: 348 total, 1 running, 346 sleeping, 0 stopped, 1 zombi
    %Cpu0 : 5.3 us, 2.3 sy, 0.0 ni, 91.7 id, 0.0 wa, 0.0 hi, 0.7
    %Cpu1 : 8.3 us, 12.6 sy, 0.0 ni, 78.8 id, 0.0 wa, 0.0 hi, 0.3
    %Cpu2 : 4.0 us, 2.0 sy, 0.0 ni, 93.9 id, 0.0 wa, 0.0 hi, 0.0
    %Cpu3 : 3.0 us, 2.4 sy, 0.0 ni, 94.6 id, 0.0 wa, 0.0 hi, 0.0
    %Cpu4 : 7.1 us, 5.7 sy, 0.0 ni, 87.2 id, 0.0 wa, 0.0 hi, 0.0
    %Cpu5 : 4.4 us, 1.4 sy, 0.0 ni, 93.9 id, 0.3 wa, 0.0 hi, 0.0
    %Cpu6 : 3.3 us, 2.7 sy, 0.0 ni, 93.7 id, 0.0 wa, 0.0 hi, 0.3
    %Cpu7 : 3.7 us, 1.3 sy, 0.0 ni, 94.6 id, 0.0 wa, 0.0 hi, 0.3
    ......

    # (容器内)执行多个死循环任务,强行使CPU忙碌
    bash-4.4# while true; do echo > /dev/null; done &
    [1] 10
    bash-4.4# while true; do echo > /dev/null; done &
    [2] 11
    bash-4.4# while true; do echo > /dev/null; done &
    [3] 12
    bash-4.4# while true; do echo > /dev/null; done &
    [4] 13
    bash-4.4# while true; do echo > /dev/null; done &
    [5] 14
    bash-4.4# while true; do echo > /dev/null; done &
    [6] 15
    bash-4.4# while true; do echo > /dev/null; done &
    [7] 16

    # (宿主机内)同样是执行top命令并点击1,可以看到头两个CPU的负载情况有了明显变化
    hgc@hgc-X555LD:/sys/fs/cgroup/cpuset/mydocker_1530580167$ top
    top - 10:05:05 up 13:22, 1 user, load average: 4.25, 1.68, 1.06
    Tasks: 355 total, 1 running, 353 sleeping, 0 stopped, 1 zombi
    %Cpu0 : 9.9 us, 34.8 sy, 0.0 ni, 53.2 id, 2.0 wa, 0.0 hi, 0.0
    %Cpu1 : 14.4 us, 43.6 sy, 0.0 ni, 40.2 id, 1.7 wa, 0.0 hi, 0.0
    %Cpu2 : 2.2 us, 0.6 sy, 0.0 ni, 92.0 id, 0.0 wa, 0.0 hi, 5.1
    %Cpu3 : 2.6 us, 1.3 sy, 0.0 ni, 95.0 id, 0.0 wa, 0.0 hi, 1.0
    %Cpu4 : 6.4 us, 10.0 sy, 0.0 ni, 83.6 id, 0.0 wa, 0.0 hi, 0.0
    %Cpu5 : 2.3 us, 1.2 sy, 0.0 ni, 95.3 id, 0.0 wa, 0.0 hi, 1.2
    %Cpu6 : 1.7 us, 0.7 sy, 0.0 ni, 97.6 id, 0.0 wa, 0.0 hi, 0.0
    %Cpu7 : 2.0 us, 2.7 sy, 0.0 ni, 95.3 id, 0.0 wa, 0.0 hi, 0.0
    ......

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#define _GNU_SOURCE

#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <sched.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/mount.h>
// 必须先安装:sudo apt-get install libcgroup-dev
#include <libcgroup.h>
#include <time.h>
#include <signal.h>
#include <string.h>
#include <fcntl.h>

#define STACK_SIZE (1024 * 1024)
#define MEMORY_LIMIT (512*1024*1024)

//const char *rootfs = "/data1/centos6/rootfs/"; //centos6 镜像位置
//const char *rootfs = "/home/hgc/tools/virtualmachines/ubuntu-16.04/"; // 镜像位置
//const char *hostname = "mydocker"; //container 主机名
static char child_stack[STACK_SIZE];
//char *const child_args[] = {
// "/bin/bash",
// NULL
//};
int pipe_fd[2]; //父子进程同步

int child_main(void *args) {
char c;
// TODO:子进程里无法使用全局变量rootfs和hostname
// 镜像位置
const char *rootfs = "/home/hgc/tools/virtualmachines/ubuntu-16.04/";
// container 主机名
const char *hostname = "mydocker";
// 在子进程(容器)内执行的任务
// TODO:因为chroot改变了根目录的位置,所以在保证目标命令存在容器内的基础上(这里是/bin/bash),
// TODO:必须保证该命令依赖的资源同样存在于容器内(动态链接库,使用ldd查看)
char *const child_args[] = {
"/bin/bash",
0
};

printf("In child process(container)\n");
chroot(rootfs); //用chroot 切换根目录
if (errno != 0) {
perror("chroot()");
exit(1);
}
// TODO:这里不能使用sizeof
//clone 调用中的 CLONE_NEWUTS起隔离主机名和域名的作用
sethostname(hostname, strlen(hostname));
if (errno != 0) {
perror("sethostname()!");
exit(1);
}
//挂载proc子系统,CLONE_NEWNS 起隔离文件系统作用
// 需要在rootfs目录下创建proc目录
mount("proc", "/proc", "proc", 0, NULL);
if (errno != 0) {
perror("Mount(proc)");
exit(1);
}
//切换的根目录
chdir("/");
close(pipe_fd[1]);
read(pipe_fd[0], &c, 1);
//设置veth1 网络
system("ip link set lo up");
system("ip link set veth1 up");
system("ip addr add 169.254.1.2/30 dev veth1");
//将子进程的镜像替换成bash
printf("[%s]\n", child_args[0]);
if (execv(child_args[0], child_args) == -1) {
perror("execv(path, argv)");
}
return 1;
}

struct cgroup *cgroup_control(pid_t pid) {
struct cgroup *cgroup = NULL;
int ret;
ret = cgroup_init();
char *cgname = malloc(19 * sizeof(char));
if (ret) {
printf("error occurs while init cgroup.\n");
return NULL;
}
time_t now_time = time(NULL);
sprintf(cgname, "mydocker_%d", (int) now_time);
printf("%s\n", cgname);
cgroup = cgroup_new_cgroup(cgname);
if (!cgroup) {
ret = ECGFAIL;
printf("Error new cgroup%s\n", cgroup_strerror(ret));
goto out;
}
//添加cgroup memory 和 cpuset子系统
struct cgroup_controller *cgc = cgroup_add_controller(cgroup, "memory");
struct cgroup_controller *cgc_cpuset = cgroup_add_controller(cgroup, "cpuset");
if (!cgc || !cgc_cpuset) {
ret = ECGINVAL;
printf("Error add controller %s\n", cgroup_strerror(ret));
goto out;
}
// 内存限制 512M
if (cgroup_add_value_uint64(cgc, "memory.limit_in_bytes", MEMORY_LIMIT)) {
printf("Error limit memory.\n");
goto out;
}
//限制只能使用0和1号cpu
if (cgroup_add_value_string(cgc_cpuset, "cpuset.cpus", "0-1")) {
printf("Error limit cpuset cpus.\n");
goto out;
}
//限制只能使用0和1块内存
// TODO:使用0-1作为参数会报错“Invalid argument”
if (cgroup_add_value_string(cgc_cpuset, "cpuset.mems", "0")) {
printf("Error limit cpuset mems.\n");
goto out;
}
ret = cgroup_create_cgroup(cgroup, 0);
if (ret) {
printf("Error create cgroup%s\n", cgroup_strerror(ret));
goto out;
}
ret = cgroup_attach_task_pid(cgroup, pid);
if (ret) {
printf("Error attach_task_pid %s\n", cgroup_strerror(ret));
goto out;
}
return cgroup;
out:
if (cgroup) {
cgroup_delete_cgroup(cgroup, 0);
cgroup_free(&cgroup);
}
return NULL;
}

int main() {
char *cmd;
printf("main process: \n");
pipe(pipe_fd);
if (errno != 0) {
perror("pipe()");
exit(1);
}
// 调用clone创建子进程,传入namespace的几个flag参数,实现namespace的隔离
// 子进程执行child_main函数,其堆栈空间使用child_stack参数指定
// clone与线程的实现息息相关:http://www.xuebuyuan.com/1422353.html
int child_pid = clone(child_main, child_stack + STACK_SIZE, \
CLONE_NEWNET | CLONE_NEWNS | CLONE_NEWPID | CLONE_NEWIPC | CLONE_NEWUTS | SIGCHLD, NULL);
struct cgroup *cg = cgroup_control(child_pid);
// 添加veth pair,设置veth1 在子进程的的namespace,veth0 在父进程的namespace,为了实现container和宿主机之间的网络通信
// linl3 实现起来太繁琐,借用命令行工具ip 实现
system("ip link add veth0 type veth peer name veth1");
asprintf(&cmd, "ip link set veth1 netns %d", child_pid); // asprintf根据字符串长度申请足够的内存空间,但在之后必须手动释放
system(cmd);
system("ip link set veth0 up");
system("ip addr add 169.254.1.1/30 dev veth0");
free(cmd);
//等执行以上命令,通知子进程,子进程设置自己的网络
close(pipe_fd[1]);
waitpid(child_pid, NULL, 0);
if (cg) {
cgroup_delete_cgroup(cg, 0); //删除cgroup 子系统
}
printf("child process exited.\n");
return 0;
}

参考

LXC

  1. Linux Containers
  2. 理解 chroot

虚拟网络设备

  1. Linux-虚拟网络设备-veth pair

clone

  1. linux的Clone()函数详解

Linux namespace

  1. Linux Namespaces机制
  2. 介绍 Linux 的命名空间

接口扩展策略注解 @SPI

Dubbo中的很多扩展接口,如 Protocol、Transporter、Filter 等,都是通过 JDK 的 SPI 机制实现的,也就是说这些功能都可被用户自定义的扩展所替换,接口扩展点由注解@SPI定义。
JDK 中 SPI(Service Provider Interface)的设计与策略模式如出一辙,开发者可以替换掉 Dubbo 原扩展接口的默认实现,完成自定义需求,即可以自定义实现策略。
Dubbo 在 JDK 现有 SPI 实现的基础上做了如下改进:

  1. JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
  2. 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过 getName();获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。
  3. 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

那么 Dubbo 的 SPI 机制是怎么实现的呢?以协议扩展为例,Dubbo 中协议被抽象为 Protocol 接口。

读取扩展点

ServiceConfig#protocol

1
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()

Dubbo 使用 ExtensionLoader 实现扩展点加载。

  • ExtensionLoader#getExtensionLoader()
    获取 ExtensionLoader 实现,保证每种扩展点一个单例。
  • ExtensionLoader#getAdaptiveExtension()
    根据不同的 SPI 扩展点,即不同的 interface,生成不同的 Adaptive 实例的代码。
    -> getAdaptiveExtensionClass()
    -> getExtensionClasses()
    -> loadExtensionClasses()
    加载所有的扩展点实现,直到扩展点方法执行时才决定调用是一个扩展点实现,即从众多的实现策略中决定具体使用哪一个策略。
    ExtensionLoader 会依次从META-INF/dubbo/internal(Dubbo 内部实现)、META-INF/dubbo/(开发者自定义策略)、META-INF/services/这几个目录下读取扩展点实现,目录下的同名文件配置了对应扩展点的实现策略,调用 loadFile 来加载对应的扩展策略。
    -> loadFile(Map<String, Class<?>> extensionClasses, String dir)

生成 Adaptive 实例

  • ExtensionLoader#loadFile
    -> String fileName = dir + type.getName()
    拼接文件路径
    -> ClassLoader classLoader = findClassLoader()
    拿到 ExtensionLoader 的类加载器。
    -> Class<?> clazz = Class.forName(line, true, classLoader);
    文件每行是一个实现类的全路径名,通过反射加载并拿到具体类型。
    -> extensionClasses.put(n, clazz)
    添加到 map 里返回。
  • ExtensionLoader#cachedClasses
    -> cachedClasses.set(classes)
    添加到缓存。
  • ExtensionLoader#createAdaptiveExtensionClass
    -> ExtensionLoader#createAdaptiveExtensionClassCode
    生成 Adaptive 类。
    -> compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension()
    拿到编译接口扩展点的一个具体实现,dubbo 内部支持 jdk 和 javassist,默认是 javassist。
    -> compiler.compile(code, classLoader)
    编译代码,生成 Adaptive 实例类。

上面提到 Compiler 也是一个扩展点,同样也依赖这个流程来实例化,在运行时生成 Adaptive 实例的时候,需要生成 Compiler 接口的 Adaptive 实例,即运行生成 Adaptive 实例的时候需要先有一个 Compiler 接口的 Adaptive 实例,那这样岂不是陷入了死循环,这里就要提到显示指定 Adaptive 实例的情况。@Adaptive注解支持类级别和方法级别:

1
2
1、类级别:只能拥有一个,注解打在接口实现类上,显示的注册一个Adaptive实例,在编译期就存在,如`AdaptiveCompiler`,解决了上面的死循环问题,由`AdaptiveCompiler`依据dubbo配置决定使用哪个编译类;
2、方法级别:在运行期动态的生成Adaptive实例。

通过 URL 动态选择协议

ExtensionLoader#createAdaptiveExtensionClassCode
生成的 Protocol 的 Adaptive 实例类,依据 URL 中 protocol key-value 的值,选择对应的 Protocol 策略来暴露和引用服务。
扩展点方法调用会有 URL 参数(或是参数有 URL 成员),这样依赖的扩展点可以从 URL 拿到配置信息,所有的扩展点自己定好配置的 Key 后,配置信息从 URL 上从最外层传入,URL 在配置传递上即是一条总线。
以 dubbo+zookeeper 为例,暴露和引用远程服务都是注册在 zookeeper 上的,服务注册在 zookeeper 上本质其实是一个 URL,远程服务调用的过程中依据 URL 的 key-value 来动态决定执行 Protocol、Filter 等接口扩展点的执行策略。
下面是 Provider 端暴露 HelloService 服务时在 zookeeper 上注册的 URL,在 zookeeper 上的路径为/dubbo/com.dubbo.test.service.HelloService/providers,URL 表示了采用 dubbo 协议,接口为 com.dubbo.test.service.HelloService,方法为 say,要执行的 Filter 为 whiteFilter 等。

1
2
[zk: localhost:2181(CONNECTED) 1] Is /dubbo/com.dubbo.test.service.HelloService/providers
[dubbo://127.0.0.1:2O881/com.dubbo.test.service.HelloService?anyhost=true&application=dubbo-test-service&dubbo=2.4.10&group=test-prod&interface=com.dubbo.test.service.HelloService&methods=say&pid=21242&revision=l.0&service.filter=whiteFilter&side=providerxtamp=1495436105078&version=l.0]

缓存

  • volatile Class<~> cachedAdaptiveClass
    这个是缓存 AdaptiveClass,如果一个扩展类的类上面带有 @Adaptive 注解,那么这个类就会被缓存在这个地方,每一种类型的扩展类只有一个 AdaptiveClass,如果发现有多个,则会报错。另外,当通过 getAdaptiveExtensionClass 来获取自适应扩展类时,如果当前还没有 AdaptiveClass,则会自动创建一个(动态生成 Java 代码,再编译,典型的比如 Protocol$Adaptive 就是这么生成的)
  • Set<~> cachedWrapperClasses
    这个是缓存包装类的,Dubbo 判断一个扩展类是否是包装类比较简单,通过构造函数来判断,如果这个扩展类有一个构造函数,其中参数是当前扩展类的类型,那么就是包装类,举个例子,ProtocolFilterWrapper 就是 protocol 扩展类的包装类,因为有这个构造函数:public ProtocolFilterWrapper(Protocol protocol)
  • Map<~> cachedActivates
    这个是缓存激活的扩展类,当然,@Activate 注解还可以规定激活的条件和时机
  • Holder<~> cachedClasses
    这个是缓存 Adaptive 和 Wrapper 扩展类之外的普通扩展类

扩展类被加载后会根据一定的规则放入以上 4 个缓存中,比如带有 @Adaptive 注解的会被放入 cachedAdaptiveClass。

0%