Tallate

该吃吃该喝喝 啥事别往心里搁

Spring 如何加载注解

很多注解本身只是提供了一个标识,要实现注解所表示的功能,必然还会有一个扫描器扫描这个注解,然后将必须的Bean注入到Spring容器内,而且很多时候会为被注解的对象生成一个动态代理,以实现日志记录、接口幂等、限流等功能。
要自己实现一个注解,关键是如何扫描及如何生成代理并注入到Spring容器这两个步骤,具体的实现可以参考MapperScannerConfigurer,大体逻辑是:

  1. 在Spring容器加载完毕后,再对指定包下的类进行一次扫描;

Spring 三级缓存

Spring 中产生循环依赖的三种情况

  1. 构造器注入循环依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Service
    public class A {
    public A(B b) {
    }
    }
    @Service
    public class B {
    public B(A a) {
    }
    }
    构造器注入构成的循环依赖,此种循环依赖方式是无法解决的,只能抛出 BeanCurrentlyInCreationException 异常表示循环依赖。
    不能解决的原因是:Spring 解决循环依赖的原理是实例化 Bean 后先把引用存到一个 Map 中,之后初始化成员变量时,可以直接从这个 Map 中取。但是构造器注入相当于实例化和初始化是同时进行的,因此无法解决。
  2. singleton 模式 field 属性注入循环依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Service
    public class A {
    @Autowired
    private B b;
    }

    @Service
    public class B {
    @Autowired
    private A a;
    }
  3. prototype 模式 field 属性注入循环依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Service
    public class A {
    @Autowired
    private B b;
    }

    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Service
    public class B {
    @Autowired
    private A a;
    }

Spring 创建 Bean 的流程

  1. createBeanInstance:实例化,其实也就是调用对象的构造方法实例化对象
  2. populateBean:填充属性,这一步主要是对 bean 的依赖属性进行注入(@Autowired)
  3. initializeBean:回到一些形如 initMethod、InitializingBean 等方法

其中,循环依赖可能发生在第一步和第二步,其中第一步是因为构造方法中可能会需要传入其他 Bean。

Spring 三级缓存如何解决循环依赖

缓存生效时间

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultSingletonBeanRegistry extends SimpleAliasRegistry implements SingletonBeanRegistry {

// 用于存放完全初始化好的 bean,从该缓存中取出的 bean 可以直接使用
/** Cache of singleton objects: bean name --> bean instance */
private final Map<String, Object> singletonObjects = new ConcurrentHashMap<String, Object>(256);

// 提前曝光的单例对象的cache,存放原始的 bean 对象(尚未填充属性),用于解决循环依赖
/** Cache of early singleton objects: bean name --> bean instance */
private final Map<String, Object> earlySingletonObjects = new HashMap<String, Object>(16);

// 单例对象工厂的cache,存放 bean 工厂对象,用于解决循环依赖
/** Cache of singleton factories: bean name --> ObjectFactory */
private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<String, ObjectFactory<?>>(16);

获取单例 Bean 的过程

org.springframework.beans.factory.support.DefaultSingletonBeanRegistry#getSingleton(java.lang.String)

  1. 先从一级缓存 singletonObjects 中去获取,如果获取到就直接 return。
  2. 如果获取不到或者对象正在创建中(isSingletonCurrentlyInCreation()),那就再从二级缓存 earlySingletonObjects 中获取,如果获取到就直接 return。
  3. 如果还是获取不到,且允许 singletonFactories(allowEarlyReference=true)通过 getObject()获取。就从三级缓存 singletonFactory.getObject()获取,如果获取到了就从 singletonFactories 中移除,并且放进 earlySingletonObjects,其实也就是从三级缓存移动到了二级缓存。

总结Linux中的5种IO模型,其中最常用的是IO多路复用,特别是epoll是各种网络框架的底层IO框架。

阅读全文 »

文件系统操作

1
2
3
4
5
6
7
ls -F filename # 列出目录,使用*等符号标志文件类型,*表示可执行文件,/为目录文件
ls -i filename # 查看文件inode号
mkdir -p dirname # 创建目录,如果有父目录就创建父目录
stat filename # 查看文件详细信息,包括inode号、链接数
mv # 移动/重命名
ln oldfile newfile # 创建硬链接
ln -s sourcefile/sourcedir targetfile/targetdir # 创建软链接

chroot

chroot

mount 和 umount

1
mount | column -t # 查看挂载分区信息

制作目录硬链接。

1
2
3
4
5
6
7
# 假设当前目录下有a和b两个目录
mount -o bind a b # b成为a的硬链接,其实就是将a挂载到了b上
mount # 通过mount可以看出是bind这个参数起作用
/dev/sda2 on /home/hgc/Downloads/b type ext4 (rw,relatime,errors=remount-ro,data=ordered)
# 删除时必须先卸载再删除
umount b
rm -rf b

ln

fdisk

1
fdisk -l # 查看硬盘分区信息

并发(Concurrency)

线程、进程,线程安全,进程同步,可见性,一致性,锁,信号量,并发,并行

线程和进程

从操作系统概念上说,线程是最小的可执行单位,也就是系统调度的最小单位。进程是资源分配的最小单位。线程是依赖进程存在的,共享进程内的资源,如内存,cpu,io 等。在操作系统的发展过程中,为了提高系统的稳定性、吞吐量和安全性,操作系统内核和用户态做了隔离,例如 Linux 有内核线程,用户线程,内核进程,用户进程,从根本上 Linux 是没有线程的,线程对 Linux 系统来说是个特殊的进程。那么用户线程和内核线程是一一对应呢?从宏观上看是一一对应的,在用户态的每一个线程,都会在内核有对应的执行线程,但是由于资源的限制,用户态的线程和内核线程是多对一的关系。用户进程和内核进程也类似。具体怎样对应的,这里就不探讨了。
为了提高操作系统的实时性,操作系统是以时间片轮转来实现任务调度的。理论上时间片内是不可以被中断的,可认为是 cpu 最小的单位执行时间。现代操作系统为了提高用户体验,线程都是抢占式的,而中断一般在时间片用完的时候发生。线程、进程和 CPU 都是多对一的关系,所以存在进程线程切换的问题。
线程内部还是有自己内存空间的,所以有个概念叫线程内存模型。线程内部有自己私有的本地内存,故线程和线程之间的本地内存存在可见性问题。例如全局变量 A 在线程 1 修改后,线程 2 并不一定能拿到 A 的修改值,因为线程 1 会把全局变量 A 拷贝到本地内存,修改后并不会马上同步。在编译的时候,编译器为了优化,(例如利用超线程技术)可能会重排指令的执行顺序,这就会存在一致性了。

线程安全

在线程安全里面经常要讨论的两个问题就是:可见性和一致性。锁是什么东西呢?锁就是一道内存屏障,保证可见性和一致性的一种策略,由操作系统甚至更底层的硬件提供。加锁是消耗资源的,特别是在多核 CPU 上,现在多核 CPU 一般有 3 级缓存,一级缓存通常是单核独占的,而线程的本地内存很可能就保存在 cpu 的缓存里面,然而加锁就意味着保证可见性和一致性,需要中断同步数据,保证别人拿到的是最新修改值。由于用途不同,锁被设计成各种各样的,如互斥锁,读写锁,自旋锁,同步块,数据库的事务等,如果只要保证可见性的,可以不使用锁,在 java 里面可以使用 volatile 修饰全局变量。虽然在 c/c++,都有同样的修饰符,但是是不是一样的意思呢,请参考其他文章。

死锁(deadlock)

定义

多个进程竞争资源造成的互相等待情况。

资源

可重用性资源:可供重复使用多次的资源
不可抢占性资源:一旦系统把某资源分配给该进程后,就不能将它强行收回,只能在进程使用完后自动释放
可消耗资源:又叫临时性资源,它是在进程运行期间,由进程动态的创建和消耗的

死锁产生的原因

  1. 系统资源的竞争
    系统资源的竞争导致系统资源不足,以及资源分配不当,导致死锁。
    主要是竞争可重用不可抢占式的资源和可消耗的资源。
  2. 进程运行推进顺序不合适
    进程在运行过程中,请求和释放资源的顺序不当,会导致死锁。

死锁产生的条件

互斥条件 一个资源每次只能被一个进程使用,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
请求与保持条件 进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源 已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
不可剥夺条件 进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能 由获得该资源的进程自己来释放(只能是主动释放)。
循环等待条件 若干进程间形成首尾相接循环等待资源的关系
这四个条件是死锁的必要条件,只要系统发生死锁,这些条件必然成立,而只要上述条件之一不满足,就不会发生死锁。
形象地说,就是有两个酒鬼,一个有开瓶器,一个有酒,这两种资源都只能被一个人占有(互斥),且用完之前不能被另一个人抢去(不可剥夺),他们互相等对方手上的资源(循环等待),但又不肯放开自己手上的资源(请求与保持),因此陷入了死锁。

死锁避免

系统对进程发出每一个系统能够满足的资源申请进行动态检查,并根据检查结果决定是否分配资源,如果分配后系统可能发生死锁,则不予分配,否则予以分配。
书上给出了两种死锁避免策略

  1. 进程启动拒绝
    若对每个资源,能满足现有所有进程再加上新进程的需求,则可以启动这个进程,否则拒绝
  2. 资源分配拒绝(银行家算法)

死锁预防

死锁预防是设法至少破坏产生死锁的四个必要条件之一,严格的防止死锁的出现。

  1. 互斥
    不可能禁止,比如文件只允许互斥的写访问
  2. 占有且等待
    可以要求进程一次性请求所有需要的资源,并且阻塞这个进程直到所有请求都同时满足,这样就不会再请求新资源了。
  3. 不可抢占
  4. 循环等待

持久化(Persistent)

Linux IO 模型

X
应用程序调用内核 IO 函数的过程如下图所示:
X
处于 OS 的安全性等的考虑,进程无法直接操作 I/O 设备,必须通过系统调用来请求内核完成 I/O 动作,而内核会为每个 I/O 设备维护一个 Buffer。

  1. 用户进程发起请求;
  2. 内核接收到请求后,从 I/O 设备中获取数据到 Buffer 中;
  3. 将 Buffer 中的数据拷贝到用户进程的地址空间,该用户进程获取到数据后响应给客户端。

在整个请求过程中,数据输入至 Buffer 需要时间,从 Buffer 复制数据到进程也需要时间,这个等待时间是限制 I/O 效率的罪魁祸首,根据等待方式的不同,I/O 动作可以分为以下五种模式:

  • 阻塞 I/O(Blocking I/O)
  • 非阻塞 I/O(Non-Blocking I/O)
  • I/O 复用(I/O Multiplexing)
  • 信号驱动的 I/O(Signal Driven I/O)
  • 异步 I/O(Asynchronous I/O)

存储器管理

文件系统

IO(pipe)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <syscall.h>

int testPipe(){
int fd[2];

int *read_fd = &fd[0];
int *write_fd = &fd[1];

if(pipe(fd) == -1){
printf("pipe create failed\n");
}

int pid = fork();
if(pid == -1){
printf("fork failed");
return -1;
}
else if(pid == 0){
int count;
char string[] = "hahaha";
close(*read_fd);
count = write(*write_fd, string, sizeof(string));
printf("写了%d个字符\n", count);
return 0;
}
else{
int count;
char buffer[100];
close(*write_fd);
count = read(*read_fd, buffer, sizeof(buffer));
printf("父进程接受到%d字节的数据:%s", count, buffer);
return 0;
}
}

虚拟内存

swap

虚拟化(Virtualization)

驱动管理

参考

Linux 应用

  1. 虚拟内存
    All about Linux swap space
    Swap
    Linux Performance: Why You Should Almost Always Add Swap Space
  2. 演进
    Linux 内核的发展 介绍 2.6.28 和 2.6.29 版本中的新特性
    对 Linux 内核的发展方向的展望 - Linux 4.2
  3. 运维
    老司机告诉你:正规的运维工作是什么的?
  4. 并发
    Linux 原子操作 atomic_cmpxchg()/Atomic_read()/Atomic_set()/Atomic_add()/Atomic_sub()
  5. 隔离
    cgroup - jerry017cn

操作系统

  1. 分时和实时操作系统
    List of open source real-time operating systems
  2. 操作系统概念
    【操作系统】操作系统综述(一)
  3. Operating Systems: Three Easy Pieces

Concurrency

  1. 【操作系统】进程管理(二)
  2. 【操作系统】处理机调度与死锁(三)

Persistent

  1. 【操作系统】存储器管理(四)
  2. 【操作系统】文件管理(六)

Virtualization

  1. 【操作系统】设备管理(五)

Linux 内核

  1. 哈工大操作系统
    操作系统之基础
    操作系统原理与实践
  2. NJU
    在开始愉快的 PA 之旅之前
    NJU-ProjectN
  3. The Linux Information Project
  4. The Linux Kernel Archives
  5. Linux 内核文档(中文版)
  6. Write Your Own Operating System
  7. 6.828: Operating System Engineering

无锁栈

Treiber Stack

无锁列表

CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全的 ArrayList,对其进行的修改操作和元素迭代操作都是在底层创建一个拷贝的数组(快照)上进行的,也就是写时拷贝策略。CopyOnWriteArrayList 适合读多写少的场景,但如果应用在写操作频繁的场景下反而会降低性能。
CopyOnWriteArrayList类图

  • lock:保证写操作时的并发安全;

add(E e)

添加操作拷贝了份快照,在快照上添加元素,最后替代原数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {

// 加独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取array
Object[] elements = getArray();

// 拷贝array到新数组,添加元素到新数组
// 新数组长度是原数组长度+1,可见CopyOnWriteArrayList是无界的
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;

// 使用新数组替换添加前的数组
setArray(newElements);
return true;
} finally {
// 释放独占锁
lock.unlock();
}
}

get(int index)

get 操作获取下标处的元素,实际上 get 可以被分解为以下两个步骤:

  1. 获取 array 的引用;
  2. 通过下标访问 array 指定位置的元素。

整个过程并没有加锁,如果在访问期间有另一个线程删除了某个元素,实际上因为修改操作是发生在原数组的一个快照上的,get 操作仍然获取的是原数组上的元素,因此不会发生类似数组越界的问题。但同时也不可避免这个过程带来的弱一致性,因为元素事实上已经被删除了却仍然可以被访问到。

set(int index, E element)

修改 list 中指定元素的值。

  • 如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常;
  • 如果指定位置元素与新值不一致,则创建新数组、在新数组上修改,最后设置新数组到 array(COW)。
  • 即使没有变化,也还是需要重新设置一次 array,这主要是因为 array 本身是 volatile 的,set 方法应当提供 volatile 的语义。

