Tallate

该吃吃该喝喝 啥事别往心里搁

Spring 如何加载注解

很多注解本身只是提供了一个标识,要实现注解所表示的功能,必然还会有一个扫描器扫描这个注解,然后将必须的Bean注入到Spring容器内,而且很多时候会为被注解的对象生成一个动态代理,以实现日志记录、接口幂等、限流等功能。
要自己实现一个注解,关键是如何扫描及如何生成代理并注入到Spring容器这两个步骤,具体的实现可以参考MapperScannerConfigurer,大体逻辑是:

  1. 在Spring容器加载完毕后,再对指定包下的类进行一次扫描;

Spring 三级缓存

Spring 中产生循环依赖的三种情况

  1. 构造器注入循环依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Service
    public class A {
    public A(B b) {
    }
    }
    @Service
    public class B {
    public B(A a) {
    }
    }
    构造器注入构成的循环依赖,此种循环依赖方式是无法解决的,只能抛出 BeanCurrentlyInCreationException 异常表示循环依赖。
    不能解决的原因是:Spring 解决循环依赖的原理是实例化 Bean 后先把引用存到一个 Map 中,之后初始化成员变量时,可以直接从这个 Map 中取。但是构造器注入相当于实例化和初始化是同时进行的,因此无法解决。
  2. singleton 模式 field 属性注入循环依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Service
    public class A {
    @Autowired
    private B b;
    }

    @Service
    public class B {
    @Autowired
    private A a;
    }
  3. prototype 模式 field 属性注入循环依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Service
    public class A {
    @Autowired
    private B b;
    }

    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    @Service
    public class B {
    @Autowired
    private A a;
    }

Spring 创建 Bean 的流程

  1. createBeanInstance:实例化,其实也就是调用对象的构造方法实例化对象
  2. populateBean:填充属性,这一步主要是对 bean 的依赖属性进行注入(@Autowired)
  3. initializeBean:回到一些形如 initMethod、InitializingBean 等方法

其中,循环依赖可能发生在第一步和第二步,其中第一步是因为构造方法中可能会需要传入其他 Bean。

Spring 三级缓存如何解决循环依赖