remove(int index)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E remove(int index) {

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {

//获取数组
Object[] elements = getArray();
int len = elements.length;

//获取指定元素
E oldValue = get(elements, index);
int numMoved = len - index - 1;

//如果要删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
//分两次拷贝除删除后的元素到新数组
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//使用新数组代替老的
setArray(newElements);
}
return oldValue;
} finally {
//释放锁
lock.unlock();
}
}

remove(Object o)

remove(Object o, Object[] snapshot, int index)

iterator

CopyOnWriteArrayList 中的 iterator 是弱一致性的,其他线程的修改操作对 iterator 不可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

static final class COWIterator<E> implements ListIterator<E> {
//array的快照版本
private final Object[] snapshot;

//数组下标
private int cursor;

//构造函数
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

//是否遍历结束
public boolean hasNext() {
return cursor < snapshot.length;
}

//获取元素
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
  • 如果在该线程使用返回的迭代器遍历元素的过程中,其它线程没有对 list 进行增删改,那么 snapshot 本身就是 list 的 array,因为它们是引用关系。
  • 如果在遍历期间存在其他线程对 list 的增删改操作,那么 snapshot 会成为原 array 的快照,此时其他线程对 list 进行的增删改是不可见的,因为它们操作的是两个不同的数组。

无锁队列

ConcurrentLinkedQueue

  • 线程安全
  • 无界
  • 非阻塞

数据结构

ConcurrentLinkedQueue类图

  • 底层队列使用单向链表实现。
  • 两个volatile的Node节点(head和tail)分别存放队列的首尾节点,从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
    1
    2
    3
    public ConcurrentLinkedQueue() {
    head = tail = new Node(null);
    }
  • Node节点内部为一个volatile修饰的变量item用来存放节点的值,next用来存放链表的下一个节点,从而链接成一个单向无界链表,如下图所示:
    ConcurrentLinkedQueue的队列结构
  • 入队和出队操作使用CAS来实现线程安全。

入队 - offer

offer操作在队列末尾添加一个元素:

  • 如果传入的是null,抛出NPE,表明ConcurrentLinkedQueue是不允许插入null值的;
  • 其他情况下插入任何元素都会返回true,因为该队列是无界队列;
  • 使用CAS操作实现线程安全
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    // 从尾节点进行插入
    for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;

    // 如果q==null说明p是尾节点,则执行插入
    // (1)
    if (q == null) {
    // 使用CAS设置p节点的next节点
    if (p.casNext(null, newNode)) {
    // cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
    if (p != t)
    casTail(t, newNode); // Failure is OK.
    return true;
    }
    }
    // (2)
    else if (p == q)
    // 多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
    // 重新找新的head,因为新的head后面的节点才是正常的节点。
    p = (t != (t = tail)) ? t : head;
    // (3)
    else
    // 寻找尾节点
    p = (p != t && t != (t = tail)) ? t : q;
    }
    }
    注意这里的循环体从队列尾部添加元素:
  1. 刚开始队列为空,代码(1)通过CAS替换p的下一个节点;
    注意有一个哨兵节点null,刚开始队列的head和tail节点都是指向该哨兵节点,因此队列中至少都会有一个节点;
  2. 如果多个线程同时执行插入,总会有一个线程CAS时插入失败,这时会进入下一次循环
    ConcurrentLinkedQueue插入1
    这时不满足(1)和(2)的条件,在代码(3)处会将q赋值给p
    ConcurrentLinkedQueue插入2
    再到下一次循环时q就会移动到null,这时要么正常插入,要么又被别人通过CAS抢了。
  3. 代码(2)是在执行poll时可能出现的情况:
    ConcurrentLinkedQueue插入3
    此时由于t==tail,所以p被赋值为head,然后继续循环插入元素。

出队 - poll

poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E poll() {
// goto标记
restartFromHead:

// (1)无限循环
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取当前队头节点
E item = p.item;

// (2)当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//(6)cas成功标志当前节点以及从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// (3)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// (4)自引用了,则重新找新的队列头节点
else if (p == q)
continue restartFromHead;
// (5)
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
  1. 刚开始队列是空的,内层循环代码(3)判断队列为空就直接返回null了;
    这时updateHead执行时由于h等于p所以没有设置头节点,poll直接返回null。
  2. 如果执行到(3)时已经有其他线程调用了offer方法成功添加一个元素到队列末尾,这时q会指向新增元素的节点
    ConcurrentLinkedQueue出队1
    这时会进入(5),令p也指向新q。
    然后在下一次循环时,进入代码(2),执行p.casItem(item, null)时会通过CAS操作设置头节点的值为null。
    代码(6)处,此时h指向哨兵节点,而p指向队列头节点,这时将p设置为新的头节点(这时p里的值已经被清掉了是一个空节点)。
    此时队列的状态为:
    ConcurrentLinkedQueue插入3
    这就是之前讲队列offer时的一种特殊情况。
  3. 自引用的情况
    假设线程A已经执行到(2)将第一个节点值置为null,这时又有一个线程B开始执行poll操作,如下图所示:
    ConcurrentLinkedQueue出队2
    然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
    ConcurrentLinkedQueue出队3
    然后线程 B 继续执行代码(3)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(4)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
    ConcurrentLinkedQueue出队4

ArrayBlockingQueue

offer是不会阻塞的,如果满了直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果已经超过容量阈值,则直接返回false
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 把它看成一个循环数组,如果超出范围就卷回
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒这个Condition,必须是在加了锁的前提下才能使用
notEmpty.signal();
}

poll同样也不会阻塞,如果空了直接返回null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 回卷
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒
notFull.signal();
return x;
}

put操作会等待notFull这个条件:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

take操作同理,会等待notEmpty这个条件:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

PriorityBlockingQueue

SynchronousQueue

LinkedTransferQueue

队列比较

Disruptor

  • 无锁内存队列

  • 优化 CPU 伪共享

  • RingBuffer
    环形队列,使用定长数组存储,长度是 2^N,可以使用位运算提升性能。
    无锁:无锁设计减少了竞争。
    预热:预先填充好任务/事件,不需要像链表那样每次添加/删除节点时去创建/回收节点,从而可以避免一定的垃圾回收。
    缓存行填充解决了 CPU 伪共享问题。

  • WorkPool
    存储 WorkProcessor 的池子,Disruptor 可以通过 Executor 并发启动每一个 WorkProcessor

  • WorkProcessor
    从 RindBuffer 消费事件/任务,并交由 WorkHandler 处理。

  • WorkHandler
    处理任务的工作者,根据任务类型委托给不同的 EventHandler。

Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用

异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务写入磁盘,对用户线程来说,耗时只有将数据写入队列中。

并发安全 Map

ConcurrentHashMap

ConcurrentHashMap结构

get

代码:java.util.concurrent.ConcurrentHashMap.get

  1. 计算 key 的散列值,可以使用该散列值定位到散列表中的某个槽。
    如果 key 是自定义类型对象,需要实现重写 hash 方法。
  2. 找到对象
    hash 值是不精确匹配的,hash 值的关键是计算简单而且有一定的区分度,比如取 string 的前 3 位的和作为 hash 值。
    要精确匹配需要使用对象的 equals 方法。
    ConcurrentHashMap 中哈希槽的实现方法有两种:链表和红黑树,链表和红黑树的查找过程就不必细说了。

put

ConcurrentHashMap的put操作
代码:java.util.concurrent.ConcurrentHashMap.put

  1. hash
  2. 找对象
    找对象过程与 get 的区别主要是 put 需要并发控制:
    • 如果槽是空的,则通过 CAS 直接赋值;
    • 如果槽非空,则先用synchronized锁住槽,接下来根据槽的数据结构来插入节点,如果槽是链表,则遍历链表找该 Node 是否已存在,不存在的情况下插入到末尾,如果槽是红黑树,则通过二叉树的遍历找目标 Node,找不到的情况下插入到叶子并重新执行红黑平衡。

rehash

扩容的触发条件与HashMap一致。
扩容流程大致上是:遍历哈希槽,对每个需要迁移的哈希槽进行synchronized加锁。
当扩容开始后,其他线程必须等扩容完成后才能工作,但其他线程也不是就一直阻塞等扩容完成,而是调用helpTransfer方法一起帮助进行扩容,实际上因为扩容的单位是哈希槽,因此多线程并发执行扩容并不会导致明显的冲突增加。

扩容入口:

  1. helpTransfer
    写入操作时协助扩容,即判断hash节点是ForwardingNode则调用helpTransfer将
  2. 全量添加时,需要保证