缓存生效时间

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultSingletonBeanRegistry extends SimpleAliasRegistry implements SingletonBeanRegistry {

// 用于存放完全初始化好的 bean,从该缓存中取出的 bean 可以直接使用
/** Cache of singleton objects: bean name --> bean instance */
private final Map<String, Object> singletonObjects = new ConcurrentHashMap<String, Object>(256);

// 提前曝光的单例对象的cache,存放原始的 bean 对象(尚未填充属性),用于解决循环依赖
/** Cache of early singleton objects: bean name --> bean instance */
private final Map<String, Object> earlySingletonObjects = new HashMap<String, Object>(16);

// 单例对象工厂的cache,存放 bean 工厂对象,用于解决循环依赖
/** Cache of singleton factories: bean name --> ObjectFactory */
private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<String, ObjectFactory<?>>(16);

获取单例 Bean 的过程

org.springframework.beans.factory.support.DefaultSingletonBeanRegistry#getSingleton(java.lang.String)

  1. 先从一级缓存 singletonObjects 中去获取,如果获取到就直接 return。
  2. 如果获取不到或者对象正在创建中(isSingletonCurrentlyInCreation()),那就再从二级缓存 earlySingletonObjects 中获取,如果获取到就直接 return。
  3. 如果还是获取不到,且允许 singletonFactories(allowEarlyReference=true)通过 getObject()获取。就从三级缓存 singletonFactory.getObject()获取,如果获取到了就从 singletonFactories 中移除,并且放进 earlySingletonObjects,其实也就是从三级缓存移动到了二级缓存。

总结Linux中的5种IO模型,其中最常用的是IO多路复用,特别是epoll是各种网络框架的底层IO框架。

阅读全文 »

文件系统操作

1
2
3
4
5
6
7
ls -F filename # 列出目录,使用*等符号标志文件类型,*表示可执行文件,/为目录文件
ls -i filename # 查看文件inode号
mkdir -p dirname # 创建目录,如果有父目录就创建父目录
stat filename # 查看文件详细信息,包括inode号、链接数
mv # 移动/重命名
ln oldfile newfile # 创建硬链接
ln -s sourcefile/sourcedir targetfile/targetdir # 创建软链接

chroot

chroot

mount 和 umount

1
mount | column -t # 查看挂载分区信息

制作目录硬链接。

1
2
3
4
5
6
7
# 假设当前目录下有a和b两个目录
mount -o bind a b # b成为a的硬链接,其实就是将a挂载到了b上
mount # 通过mount可以看出是bind这个参数起作用
/dev/sda2 on /home/hgc/Downloads/b type ext4 (rw,relatime,errors=remount-ro,data=ordered)
# 删除时必须先卸载再删除
umount b
rm -rf b

ln

fdisk

1
fdisk -l # 查看硬盘分区信息

并发(Concurrency)

线程、进程,线程安全,进程同步,可见性,一致性,锁,信号量,并发,并行

线程和进程

从操作系统概念上说,线程是最小的可执行单位,也就是系统调度的最小单位。进程是资源分配的最小单位。线程是依赖进程存在的,共享进程内的资源,如内存,cpu,io 等。在操作系统的发展过程中,为了提高系统的稳定性、吞吐量和安全性,操作系统内核和用户态做了隔离,例如 Linux 有内核线程,用户线程,内核进程,用户进程,从根本上 Linux 是没有线程的,线程对 Linux 系统来说是个特殊的进程。那么用户线程和内核线程是一一对应呢?从宏观上看是一一对应的,在用户态的每一个线程,都会在内核有对应的执行线程,但是由于资源的限制,用户态的线程和内核线程是多对一的关系。用户进程和内核进程也类似。具体怎样对应的,这里就不探讨了。
为了提高操作系统的实时性,操作系统是以时间片轮转来实现任务调度的。理论上时间片内是不可以被中断的,可认为是 cpu 最小的单位执行时间。现代操作系统为了提高用户体验,线程都是抢占式的,而中断一般在时间片用完的时候发生。线程、进程和 CPU 都是多对一的关系,所以存在进程线程切换的问题。
线程内部还是有自己内存空间的,所以有个概念叫线程内存模型。线程内部有自己私有的本地内存,故线程和线程之间的本地内存存在可见性问题。例如全局变量 A 在线程 1 修改后,线程 2 并不一定能拿到 A 的修改值,因为线程 1 会把全局变量 A 拷贝到本地内存,修改后并不会马上同步。在编译的时候,编译器为了优化,(例如利用超线程技术)可能会重排指令的执行顺序,这就会存在一致性了。

线程安全

在线程安全里面经常要讨论的两个问题就是:可见性和一致性。锁是什么东西呢?锁就是一道内存屏障,保证可见性和一致性的一种策略,由操作系统甚至更底层的硬件提供。加锁是消耗资源的,特别是在多核 CPU 上,现在多核 CPU 一般有 3 级缓存,一级缓存通常是单核独占的,而线程的本地内存很可能就保存在 cpu 的缓存里面,然而加锁就意味着保证可见性和一致性,需要中断同步数据,保证别人拿到的是最新修改值。由于用途不同,锁被设计成各种各样的,如互斥锁,读写锁,自旋锁,同步块,数据库的事务等,如果只要保证可见性的,可以不使用锁,在 java 里面可以使用 volatile 修饰全局变量。虽然在 c/c++,都有同样的修饰符,但是是不是一样的意思呢,请参考其他文章。

死锁(deadlock)

定义

多个进程竞争资源造成的互相等待情况。

资源

可重用性资源:可供重复使用多次的资源
不可抢占性资源:一旦系统把某资源分配给该进程后,就不能将它强行收回,只能在进程使用完后自动释放
可消耗资源:又叫临时性资源,它是在进程运行期间,由进程动态的创建和消耗的

死锁产生的原因

  1. 系统资源的竞争
    系统资源的竞争导致系统资源不足,以及资源分配不当,导致死锁。
    主要是竞争可重用不可抢占式的资源和可消耗的资源。
  2. 进程运行推进顺序不合适
    进程在运行过程中,请求和释放资源的顺序不当,会导致死锁。

死锁产生的条件

互斥条件 一个资源每次只能被一个进程使用,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
请求与保持条件 进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源 已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
不可剥夺条件 进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能 由获得该资源的进程自己来释放(只能是主动释放)。
循环等待条件 若干进程间形成首尾相接循环等待资源的关系
这四个条件是死锁的必要条件,只要系统发生死锁,这些条件必然成立,而只要上述条件之一不满足,就不会发生死锁。
形象地说,就是有两个酒鬼,一个有开瓶器,一个有酒,这两种资源都只能被一个人占有(互斥),且用完之前不能被另一个人抢去(不可剥夺),他们互相等对方手上的资源(循环等待),但又不肯放开自己手上的资源(请求与保持),因此陷入了死锁。

死锁避免

系统对进程发出每一个系统能够满足的资源申请进行动态检查,并根据检查结果决定是否分配资源,如果分配后系统可能发生死锁,则不予分配,否则予以分配。
书上给出了两种死锁避免策略

  1. 进程启动拒绝
    若对每个资源,能满足现有所有进程再加上新进程的需求,则可以启动这个进程,否则拒绝
  2. 资源分配拒绝(银行家算法)

死锁预防

死锁预防是设法至少破坏产生死锁的四个必要条件之一,严格的防止死锁的出现。

  1. 互斥
    不可能禁止,比如文件只允许互斥的写访问
  2. 占有且等待
    可以要求进程一次性请求所有需要的资源,并且阻塞这个进程直到所有请求都同时满足,这样就不会再请求新资源了。
  3. 不可抢占
  4. 循环等待

持久化(Persistent)

Linux IO 模型

X
应用程序调用内核 IO 函数的过程如下图所示:
X
处于 OS 的安全性等的考虑,进程无法直接操作 I/O 设备,必须通过系统调用来请求内核完成 I/O 动作,而内核会为每个 I/O 设备维护一个 Buffer。

  1. 用户进程发起请求;
  2. 内核接收到请求后,从 I/O 设备中获取数据到 Buffer 中;
  3. 将 Buffer 中的数据拷贝到用户进程的地址空间,该用户进程获取到数据后响应给客户端。

在整个请求过程中,数据输入至 Buffer 需要时间,从 Buffer 复制数据到进程也需要时间,这个等待时间是限制 I/O 效率的罪魁祸首,根据等待方式的不同,I/O 动作可以分为以下五种模式:

  • 阻塞 I/O(Blocking I/O)
  • 非阻塞 I/O(Non-Blocking I/O)
  • I/O 复用(I/O Multiplexing)
  • 信号驱动的 I/O(Signal Driven I/O)
  • 异步 I/O(Asynchronous I/O)

存储器管理

文件系统

IO(pipe)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <syscall.h>

int testPipe(){
int fd[2];

int *read_fd = &fd[0];
int *write_fd = &fd[1];

if(pipe(fd) == -1){
printf("pipe create failed\n");
}

int pid = fork();
if(pid == -1){
printf("fork failed");
return -1;
}
else if(pid == 0){
int count;
char string[] = "hahaha";
close(*read_fd);
count = write(*write_fd, string, sizeof(string));
printf("写了%d个字符\n", count);
return 0;
}
else{
int count;
char buffer[100];
close(*write_fd);
count = read(*read_fd, buffer, sizeof(buffer));
printf("父进程接受到%d字节的数据:%s", count, buffer);
return 0;
}
}

虚拟内存

swap

虚拟化(Virtualization)

驱动管理

参考

Linux 应用

  1. 虚拟内存
    All about Linux swap space
    Swap
    Linux Performance: Why You Should Almost Always Add Swap Space
  2. 演进
    Linux 内核的发展 介绍 2.6.28 和 2.6.29 版本中的新特性
    对 Linux 内核的发展方向的展望 - Linux 4.2
  3. 运维
    老司机告诉你:正规的运维工作是什么的?
  4. 并发
    Linux 原子操作 atomic_cmpxchg()/Atomic_read()/Atomic_set()/Atomic_add()/Atomic_sub()
  5. 隔离
    cgroup - jerry017cn

操作系统

  1. 分时和实时操作系统
    List of open source real-time operating systems
  2. 操作系统概念
    【操作系统】操作系统综述(一)
  3. Operating Systems: Three Easy Pieces

Concurrency

  1. 【操作系统】进程管理(二)
  2. 【操作系统】处理机调度与死锁(三)

Persistent

  1. 【操作系统】存储器管理(四)
  2. 【操作系统】文件管理(六)

Virtualization

  1. 【操作系统】设备管理(五)

Linux 内核

  1. 哈工大操作系统
    操作系统之基础
    操作系统原理与实践
  2. NJU
    在开始愉快的 PA 之旅之前
    NJU-ProjectN
  3. The Linux Information Project
  4. The Linux Kernel Archives
  5. Linux 内核文档(中文版)
  6. Write Your Own Operating System
  7. 6.828: Operating System Engineering

并发安全容器(Queue)

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层数据结构使用单向链表实现,入队和出队操作使用 CAS 来实现线程安全。
2.1 ConcurrentLinkedQueue 类图结构
先简单介绍下 ConcurrentLinkedQueue 的类图结构如下图:
image.png
如上类图 ConcurrentLinkedQueue 内部的队列是使用单向链表方式实现,其中两个 volatile 类型的 Node 节点分别用来存放队列的首尾节点。从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
Node 节点内部则维护一个 volatile 修饰的变量 item 用来存放节点的值,next 用来存放链表的下一个节点,从而链接为一个单向无界链表。
首先一个图来概况该队列构成,读者可以读完本节后在回头体会这个图:image.png
2.2 ConcurrentLinkedQueue 原理介绍
本节主要介绍 ConcurrentLinkedQueue 的几个主要的方法的实现原理
2.2.1 offer 操作
offer 操作是在队列末尾添加一个元素,如果传递的参数是 null 则抛出 NPE 异常,否者由于 ConcurrentLinkedQueue 是无界队列该方法一直会返回 true。另外由于使用 CAS 无阻塞算法,该方法不会阻塞调用线程,下面具体看看实现原理。
public boolean offer(E e) {
//(1)e为null则抛出空指针异常
checkNotNull(e);

//(2)构造Node节点
final Node newNode = new Node(e);

//(3)从尾节点进行插入
for (Node<E> t = tail, p = t;;) {

    Node<E> q = p.next;

    //(4)如果q==null说明p是尾节点,则执行插入
    if (q == null) {

        //(5)使用CAS设置p节点的next节点
        if (p.casNext(null, newNode)) {
            //(6)cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
            if (p != t)
                casTail(t, newNode);  // Failure is OK.
            return true;
        }
    }
    else if (p == q)//(7)
        //多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
        //重新找新的head,因为新的head后面的节点才是正常的节点。
        p = (t != (t = tail)) ? t : head;
    else
        //(8) 寻找尾节点
        p = (p != t && t != (t = tail)) ? t : q;
}

}
上节类图结构时候谈到构造队列时候参构造函数创建了一个 item 为 null 的哨兵节点,并且 head 和 tail 都是指向这个节点,下面通过图形结合来讲解下 offer 操作的代码实现。
首先看下当一个线程调用 offer(item)时候情况:首先代码(1)对传参判断空检查,如果为 null 则抛出 NPE 异常,然后代码(2)则使用 item 作为构造函数参数创建了一个新的节点,代码(3)从队列尾部节点开始循环,意图是从队列尾部添加元素。
image.png
上图是执行代码(4)时候队列的情况,这时候节点 p,t,head,tail 同时指向了 item 为 null 的哨兵节点,由于哨兵节点的 next 节点为 null, 所以这里 q 指向也是 null。
代码(4)发现q==null则执行代码(5)通过 CAS 原子操作判断 p 节点的 next 节点是否为 null,如果为 null 则使用节点 newNode 替换 p 的 next 节点,然后执行代码(6)由于p==t所以没有设置尾部节点,然后退出 offer 方法,这时候队列的状态图如下:
image.png
上面讲解的是一个线程调用 offer 方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5),假设线程 A 调用 offer(item1), 线程 B 调用 offer(item2), 线程 A 和 B 同时执行到 p.casNext(null, newNode)。而 CAS 的比较并设置操作是原子性的,假设线程 A 先执行了比较设置操作则发现当前 p 的 next 节点确实是 null 则会原子性更新 next 节点为 newNode,这时候线程 B 也会判断 p 的 next 节点是否为 null,结果发现不是 null(因为线程 A 已经设置了 p 的 next 为 newNode)则会跳到步骤(3),然后执行到步骤(4)时候队列分布图为:
image.png
根据这个状态图可知线程 B 会去执行代码(8),然后 q 赋值给了 p,这时候队列状态图为:
image.png
然后线程 B 再次跳转到代码(3)执行,当执行到代码(4)时候队列状态图为:
image.png
由于这时候 q==null, 所以线程 B 会执行步骤(5),通过 CAS 操作判断当前 p 的 next 节点是否是 null,不是则再次循环后尝试,是则使用 newNode 替换,假设 CAS 成功了,那么执行步骤(6)由于 p!=t 所以设置 tail 节点为 newNode,然后退出 offer 方法。这时候队列分布图为:
image.png
分析到现在,offer 代码的执行路径现在就差步骤(7)还没走过,其实这个要在执行 poll 操作后才会出现,这里先看下执行 poll 操作后可能会存在的的一种情况如下图:
image.png
下面分析下当队列处于这种状态时候调用 offer 添加元素代码执行到步骤(4)时候的状态图:
image.png
由于 q 节点不为空并且p==q所以执行步骤(7),由于t==tail所以 p 被赋值为了 head,然后进入循环,循环后执行到代码(4)时候队列状态为:
image.png
由于q==null, 所以执行步骤(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p 的 next 节点被设置为新增节点,然后执行步骤(6),由于p!=t所以设置新节点为队列为节点,现在队列状态如下:
image.png
这里自引用的节点会被垃圾回收掉。
总结:可见 offer 操作里面关键步骤是代码(5)通过原子 CAS 操作来进行控制同时只有一个线程可以追加元素到队列末尾,进行 cas 竞争失败的线程则会通过循环一次次尝试进行 cas 操作,直到 cas 成功才会返回,也就是通过使用无限循环里面不断进行 CAS 尝试方式来替代阻塞算法挂起调用线程,相比阻塞算法这是使用 CPU 资源换取阻塞所带来的开销。
2.2.2 poll 操作
poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null,下面看看实现原理。
public E poll() {
//(1) goto标记
restartFromHead:

//(2)无限循环
for (;;) {
    for (Node<E> h = head, p = h, q;;) {

        //(3)保存当前节点值
        E item = p.item;

        //(4)当前节点有值则cas变为null
        if (item != null && p.casItem(item, null)) {
            //(5)cas成功标志当前节点以及从链表中移除
            if (p != h) 
                updateHead(h, ((q = p.next) != null) ? q : p);
            return item;
        }
        //(6)当前队列为空则返回null
        else if ((q = p.next) == null) {
            updateHead(h, p);
            return null;
        }
        //(7)自引用了,则重新找新的队列头节点
        else if (p == q)
            continue restartFromHead;
        else//(8)
            p = q;
    }
}

}
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
同理本节也通过图形结合的方式来讲解代码执行逻辑:
poll 操作是从队头获取元素,所以代码(2)内层循环是从 head 节点开始迭代,代码(3)获取当前队头的节点,当队列一开始为空时候队列状态为:
image.png
由于 head 节点指向的为 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null 如下图:
image.png
所以执行 updateHead 方法,由于 h 等于 p 所以没有设置头结点,poll 方法直接返回 null。
假设执行到代码(6)时候已经有其它线程调用了 offer 方法成功添加一个元素到队列,这时候 q 指向的是新增元素的节点,这时候队列状态为:
image.png
所以代码(6)判断结果为 false,然后会转向代码(7)执行,而此时 p 不等于 q,所以转向代码(8)执行,执行结果是 p 指向了节点 q,此时队列状态为:
image.png
然后程序转向代码(3)执行,p 现在指向的元素值不为 null,则执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时没有其它线程进行 poll 操作,CAS 成功则执行代码(5)由于此时 p!=h 所以设置头结点为 p,poll 然后返回被从队列移除的节点值 item。此时队列状态为:
image.png
这个状态就是讲解 offer 操作时候,offer 代码的执行路径(7)执行的前提状态。
假如现在一个线程调用了 poll 操作,则在执行代码(4) 时候队列状态为:
image.png
可知这时候执行代码(6)返回 null.
现在 poll 的代码还有个分支(7)没有执行过,那么什么时候会执行那?下面来看看,假设线程 A 执行 poll 操作时候当前队列状态为:
image.png
那么执行p.casItem(item, null) 通过 CAS 操作尝试设置 p 的 item 值为 null。
假设 CAS 设置成功则标示该节点从队列中移除了,此时队列状态为:
image.png
然后由于 p!=h, 所以会执行 updateHead 方法,假如线程 A 执行 updateHead 前另外一个线程 B 开始 poll 操作这时候线程 B 的 p 指向 head 节点,但是还没有执行到代码(6)这时候队列状态为:
image.png
然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
image.png
然后线程 B 继续执行代码(6)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(7)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
image.png
总结:poll 方法移除一个元素时候只是简单的使用 CAS 操作把当前节点的 item 值设置 null,然后通过重新设置头结点让该元素从队列里面摘除,被摘除的节点就成了孤立节点,这个节点会被在垃圾回收的时候会回收掉。另外执行分支中如果发现头节点被修改了要跳到外层循环重新获取新的头节点。
2.2.3 peek 操作
peek 操作是获取队列头部一个元素(只不获取不移除),如果队列为空则返回 null,下面看看实现原理。
public E peek() {
//(1)
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
//(2)
E item = p.item;
//(3)
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
//(4)
else if (p == q)
continue restartFromHead;
else
//(5)
p = q;
}
}
}
代码结构与 poll 操作类似,不同在于步骤(3)的使用只是少了 castItem 操作,其实这很正常,因为 peek 只是获取队列头元素值并不清空其值,根据前面我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次 peek 时候代码(3)中会发现 item==null, 然后会执行 q = p.next, 这时候 q 节点指向的才是队列里面第一个真正的元素或者如果队列为 null 则 q 指向 null。
当队列为空时候这时候队列状态为:
image.png
这时候执行 updateHead 由于 h 节点等于 p 节点所以不进行任何操作,然后 peek 操作会返回 null。
当队列至少有一个元素时候(这里假设只有一个)这时候队列状态为:
image.png
这时候执行代码(5)这时候 p 指向了 q 节点,然后执行代码(3)这时候队列状态为:
image.png
执行代码(3)发现 item 不为 null,则执行 updateHead 方法,由于 h!=p, 所以设置头结点,设置后队列状态为:
image.png
也就是剔除了哨兵节点。
总结:peek 操作代码与 poll 操作类似只是前者只获取队列头元素但是并不从队列里面删除,而后者获取后需要从队列里面删除,另外在第一次调用 peek 操作时候,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者 null。
2.2.4 size 操作
获取当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁所以从调用 size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。
public int size() {
int count = 0;
for (Node p = first(); p != null; p = succ(p))
if (p.item != null)
// 最大返回Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}

//获取第一个队列元素(哨兵元素不算),没有则为null
Node first() {
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

//获取当前节点的next元素,如果是自引入节点则返回真正头节点
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
2.2.5 remove 操作
如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回 true,否者返回 false
public boolean remove(Object o) {

//查找元素为空,直接返回false
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
    E item = p.item;

    //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。
    if (item != null &&
        o.equals(item) &&
        p.casItem(item, null)) {

        //获取next元素
        Node<E> next = succ(p);

        //如果有前驱节点,并且next不为空则链接前驱节点到next,
        if (pred != null && next != null)
            pred.casNext(p, next);
        return true;
    }
    pred = p;
}
return false;

}
注:ConcurrentLinkedQueue 底层使用单向链表数据结构来保存队列元素,每个元素被包装为了一个 Node 节点,队列是靠头尾节点来维护的,创建队列时候头尾节点指向一个 item 为 null 的哨兵节点,第一次 peek 或者 first 时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以获取 size 的时候有可能进行了 offer,poll 或者 remove 操作,导致获取的元素个数不精确,所以在并发情况下 size 函数不是很有用。

LinkedBlockingQueue

前面介绍了使用 CAS 算法实现的非阻塞队列 ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列 LinkedBlockingQueue 的实现
3.1 LinkedBlockingQueue 类图结构
同理首先看下 LinkedBlockingQueue 的类图结构
image.png
如上类图可知 LinkedBlockingQueue 也是使用单向链表实现,也有两个 Node 分别用来存放首尾节点,并且里面有个初始值为 0 的原子变量 count 用来记录队列元素个数。另外里面有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列获取元素,其它线程必须等待,putLock 控制同时只能有一个线程可以获取锁去添加元素,其它线程必须等待。另外 notEmpty 和 notFull 是信号量,内部分别有一个条件队列用来存放进队和出队时候被阻塞的线程,其实这个是个生产者 - 消费者模型。如下是独占锁创建代码:
/** 执行take, poll等操作时候需要获取该锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 当队列为空时候执行出队操作(比如take)的线程会被放入这个条件队列进行等待 */
private final Condition notEmpty = takeLock.newCondition();

/** 执行put, offer等操作时候需要获取该锁*/
private final ReentrantLock putLock = new ReentrantLock();

/**当队列满时候执行进队操作(比如put)的线程会被放入这个条件队列进行等待 */
private final Condition notFull = putLock.newCondition();

/** 当前队列元素个数 */
private final AtomicInteger count = new AtomicInteger(0);
如下是 LinkedBlockingQueue 无参构造函数代码:
public static final int MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点,指向哨兵节点
last = head = new Node(null);
}
从代码可知默认队列容量为 0x7fffffff; 用户也可以自己指定容量,所以一定程度上 LinkedBlockingQueue 可以说是有界阻塞队列。
首先使用一个图来概况该队列,读者在读完本节后在回头体会下:
image.png
3.2 LinkedBlockingQueue 原理介绍
3.2.1 offer 操作
向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false,如果 e 元素为 null 则抛出 NullPointerException 异常,另外该方法是非阻塞的。
public boolean offer(E e) {

    //(1)空元素抛空指针异常
    if (e == null) throw new NullPointerException();

    //(2) 如果当前队列满了则丢弃将要放入的元素,然后返回false
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;

    //(3) 构造新节点,获取putLock独占锁
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //(4)如果队列不满则进队列,并递增元素计数
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            //(5)
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        //(6)释放锁
        putLock.unlock();
    }
    //(7)
    if (c == 0)
        signalNotEmpty();
    //(8)
    return c >= 0;
}