扩容代码:
java.util.concurrent.ConcurrentHashMap.transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 创建一个新的两倍大小的新nextTab,将老tab中的元素迁移过去
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 标记节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果tab当前位置为null,则设置fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 已经是fwd节点,则遍历下一个位置
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// tab当前位置已有节点,则加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 表示链表节点,如果是树节点则fh=-2
if (fh >= 0) {
// 头节点的hash值
int runBit = fh & n;
Node<K,V> lastRun = f;
// 链表的下一节点p
for (Node<K,V> p = f.next; p != null; p = p.next) {
// p节点的hash值
int b = p.hash & n;
// 避免成环?
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将ln和hn转移到nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
原tab置为fwd,表示已经被转移了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

size

size操作返回的是一个不精确的值,因为进行统计的过程中,很有可能会有其他线程正在进行插入和删除操作。

1.8之前的size:

  1. 遍历segments数组,将每个segment的count加起来作为总数,将modCount加起来作为修改总数;
    modCount会在每次segment被修改时+1(只增不减),用于比较。
  2. 再做一遍遍历,将这次的modCount总数和上一次的比较,如果一致则计数准确直接返回,否则重试;
  3. 如果重试了2次都不行,则第三次会对segment加锁再统计。

1.8之后,没有了分段锁,size不会每次都遍历segments统计,而是在更新时修改总数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

从源码中可以看到,ConcurrentHashMap#size的结果就是:
baseCount + sum(counterCells)
其中:

  • baseCount:计数,总数发生变化时通过CAS修改
  • counterCells:如果baseCount CAS修改失败,作为兜底,类似LongAdder的思路。

put操作的末尾会调用addCount()更新baseCount的值,如果CAS修改失败了,则使用counterCells,如果CAS修改 counterCells失败了,则使用fullAddCount方法继续死循环操作,直到成功。

QA

  1. JUC 并发包中并发组件 CopyOnWriteArrayList 的实现原理,CopyOnWriteArrayList 是如何通过写时拷贝实现并发安全的 List?

  2. 什么是弱一致性?

  3. 说一下 ConcurrentHashMap。

  4. ConcurrentHashMap 怎么实现并发安全?
    相对 Hashtable 来说 ConcurrentHashMap 的锁粒度是更小的,Hashtable 中使用 synchronized 实现的一种方法级的悲观锁,相当于把整个散列表锁住了,不利于系统整体吞吐量的提升。
    JDK1.7 中它使用的是一种分段锁来保证并发安全,是一种粒度较小的锁,写操作每次只锁住一个哈希槽,
    JDK1.8 之后改为通过实现一种基于 CAS 的乐观锁来保证并发安全,当然,和 HashMap 一样,每个哈希槽在增长到一定程度后会自动转换为红黑树。

参考

  1. 【目录】JUC 集合框架目录
  2. 并发框架 Disruptor 译文

并发安全容器(Queue)

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层数据结构使用单向链表实现,入队和出队操作使用 CAS 来实现线程安全。
2.1 ConcurrentLinkedQueue 类图结构
先简单介绍下 ConcurrentLinkedQueue 的类图结构如下图:
image.png
如上类图 ConcurrentLinkedQueue 内部的队列是使用单向链表方式实现,其中两个 volatile 类型的 Node 节点分别用来存放队列的首尾节点。从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
Node 节点内部则维护一个 volatile 修饰的变量 item 用来存放节点的值,next 用来存放链表的下一个节点,从而链接为一个单向无界链表。
首先一个图来概况该队列构成,读者可以读完本节后在回头体会这个图:image.png
2.2 ConcurrentLinkedQueue 原理介绍
本节主要介绍 ConcurrentLinkedQueue 的几个主要的方法的实现原理
2.2.1 offer 操作
offer 操作是在队列末尾添加一个元素,如果传递的参数是 null 则抛出 NPE 异常,否者由于 ConcurrentLinkedQueue 是无界队列该方法一直会返回 true。另外由于使用 CAS 无阻塞算法,该方法不会阻塞调用线程,下面具体看看实现原理。
public boolean offer(E e) {
//(1)e为null则抛出空指针异常
checkNotNull(e);

//(2)构造Node节点
final Node newNode = new Node(e);

//(3)从尾节点进行插入
for (Node<E> t = tail, p = t;;) {

    Node<E> q = p.next;

    //(4)如果q==null说明p是尾节点,则执行插入
    if (q == null) {

        //(5)使用CAS设置p节点的next节点
        if (p.casNext(null, newNode)) {
            //(6)cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
            if (p != t)
                casTail(t, newNode);  // Failure is OK.
            return true;
        }
    }
    else if (p == q)//(7)
        //多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
        //重新找新的head,因为新的head后面的节点才是正常的节点。
        p = (t != (t = tail)) ? t : head;
    else
        //(8) 寻找尾节点
        p = (p != t && t != (t = tail)) ? t : q;
}

}
上节类图结构时候谈到构造队列时候参构造函数创建了一个 item 为 null 的哨兵节点,并且 head 和 tail 都是指向这个节点,下面通过图形结合来讲解下 offer 操作的代码实现。
首先看下当一个线程调用 offer(item)时候情况:首先代码(1)对传参判断空检查,如果为 null 则抛出 NPE 异常,然后代码(2)则使用 item 作为构造函数参数创建了一个新的节点,代码(3)从队列尾部节点开始循环,意图是从队列尾部添加元素。
image.png
上图是执行代码(4)时候队列的情况,这时候节点 p,t,head,tail 同时指向了 item 为 null 的哨兵节点,由于哨兵节点的 next 节点为 null, 所以这里 q 指向也是 null。
代码(4)发现q==null则执行代码(5)通过 CAS 原子操作判断 p 节点的 next 节点是否为 null,如果为 null 则使用节点 newNode 替换 p 的 next 节点,然后执行代码(6)由于p==t所以没有设置尾部节点,然后退出 offer 方法,这时候队列的状态图如下:
image.png
上面讲解的是一个线程调用 offer 方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5),假设线程 A 调用 offer(item1), 线程 B 调用 offer(item2), 线程 A 和 B 同时执行到 p.casNext(null, newNode)。而 CAS 的比较并设置操作是原子性的,假设线程 A 先执行了比较设置操作则发现当前 p 的 next 节点确实是 null 则会原子性更新 next 节点为 newNode,这时候线程 B 也会判断 p 的 next 节点是否为 null,结果发现不是 null(因为线程 A 已经设置了 p 的 next 为 newNode)则会跳到步骤(3),然后执行到步骤(4)时候队列分布图为:
image.png
根据这个状态图可知线程 B 会去执行代码(8),然后 q 赋值给了 p,这时候队列状态图为:
image.png
然后线程 B 再次跳转到代码(3)执行,当执行到代码(4)时候队列状态图为:
image.png
由于这时候 q==null, 所以线程 B 会执行步骤(5),通过 CAS 操作判断当前 p 的 next 节点是否是 null,不是则再次循环后尝试,是则使用 newNode 替换,假设 CAS 成功了,那么执行步骤(6)由于 p!=t 所以设置 tail 节点为 newNode,然后退出 offer 方法。这时候队列分布图为:
image.png
分析到现在,offer 代码的执行路径现在就差步骤(7)还没走过,其实这个要在执行 poll 操作后才会出现,这里先看下执行 poll 操作后可能会存在的的一种情况如下图:
image.png
下面分析下当队列处于这种状态时候调用 offer 添加元素代码执行到步骤(4)时候的状态图:
image.png
由于 q 节点不为空并且p==q所以执行步骤(7),由于t==tail所以 p 被赋值为了 head,然后进入循环,循环后执行到代码(4)时候队列状态为:
image.png
由于q==null, 所以执行步骤(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p 的 next 节点被设置为新增节点,然后执行步骤(6),由于p!=t所以设置新节点为队列为节点,现在队列状态如下:
image.png
这里自引用的节点会被垃圾回收掉。
总结:可见 offer 操作里面关键步骤是代码(5)通过原子 CAS 操作来进行控制同时只有一个线程可以追加元素到队列末尾,进行 cas 竞争失败的线程则会通过循环一次次尝试进行 cas 操作,直到 cas 成功才会返回,也就是通过使用无限循环里面不断进行 CAS 尝试方式来替代阻塞算法挂起调用线程,相比阻塞算法这是使用 CPU 资源换取阻塞所带来的开销。
2.2.2 poll 操作
poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null,下面看看实现原理。
public E poll() {
//(1) goto标记
restartFromHead:

//(2)无限循环
for (;;) {
    for (Node<E> h = head, p = h, q;;) {

        //(3)保存当前节点值
        E item = p.item;

        //(4)当前节点有值则cas变为null
        if (item != null && p.casItem(item, null)) {
            //(5)cas成功标志当前节点以及从链表中移除
            if (p != h) 
                updateHead(h, ((q = p.next) != null) ? q : p);
            return item;
        }
        //(6)当前队列为空则返回null
        else if ((q = p.next) == null) {
            updateHead(h, p);
            return null;
        }
        //(7)自引用了,则重新找新的队列头节点
        else if (p == q)
            continue restartFromHead;
        else//(8)
            p = q;
    }
}

}
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
同理本节也通过图形结合的方式来讲解代码执行逻辑:
poll 操作是从队头获取元素,所以代码(2)内层循环是从 head 节点开始迭代,代码(3)获取当前队头的节点,当队列一开始为空时候队列状态为:
image.png
由于 head 节点指向的为 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null 如下图:
image.png
所以执行 updateHead 方法,由于 h 等于 p 所以没有设置头结点,poll 方法直接返回 null。
假设执行到代码(6)时候已经有其它线程调用了 offer 方法成功添加一个元素到队列,这时候 q 指向的是新增元素的节点,这时候队列状态为:
image.png
所以代码(6)判断结果为 false,然后会转向代码(7)执行,而此时 p 不等于 q,所以转向代码(8)执行,执行结果是 p 指向了节点 q,此时队列状态为:
image.png
然后程序转向代码(3)执行,p 现在指向的元素值不为 null,则执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时没有其它线程进行 poll 操作,CAS 成功则执行代码(5)由于此时 p!=h 所以设置头结点为 p,poll 然后返回被从队列移除的节点值 item。此时队列状态为:
image.png
这个状态就是讲解 offer 操作时候,offer 代码的执行路径(7)执行的前提状态。
假如现在一个线程调用了 poll 操作,则在执行代码(4) 时候队列状态为:
image.png
可知这时候执行代码(6)返回 null.
现在 poll 的代码还有个分支(7)没有执行过,那么什么时候会执行那?下面来看看,假设线程 A 执行 poll 操作时候当前队列状态为:
image.png
那么执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null。
假设 CAS 设置成功则标示该节点从队列中移除了,此时队列状态为:
image.png
然后由于 p!=h, 所以会执行 updateHead 方法,假如线程 A 执行 updateHead 前另外一个线程 B 开始 poll 操作这时候线程 B 的 p 指向 head 节点,但是还没有执行到代码(6)这时候队列状态为:
image.png
然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
image.png
然后线程 B 继续执行代码(6)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(7)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
image.png
总结:poll 方法移除一个元素时候只是简单的使用 CAS 操作把当前节点的 item 值设置 null,然后通过重新设置头结点让该元素从队列里面摘除,被摘除的节点就成了孤立节点,这个节点会被在垃圾回收的时候会回收掉。另外执行分支中如果发现头节点被修改了要跳到外层循环重新获取新的头节点。
2.2.3 peek 操作
peek 操作是获取队列头部一个元素(只不获取不移除),如果队列为空则返回 null,下面看看实现原理。
public E peek() {
//(1)
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
//(2)
E item = p.item;
//(3)
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
//(4)
else if (p == q)
continue restartFromHead;
else
//(5)
p = q;
}
}
}
代码结构与 poll 操作类似,不同在于步骤(3)的使用只是少了 castItem 操作,其实这很正常,因为 peek 只是获取队列头元素值并不清空其值,根据前面我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次 peek 时候代码(3)中会发现 item==null, 然后会执行 q = p.next, 这时候 q 节点指向的才是队列里面第一个真正的元素或者如果队列为 null 则 q 指向 null。
当队列为空时候这时候队列状态为:
image.png
这时候执行 updateHead 由于 h 节点等于 p 节点所以不进行任何操作,然后 peek 操作会返回 null。
当队列至少有一个元素时候(这里假设只有一个)这时候队列状态为:
image.png
这时候执行代码(5)这时候 p 指向了 q 节点,然后执行代码(3)这时候队列状态为:
image.png
执行代码(3)发现 item 不为 null,则执行 updateHead 方法,由于 h!=p, 所以设置头结点,设置后队列状态为:
image.png
也就是剔除了哨兵节点。
总结:peek 操作代码与 poll 操作类似只是前者只获取队列头元素但是并不从队列里面删除,而后者获取后需要从队列里面删除,另外在第一次调用 peek 操作时候,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者 null。
2.2.4 size 操作
获取当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁所以从调用 size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。
public int size() {
int count = 0;
for (Node p = first(); p != null; p = succ(p))
if (p.item != null)
// 最大返回Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}

//获取第一个队列元素(哨兵元素不算),没有则为null
Node first() {
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

//获取当前节点的next元素,如果是自引入节点则返回真正头节点
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
2.2.5 remove 操作
如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回 true,否者返回 false
public boolean remove(Object o) {

//查找元素为空,直接返回false
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
    E item = p.item;

    //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。
    if (item != null &&
        o.equals(item) &&
        p.casItem(item, null)) {

        //获取next元素
        Node<E> next = succ(p);

        //如果有前驱节点,并且next不为空则链接前驱节点到next,
        if (pred != null && next != null)
            pred.casNext(p, next);
        return true;
    }
    pred = p;
}
return false;

}
注:ConcurrentLinkedQueue 底层使用单向链表数据结构来保存队列元素,每个元素被包装为了一个 Node 节点,队列是靠头尾节点来维护的,创建队列时候头尾节点指向一个 item 为 null 的哨兵节点,第一次 peek 或者 first 时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以获取 size 的时候有可能进行了 offer,poll 或者 remove 操作,导致获取的元素个数不精确,所以在并发情况下 size 函数不是很有用。

LinkedBlockingQueue

前面介绍了使用 CAS 算法实现的非阻塞队列 ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列 LinkedBlockingQueue 的实现
3.1 LinkedBlockingQueue 类图结构
同理首先看下 LinkedBlockingQueue 的类图结构
image.png
如上类图可知 LinkedBlockingQueue 也是使用单向链表实现,也有两个 Node 分别用来存放首尾节点,并且里面有个初始值为 0 的原子变量 count 用来记录队列元素个数。另外里面有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列获取元素,其它线程必须等待,putLock 控制同时只能有一个线程可以获取锁去添加元素,其它线程必须等待。另外 notEmpty 和 notFull 是信号量,内部分别有一个条件队列用来存放进队和出队时候被阻塞的线程,其实这个是个生产者 - 消费者模型。如下是独占锁创建代码:
/** 执行take, poll等操作时候需要获取该锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 当队列为空时候执行出队操作(比如take)的线程会被放入这个条件队列进行等待 */
private final Condition notEmpty = takeLock.newCondition();

/** 执行put, offer等操作时候需要获取该锁*/
private final ReentrantLock putLock = new ReentrantLock();

/**当队列满时候执行进队操作(比如put)的线程会被放入这个条件队列进行等待 */
private final Condition notFull = putLock.newCondition();

/** 当前队列元素个数 */
private final AtomicInteger count = new AtomicInteger(0);
如下是 LinkedBlockingQueue 无参构造函数代码:
public static final int MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点,指向哨兵节点
last = head = new Node(null);
}
从代码可知默认队列容量为 0x7fffffff; 用户也可以自己指定容量,所以一定程度上 LinkedBlockingQueue 可以说是有界阻塞队列。
首先使用一个图来概况该队列,读者在读完本节后在回头体会下:
image.png
3.2 LinkedBlockingQueue 原理介绍
3.2.1 offer 操作
向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false,如果 e 元素为 null 则抛出 NullPointerException 异常,另外该方法是非阻塞的。
public boolean offer(E e) {

    //(1)空元素抛空指针异常
    if (e == null) throw new NullPointerException();

    //(2) 如果当前队列满了则丢弃将要放入的元素,然后返回false
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;

    //(3) 构造新节点,获取putLock独占锁
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //(4)如果队列不满则进队列,并递增元素计数
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            //(5)
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        //(6)释放锁
        putLock.unlock();
    }
    //(7)
    if (c == 0)
        signalNotEmpty();
    //(8)
    return c >= 0;
}