private void enqueue(Node node) {
last = last.next = node;
}
步骤(2)判断如果当前队列已满则丢弃当前元素并返回 false
步骤(3)获取到 putLock 锁,当前线程获取到该锁后,则其它调用 put 和 offer 的线程将会被阻塞(阻塞的线程被放到 putLock 锁的 AQS 阻塞队列)。
步骤(4)这里有重新判断了下当前队列是否满了,这是因为在执行代码(2)和获取到 putLock 锁期间可能其它线程通过 put 或者 offer 方法向队列里面添加了新元素。重新判断队列确实不满则新元素入队,并递增计数器。
步骤(5)判断如果新元素入队后队列还有空闲空间,则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作(比如执行 put 方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程。
代码(6) 则释放获取的 putLock 锁,这里要注意锁的释放一定要在 finally 里面做,因为即使 try 块抛异常了,finally 也是会被执行到的。另外释放锁后其它因为调用 put 和 offer 而被阻塞的线程将会有一个获取到该锁。
代码(7)c==0 说明在执行代码(6)释放锁时候队列里面至少有一个元素,队列里面有元素则执行 signalNotEmpty,下面看看 signalNotEmpty 的代码:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
可知作用是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这里也说明了调用条件变量的方法前要首先获取对应的锁。
综上可知 offer 方法中通过使用 putLock 锁保证了在队尾新增元素的原子性和队列元素个数的比较和递增操作的原子性。
3.2.2 put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回 true,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。
put 操作的代码结构与 offer 操作类似,代码如下:
public void put(E e) throws InterruptedException {
//(1)空元素抛空指针异常
if (e == null) throw new NullPointerException();
//(2) 构建新节点,并获取独占锁putLock
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//(3)如果队列满则等待
while (count.get() == capacity) {
notFull.await();
}
//(4)进队列并递增计数
enqueue(node);
c = count.getAndIncrement();
//(5)
if (c + 1 < capacity)
notFull.signal();
} finally {
//(6)
putLock.unlock();
}
//(7)
if (c == 0)
signalNotEmpty();
}
代码(2)中使用 putLock.lockInterruptibly() 获取独占锁,相比 offer 方法中这个获取独占锁方法意味着可以被中断,具体说是当前线程在获取锁的过程中,如果被其它线程设置了中断标志则当前线程会抛出 InterruptedException 异常,所以 put 操作在获取锁过程中是可被中断的。
代码(3)如果当前队列已满,则调用 notFull 的 await() 把当前线程放入 notFull 的条件队列,当前线程被阻塞挂起并释放获取到的 putLock 锁,由于 putLock 锁被释放了,所以现在其它线程就有机会获取到 putLock 锁了。
另外考虑下代码(3)判断队列是否为空为何使用 while 循环而不是 if 语句那?其实是考虑到当前线程被虚假唤醒的问题,也就是其它线程没有调用 notFull 的 singal 方法时候 notFull.await() 在某种情况下会自动返回。如果使用 if 语句那么虚假唤醒后会执行代码(4)元素入队,并且递增计数器,而这时候队列已经是满了的,导致队列元素个数大于了队列设置的容量,导致程序出错。而使用 while 循环假如 notFull.await() 被虚假唤醒了,那么循环在检查一下当前队列是否是满的,如果是则再次进行等待。
3.2.3 poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)队列为空则返回null
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
//(2)获取独占锁
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//(3)队列不空则出队并递减计数
if (count.get() > 0) {//3.1
x = dequeue();//3.2
c = count.getAndDecrement();//3.3
//(4)
if (c > 1)
notEmpty.signal();
}
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)返回
return x;
}
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
代码 (1) 如果当前队列为空,则直接返回 null
代码(2)获取独占锁 takeLock,当前线程获取该锁后,其它线程在调用 poll 或者 take 方法会被阻塞挂起
代码 (3) 如果当前队列不为空则进行出队操作,然后递减计数器。
代码(4)如果 c>1 则说明当前线程移除掉队列里面的一个元素后队列不为空(c 是删除元素前队列元素个数),那么这时候就可以激活因为调用 poll 或者 take 方法而被阻塞到 notEmpty 的条件队列里面的一个线程。
代码(6)说明当前线程移除队头元素前当前队列是满的,移除队头元素后队列当前至少有一个空闲位置,那么这时候就可以调用 signalNotFull 激活因为调用 put 或者 offer 而被阻塞放到 notFull 的条件队列里的一个线程,signalNotFull 的代码如下:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll 代码逻辑比较简单,值得注意的是获取元素时候只操作了队列的头节点。
3.2.4 peek 操作
获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)
if (count.get() == 0)
return null;
//(2)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node first = head.next;
//(3)
if (first == null)
return null;
else
//(4)
return first.item;
} finally {
//(5)
takeLock.unlock();
}
}
peek 操作代码也比较简单,这里需要注意的是代码(3)这里还是需要判断下 first 是否为 null 的,不能直接执行代码(4)。正常情况下执行到代码(2)说明队列不为空,但是代码(1)和(2)不是原子性操作,也就是在执行点(1)判断队列不空后,在代码(2)获取到锁前有可能其它线程执行了 poll 或者 take 操作导致队列变为了空,然后当前线程获取锁后,直接执行 first.item 会抛出空指针异常。
3.2.5 take 操作
获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//(1)获取锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//(2)当前队列为空则阻塞挂起
while (count.get() == 0) {
notEmpty.await();
}
//(3)出队并递减计数
x = dequeue();
c = count.getAndDecrement();
//(4)
if (c > 1)
notEmpty.signal();
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)
return x;
}
代码(1)当前线程获取到独占锁,其它调用 take 或者 poll 的线程将会被阻塞挂起。
代码(2)如果队列为空则阻塞挂起当前线程,并把当前线程放入 notEmpty 的条件队列。
代码(3)进行出队操作并递减计数。
代码(4)如果 c>1 说明当前队列不为空,则唤醒 notEmpty 的条件队列的条件队列里面的一个因为调用 take 或者 poll 而被阻塞的线程。
代码(5)释放锁。
代码(6)如果 c == capacity 则说明当前队列至少有一个空闲位置,则激活条件变量 notFull 的条件队列里面的一个因为调用 put 或者 offer 而被阻塞的线程。
3.2.6 remove 操作
删除队列里面指定元素,有则删除返回 true,没有则返回 false
public boolean remove(Object o) {
if (o == null) return false;

//(1)双重加锁
fullyLock();
try {

    //(2)遍历队列找则删除返回true
    for (Node<E> trail = head, p = trail.next;
         p != null;
         trail = p, p = p.next) {
         //(3)
        if (o.equals(p.item)) {
            unlink(p, trail);
            return true;
        }
    }
    //(4)找不到返回false
    return false;
} finally {
    //(5)解锁
    fullyUnlock();
}

}
代码(1)通过 fullyLock 获取双重锁,当前线程获取后,其它线程进行入队或者出队的操作时候就会被阻塞挂起。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
代码(2)遍历队列寻找要删除的元素,找不到则直接返回 false,找到则执行 unlink 操作,unlik 操作代码如下:
void unlink(Node p, Node trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
如果当前队列满,删除后,也不忘记唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
可知删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中一个因为调 用 put 或者 offer 方法而被阻塞的线程。
代码(5)调用 fullyUnlock 方法使用与加锁顺序相反的顺序释放双重锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
总结下,由于 remove 方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元素过程中是线程安全的,并且此时其它调用入队出队操作的线程全部会被阻塞,另外获取多个资源锁与释放的顺序是相反的。
3.2.7 size 操作
int size() : 获取当前队列元素个数。
public int size() {
return count.get();
}
由于在操作出队入队时候操作 Count 的时候是加了锁的,所以相比 ConcurrentLinkedQueue 的 size 方法比较准确。这里考虑下为何 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不适用一个原子变量那?这是因为使用原子变量保存队列元素个数需要保证入队出队操作和操作原子变量是原子性操作,而 ConcurrentLinkedQueue 是使用 CAS 无锁算法的,所以无法做到这个。
注:LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

ArrayBlockingQueue

上节介绍了有界链表方式的阻塞队列 LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列 ArrayBlockingQueue 的原理
4.1 ArrayBlockingQueue 类图结构
同理为了能从全局一览 ArrayBlockingQueue 的内部构造,先看下类图:
image.png
如图 ArrayBlockingQueue 内部有个数组 items 用来存放队列元素,putindex 变量标示入队元素下标,takeIndex 是出队下标,count 统计队列元素个数,从定义可知并没有使用 volatile 修饰,这是因为访问这些变量使用都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。
另外有个独占锁 lock 用来保证出入队操作原子性,这保证了同时只有一个线程可以进行入队出队操作,另外 notEmpty,notFull 条件变量用来进行出入队的同步。
另外由于 ArrayBlockingQueue 是有界队列,所以构造函数必须传入队列大小参数,构造函数代码如下:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

可知默认情况下使用的是 ReentrantLock 提供的非非公平独占锁进行出入队操作的加锁。
首先一个图概况该队列,读者可以读完本节后在回头体会下:
image.png
4.2 ArrayBlockingQueue 原理介绍
本节主要讲解下面几个主要函数的原理。
4.2.1 offer 操作
向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false,如果 e 元素为 null 则抛出 NullPointerException 异常,另外该方法是不阻塞的。
public boolean offer(E e) {
//(1)e为null,则抛出NullPointerException异常
checkNotNull(e);
//(2)获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(3)如果队列满则返回false
if (count == items.length)
return false;
else {
//(4)否者插入元素
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
代码(2)获取独占锁,当前线程获取该锁后,其它入队和出队操作的线程都会被阻塞挂起后放入 lock 锁的 AQS 阻塞队列。
代码(3)如果队列满则直接返回 false,否者调用 enqueue 方法后返回 true,enqueue 的代码如下:
private void enqueue(E x) {
//(6)元素入队
final Object[] items = this.items;
items[putIndex] = x;
//(7)计算下一个元素应该存放的下标
if (++putIndex == items.length)
putIndex = 0;
count++;
//(8)
notEmpty.signal();
}
如上代码首先把当前元素放入 items 数组,然后计算下一个元素应该存放的下标,然后递增元素个数计数器,最后激活 notEmpty 的条件队列中因为调用 poll 或者 take 操作而被阻塞的的一个线程。这里由于在操作共享变量比如 count 前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在 CPU 缓存或者寄存器里面的值。
代码(5)释放锁,释放锁后会把修改的共享变量值比如 Count 的值刷新回主内存中,这样其它线程通过加锁在次读取这些共享变量后就可以看到最新的值。
4.2.2 put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回 true,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。
public void put(E e) throws InterruptedException {
//(1)
checkNotNull(e);
final ReentrantLock lock = this.lock;

//(2)获取锁(可被中断)
lock.lockInterruptibly();
try {

    //(3)如果队列满,则把当前线程放入notFull管理的条件队列
    while (count == items.length)
        notFull.await();

    //(4)插入元素
    enqueue(e);
} finally {
    //(5)
    lock.unlock();
}

}
代码(2)在获取锁的过程中当前线程被其它线程中断了,则当前线程会抛出 InterruptedException 异常而退出。
代码(3)判断如果当前队列满了,则把当前线程阻塞挂起后放入到 notFull 的条件队列,注意这里也是使用了 while 而不是 if。
代码(4)如果队列不满则插入当前元素,此处不再累述。
4.2.3 poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)当前队列为空则返回null,否者调用dequeue()获取
return (count == 0) ? null : dequeue();
} finally {
//(3)释放锁
lock.unlock();
}
}
代码(1)获取独占锁
代码(2)如果队列为空则返回 null,否者调用 dequeue() 方法,dequeue 代码如下:
private E dequeue() {
final Object[] items = this.items;

//(4)获取元素值
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//(5)数组中值值为null;
items[takeIndex] = null;

//(6)队头指针计算,队列元素个数减一

if (++takeIndex == items.length)
takeIndex = 0;
count–;

//(7)发送信号激活notFull条件队列里面的一个线程
notFull.signal();
return x;

}
可知首先获取当前队头元素保存到局部变量,然后重置队头元素为 null,并重新设置队头下标,元素计数器递减,最后发送信号激活 notFull 的条件队列里面一个因为调用 put 或者 offer 而被阻塞的线程。
4.2.4 take 操作
获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {

    //(2)队列为空,则等待,直到队列有元素
    while (count == 0)
        notEmpty.await();
    //(3)获取队头元素
    return dequeue();
} finally {
    //(4) 释放锁
    lock.unlock();
}

}
take 操作的代码也比较简单与 poll 相比只是步骤(2)如果队列为空则把当前线程挂起后放入到 notEmpty 的条件队列,等其它线程调用 notEmpty.signal() 方法后在返回,需要注意的是这里也是使用 while 循环进行检测并等待而不是使用 if。
4.2.5 peek 操作
获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)
return itemAt(takeIndex);
} finally {
//(3)
lock.unlock();
}
}

@SuppressWarnings(“unchecked”)
final E itemAt(int i) {
return (E) items[i];
}
peek 的实现更简单,首先获取独占锁,然后从数组 items 中获取当前队头下标的值并返回,在返回前释放了获取的锁。
4.2.6 size 操作
获取当前队列元素个数。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
size 操作是简单的,获取锁后直接返回 count,并在返回前释放锁。也许你会疑问这里有没有修改 Count 的值,只是简单的获取下,为何要加锁那?其实如果 count 声明为 volatile 这里就不需要加锁了,因为 volatile 类型变量保证了内存的可见性,而 ArrayBlockingQueue 的设计中 count 并没有声明为 volatile,是因为 count 的操作都是在获取锁后进行的,而获取锁的语义之一是获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。
注:ArrayBlockingQueue 通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加 synchronized 的意味。ArrayBlockingQueue 的 size 操作的结果是精确的,因为计算前加了全局锁。

PriorityBlockingQueue

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素,内部是平衡二叉树堆的实现。
5.1 PriorityBlockingQueue 类图结构
下面首先通过类图来从全局了解下 PriorityBlockingQueue 的结构
image.png
如图 PriorityBlockingQueue 内部有个数组 queue 用来存放队列元素,size 用来存放队列元素个数,allocationSpinLock 是个自旋锁,用 CAS 操作来保证同时只有一个线程可以扩容队列,状态为 0 或者 1,其中 0 表示当前没有在进行扩容,1 标示当前正在扩容。
如下构造函数,默认队列容量为 11,默认比较器为 null,也就是使用元素的 compareTo 方法进行比较来确定元素的优先级,这意味着队列元素必须实现了 Comparable 接口;
private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

首先通过一个图来对该队列进行概况,读者读完本机后,可以回头在体会下:
image.png
5.2 原理介绍
5.2.1 offer 操作
offer 操作作用是在队列插入一个元素,由于是无界队列,所以一直返回 true,如下是 offer 函数的代码:
public boolean offer(E e) {

if (e == null)
    throw new NullPointerException();

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();

int n, cap;
Object[] array;

//如果当前元素个数>=队列容量,则扩容(1)
while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);

try {
    Comparator<? super E> cmp = comparator;

    //默认比较器为null (2)
    if (cmp == null)
        siftUpComparable(n, e, array);
    else
        //自定义比较器 (3)
        siftUpUsingComparator(n, e, array, cmp);

    //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程(9)
    size = n + 1;
    notEmpty.signal();//激活调用take()方法被阻塞的线程
} finally {
    //释放独占锁
    lock.unlock();
}
return true;

}
如上代码,主流程比较简单,下面主要看看如何进行扩容的和内部如何建堆的,首先看下扩容逻辑:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //释放获取的锁
Object[] newArray = null;

//cas成功则扩容(4)
if (allocationSpinLock == 0 &&
    UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                             0, 1)) {
    try {
        //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
        int newCap = oldCap + ((oldCap < 64) ?
                               (oldCap + 2) : // grow faster if small
                               (oldCap >> 1));
        if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
            int minCap = oldCap + 1;
            if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                throw new OutOfMemoryError();
            newCap = MAX_ARRAY_SIZE;
        }
        if (newCap > oldCap && queue == array)
            newArray = new Object[newCap];
    } finally {
        allocationSpinLock = 0;
    }
}

//第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
if (newArray == null) // back off if another thread is allocating
    Thread.yield();
lock.lock();//(6)
if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
}

}
tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功。其实这里不先释放锁,也是可行的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容时候还占用锁那么其它线程在这个时候是不能进行出队和入队操作的,这大大降低了并发性。所以为了提高性能,使用 CAS 控制只有一个线程可以进行扩容,并且在扩容前释放了锁,让其它线程可以进行入队出队操作。
spinlock 锁使用 CAS 控制只有一个线程可以进行扩容,CAS 失败的线程会调用 Thread.yield() 让出 cpu,目的意在让扩容线程扩容后优先调用 lock.lock 重新获取锁,但是这得不到一定的保证。有可能 yield 的线程在扩容线程扩容完成前已经退出,并执行代码(6)获取到了锁,这时候获取到的锁的线程发现 newArray 为 null 就会执行代码(1)。如果当前数组扩容还没完毕,当前线程会再次调用 tryGrow 方法,然后释放锁,这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后有调用 yield 方法出让 CPU。可知当扩容线程进行扩容期间,其他线程是原地自旋通过代码(1)检查当前扩容是否完毕,等扩容完毕后才退出代码(1)的循环。
当扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取了该锁,并且 allocationSpinLock 被修饰为了 volatile。
当扩容线程扩容完毕后会执行代码 (6) 获取锁,获取锁后复制当前 queue 里面的元素到新数组。
然后看下具体建堆算法:
private static void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;