private void enqueue(Node node) {
last = last.next = node;
}
步骤(2)判断如果当前队列已满则丢弃当前元素并返回 false
步骤(3)获取到 putLock 锁,当前线程获取到该锁后,则其它调用 put 和 offer 的线程将会被阻塞(阻塞的线程被放到 putLock 锁的 AQS 阻塞队列)。
步骤(4)这里有重新判断了下当前队列是否满了,这是因为在执行代码(2)和获取到 putLock 锁期间可能其它线程通过 put 或者 offer 方法向队列里面添加了新元素。重新判断队列确实不满则新元素入队,并递增计数器。
步骤(5)判断如果新元素入队后队列还有空闲空间,则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作(比如执行 put 方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程。
代码(6) 则释放获取的 putLock 锁,这里要注意锁的释放一定要在 finally 里面做,因为即使 try 块抛异常了,finally 也是会被执行到的。另外释放锁后其它因为调用 put 和 offer 而被阻塞的线程将会有一个获取到该锁。
代码(7)c==0 说明在执行代码(6)释放锁时候队列里面至少有一个元素,队列里面有元素则执行 signalNotEmpty,下面看看 signalNotEmpty 的代码:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
可知作用是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这里也说明了调用条件变量的方法前要首先获取对应的锁。
综上可知 offer 方法中通过使用 putLock 锁保证了在队尾新增元素的原子性和队列元素个数的比较和递增操作的原子性。
3.2.2 put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回 true,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。
put 操作的代码结构与 offer 操作类似,代码如下:
public void put(E e) throws InterruptedException {
//(1)空元素抛空指针异常
if (e == null) throw new NullPointerException();
//(2) 构建新节点,并获取独占锁putLock
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//(3)如果队列满则等待
while (count.get() == capacity) {
notFull.await();
}
//(4)进队列并递增计数
enqueue(node);
c = count.getAndIncrement();
//(5)
if (c + 1 < capacity)
notFull.signal();
} finally {
//(6)
putLock.unlock();
}
//(7)
if (c == 0)
signalNotEmpty();
}
代码(2)中使用 putLock.lockInterruptibly() 获取独占锁,相比 offer 方法中这个获取独占锁方法意味着可以被中断,具体说是当前线程在获取锁的过程中,如果被其它线程设置了中断标志则当前线程会抛出 InterruptedException 异常,所以 put 操作在获取锁过程中是可被中断的。
代码(3)如果当前队列已满,则调用 notFull 的 await() 把当前线程放入 notFull 的条件队列,当前线程被阻塞挂起并释放获取到的 putLock 锁,由于 putLock 锁被释放了,所以现在其它线程就有机会获取到 putLock 锁了。
另外考虑下代码(3)判断队列是否为空为何使用 while 循环而不是 if 语句那?其实是考虑到当前线程被虚假唤醒的问题,也就是其它线程没有调用 notFull 的 singal 方法时候 notFull.await() 在某种情况下会自动返回。如果使用 if 语句那么虚假唤醒后会执行代码(4)元素入队,并且递增计数器,而这时候队列已经是满了的,导致队列元素个数大于了队列设置的容量,导致程序出错。而使用 while 循环假如 notFull.await() 被虚假唤醒了,那么循环在检查一下当前队列是否是满的,如果是则再次进行等待。
3.2.3 poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)队列为空则返回null
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
//(2)获取独占锁
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//(3)队列不空则出队并递减计数
if (count.get() > 0) {//3.1
x = dequeue();//3.2
c = count.getAndDecrement();//3.3
//(4)
if (c > 1)
notEmpty.signal();
}
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)返回
return x;
}
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
代码 (1) 如果当前队列为空,则直接返回 null
代码(2)获取独占锁 takeLock,当前线程获取该锁后,其它线程在调用 poll 或者 take 方法会被阻塞挂起
代码 (3) 如果当前队列不为空则进行出队操作,然后递减计数器。
代码(4)如果 c>1 则说明当前线程移除掉队列里面的一个元素后队列不为空(c 是删除元素前队列元素个数),那么这时候就可以激活因为调用 poll 或者 take 方法而被阻塞到 notEmpty 的条件队列里面的一个线程。
代码(6)说明当前线程移除队头元素前当前队列是满的,移除队头元素后队列当前至少有一个空闲位置,那么这时候就可以调用 signalNotFull 激活因为调用 put 或者 offer 而被阻塞放到 notFull 的条件队列里的一个线程,signalNotFull 的代码如下:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll 代码逻辑比较简单,值得注意的是获取元素时候只操作了队列的头节点。
3.2.4 peek 操作
获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)
if (count.get() == 0)
return null;
//(2)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node first = head.next;
//(3)
if (first == null)
return null;
else
//(4)
return first.item;
} finally {
//(5)
takeLock.unlock();
}
}
peek 操作代码也比较简单,这里需要注意的是代码(3)这里还是需要判断下 first 是否为 null 的,不能直接执行代码(4)。正常情况下执行到代码(2)说明队列不为空,但是代码(1)和(2)不是原子性操作,也就是在执行点(1)判断队列不空后,在代码(2)获取到锁前有可能其它线程执行了 poll 或者 take 操作导致队列变为了空,然后当前线程获取锁后,直接执行 first.item 会抛出空指针异常。
3.2.5 take 操作
获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//(1)获取锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//(2)当前队列为空则阻塞挂起
while (count.get() == 0) {
notEmpty.await();
}
//(3)出队并递减计数
x = dequeue();
c = count.getAndDecrement();
//(4)
if (c > 1)
notEmpty.signal();
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)
return x;
}
代码(1)当前线程获取到独占锁,其它调用 take 或者 poll 的线程将会被阻塞挂起。
代码(2)如果队列为空则阻塞挂起当前线程,并把当前线程放入 notEmpty 的条件队列。
代码(3)进行出队操作并递减计数。
代码(4)如果 c>1 说明当前队列不为空,则唤醒 notEmpty 的条件队列的条件队列里面的一个因为调用 take 或者 poll 而被阻塞的线程。
代码(5)释放锁。
代码(6)如果 c == capacity 则说明当前队列至少有一个空闲位置,则激活条件变量 notFull 的条件队列里面的一个因为调用 put 或者 offer 而被阻塞的线程。
3.2.6 remove 操作
删除队列里面指定元素,有则删除返回 true,没有则返回 false
public boolean remove(Object o) {
if (o == null) return false;

//(1)双重加锁
fullyLock();
try {

    //(2)遍历队列找则删除返回true
    for (Node<E> trail = head, p = trail.next;
         p != null;
         trail = p, p = p.next) {
         //(3)
        if (o.equals(p.item)) {
            unlink(p, trail);
            return true;
        }
    }
    //(4)找不到返回false
    return false;
} finally {
    //(5)解锁
    fullyUnlock();
}

}
代码(1)通过 fullyLock 获取双重锁,当前线程获取后,其它线程进行入队或者出队的操作时候就会被阻塞挂起。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
代码(2)遍历队列寻找要删除的元素,找不到则直接返回 false,找到则执行 unlink 操作,unlik 操作代码如下:
void unlink(Node p, Node trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
如果当前队列满,删除后,也不忘记唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
可知删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中一个因为调 用 put 或者 offer 方法而被阻塞的线程。
代码(5)调用 fullyUnlock 方法使用与加锁顺序相反的顺序释放双重锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
总结下,由于 remove 方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元素过程中是线程安全的,并且此时其它调用入队出队操作的线程全部会被阻塞,另外获取多个资源锁与释放的顺序是相反的。
3.2.7 size 操作
int size() : 获取当前队列元素个数。
public int size() {
return count.get();
}
由于在操作出队入队时候操作 Count 的时候是加了锁的,所以相比 ConcurrentLinkedQueue 的 size 方法比较准确。这里考虑下为何 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不适用一个原子变量那?这是因为使用原子变量保存队列元素个数需要保证入队出队操作和操作原子变量是原子性操作,而 ConcurrentLinkedQueue 是使用 CAS 无锁算法的,所以无法做到这个。
注:LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

ArrayBlockingQueue

上节介绍了有界链表方式的阻塞队列 LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列 ArrayBlockingQueue 的原理
4.1 ArrayBlockingQueue 类图结构
同理为了能从全局一览 ArrayBlockingQueue 的内部构造,先看下类图:
image.png
如图 ArrayBlockingQueue 内部有个数组 items 用来存放队列元素,putindex 变量标示入队元素下标,takeIndex 是出队下标,count 统计队列元素个数,从定义可知并没有使用 volatile 修饰,这是因为访问这些变量使用都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。
另外有个独占锁 lock 用来保证出入队操作原子性,这保证了同时只有一个线程可以进行入队出队操作,另外 notEmpty,notFull 条件变量用来进行出入队的同步。
另外由于 ArrayBlockingQueue 是有界队列,所以构造函数必须传入队列大小参数,构造函数代码如下:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

可知默认情况下使用的是 ReentrantLock 提供的非非公平独占锁进行出入队操作的加锁。
首先一个图概况该队列,读者可以读完本节后在回头体会下:
image.png
4.2 ArrayBlockingQueue 原理介绍
本节主要讲解下面几个主要函数的原理。
4.2.1 offer 操作
向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false,如果 e 元素为 null 则抛出 NullPointerException 异常,另外该方法是不阻塞的。
public boolean offer(E e) {
//(1)e为null,则抛出NullPointerException异常
checkNotNull(e);
//(2)获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(3)如果队列满则返回false
if (count == items.length)
return false;
else {
//(4)否者插入元素
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
代码(2)获取独占锁,当前线程获取该锁后,其它入队和出队操作的线程都会被阻塞挂起后放入 lock 锁的 AQS 阻塞队列。
代码(3)如果队列满则直接返回 false,否者调用 enqueue 方法后返回 true,enqueue 的代码如下:
private void enqueue(E x) {
//(6)元素入队
final Object[] items = this.items;
items[putIndex] = x;
//(7)计算下一个元素应该存放的下标
if (++putIndex == items.length)
putIndex = 0;
count++;
//(8)
notEmpty.signal();
}
如上代码首先把当前元素放入 items 数组,然后计算下一个元素应该存放的下标,然后递增元素个数计数器,最后激活 notEmpty 的条件队列中因为调用 poll 或者 take 操作而被阻塞的的一个线程。这里由于在操作共享变量比如 count 前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在 CPU 缓存或者寄存器里面的值。
代码(5)释放锁,释放锁后会把修改的共享变量值比如 Count 的值刷新回主内存中,这样其它线程通过加锁在次读取这些共享变量后就可以看到最新的值。
4.2.2 put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回 true,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。
public void put(E e) throws InterruptedException {
//(1)
checkNotNull(e);
final ReentrantLock lock = this.lock;

//(2)获取锁(可被中断)
lock.lockInterruptibly();
try {

    //(3)如果队列满,则把当前线程放入notFull管理的条件队列
    while (count == items.length)
        notFull.await();

    //(4)插入元素
    enqueue(e);
} finally {
    //(5)
    lock.unlock();
}

}
代码(2)在获取锁的过程中当前线程被其它线程中断了,则当前线程会抛出 InterruptedException 异常而退出。
代码(3)判断如果当前队列满了,则把当前线程阻塞挂起后放入到 notFull 的条件队列,注意这里也是使用了 while 而不是 if。
代码(4)如果队列不满则插入当前元素,此处不再累述。
4.2.3 poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)当前队列为空则返回null,否者调用dequeue()获取
return (count == 0) ? null : dequeue();
} finally {
//(3)释放锁
lock.unlock();
}
}
代码(1)获取独占锁
代码(2)如果队列为空则返回 null,否者调用 dequeue() 方法,dequeue 代码如下:
private E dequeue() {
final Object[] items = this.items;

//(4)获取元素值
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//(5)数组中值值为null;
items[takeIndex] = null;

//(6)队头指针计算,队列元素个数减一

if (++takeIndex == items.length)
takeIndex = 0;
count–;

//(7)发送信号激活notFull条件队列里面的一个线程
notFull.signal();
return x;

}
可知首先获取当前队头元素保存到局部变量,然后重置队头元素为 null,并重新设置队头下标,元素计数器递减,最后发送信号激活 notFull 的条件队列里面一个因为调用 put 或者 offer 而被阻塞的线程。
4.2.4 take 操作
获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {

    //(2)队列为空,则等待,直到队列有元素
    while (count == 0)
        notEmpty.await();
    //(3)获取队头元素
    return dequeue();
} finally {
    //(4) 释放锁
    lock.unlock();
}

}
take 操作的代码也比较简单与 poll 相比只是步骤(2)如果队列为空则把当前线程挂起后放入到 notEmpty 的条件队列,等其它线程调用 notEmpty.signal() 方法后在返回,需要注意的是这里也是使用 while 循环进行检测并等待而不是使用 if。
4.2.5 peek 操作
获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)
return itemAt(takeIndex);
} finally {
//(3)
lock.unlock();
}
}

@SuppressWarnings(“unchecked”)
final E itemAt(int i) {
return (E) items[i];
}
peek 的实现更简单,首先获取独占锁,然后从数组 items 中获取当前队头下标的值并返回,在返回前释放了获取的锁。
4.2.6 size 操作
获取当前队列元素个数。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
size 操作是简单的,获取锁后直接返回 count,并在返回前释放锁。也许你会疑问这里有没有修改 Count 的值,只是简单的获取下,为何要加锁那?其实如果 count 声明为 volatile 这里就不需要加锁了,因为 volatile 类型变量保证了内存的可见性,而 ArrayBlockingQueue 的设计中 count 并没有声明为 volatile,是因为 count 的操作都是在获取锁后进行的,而获取锁的语义之一是获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。
注:ArrayBlockingQueue 通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加 synchronized 的意味。ArrayBlockingQueue 的 size 操作的结果是精确的,因为计算前加了全局锁。

PriorityBlockingQueue

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素,内部是平衡二叉树堆的实现。
5.1 PriorityBlockingQueue 类图结构
下面首先通过类图来从全局了解下 PriorityBlockingQueue 的结构
image.png
如图 PriorityBlockingQueue 内部有个数组 queue 用来存放队列元素,size 用来存放队列元素个数,allocationSpinLock 是个自旋锁,用 CAS 操作来保证同时只有一个线程可以扩容队列,状态为 0 或者 1,其中 0 表示当前没有在进行扩容,1 标示当前正在扩容。
如下构造函数,默认队列容量为 11,默认比较器为 null,也就是使用元素的 compareTo 方法进行比较来确定元素的优先级,这意味着队列元素必须实现了 Comparable 接口;
private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

首先通过一个图来对该队列进行概况,读者读完本机后,可以回头在体会下:
image.png
5.2 原理介绍
5.2.1 offer 操作
offer 操作作用是在队列插入一个元素,由于是无界队列,所以一直返回 true,如下是 offer 函数的代码:
public boolean offer(E e) {

if (e == null)
    throw new NullPointerException();

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();

int n, cap;
Object[] array;

//如果当前元素个数>=队列容量,则扩容(1)
while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);

try {
    Comparator<? super E> cmp = comparator;

    //默认比较器为null (2)
    if (cmp == null)
        siftUpComparable(n, e, array);
    else
        //自定义比较器 (3)
        siftUpUsingComparator(n, e, array, cmp);

    //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程(9)
    size = n + 1;
    notEmpty.signal();//激活调用take()方法被阻塞的线程
} finally {
    //释放独占锁
    lock.unlock();
}
return true;

}
如上代码,主流程比较简单,下面主要看看如何进行扩容的和内部如何建堆的,首先看下扩容逻辑:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //释放获取的锁
Object[] newArray = null;

//cas成功则扩容(4)
if (allocationSpinLock == 0 &&
    UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                             0, 1)) {
    try {
        //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
        int newCap = oldCap + ((oldCap < 64) ?
                               (oldCap + 2) : // grow faster if small
                               (oldCap >> 1));
        if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
            int minCap = oldCap + 1;
            if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                throw new OutOfMemoryError();
            newCap = MAX_ARRAY_SIZE;
        }
        if (newCap > oldCap && queue == array)
            newArray = new Object[newCap];
    } finally {
        allocationSpinLock = 0;
    }
}

//第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
if (newArray == null) // back off if another thread is allocating
    Thread.yield();
lock.lock();//(6)
if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
}

}
tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功。其实这里不先释放锁,也是可行的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容时候还占用锁那么其它线程在这个时候是不能进行出队和入队操作的,这大大降低了并发性。所以为了提高性能,使用 CAS 控制只有一个线程可以进行扩容,并且在扩容前释放了锁,让其它线程可以进行入队出队操作。
spinlock 锁使用 CAS 控制只有一个线程可以进行扩容,CAS 失败的线程会调用 Thread.yield() 让出 cpu,目的意在让扩容线程扩容后优先调用 lock.lock 重新获取锁,但是这得不到一定的保证。有可能 yield 的线程在扩容线程扩容完成前已经退出,并执行代码(6)获取到了锁,这时候获取到的锁的线程发现 newArray 为 null 就会执行代码(1)。如果当前数组扩容还没完毕,当前线程会再次调用 tryGrow 方法,然后释放锁,这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后有调用 yield 方法出让 CPU。可知当扩容线程进行扩容期间,其他线程是原地自旋通过代码(1)检查当前扩容是否完毕,等扩容完毕后才退出代码(1)的循环。
当扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取了该锁,并且 allocationSpinLock 被修饰为了 volatile。
当扩容线程扩容完毕后会执行代码 (6) 获取锁,获取锁后复制当前 queue 里面的元素到新数组。
然后看下具体建堆算法:
private static void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;

//队列元素个数>0则判断插入位置,否者直接入队(7)
while (k > 0) {
    int parent = (k - 1) >>> 1;
    Object e = array[parent];
    if (key.compareTo((T) e) >= 0)
        break;
    array[k] = e;
    k = parent;
}
array[k] = key;(8)

}
下面用图来解释上面算法过程,假设队列初始化容量为 2, 创建的优先级队列的泛型参数为 Integer。
首先调用队列的 offer(2) 方法,希望插入元素 2 到队列,插入前队列状态如下图:
image.png
首先执行代码(1),从上图变量值可知判断值为 false,所以紧接着执行代码(2),由于 k=n=size=0 所以代码(7)判断结果为 false,所以会执行代码(8)直接把元素 2 入队,最后执行代码(9)设置 size 的值加 1,这时候队列的状态如下图:
image.png
然后调用队列的 offer(4) 时候,首先执行代码(1),从上图变量值可知判断为 false,所以执行代码(2),由于 k=1, 所以进入 while 循环,由于 parent=0;e=2;key=4; 默认元素比较器是使用元素的 compareTo 方法,可知 key>e 所以执行 break 退出 siftUpComparable 中的循环; 然后把元素存到数组下标为 1 的地方,最后执行代码(9)设置 size 的值加 1,这时候队列状态为:
image.png
然后调用队列的 offer(6) 时候,首先执行代码(1),从上图变量值知道这时候判断值为 true, 所以调用 tryGrow 进行数组扩容, 由于 2<64 所以 newCap=2 + (2+2)=6; 然后创建新数组并拷贝,然后调用 siftUpComparable 方法,由于 k=2>0 进入 while 循环,由于 parent=0;e=2;key=6;key>e 所以 break 后退出 while 循环; 并把元素 6 放入数组下标为 2 的地方,最后设置 size 的值加 1,现在队列状态:
image.png
然后调用队列的 offer(1) 时候,首先执行代码(1),从上图变量值知道这次判断值为 false,所以执行代码(2),由于k=3, 所以进入 while 循环,由于parent=0;e=4;key=1; key<e,所以把元素 4 复制到数组下标为 3 的地方,然后 k=0 退出 while 循环;然后把元素 1 存放到下标为 0 地方,现在状态:
image.png
这时候二叉树堆的树形图如下:
image.png
可知堆的根元素是 1,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时候,会一次返回堆里面值最小的元素。
5.2.2 poll 操作
poll 操作作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。poll 函数代码如下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//获取独占锁
try {
return dequeue();
} finally {
lock.unlock();//释放独占锁
}
}
如上代码可知在进行出队操作过程中要先加锁,这意味着,当当前线程进行出队操作时候,其它线程不能再进行入队和出队操作,但是从前面介绍 offer 函数时候知道这时候可以有其它线程进行扩容,下面主要看下具体执行出队操作的 dequeue 方法的代码:
private E dequeue() {

//队列为空,则返回null
int n = size - 1;
if (n < 0)
    return null;
else {

    //获取队头元素(1)
    Object[] array = queue;
    E result = (E) array[0];

    //获取队尾元素,并值null(2)
    E x = (E) array[n];
    array[n] = null;

    Comparator<? super E> cmp = comparator;
    if (cmp == null)//(3)
        siftDownComparable(0, x, array, n);
    else
        siftDownUsingComparator(0, x, array, n, cmp);
    size = n;//(4)
    return result;
}

}
如上代码,如果队列为空则直接返回 null,否者执行代码(1)获取数组第一个元素作为返回值存放到变量 Result,这里需要注意下数组里面第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。 然后代码(2)获取队列尾部元素存放到变量 x, 并且置空尾部节点,然后执行代码(3)插入变量 x 到数组下标为 0 的位置后,重新调成堆为最大或者最小堆,然后返回。这里重要的是看如何去掉堆的根节点后,使用剩下的节点重新调整为一个最大或者最小堆,下面我们看下 siftDownComparable 的代码实现:
private static void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];(5)
int right = child + 1;(6)
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}
同理下面我们结合图来模拟上面调整堆的算法过程,接着上节队列的状态继续讲解,上节队列元素序列为 1,2,6,4:
第一次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =4;n=3;result=1;x=4; 这时候队列状态
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第二次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =3;n=2;result=2;x=6; 这时候队列状态:
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第三次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =2;n=1;result=4;x=6; 这时候队列状态:
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第四次直接返回元素 6.
下面重点说说 siftDownComparable 这个调整堆的算法: 首先说下堆调整的思路,由于队列数组第 0 个元素为树根,出队时候要被移除,这时候数组就不在是最小堆了,所以需要调整堆,具体是要从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会看自己作为根节点的树的左右子树里面那个是最小值,这是一个递归,直到树叶节点结束递归,如果还不明白,没关系,下面结合图来说明下,假如当前队列内容如下:
image.png
其对应的二叉堆树为:
image.png
这时候如果调用了 poll(); 那么 result=2;x=11;队列末尾的元素设置为 null 后,剩下的元素调整堆的步骤如下图:
image.png
如上图(1)树根的 leftChildVal = 4;rightChildVal = 6; 4<6; 所以 c=4; 然后 11>4 也就是 key>c;所以使用元素 4 覆盖树根节点的值,现在堆对应的树如图(2)。
然后树根的左子树树根的左右孩子节点中 leftChildVal = 8;rightChildVal = 10; 8<10; 所以 c=8; 然后发现 11>8 也就是 key>c;所以元素 8 作为树根左子树的根节点,现在树的形状如图(3), 这时候判断 k<half 为 false 就会退出循环,然后把 x=11 设置到数组下标为 3 的地方,这时候堆树如图(4),至此调整堆完毕,siftDownComparable 返回 result=2,poll 方法也返回了。
5.2.3 put 操作
put 操作内部调用的 offer, 由于是无界队列,所以不需要阻塞
public void put(E e) {
offer(e); // never need to block
}
5.2.4 take 操作
take 操作作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞,如下代码:
public E take() throws InterruptedException {
//获取锁,可被中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {

    //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列
    while ( (result = dequeue()) == null)
        notEmpty.await();//阻塞当前线程
} finally {
    lock.unlock();//释放锁
}
return result;

}
如上代码,首先通过 lock.lockInterruptibly() 获取独占锁,这个方式获取的锁是对中断进行响应的。然后调用 dequeue 方法返回堆树根节点元素,如果队列为空,则返回 false,然后当前线程调用 notEmpty.await() 阻塞挂起当前线程,直到有线程调用了 offer()方法(offer 方法内在添加元素成功后调用了 notEmpty.signal 方法会激活一个阻塞在 notEmpty 的条件队列里面的一个线程)。另外这里使用 while 而不是 if 是为了避免虚假唤醒。
5.2.5 size 操作
获取队列元个数,如下代码,在返回 size 前加了锁,保证在调用 size() 方法时候不会有其它线程进行入队和出队操作,另外由于 size 变量没有被修饰为 volatie,这里加锁也保证了多线程下 size 变量的内存可见性。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
注:PriorityBlockingQueue 队列内部使用二叉树堆维护元素优先级,内部使用数组作为元素存储的数据结构,这个数组是可扩容的,当当前元素个数 >= 最大容量时候会通过算法扩容,出队时候始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素,默认元素优先级比较规则是使用元素的 compareTo 方法来做,用户可以自定义优先级的比较规则。

队列对比

上面介绍的各种队列中只有 ConcurrentLinkedQueue 是使用 UNSAFE 类提供的 CAS 非阻塞算法实现的,其他几个队列内部都是使用锁来保证线程安全的。使用 CAS 算法的效率较好,那么是不是所有场景都用 ConcurrentLinkedQueue 那?
其实不然,因为 ConcurrentLinkedQueue 还是无界队列,无界队列使用不当可能造成 OOM。所以当使用 ConcurrentLinkedQueue 的时候在添加元素前应该先判断当前队列元素个数是否已经达到了设定的阈值,如果达到就做一定的处理措施,比如直接丢弃等。这里需要注意判断当前队列元素个数与阈值这个操作不是原子性的,最终会导致队列元素个数比设置的阈值大。
ConcurrentLinkedQueue 在 Tomcat 的的 NioEndPoint 中得到了应用,通过使用 ConcurrentLinkedQueue 将同步转换为异步,可以让 tomcat 同时接受更多请求,模型如下图:
image.png
tomcat 的 NioEndPoint 模式中 acceptor 线程负责接受用户请求,接受后把请求放入到 poll 线程对应的队列,poll 线程从队列里面获取任务后委托给 worker 线程具体处理。
LinkedBlockingQueue 和 ArrayBlockingQueue 都是有界阻塞队列,不同在于一个底层数据结构是链表,一个是数组;另外前者入队出队使用单独的锁,而后者出入队使用同一个锁,所以前者的并发度比后者高。另外创建前者时候可以不指定队列大小,默认队列元素个数为 Integer.MAX_VALUE,而后者必须要指定数组大小。所以使用 LinkedBlockingQueue 时候要记得指定队列大小。
比如比较有名的 LogBack 日志系统的异步日志打印实现中就是用了 ArrayBlockingQueue 作为缓冲队列,如下图,业务检查调用异步 log 进行写入日志时候,实际是把日志放入了 ArrayBlockingQueue 队列就返回了,而具体真正写入日志到磁盘是一个日志线程从队列里面获取任务来做的,这其实是一个多生产单消费模型:
image.png
PriorityBlockingQueue 是无界阻塞队列,是一个队列元素有优先级的队列,前面的队列模式都是 FIFO 先进先出,而 PriorityBlockingQueue 而是优先级最高的元素先出队,而不管谁先进入队列的,所以 PriorityBlockingQueue 经常会用在一些任务具有优先级的场景。还比如上面说的 logback 异步日志模型,如果把日志等级分了优先级,比如 error>warn>info,那么上述模型中队列就可以使用 PriorityBlockingQueue,日志线程会先从队列里面首先获取 error 级别的日志,但是需要注意的是如果业务线程一直向队列里面写入 error 级别日志,那么可能先写入到队列的 warn 和 info 级别的日志将很久甚至永远没机会写入到磁盘。还有一点要注意 PriorityBlockingQueue 是无界限队列,要注意判断队列元素个数不要超过设置的阈值。

线程和状态机

线程和线程任务

线程任务区别于线程,可以理解为线程需要执行的逻辑,类似 Thread 中要执行的 Runnable。

阅读全文 »

基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,不再需要写死服务提供方地址,注册中心基于接口名查询服务提供者的 IP 地址,使服务提供方可以平滑增加或减少机器。

角色分类

以功能角度来说服务可以分成以下几种:

  • 服务提供者;
  • 服务消费者;
  • 服务提供者兼消费者。

注册中心分类

可以分成以下几种注册中心:

  • Simple 注册中心 点对点直连
  • Multicast 注册中心 多播
  • Zookeeper 注册中心
  • Redis 注册中心

配置

服务提供者(provider)配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- 应用名称,可显示依赖关系 -->
<dubbo:application name="dubbo-order-server" />

<!-- 注册中心是ZooKeeper,也可以选择Redis做注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"
client="zkclient" />

<!-- 通过dubbo协议在注册中心(127.0.0.1表示本机)的20880端口暴露服务 -->
<dubbo:protocol name="dubbo" host="127.0.0.1" port="20880" />

<!-- 提供服务用地的是service标签,将该接口暴露到dubbo中 -->
<dubbo:service interface="com.dubbo.service.OrderService"
ref="orderService" />

<!-- Spring容器加载具体的实现类-->
<bean id="orderService" class="dubbo.service.impl.OrderServiceImpl" />

<dubbo:monitor protocol="registry" />

服务消费者(consumer)配置:

1
2
3
4
5
6
7
8
9
10
11
<!-- 应用名称,可显示依赖关系 -->
<dubbo:application name="dubbo-user-consumer" />

<!-- zookeeper作为注册中心 ,也可以选择Redis做注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"
client="zkclient" />

<dubbo:protocol host="127.0.0.1" />

<!-- 调用服务使用reference标签,从注册中心中查找服务 -->
<dubbo:reference id="orderService" interface="com.dubbo.service.OrderService" />

查看服务注册/暴露结果

Dubbo服务注册信息
Dubbo 在 ZooKeeper 中以树形结构维护服务注册信息:

  • 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址;
  • 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址;
  • 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

ZooKeeper 启动的时候会把配置信息加载进内存并持久化到数据库,然后启动定时器脏数据检查定时器 DirtyCheckTask,分别检查消费者和提供者的地址列表缓存、消费者和提供者地址列表的数据库数据,清理不存活的消费者和提供者数据,对于缓存中的存在的消费者和提供者而数据库不存在,提供者重新注册和消费者重新订阅。

Dubbo 提供了一些异常情况下的兜底方案:

  • 当提供者出现断电等异常停机时,注册中心能自动删除提供者信息
  • 当注册中心重启时,能自动恢复注册数据,以及订阅请求
  • 当会话过期时,能自动恢复注册数据,以及订阅请求
  • 当设置 <dubbo:registry check=”false” /> 时,记录失败注册和订阅请求,后台定时重试

在了解 ZooKeeper 基础上,还可以增加一些配置来修改注册细节:
可通过 <dubbo:registry username="admin" password="1234" /> 设置 ZooKeeper 登录信息
可通过 <dubbo:registry group="dubbo" /> 设置 ZooKeeper 的根节点,不设置将使用无根树
支持 * 号通配符 <dubbo:reference group="*" version="*" /> ,可订阅服务的所有分组和所有版本的提供者

在 Provider 启动完毕后,可以登录到 ZooKeeper 上查看注册的结果:

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 11] ls /
[dubbo, zookeeper]
[zk: localhost:2181(CONNECTED) 12] ls /dubbo
[com.alibaba.dubbo.monitor.MonitorService, com.tallate.UserServiceBo]
[zk: localhost:2181(CONNECTED) 13] ls /dubbo/com.tallate.UserServiceBo
[configurators, consumers, providers, routers]
[zk: localhost:2181(CONNECTED) 14] ls /dubbo/com.tallate.UserServiceBo/providers
[dubbo%3A%2F%2F192.168.96.194%3A20880%2Fcom.tallate.UserServiceBo%3Fanyhost%3Dtrue%26application%3DdubboProvider%26dubbo%3D2.0.2%26generic%3Dfalse%26group%3Ddubbo%26interface%3Dcom.tallate.UserServiceBo%26methods%3DsayHello%2CtestPojo%2CsayHello2%26pid%3D28129%26revision%3D1.0.0%26side%3Dprovider%26timeout%3D3000%26timestamp%3D1575202776615%26version%3D1.0.0]

服务自动发现流程

服务自动发现功能完成下面这个流程,我们接下来分点概述:

  1. 服务提供者在启动时,向注册中心注册自己提供的服务。
  2. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  3. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  4. 服务消费者,从提供者地址列表中,基于软负载均衡算法(基于软件的负载均衡,与 F5 相对),选一台提供者进行调用,如果调用失败,再选另一台调用。

注册和注销服务(Provider 执行流程)

服务的注册与注销,是对服务提供方角色而言,大致流程如下所示:
注册和注销服务

  1. 在接口提供者初始化时,每个接口都会创建一个 Invoker 和 Exporter,Exporter 持有 Invoker 实例,通过 Invocation 中的信息就可找到对应的 Exporter 和 Invoker
  2. 同 Consumer 的过程类似,调用 Invoker 前会调用 Invoker-Filter。
  3. 调用 Invoker.invoke() 时,通过反射调用最终的服务实现执行相关逻辑。

ServiceBean 负责了服务的暴露:

  • 继承自 ServiceConfig,export 方法实现了服务暴露的逻辑;
  • 实现了 Spring 中的 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware

启动时,ServiceBean 主要负责以下任务:

  • 生成 DubboExporter 对象并缓存起来
  • 添加过滤器和监听器支持
  • 在 zk 上注册相关信息,暴露服务,方便被感知到
  • 监听端口,等待通信的到来

Dubbo服务导出

  1. 前置工作,主要用于检查参数和组装 URL;
    ServiceBean#onApplicationEvent: 接收 Spring 上下文刷新事件后执行服务导出操作
    -> ServiceBean#export: 导出服务
    -> ProviderConfig.getExport、getDelay 获取配置,如果 export 为 false 则无法提供给其他服务调用、一般只提供给本地调试时使用,如果需要 delay 则将任务交给一个 ScheduledExecutorService 延迟执行,否则调用 doExport 暴露服务
    -> ServiceConfig.doExport 一堆配置检查
  2. 导出服务,包含导出服务到本地(JVM)和导出服务到远程两个过程;
    ServiceConfig.doExportUrls
    导出服务,Dubbo 中所有服务都通过 URL 导出,支持多协议多注册中心导出服务(遍历 ProtocolConfig 集合导出每个服务)
    AbstractInterfaceConfig#loadRegistries
    加载注册中心链接
    ServiceConfig#doExportUrlsFor1Protocol
    组装 URL,将服务注册到注册中心
    JavassistProxyFactory#getInvoker
    获取 Invoker 实例,用于接收请求
    ServiceConfig#exportLocal、DubboProtocol#export
    根据配置信息导出服务到本地或远程,远程默认取Dubbo协议
    DubboProtocol#openServer
    开始监听请求
  3. 向注册中心注册服务,用于服务发现
    Dubbo 服务注册本质是在 zk 指定目录下创建临时节点,路径是{group}/{Interface}/providers/{url}
    RegistryProtocol#register
    -> RegistryFactory#getRegistry
    -> AbstractRegistry#register