//队列元素个数>0则判断插入位置,否者直接入队(7)
while (k > 0) {
    int parent = (k - 1) >>> 1;
    Object e = array[parent];
    if (key.compareTo((T) e) >= 0)
        break;
    array[k] = e;
    k = parent;
}
array[k] = key;(8)

}
下面用图来解释上面算法过程,假设队列初始化容量为 2, 创建的优先级队列的泛型参数为 Integer。
首先调用队列的 offer(2) 方法,希望插入元素 2 到队列,插入前队列状态如下图:
image.png
首先执行代码(1),从上图变量值可知判断值为 false,所以紧接着执行代码(2),由于 k=n=size=0 所以代码(7)判断结果为 false,所以会执行代码(8)直接把元素 2 入队,最后执行代码(9)设置 size 的值加 1,这时候队列的状态如下图:
image.png
然后调用队列的 offer(4) 时候,首先执行代码(1),从上图变量值可知判断为 false,所以执行代码(2),由于 k=1, 所以进入 while 循环,由于 parent=0;e=2;key=4; 默认元素比较器是使用元素的 compareTo 方法,可知 key>e 所以执行 break 退出 siftUpComparable 中的循环; 然后把元素存到数组下标为 1 的地方,最后执行代码(9)设置 size 的值加 1,这时候队列状态为:
image.png
然后调用队列的 offer(6) 时候,首先执行代码(1),从上图变量值知道这时候判断值为 true, 所以调用 tryGrow 进行数组扩容, 由于 2<64 所以 newCap=2 + (2+2)=6; 然后创建新数组并拷贝,然后调用 siftUpComparable 方法,由于 k=2>0 进入 while 循环,由于 parent=0;e=2;key=6;key>e 所以 break 后退出 while 循环; 并把元素 6 放入数组下标为 2 的地方,最后设置 size 的值加 1,现在队列状态:
image.png
然后调用队列的 offer(1) 时候,首先执行代码(1),从上图变量值知道这次判断值为 false,所以执行代码(2),由于k=3, 所以进入 while 循环,由于parent=0;e=4;key=1; key<e,所以把元素 4 复制到数组下标为 3 的地方,然后 k=0 退出 while 循环;然后把元素 1 存放到下标为 0 地方,现在状态:
image.png
这时候二叉树堆的树形图如下:
image.png
可知堆的根元素是 1,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时候,会一次返回堆里面值最小的元素。
5.2.2 poll 操作
poll 操作作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。poll 函数代码如下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//获取独占锁
try {
return dequeue();
} finally {
lock.unlock();//释放独占锁
}
}
如上代码可知在进行出队操作过程中要先加锁,这意味着,当当前线程进行出队操作时候,其它线程不能再进行入队和出队操作,但是从前面介绍 offer 函数时候知道这时候可以有其它线程进行扩容,下面主要看下具体执行出队操作的 dequeue 方法的代码:
private E dequeue() {

//队列为空,则返回null
int n = size - 1;
if (n < 0)
    return null;
else {

    //获取队头元素(1)
    Object[] array = queue;
    E result = (E) array[0];

    //获取队尾元素,并值null(2)
    E x = (E) array[n];
    array[n] = null;

    Comparator<? super E> cmp = comparator;
    if (cmp == null)//(3)
        siftDownComparable(0, x, array, n);
    else
        siftDownUsingComparator(0, x, array, n, cmp);
    size = n;//(4)
    return result;
}

}
如上代码,如果队列为空则直接返回 null,否者执行代码(1)获取数组第一个元素作为返回值存放到变量 Result,这里需要注意下数组里面第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。 然后代码(2)获取队列尾部元素存放到变量 x, 并且置空尾部节点,然后执行代码(3)插入变量 x 到数组下标为 0 的位置后,重新调成堆为最大或者最小堆,然后返回。这里重要的是看如何去掉堆的根节点后,使用剩下的节点重新调整为一个最大或者最小堆,下面我们看下 siftDownComparable 的代码实现:
private static void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];(5)
int right = child + 1;(6)
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}
同理下面我们结合图来模拟上面调整堆的算法过程,接着上节队列的状态继续讲解,上节队列元素序列为 1,2,6,4:
第一次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =4;n=3;result=1;x=4; 这时候队列状态
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第二次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =3;n=2;result=2;x=6; 这时候队列状态:
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第三次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =2;n=1;result=4;x=6; 这时候队列状态:
image.png
然后执行代码(3)调整堆后队列状态为:
image.png
第四次直接返回元素 6.
下面重点说说 siftDownComparable 这个调整堆的算法: 首先说下堆调整的思路,由于队列数组第 0 个元素为树根,出队时候要被移除,这时候数组就不在是最小堆了,所以需要调整堆,具体是要从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会看自己作为根节点的树的左右子树里面那个是最小值,这是一个递归,直到树叶节点结束递归,如果还不明白,没关系,下面结合图来说明下,假如当前队列内容如下:
image.png
其对应的二叉堆树为:
image.png
这时候如果调用了 poll(); 那么 result=2;x=11;队列末尾的元素设置为 null 后,剩下的元素调整堆的步骤如下图:
image.png
如上图(1)树根的 leftChildVal = 4;rightChildVal = 6; 4<6; 所以 c=4; 然后 11>4 也就是 key>c;所以使用元素 4 覆盖树根节点的值,现在堆对应的树如图(2)。
然后树根的左子树树根的左右孩子节点中 leftChildVal = 8;rightChildVal = 10; 8<10; 所以 c=8; 然后发现 11>8 也就是 key>c;所以元素 8 作为树根左子树的根节点,现在树的形状如图(3), 这时候判断 k<half 为 false 就会退出循环,然后把 x=11 设置到数组下标为 3 的地方,这时候堆树如图(4),至此调整堆完毕,siftDownComparable 返回 result=2,poll 方法也返回了。
5.2.3 put 操作
put 操作内部调用的 offer, 由于是无界队列,所以不需要阻塞
public void put(E e) {
offer(e); // never need to block
}
5.2.4 take 操作
take 操作作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞,如下代码:
public E take() throws InterruptedException {
//获取锁,可被中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {

    //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列
    while ( (result = dequeue()) == null)
        notEmpty.await();//阻塞当前线程
} finally {
    lock.unlock();//释放锁
}
return result;

}
如上代码,首先通过 lock.lockInterruptibly() 获取独占锁,这个方式获取的锁是对中断进行响应的。然后调用 dequeue 方法返回堆树根节点元素,如果队列为空,则返回 false,然后当前线程调用 notEmpty.await() 阻塞挂起当前线程,直到有线程调用了 offer()方法(offer 方法内在添加元素成功后调用了 notEmpty.signal 方法会激活一个阻塞在 notEmpty 的条件队列里面的一个线程)。另外这里使用 while 而不是 if 是为了避免虚假唤醒。
5.2.5 size 操作
获取队列元个数,如下代码,在返回 size 前加了锁,保证在调用 size() 方法时候不会有其它线程进行入队和出队操作,另外由于 size 变量没有被修饰为 volatie,这里加锁也保证了多线程下 size 变量的内存可见性。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
注:PriorityBlockingQueue 队列内部使用二叉树堆维护元素优先级,内部使用数组作为元素存储的数据结构,这个数组是可扩容的,当当前元素个数 >= 最大容量时候会通过算法扩容,出队时候始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素,默认元素优先级比较规则是使用元素的 compareTo 方法来做,用户可以自定义优先级的比较规则。

队列对比

上面介绍的各种队列中只有 ConcurrentLinkedQueue 是使用 UNSAFE 类提供的 CAS 非阻塞算法实现的,其他几个队列内部都是使用锁来保证线程安全的。使用 CAS 算法的效率较好,那么是不是所有场景都用 ConcurrentLinkedQueue 那?
其实不然,因为 ConcurrentLinkedQueue 还是无界队列,无界队列使用不当可能造成 OOM。所以当使用 ConcurrentLinkedQueue 的时候在添加元素前应该先判断当前队列元素个数是否已经达到了设定的阈值,如果达到就做一定的处理措施,比如直接丢弃等。这里需要注意判断当前队列元素个数与阈值这个操作不是原子性的,最终会导致队列元素个数比设置的阈值大。
ConcurrentLinkedQueue 在 Tomcat 的的 NioEndPoint 中得到了应用,通过使用 ConcurrentLinkedQueue 将同步转换为异步,可以让 tomcat 同时接受更多请求,模型如下图:
image.png
tomcat 的 NioEndPoint 模式中 acceptor 线程负责接受用户请求,接受后把请求放入到 poll 线程对应的队列,poll 线程从队列里面获取任务后委托给 worker 线程具体处理。
LinkedBlockingQueue 和 ArrayBlockingQueue 都是有界阻塞队列,不同在于一个底层数据结构是链表,一个是数组;另外前者入队出队使用单独的锁,而后者出入队使用同一个锁,所以前者的并发度比后者高。另外创建前者时候可以不指定队列大小,默认队列元素个数为 Integer.MAX_VALUE,而后者必须要指定数组大小。所以使用 LinkedBlockingQueue 时候要记得指定队列大小。
比如比较有名的 LogBack 日志系统的异步日志打印实现中就是用了 ArrayBlockingQueue 作为缓冲队列,如下图,业务检查调用异步 log 进行写入日志时候,实际是把日志放入了 ArrayBlockingQueue 队列就返回了,而具体真正写入日志到磁盘是一个日志线程从队列里面获取任务来做的,这其实是一个多生产单消费模型:
image.png
PriorityBlockingQueue 是无界阻塞队列,是一个队列元素有优先级的队列,前面的队列模式都是 FIFO 先进先出,而 PriorityBlockingQueue 而是优先级最高的元素先出队,而不管谁先进入队列的,所以 PriorityBlockingQueue 经常会用在一些任务具有优先级的场景。还比如上面说的 logback 异步日志模型,如果把日志等级分了优先级,比如 error>warn>info,那么上述模型中队列就可以使用 PriorityBlockingQueue,日志线程会先从队列里面首先获取 error 级别的日志,但是需要注意的是如果业务线程一直向队列里面写入 error 级别日志,那么可能先写入到队列的 warn 和 info 级别的日志将很久甚至永远没机会写入到磁盘。还有一点要注意 PriorityBlockingQueue 是无界限队列,要注意判断队列元素个数不要超过设置的阈值。

无锁栈

Treiber Stack