因为Dubbo一般使用ZooKeeper作为注册中心,所以完全可以利用ZooKeeper的临时节点自动删除机制来实现服务器下线自动踢出的机制。

服务订阅和取消(Consumer 执行流程)

为了满足应用系统的需求,服务消费方的可能需要从服务注册中心订阅指定的有服务提供方发布的服务,在得到通知可以使用服务时,就可以直接调用服务。反过来,如果不需要某一个服务了,可以取消该服务。
服务订阅和取消

有两种服务引入方式:

  1. 饿汉式:Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,可通过配置 <dubbo:reference> 的 init 属性开启。
  2. 懒汉式:ReferenceBean 对应的服务被注入到其他类中时引用

服务提供的方式有三种:

  1. 引用本地 (JVM) 服务;
  2. 通过直连方式引用远程服务;
  3. 通过注册中心引用远程服务。

不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。

获取客户端Proxy:

  1. 在 Consumer 初始化的时候,会生成一个代理注册到容器中,该代理回调中持有一个 Invoker 实例,消费调用服务接口时它的 invoke() 方法会被调用。
    spring.ReferenceBean#getObject
    ReferenceConfig#createProxy
    创建代理实例,根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用,不是本地引用的情况下默认采用Dubbo协议。
    Protocol#refer
    -> DubboProtocol#getClients 获取客户端实例,实例类型为 ExchangeClient,ExchangeClient 不具备通信能力,它需要依赖更底层的客户端实例
    -> DubboProtocol#getSharedClient 默认获取共享客户端
    -> DubboProtocol#initClient 创建客户端实例,默认为 Netty
    -> Exchangers#connect(URL url, ExchangeHandler handler)
  2. 使用 Cluster 合并 Invoker
    org.apache.dubbo.rpc.cluster.Cluster#join
    如果配置了多个 URL,则使用 Cluster 合并多个 Invoker
  3. 创建动态代理
    -> ProxyFactory#getProxy(Invoker invoker)
    常用的动态代理技术有 javassist、cglib、jdk,其中 dubbo 使用的是 javassist。

    根据早期 Dubbo 作者梁飞(http://javatar.iteye.com/blog/814426)的说法,使用 javassist 是为了性能。

Consumer端服务调用过程

Dubbo组件

调用代理类的方法

请求实际调用的是InvokerInvocationHandler.invoke

Registry & Directory

Registry 将注册信息保存到本地的Directory

启动服务时需要给一个Dubbo接口创建代理,这时需要将注册URL转换为Invoker对象:
org.apache.dubbo.registry.integration.RegistryProtocol#refer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}

引用一个服务时,会注册一个zkListener,监听注册服务的命名空间的变更情况。
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
那么服务是怎么注册的呢?其实就是上边Provider注册服务的过程。
监听到注册中心的变更后,更新本地的Invoker列表,同时删除不可用的。
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

...

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

...

try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

Invoker使用Directory

为了服务高可用同一个服务一般会有多个应用服务器提供,要先挑选一个提供者提供服务。在服务接口消费者初始化时,接口方法和提供者 Invoker 对应关系保存在 Directory。 中,通过调用的方法名称(或方法名称+第一个参数)获取该方法对应的提供者 Invoker 列表,如注册中心设置了路由规则,对这些 Invoker 根据路由规则进行过滤。
启动时订阅某个服务:
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 订阅providers、configurators、routers这几个namespace
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 使用Cluster组合Invoker
Invoker invoker = cluster.join(directory);
return invoker;
}

添加监听器:
org.apache.dubbo.registry.integration.RegistryDirectory#subscribe

1
2
3
4
5
6
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}

Consumer端监听服务变更事件,刷新Invoker列表:
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker

Registry的几种实现

  • ZooKeeperRegistry
  • RedisRegistry
    注册信息的存储,是在启动时调用的:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Override
    public void doRegister(URL url) {
    // key = dubbo/com.package.to.InterfaceName/providers
    String key = toCategoryPath(url);
    // url的全名
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    // 使用hash结构,可以providers一个key下面存多个url
    jedis.hset(key, value, expire);
    jedis.publish(key, Constants.REGISTER);
    success = true;
    if (! replicate) {
    break; //  如果服务器端已同步数据,只需写入单台机器
    }
    } finally {
    jedisPool.returnResource(jedis);
    }
    } catch (Throwable t) {
    exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
    }
    if (exception != null) {
    if (success) {
    logger.warn(exception.getMessage(), exception);
    } else {
    throw exception;
    }
    }
    }
    注册信息的主动删除,进程关闭时:
    1

Directory的几种实现

  • RegistryDirectory
    保存注册中心的服务注册信息,包括routers、configurators、provider。
  • StaticDirectory
    Invoker列表是固定的。

Cluster

封装了服务降级和容错机制,比如,如果调用失败则执行其他(FailoverClusterInvoker)、仍然调用失败则降级执行 mock(MockClusterInvoker)。
调用的第一层是MockClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
// 没有设置mock属性或设置为false,则直接调就完了
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
// 配成force了,直接调mock方法
else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
}
// fail-mock的方式
else {
try {
result = this.invoker.invoke(invocation);

//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}

} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}

if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}

实际invoke调用的是父类AbstractClusterInvoker的invoke方法,这个方法的主要功能是提供负载均衡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

// 找到所有可调用的服务器
List<Invoker<T>> invokers = list(invocation);
// 发送时要经过负载均衡
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}

上面的doInvoke是一个模板方法,由子类实现,默认子类是FailoverClusterInvoker,可以看到,它先通过负载均衡策略得到一个Invoker,再调用该Invoker,Invoker的默认实现是DubboInvoker,表示使用的是Dubbo协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Result doInvoke(List<Invoker<T>> invokers,
final List<Invoker<T>> invoked,
Holder<RpcException> lastException,
final Set<String> providers,
final Invocation invocation,
final LoadBalance loadbalance,
final int totalRetries,
int retries,
Holder<Invoker<T>> lastInvoked) throws RpcException {
if (retries < totalRetries) {
checkWheatherDestoried();
invokers = list(invocation);
checkInvokers(invokers, invocation);
}

// 负载均衡
final Invoker<T> invoker = select(loadbalance, invocation, invokers, invoked);
invoked.add(invoker);
lastInvoked.value = invoker;
RpcContext.getContext().setInvokers((List) invoked);

try {
return invoker.invoke(invocation);
} catch (RpcException e) {
//业务异常不重试
if (e.isBiz()) {
throw e;
}
lastException.value = e;
} catch (Throwable e) {
lastException.value = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}

if (--retries == 0) {
throw populateException(invokers, lastException.value, providers, invocation, totalRetries);
}

return doInvoke(invokers, invoked, lastException, providers, invocation, loadbalance, totalRetries, retries, lastInvoked);
}

Cluster的实现

  • MockClusterInvoker
    调用失败降级到mock接口;
  • BroadcastClusterInvoker
    每个Invoker都调一次,忽略了LoadBalance;
  • AvailableClusterInvoker
    把处于可用状态的Invoker都调一遍。
  • FailoverClusterInvoker
    一个Invoker失败就换个Invoker重试几次。
  • FailbackClusterInvoker
    如果调用失败就放到一个线程池中延迟5秒再发,一般用于发消息。
  • FailfastClusterInvoker
    失败立刻报错
  • FailsafeClusterInvoker
    失败就忽略,一般是用于记日志这种失败了影响也不大的场景。
  • ForkingClusterInvoker
    一次性选n个Invoker,并行调用,只要有一个调用成功就返回,线程间通过LinkedBlockingQueue通信。

LoadBalance

Cluster 层包含多个 Invoker,LoadBalance 负责从中选出一个来调用,有多种 LoadBalance 策略,比如随机选一个(RandomLoadBalance)、轮询(RoundRobinLoadBalance)、一致性hash(ConsistentHashLoadBalance)。
实例化LoadBalance:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
使用LoadBalance选择一个Invoker:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#select

LoadBalance的多种实现

  • RandomLoadBalance
    计算权重,然后根据每个Invoker的权重调一个。
  • LeastActiveLoadBalance
    找最近最不活跃的Invoker调用,如果这样的Invoker有多个,则按权重来随机选一个。
  • RoundRobinLoadBalance
    轮询
  • ConsistentHashLoadBalance
    一致性哈希,启动时会将Invoker排列在一个圆环上:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    this.identityHashCode = identityHashCode;
    URL url = invokers.get(0).getUrl();

    String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
    argumentIndex = new int[index.length];
    for (int i = 0; i < index.length; i++) {
    argumentIndex[i] = Integer.parseInt(index[i]);
    }

    int replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
    for (Invoker<T> invoker : invokers) {
    String address = invoker.getUrl().getAddress();
    // 多复制几个,更均匀,避免所有请求都被hash到同一个Invoker
    for (int i = 0; i < replicaNumber / 4; i++) {
    byte[] digest = md5(address + i);
    for (int h = 0; h < 4; h++) {
    long m = hash(digest, h);
    // 放入圆环上
    virtualInvokers.put(m, invoker);
    }
    }
    }
    }
    将Invoker保存到virtualInvokers上,但是virtualInvokers本身是一个HashMap,如果新来的请求不能精确hash到其中的某个Invoker怎么办?是通过tailMap找到的下一个Invoker:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Invoker<T> selectForKey(long hash) {
    Invoker<T> invoker;
    Long key = hash;

    if (!virtualInvokers.containsKey(key)) {
    SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
    if (tailMap.isEmpty()) {
    key = virtualInvokers.firstKey();
    } else {
    key = tailMap.firstKey();
    }
    }
    invoker = virtualInvokers.get(key);
    return invoker;
    }

Filter & Invoker 层

不过,在实际网络调用之前,Dubbo还提供Filter功能,Cluster会先激活Filter链然后最终调到DubboInvoker.invoke(RpcInvocation)

  1. ConsumerContextFilter可以将请求对象Invocation添加到上下文RpcContext中,其实就是存储到一个ThreadLocal变量中。
  2. FutureFilter在调用完毕后唤醒调用者线程。
  3. 或许还会有一些自定义的Filter,比如增加线程的TraceId、打印一些调用日志之类的,Filter结束后才最终调用到DubboInvoker

DubboInvoker封装了同步和异步调用,Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。
DubboInvoker是通过Netty发送消息的,消息本身如何发送的就不多说了。

Exchange

封装了网络客户端的发送逻辑,包括:

  • HeaderExchangeChannel
    对 Request 的序列化
  • ReferenceCountExchangeClient
    无引用时自动关闭客户端
  • HeaderExchangeClient
    心跳检测

数据编码 & 发送

DubboCodec
NettyChannel#send

Provider端接受调用的过程

  1. 接收请求
    NettyClient
    请求被接收后,通过 Netty 调用链向下传递执行
    NettyHandler#messageReceived
    NettyChannel
  2. 解码
    ExchangeCodec
  3. 线程派发
    Dispatcher
    IO 线程接收请求后分发给事件处理线程执行,具体的派发逻辑在ChannelHandler中实现,比如AllChannelHandler
  4. 请求分发
    ChannelEventRunnable
    根据请求类型将请求分发给不同的ChannelHandler处理。

Provider 端响应

Consumer 端接收响应

  1. 发送完请求后阻塞
    HeaderExchangeHandler
    用户线程在发送完请求后,会调用 DefaultFutureget 方法等待响应对象的到来,这时每个DefaultFuture都会关联一个调用编号,用于在接收到响应时能对应上请求的DefaultFuture
    当响应对象到来后,IO 线程根据调用编号可以找到DefaultFuture,之后会将响应对象保存到DefaultFuture,并唤醒用户线程。

为什么使用 Dubbo

选型时一般需要考虑:

  1. 业务特点及可预见的后续的发展。
  2. 可用性要求。
  3. 团队的成熟度。一个成熟的团队可以很好地 Hold 住复杂的开源框架,甚至做定制化开发。

在选择使用 Dubbo 之后,又需要考虑很多细节,比如:

  1. Dubbo 底层走什么协议?如何对对象进行序列化,用了哪些序列化方式?如何处理异步转同步?
  2. 高并发高可用性。Dubbo 依赖了 ZooKeeper,但是万一 ZooKeeper 宕机了怎么办?
    如果 ZooKeeper 假死,客户端对服务端的调用是否会全部下线?如果是该如何避免?
    如何监控 Dubbo 的调用,并做到优雅的客户端无感发布?

最佳实践

  1. 模块化
    推荐将服务接口、实体、异常等都放到 API 包内,它们都是 API 的一部分。
  2. 粗粒度
    暴露的 Dubbo 接口的粒度应尽可能得粗,代表一个完整的功能,而不是其中的某一步,否则就不得不面对分布式事务问题了,而 Dubbo 当前并没有提供分布式事务支持。
  3. 版本
    某露服务接口的配置最好增加版本,当有不兼容的升级(比如接口定义要加个参数)时,版本可以方便地实现平滑发布,而又不用引入多余的代码。
    版本只需要两位即可,比如"1.0",因为升级并不是频繁的操作,因为不兼容的升级不会那么频繁。
    升级时,先将一半的 provider 升级到新版本,然后将所有 consumer 升级,最后将其余的 provider 升级。
  4. 兼容性
    向后兼容:接口加方法、对象加字段;
    不兼容:删除方法、删除字段、枚举类型加字段。
    不兼容的情况下,可以通过升级版本来实现平滑发布。
  5. 枚举类型
    枚举是类型安全的,但是作为 Dubbo 接口的参数 / 返回值却不合适,因为 provider 会将枚举转换为字符串传输,接收方会尝试寻找该字符串所属的枚举 field,找不到就会直接报错。
  6. 序列化
    传值没必要使用接口抽象,因为序列化需要接口实现类的元信息(包括 getter、setter),无法隐藏实现。
    参数和返回值必须 byValue 而不是 byReference,因为 Dubbo 不支持远程对象,provider 引用的对象 consumer 就找不到了。
  7. 异常
    最好直接抛异常而不是返回异常码,因为异常可以携带更多信息、语法上也更加友好。
    provider 不要将 DAO 层的异常抛给 consumer 端,consumer 端不应该关注 provider 对服务是如何实现的。

开始使用 Dubbo

ZooKeeper

ZooKeeper 在 Dubbo 中可以作为注册中心使用。
下载 ZooKeeper,修改配置,配置文件位于{ZOOKEEPER_HOME}/conf/zoo.cfg:

1
2
3
4
5
dataDir = /tmp/zk/data
clientPort = 2181
tickTime = 2000
initLimit = 5
syncLimit = 2
  • dataDir:数据保存的目录
  • clientPort:监听的端口
  • tickTime:心跳检查间隔
  • initLimit:Follower 启动从 Leader 同步数据时能忍受多少个心跳的时间间隔
  • syncLimit:Leader 同步到 Follower 后,如果超过 syncLimit 个 tickTime 的时间过去,还没有收到 Follower 的响应,那么就认为该 Follower 已下线。

后台启动:

1
./bin/zkServer.sh start-foreground

SDK

SDK 是一个被 provider 和 consumer 同时依赖的 jar 包,它的作用包括:

  • 提供实体类的定义;
    1
    2
    3
    public class Person {
    ...
    }
  • 提供接口的定义;
    1
    2
    3
    public interface UserServiceBo {
    String sayHello(String name);
    }

在设计 SDK 时包含一些注意要点,比如:

  • 不要使用枚举,用字符串常量来替代,因为 Dubbo 反序列化时如果碰到不存在的枚举就会抛出异常,这个问题编译期无法发现,可能造成线上故障;
  • 升级时不要随意修改接口定义,provider 和 consumer 接口定义不同会导致运行时故障,最佳实践是提升dubbo:referencedubbo:service的版本号,或者直接增加一个接口。

Provider

  1. 声明依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>dubbo</artifactId>
    <version>2.6.6</version>
    </dependency>
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.0.35.Final</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
    </dependency>
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
    </dependency>
  2. Dubbo 配置文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://code.alibabatech.com/schema/dubbo
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="dubboProvider"/>

    <!-- 使用zookeeper注册中心暴露服务地址 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!-- 用dubbo协议在20880端口暴露服务 -->
    <dubbo:protocol name="dubbo" port="20880"/>
    <!-- 启用monitor模块 -->
    <dubbo:monitor protocol="registry"/>

    <bean id="userService" class="com.tallate.provider.UserServiceImpl"/>

    <!-- 声明需要暴露的服务接口 -->
    <dubbo:service interface="com.tallate.UserServiceBo" ref="userService"
    group="dubbo" version="1.0.0" timeout="3000"/>

    </beans>
  3. 接口的实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class UserServiceImpl implements UserServiceBo {

    @Override
    public String sayHello(String name) {
    //让当前当前线程休眠2s
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    return name;
    }
    }
  4. 启动
    原生 Spring 的启动方式:
    1
    2
    3
    4
    5
    public static void main(String[] arg) throws InterruptedException {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:provider.xml");
    //挂起当前线程,如果没有改行代码,服务提供者进程会消亡,服务消费者就发现不了提供者了
    Thread.currentThread().join();
    }

    如果需要以 SpringBoot 或 Docker 方式启动可以参考官方的示例

Consumer

  1. 声明依赖
    同 Provider
  2. Dubbo 配置文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://code.alibabatech.com/schema/dubbo
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
    <dubbo:application name="dubboConsumer" />

    <!-- 使用multicast广播注册中心暴露发现服务地址 -->
    <dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181" />
    <!-- 启动monitor-->
    <dubbo:monitor protocol="registry" />
    <!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
    <dubbo:reference id="userService" interface="com.tallate.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000"/>

    </beans>

    这里出现了一些以 dubbo 作为前缀的标签,它们是由 Dubbo 的扩展 DubboNamespaceHandler 来处理的,DubboBeanDefinitionParser 在解析完后会得到对应 BeanDefinition,然后生成对象放到 BeanFactory 中。

  3. 启动
    1
    2
    3
    4
    5
    6
    7
    8
    public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
    new String[]{"classpath:consumer.xml"});

    final UserServiceBo demoService = (UserServiceBo) context.getBean("userService");

    System.out.println(demoService.sayHello("Hello World"));
    }

调用 Dubbo 原生 API 启动

  1. Provider
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    // 等价于<bean id="userService" class="com.test.UserServiceImpl" />
    UserServiceBo userService = new UserServiceImpl();
    // 等价于<dubbo:application name="dubboProvider" />
    ApplicationConfig application = new ApplicationConfig();
    application.setName("dubboProvider");

    // 等价于<dubbo:registry address="zookeeper://127.0.0.1:2181" />
    RegistryConfig registry = new RegistryConfig();
    registry.setAddress("127.0.0.1:2181");
    registry.setProtocol("zookeeper");

    // 等价于<dubbo:protocol name="dubbo" port="20880" />
    ProtocolConfig protocol = new ProtocolConfig();
    protocol.setName("dubbo");
    protocol.setPort(20880);

    // 等价于<dubbo:monitor protocol="registry" />
    MonitorConfig monitorConfig = new MonitorConfig();
    monitorConfig.setProtocol("registry");

    // 等价于<dubbo:service interface="com.test.UserServiceBo" ref="userService"
    // group="dubbo" version="1.0.0" timeout="3000"/>
    // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏
    ServiceConfig<UserServiceBo> service = new ServiceConfig<>();
    service.setApplication(application);
    service.setMonitor(monitorConfig);
    // 多个注册中心可以用setRegistries()
    service.setRegistry(registry);
    // 多个协议可以用setProtocols()
    service.setProtocol(protocol);
    service.setInterface(UserServiceBo.class);
    service.setRef(userService);
    service.setVersion("1.0.0");
    service.setGroup("dubbo");
    service.setTimeout(3000);
    service.export();

    // 挂起当前线程
    Thread.currentThread().join();
  2. Consumer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 等价于<dubbo:application name="dubboConsumer" />
    ApplicationConfig application = new ApplicationConfig();
    application.setName("dubboConsumer");

    // 等价于<dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181" />
    RegistryConfig registry = new RegistryConfig();
    registry.setAddress("127.0.0.1:2181");
    registry.setProtocol("zookeeper");

    // 等价于 <dubbo:monitor protocol="registry" />
    MonitorConfig monitorConfig = new MonitorConfig();
    monitorConfig.setProtocol("registry");

    //等价于<dubbo:reference id="userService" interface="com.test.UserServiceBo"
    //group="dubbo" version="1.0.0" timeout="3000" />
    // 此实例很重,封装了与注册中心的连接以及与提供者的连接,最好放缓存,否则可能造成内存和连接泄漏
    ReferenceConfig<UserServiceBo> reference = new ReferenceConfig<>();
    reference.setApplication(application);
    // 多个注册中心可以用setRegistries()
    reference.setRegistry(registry);
    reference.setInterface(UserServiceBo.class);
    reference.setVersion("1.0.0");
    reference.setGroup("dubbo");
    reference.setTimeout(3000);
    reference.setInjvm(false);
    reference.setMonitor(monitorConfig);

    UserServiceBo userService = reference.get();
    System.out.println(userService.sayHello("哈哈哈"));
    Thread.currentThread().join();

泛化调用

正常情况下我们使用 Dubbo 时会将实体类和接口定义放到一个 SDK 包内,其实也可以不加入这个包、直接将要传的参数放到一个 Map 对象内,称为泛化调用,但是这种方式没有什么实践价值,在此就不赘述了。

Dubbo 架构

Dubbo 是一个分布式服务框架,是阿里巴巴 SOA 服务化治理方案的核心框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,以及 SOA 服务治理方案。简而言之,Dubbo 是个远程服务调用的分布式框架(告别 Web Service 模式中的 WSdl,以服务提供者与消费者的方式在 dubbo 上注册)。

Apache Dubbo 是一款高性能、轻量级的开源 Java RPC 框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。
Dubbo 的架构基本上可以概括为 RPC+服务发现,或者可以称之为弹性 RPC 框架。

CP+三大中心

Dubbo架构

图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 Provider、Consumer、Registry、Monitor 划分逻辑拓普节点,保持统一概念。

Provider: 暴露服务的服务提供方,启动时会注册自己提供的服务到注册中心。
Consumer: 调用远程服务的服务消费方,启动时会去注册中心订阅自己需要的服务,服务注册中心异步提供 Provider 的地址列表,Consumer 根据路由规则和预设的负载均衡算法选择一个 Provider 的 IP 进行调用,调用是直连的,失败后会调用另外一个。
Registry: 服务注册与发现的注册中心。
Monitor: 统计服务的调用次调和调用时间的监控中心,Provider 和 Consumer 在内存中累计调用次数和耗时,并定时每分钟发送一次统计数据到监控中心。
Container: 服务运行容器。

大数据量传输时适合用短连接,小数据量高并发适合用长连接。从上图中可以得知,Provider 和 Consumer 均通过长连接与注册中心通信,当消费方调用服务时,会创建一个连接,然后同时会创建一个心跳发送的定时线程池,每一分钟发送一次心跳包到注册中心,通过 ping-pong 来检查连接的存活性,同时还会启动断线重连定时线程池,每两秒钟检查一次连接状态,如果断开就重连,而当注册中心断开连接后,会回调通知 Consumer 销毁连接,同理,Provider 也是通过长连接与注册中心通信。

元数据中心

2.7 之后提供的一个新组件,容易和注册中心混淆,元数据和注册中心中的注册信息之间的区别如下:

  • 元数据(Metadata)指的是服务分组、服务版本、服务名、方法列表、方法参数列表、超时时间等
  • 注册信息指服务分组、服务版本、服务名、地址列表等。

元数据中心和注册中心包含了一些公共数据,另外,元数据中心还会存储方法列表即参数列表,注册中心存储了服务地址,其他的一些区别如下所示:

  • | 元数据 | 注册信息
  • | - | -
    职责 | 描述服务,定义服务的基本属性 | 存储地址列表
    变化频繁度 | 基本不变 | 随着服务上下线而不断变更
    数据量 | 大 | 小
    数据交互/存储模型 | 消费者/提供者上报,控制台查询 | PubSub 模型,提供者上报,消费者订阅
    主要使用场景 | 服务测试、服务 | MOCK 服务调用
    可用性要求 | 元数据中心可用性要求不高,不影响主流程 | 注册中心可用性要求高,影响到服务调用的主流程

Dubbo 层次化结构

Dubbo框架
Dubbo 的架构是分层的,使用这种方式可以使各个层之间解耦合(或者最大限度地松耦合)。从服务模型的角度来看,Dubbo 采用的是一种非常简单的模型,要么是提供方提供服务,要么是消费方消费服务。

Dubbo扩展

  • 服务接口层(Service):该层是与实际业务逻辑相关的,根据服务提供方和服务消费方的业务设计对应的接口和实现。

RPC 是 Dubbo 的核心:

  • 配置层(Config)
    对外配置接口,以 ServiceConfigReferenceConfig 为中心,可以直接 new 配置类,也可以通过 Spring 解析配置生成配置类。
  • 服务代理层(Proxy)
    服务接口透明代理。Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 ProxyInvoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
    Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
  • 服务注册层(Registry)
    封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactoryRegistryRegistryService。可能没有服务注册中心,此时服务提供方直接暴露服务。
  • 集群层(Cluster)
    封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 ClusterDirectoryRouterLoadBalance。将多个服务提供方组合为一个服务提供方,实现对服务消费方来透明,只需要与一个服务提供方进行交互。
  • 监控层(Monitor)
    RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactoryMonitorMonitorService
  • 远程调用层(Protocol)
    封装 RPC 调用,扩展接口为 ProtocolInvokerExporter。Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。

Registry 和 Monitor 实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起。
Cluster 是外围概念,Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有一个提供者时,是不需要 Cluster 的。

Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina、Netty、Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。

  • 交换层(Exchange):封装请求响应模式,同步转异步,以 Request 和 Response 为中心,扩展接口为ExchangerExchangeChannelExchangeClientExchangeServer
  • 网络传输层(Transport):抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为ChannelTransporterClientServerCodec
  • 数据序列化层(Serialize):可复用的一些工具,扩展接口为SerializationObjectInputObjectOutputThreadPool

Dubbo包结构

  • dubbo-common 公共逻辑模块,包括 Util 类和通用模型。
  • dubbo-remoting 远程通讯模块,相当于 Dubbo 协议的实现,如果 RPC 用 RMI 协议则不需要使用此包。
  • dubbo-rpc 远程调用模块,抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
  • dubbo-cluster 集群模块,将多个服务提供方伪装为一个提供方,包括:负载均衡、容错、路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
  • dubbo-registry 注册中心模块,基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
  • dubbo-monitor 监控模块,统计服务调用次数,调用时间的,调用链跟踪的服务。
  • dubbo-config 配置模块,是 Dubbo 对外的 API,用户通过 Config 使用 Dubbo,隐藏 Dubbo 所有细节。
  • dubbo-container 容器模块,是一个 Standalone 的容器,以简单的 Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。

集群 - Cluster

提供基于接口方法的透明远程过程调用,包括多协议支持,以及软负载均衡,失败容错,地址路由,动态配置等集群支持。

服务目录(Directory)

服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。
服务目录与注册中心之间的区别:

  • 注册中心存储服务提供者信息,在 Dubbo 中通过 ZooKeeper 实现;
  • 服务目录是 Invoker 的集合,且这个集合中的元素会随注册中心的变化而进行动态调整。

服务目录会在客户端启动时初始化完成,并订阅注册中心的更新:
com.alibaba.dubbo.registry.support.FailbackRegistry#FailbackRegistry
com.alibaba.dubbo.registry.support.FailbackRegistry#subscribe

Directory 继承结构

Directory 接口包含了一个获取配置信息的方法 getUrl,实现该接口的类可以向外提供配置信息。Directory 有多个实现。

  • StaticDirectory
    获取一次 Invoker 列表后就不变了。
  • RegistryDirectory
    实现了 NotifyListener 接口,当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化,然后根据配置变更信息刷新 Invoker 列表。
    刷新 Invoker 列表代码:com.alibaba.dubbo.registry.integration.RegistryDirectory#refreshInvoker

路由(Router)

服务目录中包含多个 Invoker,需要通过路由规则来选择调用哪个,Dubbo 提供了 3 种路由实现:条件路由 ConditionRouter脚本路由 ScriptRouter标签路由 TagRouter

条件路由(ConditionRouter)

容错方案

集群容错
Dubbo 提供多种集群的容错方案,默认情况下为 Failover。
com.alibaba.dubbo.rpc.cluster.Cluster

Failover

失败自动切换,当出现失败,重试其它服务器 (该配置为默认配置)。通常用于读操作,但重试会带来更长时间的延迟。

1
2
3
4
5
6
<!--配置集群容错模式为失败自动切换 -->
<dubbo:reference cluster="failover" />
<!-- 调用queryOrder方法如果失败共调3次,重试2次,如果成功则只调1次 -->
<dubbo:reference>
<dubbo:method name="queryOrder" retries="2" />
</dubbo:reference>