无锁列表

CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全的 ArrayList,对其进行的修改操作和元素迭代操作都是在底层创建一个拷贝的数组(快照)上进行的,也就是写时拷贝策略。CopyOnWriteArrayList 适合读多写少的场景,但如果应用在写操作频繁的场景下反而会降低性能。
CopyOnWriteArrayList类图

  • lock:保证写操作时的并发安全;

add(E e)

添加操作拷贝了份快照,在快照上添加元素,最后替代原数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {

// 加独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取array
Object[] elements = getArray();

// 拷贝array到新数组,添加元素到新数组
// 新数组长度是原数组长度+1,可见CopyOnWriteArrayList是无界的
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;

// 使用新数组替换添加前的数组
setArray(newElements);
return true;
} finally {
// 释放独占锁
lock.unlock();
}
}

get(int index)

get 操作获取下标处的元素,实际上 get 可以被分解为以下两个步骤:

  1. 获取 array 的引用;
  2. 通过下标访问 array 指定位置的元素。

整个过程并没有加锁,如果在访问期间有另一个线程删除了某个元素,实际上因为修改操作是发生在原数组的一个快照上的,get 操作仍然获取的是原数组上的元素,因此不会发生类似数组越界的问题。但同时也不可避免这个过程带来的弱一致性,因为元素事实上已经被删除了却仍然可以被访问到。

set(int index, E element)

修改 list 中指定元素的值。

  • 如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常;
  • 如果指定位置元素与新值不一致,则创建新数组、在新数组上修改,最后设置新数组到 array(COW)。
  • 即使没有变化,也还是需要重新设置一次 array,这主要是因为 array 本身是 volatile 的,set 方法应当提供 volatile 的语义。

remove(int index)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E remove(int index) {

//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {

//获取数组
Object[] elements = getArray();
int len = elements.length;

//获取指定元素
E oldValue = get(elements, index);
int numMoved = len - index - 1;

//如果要删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
//分两次拷贝除删除后的元素到新数组
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//使用新数组代替老的
setArray(newElements);
}
return oldValue;
} finally {
//释放锁
lock.unlock();
}
}

remove(Object o)

remove(Object o, Object[] snapshot, int index)

iterator

CopyOnWriteArrayList 中的 iterator 是弱一致性的,其他线程的修改操作对 iterator 不可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

static final class COWIterator<E> implements ListIterator<E> {
//array的快照版本
private final Object[] snapshot;

//数组下标
private int cursor;

//构造函数
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

//是否遍历结束
public boolean hasNext() {
return cursor < snapshot.length;
}

//获取元素
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
  • 如果在该线程使用返回的迭代器遍历元素的过程中,其它线程没有对 list 进行增删改,那么 snapshot 本身就是 list 的 array,因为它们是引用关系。
  • 如果在遍历期间存在其他线程对 list 的增删改操作,那么 snapshot 会成为原 array 的快照,此时其他线程对 list 进行的增删改是不可见的,因为它们操作的是两个不同的数组。

无锁队列

ConcurrentLinkedQueue

  • 线程安全
  • 无界
  • 非阻塞

数据结构

ConcurrentLinkedQueue类图

  • 底层队列使用单向链表实现。
  • 两个volatile的Node节点(head和tail)分别存放队列的首尾节点,从下面无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。
    1
    2
    3
    public ConcurrentLinkedQueue() {
    head = tail = new Node(null);
    }
  • Node节点内部为一个volatile修饰的变量item用来存放节点的值,next用来存放链表的下一个节点,从而链接成一个单向无界链表,如下图所示:
    ConcurrentLinkedQueue的队列结构
  • 入队和出队操作使用CAS来实现线程安全。

入队 - offer

offer操作在队列末尾添加一个元素:

  • 如果传入的是null,抛出NPE,表明ConcurrentLinkedQueue是不允许插入null值的;
  • 其他情况下插入任何元素都会返回true,因为该队列是无界队列;
  • 使用CAS操作实现线程安全
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    // 从尾节点进行插入
    for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;

    // 如果q==null说明p是尾节点,则执行插入
    // (1)
    if (q == null) {
    // 使用CAS设置p节点的next节点
    if (p.casNext(null, newNode)) {
    // cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点
    if (p != t)
    casTail(t, newNode); // Failure is OK.
    return true;
    }
    }
    // (2)
    else if (p == q)
    // 多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要
    // 重新找新的head,因为新的head后面的节点才是正常的节点。
    p = (t != (t = tail)) ? t : head;
    // (3)
    else
    // 寻找尾节点
    p = (p != t && t != (t = tail)) ? t : q;
    }
    }
    注意这里的循环体从队列尾部添加元素:
  1. 刚开始队列为空,代码(1)通过CAS替换p的下一个节点;
    注意有一个哨兵节点null,刚开始队列的head和tail节点都是指向该哨兵节点,因此队列中至少都会有一个节点;
  2. 如果多个线程同时执行插入,总会有一个线程CAS时插入失败,这时会进入下一次循环
    ConcurrentLinkedQueue插入1
    这时不满足(1)和(2)的条件,在代码(3)处会将q赋值给p
    ConcurrentLinkedQueue插入2
    再到下一次循环时q就会移动到null,这时要么正常插入,要么又被别人通过CAS抢了。
  3. 代码(2)是在执行poll时可能出现的情况:
    ConcurrentLinkedQueue插入3
    此时由于t==tail,所以p被赋值为head,然后继续循环插入元素。

出队 - poll

poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E poll() {
// goto标记
restartFromHead:

// (1)无限循环
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取当前队头节点
E item = p.item;

// (2)当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//(6)cas成功标志当前节点以及从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// (3)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// (4)自引用了,则重新找新的队列头节点
else if (p == q)
continue restartFromHead;
// (5)
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
  1. 刚开始队列是空的,内层循环代码(3)判断队列为空就直接返回null了;
    这时updateHead执行时由于h等于p所以没有设置头节点,poll直接返回null。
  2. 如果执行到(3)时已经有其他线程调用了offer方法成功添加一个元素到队列末尾,这时q会指向新增元素的节点
    ConcurrentLinkedQueue出队1
    这时会进入(5),令p也指向新q。
    然后在下一次循环时,进入代码(2),执行p.casItem(item, null)时会通过CAS操作设置头节点的值为null。
    代码(6)处,此时h指向哨兵节点,而p指向队列头节点,这时将p设置为新的头节点(这时p里的值已经被清掉了是一个空节点)。
    此时队列的状态为:
    ConcurrentLinkedQueue插入3
    这就是之前讲队列offer时的一种特殊情况。
  3. 自引用的情况
    假设线程A已经执行到(2)将第一个节点值置为null,这时又有一个线程B开始执行poll操作,如下图所示:
    ConcurrentLinkedQueue出队2
    然后线程 A 执行 updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为:
    ConcurrentLinkedQueue出队3
    然后线程 B 继续执行代码(3)q=p.next由于该节点是自引用节点所以p==q所以会执行代码(4)跳到外层循环 restartFromHead,重新获取当前队列队头 head, 现在状态为:
    ConcurrentLinkedQueue出队4

ArrayBlockingQueue

offer是不会阻塞的,如果满了直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果已经超过容量阈值,则直接返回false
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 把它看成一个循环数组,如果超出范围就卷回
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒这个Condition,必须是在加了锁的前提下才能使用
notEmpty.signal();
}

poll同样也不会阻塞,如果空了直接返回null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 回卷
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒
notFull.signal();
return x;
}

put操作会等待notFull这个条件:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

take操作同理,会等待notEmpty这个条件:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

LinkedBlockingQueue 内部是通过单向链表实现,使用头尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作,而头尾节点的操作分别使用了单独的独占锁保证了原子性,所以出队和入队操作是可以同时进行的。另外头尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队出队操作实现了一个生产消费模型。

PriorityBlockingQueue

SynchronousQueue

LinkedTransferQueue

队列比较

Disruptor

  • 无锁内存队列

  • 优化 CPU 伪共享

  • RingBuffer
    环形队列,使用定长数组存储,长度是 2^N,可以使用位运算提升性能。
    无锁:无锁设计减少了竞争。
    预热:预先填充好任务/事件,不需要像链表那样每次添加/删除节点时去创建/回收节点,从而可以避免一定的垃圾回收。
    缓存行填充解决了 CPU 伪共享问题。

  • WorkPool
    存储 WorkProcessor 的池子,Disruptor 可以通过 Executor 并发启动每一个 WorkProcessor

  • WorkProcessor
    从 RindBuffer 消费事件/任务,并交由 WorkHandler 处理。

  • WorkHandler
    处理任务的工作者,根据任务类型委托给不同的 EventHandler。

Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用

异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务写入磁盘,对用户线程来说,耗时只有将数据写入队列中。

并发安全 Map

ConcurrentHashMap

ConcurrentHashMap结构

get

代码:java.util.concurrent.ConcurrentHashMap.get

  1. 计算 key 的散列值,可以使用该散列值定位到散列表中的某个槽。
    如果 key 是自定义类型对象,需要实现重写 hash 方法。
  2. 找到对象
    hash 值是不精确匹配的,hash 值的关键是计算简单而且有一定的区分度,比如取 string 的前 3 位的和作为 hash 值。
    要精确匹配需要使用对象的 equals 方法。
    ConcurrentHashMap 中哈希槽的实现方法有两种:链表和红黑树,链表和红黑树的查找过程就不必细说了。

put

ConcurrentHashMap的put操作
代码:java.util.concurrent.ConcurrentHashMap.put

  1. hash
  2. 找对象
    找对象过程与 get 的区别主要是 put 需要并发控制:
    • 如果槽是空的,则通过 CAS 直接赋值;
    • 如果槽非空,则先用synchronized锁住槽,接下来根据槽的数据结构来插入节点,如果槽是链表,则遍历链表找该 Node 是否已存在,不存在的情况下插入到末尾,如果槽是红黑树,则通过二叉树的遍历找目标 Node,找不到的情况下插入到叶子并重新执行红黑平衡。

rehash

扩容的触发条件与HashMap一致。
扩容流程大致上是:遍历哈希槽,对每个需要迁移的哈希槽进行synchronized加锁。
当扩容开始后,其他线程必须等扩容完成后才能工作,但其他线程也不是就一直阻塞等扩容完成,而是调用helpTransfer方法一起帮助进行扩容,实际上因为扩容的单位是哈希槽,因此多线程并发执行扩容并不会导致明显的冲突增加。

扩容入口:

  1. helpTransfer
    写入操作时协助扩容,即判断hash节点是ForwardingNode则调用helpTransfer将
  2. 全量添加时,需要保证