通常用于幂等操作,多次调用副作用相同,譬如只读请求,Failover 使用得较多,推荐使用,但重试会带来更长延迟,应用于消费者和提供者的服务调用。

Failfast

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录和修改数据,Failfast 使用得较多,但如果有机器正在重启,可能会出现调用失败,应用于消费者和提供者的服务调用。

Failsafe

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作,Failsafe 使用得不多,但调用信息会丢失,应用于发送统计信息到监控中心。

Failback

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作,使用得很少,不可靠,重启会丢失,应用于注册服务到注册中心。

Forking

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,使用得很少,但需要浪费更多服务资源。

Broadcast

广播调用所有提供者,逐个调用,任意一台报错则报错 。通常用于通知所有提供者更新缓存或日志等本地资源信息,速度慢,任意一台报错则报错,使用得很少。

负载均衡

Random LoadBalance

随机调用(默认配置),按权重设置随机概率,在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重,使用较多,推荐使用,但重试时,可能出现瞬间压力不均。

1
2
3
4
<!-- 服务端方法基本负载均衡设置 -->
<dubbo:service interface="com.service.dubbo.queryOrder">
<dubbo:method name="queryOrder" loadbalance="roundrobin" />
</dubbo:service>

RoundRobin LoadBalance

轮循调用,按公约后的权重设置轮循比率,存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上,极端情况可能产生雪崩。

LeastActive LoadBalance

最少活跃数调用,相同活跃数的随机,活跃数指调用前后计数差,使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差(与时间有关)会越大,但不支持权重。

ConsistentHash LoadBalance

一致性 Hash,相同参数的请求总是发到同一提供者,当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。缺省只对第一个参数 Hash,如果要修改,请配置:

1
<dubbo:parameter key="hash.arguments" value="0,1" />

缺省用 160 份虚拟节点,如果要修改,请配置:

1
<dubbo:parameter key="hash.nodes" value="320" />

由于是通过哈希算法分摊调用,有可能出现调用不均匀的情况

远程通信 - Transport

提供对多种基于长连接的 NIO 框架抽象封装,包括多种线程模型,序列化,以及“请求-响应”模式的信息交换方式。
Dubbo 支持如下网络通信框架:

  • Mina
  • Netty
  • Grizzly

序列化 - Serialize

反射

通过缓存加载的 Class、setAccessible(false)去掉安全校验等来提高反射效率,或者使用反射包ReflectASM

序列化

对性能敏感,对开发体验要求不高的内部系统 thrift 或 protobuf
对开发体验敏感,性能有要求的内外部系统 hessian2
对序列化后的数据要求有良好的可读性 jackson/gson/xml
对兼容性和性能要求较高的系统 protobuf 或 kryo ,它们的性能相差不多,但是 protobuf 有个缺点就是要传输的每一个类的结构都要生成对应的 proto 文件。

Filter

ProtocolFilterWrapper#export:如果当前 protocol 不是 registry,则调用 buildInvokerChain
-> ProtocolFilterWrapper#buildInvokerChain
-> ExtensionLoader#getActivateExtension(URL url, String key, String group):获取系统自动激活的 Filter 和用户自定义的 Filter,最后合并返回

更多功能

限流

限流最好配置在 Provider 端,因为 Consumer 可能有很多个服务器实例,如果他们同时发起对同一 Provider 实例的请求可能会超出机器的处理能力上限。

1
2
3
4
5
6
7
8
<!-- 限制接口OrderService里的每个方法,服务提供者端的执行线程不超过10个 -->
<dubbo:service interface="com.bubbo.service.OrderService" executes="10" />
<!-- 限制接口OrderService里的queryOrderList方法,服务提供者端的执行线程不超过10个 -->
<dubbo:service interface="com.bubbo.service.OrderService">
<dubbo:method name="queryOrderList" executes="10" />
</dubbo:service>
<!--限制使用dubbo协议时在服务提供者端启用的连接数不超过1000个-->
<dubbo:provider protocol="dubbo" accepts="1000"/>

上述配置限制的是线程数,即并发连接数,Consumer 和 Provider 默认通过一条共享的 TCP 长连接通信,连接成功的情况下请求线程交由 IO 线程池异步读写数据,数据被反序列化后交由业务线程池处理具体业务,也就是对应的 Impl 实现类的具体方法。

服务隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!--当同一个接口有多个实现时,可以通过group来隔离  -->
<!--服务提供者 -->
<dubbo:service group="ImplA" interface="com.bubbo.service.OrderService"/>
<dubbo:service group="ImplB" interface="com.bubbo.service.OrderService"/>
<!--服务调用者 -->
<dubbo:reference id="MethodA" group="ImplA" interface="com.bubbo.service.OrderService"/>
<dubbo:reference id="MethodB" group="ImplB" interface="com.bubbo.service.OrderService"/>

<!--当一个接口出现升级,新旧实现同时存在时,可以通过版本号来隔离,通常版本号隔离也用于联调阶段,不同版本号的服务无法调用,版本号相同的服务才能调用 -->
<!--服务提供者 -->
<dubbo:service interface="com.bubbo.service.OrderService" version="new2.0.0"/>
<dubbo:service interface="com.bubbo.service.OrderService" version="old1.0.0"/>
<!--服务调用者 -->
<dubbo:reference id="NewMethodA" interface="com.bubbo.service.OrderService" version="new2.0.0"/>
<dubbo:reference id="OldMethodB" interface="com.bubbo.service.OrderService" version="old1.0.0"/>

通过版本号,也可以实现消费者和提供者服务端直接连接,因为发起调用默认使用随机调用端负载均衡模式,当有多台提供者的时候,会随机选取,通常联调阶段都会调用指定服务进行联调,直连一般用在调试,开发阶段,只需要消费者和提供者 version 相同即可。

灰度发布

有三台服务器 A、B、C 要上线,现在三台服务器都是旧版本代码,那首先从 Ngnix 负载均衡列表里移除 A 服务器的配置,切断对 A 的访问,然后在 A 服务器不受新的代码,重新把 A 配置进 Ngnix 负载均衡列表。如果在线使用没有问题,则继续升级 B、C 服务器,否则回滚,恢复旧版本代码,这是针对三端(PC 端,微信端,移动端)跟网关系统的。
如果是针对子系统,譬如用户系统、订单系统等,可以通过分组 group 来实现子系统的灰度发布。服务提供者有两组,One、Two,将新版本代码 group 改为 Two,旧版本 group 还是 One,将新版本的消费者 group 改为 Two,这时请求定位到新的消费者再调用新的提供者,而且旧的消费者还是请求旧的提供者,如果线上没有问题,那就把提供者 group 为 One 的组改为 Two,并部署新代码,旧的消费者也改成 Two 并部署新代码如果有问题,那消费端和提供端都回滚到旧版本。

异步调用

Dubbo 默认情况下是同步调用的,就是调用后立刻返回,但如果消费端调用服务端创建文件并转化成 PDF 格式的文件这种在 IO 密集操作时,消费端同步调用需要等待对方转换结束才返回,很消耗性能,这时选择异步调用和回调调用更合适。

1
2
3
4
5
6
7
8
9
10
11
<!--
async="true" 异步调用,调用后不用等待,继续往下执行
onreturn ="CallBack.onreturn" 返回后调用自定义的类CallBack类的onreturn方法
onthrow="CallBack.onthrow" 调用后,提供者抛出异常后,返回调用自定义的类CallBack类的onthrow方法
-->
<!--服务调用者 -->
<dubbo:reference id="tranfromPDF" interface="com.bubbo.service.OrderService" >
<dubbo:method name="tranPDF" async="true"
onreturn ="CallBack.onreturn"
onthrow="CallBack.onthrow"/>
</dubbo:reference>

可以在 onthrow 事件里实现服务降级的方法,譬如遇到网络抖动,调用超时返回时可在 onthrow 里 return null。

  • 调用方
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Test
    public void testQueryOrder() {
    // 此时调用会立即拿到null值
    List<Order> list = this.orderService.queryOrderList();
    // 拿到Future的引用,在提供方返回结果后,结果值会被设置进Future
    Future<String> orderFuture = RpcContext.getContext().getFuture();
    try {
    // 该方法是阻塞方法,在拿到值之前一直等待,直到拿到值才会被唤醒,该方法会抛出异常,可以捕获
    String returnValue = orderFuture.get();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
  • 回调方
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 回调接口
    interface ICallBack {
    // 第一个参数是返回值,第二个参数是原参数
    public void onreturn(String returnValue, String initParameter);

    // 第一个参数是异常,第二个参数是原参数
    public void onthrow(Throwable ex, String initParameter);
    }

    // 实现类
    class CallBackImpl implements ICallBack {
    public void onreturn(String returnValue, String initParameter) {
    // do something
    };

    public void onthrow(Throwable ex, String initParameter) {
    // do something
    };
    }

异步调用
调用方有一个用户线程池用于处理调用请求(比如 Tomcat 里那个线程池),请求被转发到 IO 线程池,由 IO 线程来发起对提供方的调用,此时 IO 线程会新建一个 Future 对象进 RpcContext,用户线程可以继续继续自己的业务逻辑,然后在需要的时候调用 Future 的 get 方法阻塞等待,而服务端只需要将结果返回给 IO 线程,由 IO 线程调用 notify 方法唤醒阻塞等待中的用户线程。

服务降级

服务降级用于在服务高峰期将次要服务降级,仅保留关键服务,从而降低系统负载、提升可用性。比如,订单列表正常情况下展示所有订单,但是如果是在网站开展秒杀之类的大促活动时,就可以降级展示当月的订单而不是所有,再其次,如果服务器宕机了,也最好展示兜底页而不是 504。

1
<dubbo:service interface="com.bubbo.service.OrderService" mock="com.dubbo.service.MonthOderMock"/>

热点缓存

1
2
3
4
5
<!--服务调用者 -->
<dubbo:reference id="queryCatalog" interface="com.bubbo.service.CatalogService">
<dubbo:method name="queryCatalog" cache="lru" />
</dubbo:reference>
<dubbo:monitor protocol="registry" />

如果查询的对象改变很少但又数据量很大的时候,如首页目录,可以避免每次都频繁调用服务端,可以设置本地缓存,加快热点数据的访问,Dubbo 的缓存类型 LRU 缓存,最近最少使用的数据会被清除,使用频繁的数据被保留,Thredlocal 缓存,当前线程的缓存,假如当前线程有多次请求,每次请求都需要相同的用户信息,那就适用,避免每次都去查询用户基本信息。

源码分析

环境配置比较简单,就是 zk->provider->consumer,在此不再赘述。

失败重试

Dubbo 中的失败重试机制比较丰富,基本考虑到常用的场景
http://dubbo.apache.org/zh-cn/docs/user/demos/fault-tolerent-strategy.html
FailoverClusterInvoker、FailfastClusterInvoker 等,以 FailoverClusterInvoker 为例:
FailoverClusterInvoker.doInvoke 重试几次,把失败的添加到 invoked 列表里
-> AbstractClusterInvoker.select 选一个可用的调用,如果是已经被选过或因为其他条件不可用则 reselect

负载均衡

http://dubbo.apache.org/zh-cn/docs/user/demos/loadbalance.html

幂等

Dubbo 没有提供幂等性检查功能,需要自定义。

限流

Dubbo 中的限流比较简单,采用的是计数器算法,单位时间内超出阈值的流量会被直接丢弃,而且只支持 PORVIDER 端的限流,而且为了让它生效还要搞复杂的 SPI 配置。
https://www.jianshu.com/p/7112a8d3d869
入口:TpsLimitFilter.invoke
-> TPSLimiter.isAllowable 为每个 Service 创建一个计数器 StatItem(粒度是整个 Service 有没有太大了)

降级

Dubbo 里的降级比较水,即调用出错就改成调用 Mock 接口,没有 Hystrix 中那么复杂的逻辑:
http://dubbo.apache.org/zh-cn/docs/user/demos/service-downgrade.html
https://www.cnblogs.com/java-zhao/p/8320519.html
入口:ReferenceConfig.createProxy 创建代理
-> ProxyFactory.getProxy
-> InvokerInvocationHandler.invoke
-> MockClusterInvoker.invoke 如果配置中有 fail 开头,则在远程调用失败后调用 doMockInvoke,大概逻辑是实例化一个 XxxServiceMock 服务然后调用

优雅停机

https://www.jianshu.com/p/6e4d1ecb0815

QA

说一下你们怎么用 Dubbo 的(考对 Dubbo 的应用能力)

说一下 Dubbo 的工作原理

Dubbo架构
描述 Registry、Consumer、Provider 之间的关系。

Dubbo 负载均衡策略和集群容错策略都有哪些

负载均衡策略和集群容错策略见上面的《集群》小节。

Dubbo 的动态代理策略

javassist,类似 CGLIB,通过继承目标类以生成代理类。

说一下服务注册(导出)过程

分本地暴露和远程暴露两种

说一下服务消费(引入)过程

服务的运行过程中,如果 ZooKeeper 挂掉了,这时还能正常请求吗?

说一下 Dubbo 协议

Dubbo 有几种容错机制

dubbo 有几种服务降级机制

dubbo 有几种服务降级机制

参考

  1. apache/incubator-dubbo
  2. Dubbo 文档
  3. Dubbo 实例 Demos
    中文版
  4. 设计 RPC 接口时,你有考虑过这些吗?
  5. 解密 Dubbo:自己动手编写 RPC 框架

启动过程

  1. 研究优雅停机时的一点思考
    kill -9kill -15的区别,SpringBoot 的停机机制。
  2. 一文聊透 Dubbo 优雅停机
  3. 一文聊透 Dubbo 优雅上线
  4. Spring-boot+Dubbo 应用启停源码分析
  5. 服务导出
  6. 服务引入

SPI

  1. Dubbo SPI
  2. 自适应拓展机制

协议

  1. 【RPC 专栏】深入理解 RPC 之协议篇
  2. Dubbo 在跨语言和协议穿透性方向的探索:支持 HTTP/2 gRPC
  3. 一文详细解读 Dubbo 中的 http 协议
  4. 聊聊 TCP 长连接和心跳那些事
  5. Dubbo 中的 URL 统一模型
  6. 研究网卡地址注册时的一点思考
  7. RFC 5234 - Augmented BNF for Syntax Specifications: ABNF
  8. 服务端经典的 C10k 问题(译)

心跳机制

  1. 一种心跳,两种设计
  2. 聊聊 TCP 长连接和心跳那些事

序列化

  1. 【RPC 专栏】深入理解 RPC 之序列化篇–总结篇
  2. 【RPC 专栏】深入理解 RPC 之序列化篇 —— Kryo
  3. 如何提高使用 Java 反射的效率?
  4. Java 序列化框架性能比较
0%