扩容代码:
java.util.concurrent.ConcurrentHashMap.transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 创建一个新的两倍大小的新nextTab,将老tab中的元素迁移过去
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 标记节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果tab当前位置为null,则设置fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 已经是fwd节点,则遍历下一个位置
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// tab当前位置已有节点,则加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 表示链表节点,如果是树节点则fh=-2
if (fh >= 0) {
// 头节点的hash值
int runBit = fh & n;
Node<K,V> lastRun = f;
// 链表的下一节点p
for (Node<K,V> p = f.next; p != null; p = p.next) {
// p节点的hash值
int b = p.hash & n;
// 避免成环?
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将ln和hn转移到nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
原tab置为fwd,表示已经被转移了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

size

size操作返回的是一个不精确的值,因为进行统计的过程中,很有可能会有其他线程正在进行插入和删除操作。

1.8之前的size:

  1. 遍历segments数组,将每个segment的count加起来作为总数,将modCount加起来作为修改总数;
    modCount会在每次segment被修改时+1(只增不减),用于比较。
  2. 再做一遍遍历,将这次的modCount总数和上一次的比较,如果一致则计数准确直接返回,否则重试;
  3. 如果重试了2次都不行,则第三次会对segment加锁再统计。

1.8之后,没有了分段锁,size不会每次都遍历segments统计,而是在更新时修改总数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

从源码中可以看到,ConcurrentHashMap#size的结果就是:
baseCount + sum(counterCells)
其中:

  • baseCount:计数,总数发生变化时通过CAS修改
  • counterCells:如果baseCount CAS修改失败,作为兜底,类似LongAdder的思路。

put操作的末尾会调用addCount()更新baseCount的值,如果CAS修改失败了,则使用counterCells,如果CAS修改 counterCells失败了,则使用fullAddCount方法继续死循环操作,直到成功。

QA

  1. JUC 并发包中并发组件 CopyOnWriteArrayList 的实现原理,CopyOnWriteArrayList 是如何通过写时拷贝实现并发安全的 List?

  2. 什么是弱一致性?

  3. 说一下 ConcurrentHashMap。

  4. ConcurrentHashMap 怎么实现并发安全?
    相对 Hashtable 来说 ConcurrentHashMap 的锁粒度是更小的,Hashtable 中使用 synchronized 实现的一种方法级的悲观锁,相当于把整个散列表锁住了,不利于系统整体吞吐量的提升。
    JDK1.7 中它使用的是一种分段锁来保证并发安全,是一种粒度较小的锁,写操作每次只锁住一个哈希槽,
    JDK1.8 之后改为通过实现一种基于 CAS 的乐观锁来保证并发安全,当然,和 HashMap 一样,每个哈希槽在增长到一定程度后会自动转换为红黑树。

参考

  1. 【目录】JUC 集合框架目录
  2. 并发框架 Disruptor 译文

线程和状态机

线程和线程任务

线程任务区别于线程,可以理解为线程需要执行的逻辑,类似 Thread 中要执行的 Runnable。

阅读全文 »

参考

ZooKeeper

  1. Zookeeper error: Cannot open channel to X at election address

Elasticsearch

  1. Install Elasticsearch with Docker
  2. 记一次 Docker 下安装 Logstash+Elasticsearch+Kibana 经历
  3. Kibana on Docker cannot connect to Elasticsearch
  4. Elasticsearch 集群部署

Docker MySQL

  1. Docker MySQL

Docker MySQL 双机热备实现

  1. Mysql Master/Slave Replication With Docker
  2. Mysql 双机热备实现
  3. 2.5.6 Deploying MySQL on Linux with Docker
  4. 让 docker 中的 mysql 启动时自动执行 sql
  5. MySQL 主从复制资料汇总

Redis

  1. docker redis
  2. redis/4.0/Dockerfile
  3. 使用Docker Compose部署基于Sentinel的高可用Redis集群

RabbitMQ

  1. docker rabbitmq
  2. bijukunjummen/docker-rabbitmq-cluster
  3. RabbitMQ集群 Docker一键部署
  4. Docker中实现RabbitMQ集群

Nginx

  1. docker nginx
  2. Docker Swarm Load Balancing with NGINX and NGINX Plus

Visualizer

  1. docker-swarm-visualizer

ZooKeeper

  1. 使用 Docker 一步搞定 ZooKeeper 集群的搭建

Docker容器命令

运行中的镜像我们称之为容器,有点类似程序和进程的概念。

运行容器

1
docker run --name container-name -d image-name

运行一个容器,使用 docker run 命令即可。 另,docker run -参数 含义:

  • – name:为容器起一个名称;
  • -d:detached,执行完这句命令后,控制台将不会阻塞,可以继续输入命令操作,不会阻塞,也就是启动守护式容器,如果执行 docker run –name mycentos -it centos 会进入启动容器的命令控制台,也就是启动交互式容器;
  • -i:以交互方式运行容器,通常与 -t搭配使用;
  • -t:为容器重新分配一个伪输入终端,通常与 -i 搭配使用;
  • -P:随机端口映射;
  • -p:指定端口映射,后面会有端口映射详细讲解;
  • image-name:要运行的镜像名称;
    如果以守护式方式启动 centos 容器,执行如下命令:docker run –name mycentos -d centos,会正常返回 container-id,但是通过 docker ps 查看,却发现没有在运行,通过 docker ps -a 发现,原来已经停止了,这是为什么呢?
    1
    2
    # 接下来我们创建一个守护态的Docker容器
    sudo docker run -itd ubuntu:14.04 /bin/bash

查看容器

1
2
# 查看运行中的容器列表
docker ps

输出内容中:

  • CONTAINER ID:启动时生成的 ID;
  • IMAGE:该容器使用的镜像;
  • COMMAND:容器启动时执行的命令;
  • CREATED:容器创建时间;
  • STATUS:当前容器状态;
  • PORTS:当前容器所使用的默认端口号;
  • NAMES:启动时给容器设置的名称。

另外,docker ps -参数,有:

  • -a:查看所有容器,包括已停止运行的;
  • -q:静默模式,只显示容器编号;
  • -l:显示最近创建的容器;
  • -n 3:显示最近创建的 num(此处为 3)个容器;
  • –no-trunc:不截断输出,显示完整信息。
1
2
# 可用通过如下命令查看容器中正在运行进程:
docker top container-id/container-top
1
2
# 可用通过如下命令查看容器内部细节,返回为 json:
docker insepct container-id

进入容器

docker attach

1
sudo docker attach 44fc0f0582d9

但在,使用该命令有一个问题。当多个窗口同时使用该命令进入该容器时,所有的窗口都会同步显示。如果有一个窗口阻塞了,那么其他窗口也无法再进行操作。
另外,官方文档中说 attach 后可以通过 CTRL-C 来 detach,但实际上经过我的测试,如果 container 当前在运行 bash,CTRL-C 自然是当前行的输入,没有退出;如果 container 当前正在前台运行进程,如输出 nginx 的 access.log日志,CTRL-C 不仅会导致退出容器,而且还 stop 了。这不是我们想要的,detach 的意思按理应该是脱离容器终端,但容器依然运行。好在 attach 是可以带上 –sig-proxy=false 来确保 CTRL-D 或 CTRL-C 不会关闭容器。

1
docker attach --sig-proxy=false 7f237caad43b

因为这些原因,所以docker attach命令不太适合于生产环境,平时自己开发应用时可以使用该命令。

ssh

在生产环境中排除了使用docker attach命令进入容器之后,相信大家第一个想到的就是ssh。在镜像(或容器)中安装SSH Server,这样就能保证多人进入容器且相互之间不受干扰了,相信大家在当前的生产环境中(没有使用Docker的情况)也是这样做的。但是使用了Docker容器之后不建议使用ssh进入到Docker容器内。
但是不建议,原因在

nsenter

在上面两种方式都不适合的情况下,还有一种比较方便的方法,即使用nsenter进入Docker容器。
关于什么是nsenter请参考如下文章:
https://github.com/jpetazzo/nsenter
在了解了什么是nsenter之后,系统默认将我们需要的nsenter安装到主机中
如果没有安装的话,按下面步骤安装即可(注意是主机而非容器或镜像)
具体的安装命令如下:

1
2
3
4
5
6
$ wget https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz  
$ tar -xzvf util-linux-2.24.tar.gz
$ cd util-linux-2.24/
$ ./configure --without-ncurses
$ make nsenter
$ sudo cp nsenter /usr/local/bin

安装好nsenter之后可以查看一下该命令的使用。
nsenter可以访问另一个进程的名称空间。所以为了连接到某个容器我们还需要获取该容器的第一个进程的PID。可以使用docker inspect命令来拿到该PID。
docker inspect命令使用如下:

1
$ sudo docker inspect --help   

inspect命令可以分层级显示一个镜像或容器的信息。比如我们当前有一个正在运行的容器

可以使用docker inspect来查看该容器的详细信息。

1
$ sudo docker inspect 44fc0f0582d9  

由其该信息非常多,此处只截取了其中一部分进行展示。如果要显示该容器第一个进行的PID可以使用如下方式

1
$ sudo docker inspect -f {{.State.Pid}} 44fc0f0582d9  

在拿到该进程PID之后我们就可以使用nsenter命令访问该容器了。

1
2
$ sudo nsenter --target 3326 --mount --uts --ipc --net --pid  
$ sudo nsenter --target 3326 --mount --uts --ipc --net --pid

其中的3326即刚才拿到的进程的PID
当然,如果你认为每次都输入那么多参数太麻烦的话,网上也有许多做好的脚本供大家使用。
地址如下:
http://yeasy.gitbooks.io/docker_practice/content/container/enter.html
http://www.tuicool.com/articles/eYnUBrR

exec

除了上面几种做法之外,docker在1.3.X版本之后还提供了一个新的命令exec用于进入容器,这种方式相对更简单一些。

1
sudo docker exec -it 775c7c9ee1e1 /bin/bash

exec和attach的区别:

  • attach:直接进入容器启动命令的终端,不会启动新的进程;
  • exec:在容器中打开新的终端,并且可以启动新的进程,可在宿主机中直接执行操作容器的命令,比如docker exec -it 7f237caad43b ls /tmp列出容器 /tmp 目录下的文件

容器和宿主机互相拷贝文件

宿主机拷贝文件到容器:

1
2
3
4
docker cp 文件 container-id:目标文件/文件夹
eg.
docker cp /tmp/suzhuji.txt 7f237caad43b:/tmp
将宿主机tem文件夹下suzhujia.txt文件拷贝到容器7f237caad43b中tmp目录中

从容器拷贝文件到宿主机:

1
2
3
4
docker cp container-id:目标文件/文件夹 宿主机目标文件/文件夹
eg.
docker cp 7f237caad43b:/tmp/yum.log /tmp
将容器7f237caad43b中tmp目录下yum.log拷贝到宿主机/tmp目录下

停止、启动、重启容器

1
2
3
4
5
6
7
8
9
10
# 通过以下命令来停止运行中的容器,停止后需要使用docker start命令来重新启动
docker stop container-name/container-id
# 强制停止容器(类似强制关机):
docker kill container-name/container-id
# 停止所有正在运行的容器
docker kill $(docker ps -q)
# 通过以下命令启动容器:
docker start container-name/container-id
# 通过以下命令启动容器:
docker restart container-name/container-id

容器退出(或者显示调用docker stop)后会进入终止(exited)状态,此时可以通过 docker ps -a 查看,其中数据不会丢失,还可以通过docker start 来启动,只有删除容器才会清除数据。

删除容器

1
2
3
4
5
6
# 删除单个容器:
docker rm container-id
# 删除多个容器:
docker rm container-id container-id
# 删除所有容器:
docker rm $(docker ps -a -q )

另,docker rm -参数含义:

  • -f:强制删除,如果在运行中,先停止,再删除

查看容器日志

1
2
# 查看当前容器日志,可通过如下命令:
docker logs container-id/container-name

另,docker logs -参数含义:

  • -t:加入时间戳;
  • -f:跟随最新的日志打印;
  • –tail:显示最后多少条。

端口映射

容器中可以运行一些网络应用,要让外部也可以访问这些应用,可以通过 -P(大写) 或 -p (小写) 参数来指定端口映射。启动容器的时候如果不指定对应参数,在容器外部是无法通过网络来访问容器内的网络应用和服务的。
Docker 的端口映射通过 -p 或 -P 参数实现,-p 和 -P 区别为:

  • -P : 随机映射一个49000~49900的端口到内部容器开放的网络端口
  • -p : 可以指定要映射的IP和端口,但是在一个指定端口上只可以绑定一个容器
1
2
3
# 把主机端口 8888 请求映射到 Docker 容器内部端口 8080
docker run --name tomcat1 -d tomcat
docker run --name tomcat2 -d -p 8888:8080 tomcat

使用docker ps查看端口映射情况,访问localhost:8080和localhost:8888,发现前者无法访问,后者可以。

端口映射格式

  1. ip:hostport:containerport #指定ip、指定主机port、指定容器port
    1
    2
    docker run -d -p 127.0.0.1:5000:5000 training/webapp python app.py
    指定映射使用一个特定地址,比如 localhost地址 127.0.0.1
  2. ip::containerport #指定ip、未指定主机port、指定容器port
    1
    2
    3
    4
    docker run -d -p 127.0.0.1::5000 training/webapp python app.py
    绑定 localhost 的任意端口到容器的 5000 端口,本地主机会自动分配一个端口
    还可以使用 udp 标记来指定 udp 端口
    docker run -d -p 127.0.0.1:5000:5000/udp training/webapp python app.py
  3. hostport:container #未指定ip port、指定主机port、指定容器port
    1
    2
    docker run -d -p 5000:5000 training/webapp python app.py
    将本地的 5000 端口映射到容器的 5000 端口,默认会绑定本地所有接口上的所有地址

查看端口映射

1
2
# 可以通过如下命令查看容器映射了哪些端口及协议:
docker port container-id

如果返回空表示没有进行端口映射。

QA

  1. docker ps(等其他指令)没有响应

我的情况比较简单,是因为云主机内存不够了,所以没有响应。但是如果是某个容器出问题了就需要另外想办法了:docker之 docker ps无响应

  1. 连接进入docker

https://stackoverflow.com/questions/30172605/how-to-get-into-a-docker-container

  1. 查看Docker日志

https://docs.docker.com/config/daemon/#read-the-logs

https://takacsmark.com/docker-logs/

docker logs <Container-ID>docker service logs <Service-ID> 命令能查看某一容器或服务的日志,但是怎么查看Docker守护进程的执行日志?

docker logs 和 docker service logs 命令会显示类似于终端中交互式运行命令的输出。UNIX 和 Linux 命令在运行时通常会打开三个 I/O 流,分别称为 STDIN,STDOUT 和 STDERR。STDIN 是命令的输入流,可能包括来自键盘的输入或来自另一个命令的输入。STDOUT 通常是命令的正常输出,而 STDERR 通常用于输出错误消息。默认情况下,docker logs 显示命令的 STDOUT 和 STDERR。

https://stackoverflow.com/questions/30969435/where-is-the-docker-daemon-log

比如Ubuntu:sudo journalctl -fu docker.service

查看已经退出的容器的日志

  1. 运行容器直接退出

使用docker-logs命令(或者使用docker ps -a查看正在运行中的容器)发现容器启动后直接退出,原因是Docker容器后台运行,就必须有一个前台进程,进程结束,容器就会退出,如果不是那些一直挂起的命令(eg. top, tail等),就会自动退出。

正常情况下,启动服务只需启动相应的service即可,例如:service nginx start && service php5-fpm start,但是这样做的话nginx和fpm均以后台进程模式运行,就会导致docker前台没有正在运行的应用,因此容器会立即自杀,因为已经没有事情能做了。

* 将要运行的程序以前台进程的形式运行,如果容器需要同时启动多个进程,那么只需要将其中的一个挂起到前台即可,比如对上面所说的 web 容器,只需要将启动指令修改为`service php5-fpm start && nginx -g "daemon off;"`

* 对于不知道怎么在前台运行的程序,只需要在启动的命令后面添加类似tail、top这种可以前台运行的程序,以上面所说的web容器为例,可以写成`service nginx start && service php5-fpm start && tail -f /var/log/nginx/error.log`,又如,在启动centos/ubuntu容器的时候,可以做一个死循环,持续输出任意内容,这样容器就不会认为没事可做而自杀了:`docker run -d centos /bin/sh -c "while true; do echo hello world; sleep 1; done"`。

* 可以加上-it选项,-i 开启了input(输入)功能 -t开启了一个连接容器里边的terminal(终端)(是不是相当于在容器内开启了一个bash进程?),如果希望在后台运行,可以再加上-d选项。

一般来说我们不推荐在同一个容器内运行多个应用程序,这会影响容器的效率,因此最后一种方法是最好的。

  1. 日志中出现 not support swap limit capabilities

https://segmentfault.com/q/1010000002888521

  1. 开启服务失败(elasticsearch)

在查看日志(journalctl -fu docker.service)的时候发现是内存不够用,在docker-compose.yml中去掉resources配置项就可以了。

如果没有设置限制,也有可能是物理机本身内存不够了,实在不行可以创建swap来凑空间。

  1. 使用远程主机作为docker-machine时“connection refused”

调用docker-machine env <docker-machine名>出现连接失败的情况

查看了一些资料但并没有什么卵用,/etc/default/docker这个文件根本没有被读进去。

据说是因为docker的daemon默认监听fd,而远程连接时用的是tcp。

根据这里的说明修改配置文件,但是发现无法重启docker服务,查看daemon日志发现-H的配置项重复了,而/etc/systemd/system/下并没有发现配置文件,在这里了解到原来配置文件还有一个位置/lib/systemd/system/,将相关内容(”-H fd://“)去掉就行了。

但是这样配置完毕后仍然不能在ECS上创建Docker Machine,最后采用了generic驱动成功了,具体配置见我的另一篇[App & Cloud]。

就优先级来说,/etc/docker/daemon.json > /etc/systemd/system/ > /lib/systemd/system/,优先级高的会覆盖低的。

  1. 对阿里云美西节点 docker-machine create 时 tls: DialWithDialer timed out

刚开始我也试着regenerate-certs了,但没什么卵用

在stackoverflow上查不到相关的讨论

官网相关页面上也没有。

github上对这个问题的讨论,也有了解决,在代码里把超时时间改大了,但还是写死的,我在连接美国服务器时仍然超时。

有的人提议提议将超时时间作为一个命令行参数提供出来,但是下面一个傻逼一直坚持说“suggest retrying”。

最后还是放弃了在这个节点上部署docker。

PS:在这里看到开发者的想法:用户是愚蠢的,不能把危险的选项公布出来,所以TLS任何方法不能关闭!WTF!!!随便搜索了一遍releases列表,没有发现对TLS的更改,所以我彻底放弃了。

  1. 如何让Docker启动时执行自定义命令/脚本?

如果官方提供的Dockerfile已经内置了这个功能是最好的(比如MySQL官方Dockerfile),如果没有的话,就需要自定义Dockerfile了(How to run bash command after startup?),ENTRYPOINT和CMD命令都可以用于执行命令,CMD会被接到ENTRYPOINT后面,主要用于提供一些可变的参数,docker run后面的参数其实就是CMD。

在docker-compose.yml文件中不适合自定义执行命令,command命令可以定义启动时命令,但是只能定义一条,我尝试command: a.sh && b.sh,结果也只是执行了前面的指令,所以就不要白费力气了。

  1. docker 中怎么修改应用的配置?

    • 官方镜像已有提供该功能

    • 使用volumes将宿主机文件夹挂载到容器内

    • 自定义镜像

  2. 怎么查看容器的启动命令

    • 在宿主机上docker inspect

    • 在容器内部ps -fe,其中1号进程就是启动命令(有可能出现ps命令找不到的情况)

    • 查看这个镜像的Dockerfile,其中的ENTRYPOINT和CMD指定了容器的运行时执行命令

  3. 如何临时退出一个正在交互的容器的终端,而不终止它?

按Ctrl+p,后按Ctrl+q,如果按Ctrl+c会使容器内的应用进程终止,进而会使容器终止。

  1. 使用docker port 命令映射容器的端口时,系统报错Error: No public port ‘80’ published for …,是什么意思?

创建镜像时Dockerfile要指定正确的EXPOSE的端口,容器启动时指定PublishAllport=true

  1. 可以在一个容器中同时运行多个应用进程吗?
    一般不推荐在同一个容器内运行多个应用进程,如果有类似需求,可以通过额外的进程管理机制,比如supervisord来管理所运行的进程

  2. 如何控制容器占用系统资源(CPU,内存)的份额?
    在使用docker create命令创建容器或使用docker run 创建并运行容器的时候,可以使用-c|–cpu-shares[=0]参数来调整同期使用CPU的权重,使用-m|–memory参数来调整容器使用内存的大小。

参考

容器

  1. 命令
  2. Docker 分配宿主机网段 IP
  3. docker容器网络通信原理分析
  4. Service 之间如何通信?- 每天5分钟玩转 Docker 容器技术(101)

容器编排

Swarm基本使用

  1. Play with Docker classroom: Service Discovery under Docker Swarm Mode

  2. 运维之我的docker-swarm集群中删除节点和服务

  3. docker swarm 搭建及跨主机网络互连案例分析

  4. Docker Swarm 入门一篇文章就够了

Swarm运维

  1. Docker 引擎的 Swarm 模式:入门教程

  2. 生产环境中使用Docker Swarm的一些建议

  3. 云计算之路-阿里云上:重启 manager 节点引发 docker swarm 集群宕机

Swarm原理

  1. Consul入门

  2. Service Discovery with Docker and Consul: part 1

  3. Docker Reference Architecture: Universal Control Plane 2.0 Service Discovery and Load Balancing

  4. 浮动IP(FLOAT IP)

持续部署

  1. Docker持续部署图文详解

服务发现

  1. docker swarm获取客户端IP

OpenStack(开源云计算平台)

  1. KVM and Docker LXC Benchmarking with OpenStack

  2. OpenStack

  3. OpenStack-Docker-wiki

客户端

  1. spotify/docker-client
  2. how to copy file from file system into running container
  3. 单元测试 DefaultDockerClientTest.java
0%