Tallate

该吃吃该喝喝 啥事别往心里搁

JVM结构
JVM 特指用于运行由 Java 虚拟机规范规定的字节码的运行时系统,它具有以下特征:

  • 平台无关性
    JVM 本身是平台相关的,只是对不同平台都有实现,所以我们编译成的字节码才可以在各个平台上执行,因此称 Java 平台无关指的是字节码是平台无关的。
  • 高度抽象
    JVM 提供了编译器、类加载、动态内存管理、线程等一套工具,作为虚拟机,程序经编译后运行在 JVM 上时,几乎完全不必担心平台的兼容性。
    本地方法接口只是作为一个扩展功能提供,并不是必须的,实际上除了对性能严格要求的场合,本地方法一般不会被使用。

    编译器严格来说和虚拟机并无太大关联,JVM 可以供 Groovy、Kotlin、Scala 等一系列语言作为运行时环境,

下图表示 JVM 的系统结构,包括与 Class 文件打交道的类装载系统、进行运行时数据管理的系统、进行方法执行的系统、本地方法接口(JNI)和本地方法库这些部分。

类装载子系统

类装载子系统负责查找并装载类型,类装载器主要包含两种:启动类装载器(Java 虚拟机实现的一部分)用户自定义类装载器(Java 程序的一部分)

启动类装载器 – Java 虚拟机必须有一个启动类装载器,用于装载受信任的类,如 Java API 的 class 文件。
用户自定义类装载器 – 继承自 ClassLoader 类,ClassLoader 的如下四个方法,是通往 Java 虚拟机的通道。

  • protected final Class defineClass(String name, byte data[], int offset, int length); // data 为二进制的 Java Class 文件数据,表示一个新的可用类型,之后把这个类型导入到方法区中
  • protected final Class defineClass(String name, byte data[], int offset, int length, ProtectionDomain protectionDomain);
  • protected final Class findSystemClass(String name); // 参数为全限定名,通过类装载器进行装载
  • protected final void resolveClass(Class c); // 参数为 Class 实例,完成连接初始化操作

类装载子系统负责定位和导入二进制 class 文件,并且保证导入类的正确性,为类变量分配并初始化内存,以及帮助解析符号引用。类装载器必须严格按照如下顺序进行类的装载。

  1. 装载 – 查找并装载类型的二进制数据
  2. 连接 – 执行验证,准备,以及解析(可选),连接分为如下三个步骤
    验证 – 确保被导入类型的正确性
    准备 – 为类变量(static)分配内存,并将其初始化为默认值
    解析 – 把类型中的符号引用转换为直接引用
  3. 初始化 – 把类变量初始化为正确初始值

内存管理系统

方法区

方法区是线程共享的内存区域,用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据,当方法区无法满足内存分配需求时,将抛出 OutOfMemoryError。
类信息包括:

  1. 类型全限定名。
  2. 类型的直接超类的全限定名(除非这个类型是 java.lang.Object,它没有超类)。
  3. 类型是类类型还是接口类型。
  4. 类型的访问修饰符(public、abstract、final 的某个子集)。
  5. 任何直接超接口的全限定名的有序列表。
  6. 类型的常量池。
  7. 字段信息。
  8. 方法信息。
  9. 除了常量以外的所有类(静态)变量。
  10. 一个到类 ClassLoader 的引用。
  11. 一个到 Class 类的引用。

着重介绍常量池,虚拟机必须要为每个被装载的类型维护一个常量池。常量池就是该类型所用常量的一个有序集合,包括直接常量和对其他类型、字段和方法的符号引用。它在 Java 程序的动态连接(运行时解析字节码中使用到的符号引用)中起着核心作用。
除了以上结构外,JVM 的实现者还可以添加一些其他的数据结构,如方法表:对每个加载的非抽象类的类信息中都添加一个方法表,方法表是一组对类实例方法的直接引用——包括从父类继承的方法,JVM 可以通过方法表快速找到实例方法。

C++中 virtual 函数是通过虚函数表实现的,Java 中所有方法都是 virtual 的,因此也就不需要再标注虚拟了,正像 Java 虽然宣称没有指针,但是 Java 里其实全是指针。Java 中类似的检查机制不少,这体现了 Java 的一种设计思想:始终将安全放在效率之上。

元空间

JDK1.8 的一大改动是方法区变成了元空间,其实就是放开了类空间的大小,容量取决于是 32 位或是 64 位操作系统的可用虚拟内存大小,32 位系统本地最多有 4G 虚拟内存空间,物理内存取决于操作系统的配置。新参数MaxMetaspaceSize用于限制本地内存分配给类元数据的大小,如果没有指定这个参数,元空间会在运行时根据需要动态调整,大小随意。对于没用的类及类加载器的垃圾回收将在元数据使用达到 MaxMetaspaceSize 参数的设定值时进行。

一个虚拟机实例只对应一个堆空间,堆是线程共享的,当然还有可能会划分出多个线程私有的分配缓冲区(TLAB)。堆空间是存放对象实例的地方,几乎所有对象实例都在这里分配。堆也是垃圾收集器管理的主要区域,因此也被称为“GC 堆”。
堆可以处于物理上不连续的内存空间中,只要逻辑上连续即可,就像磁盘空间一样。
当堆中没有足够的内存进行对象实例分配时,并且堆也无法扩展时,会抛出 OutOfMemoryError 异常。
对 OOM 的排查通常是先通过内存映像分析工具对 Dump 出来的堆转储快照进行分析,重点是确认内存中的对象是否是必要的,也就是要先分清楚到底是出现了内存泄漏还是内存溢出

  • 如果是内存泄漏,可进一步通过工具查看泄露对象到 GC Roots 的引用链。于是就能找到泄露对象的类型信息及 GC Roots 引用链的信息,就可以比较准确地定位出泄露代码的位置。
  • 如果不存在泄露,就是内存中的对象确实都还必须存活着,那就应当检查虚拟机的堆参数(-Xmx 与 -Xms),与机器物理内存对比看是否还可以调大,从代码上检查是否存在某些对象生命周期过长、持有状态时间过长的情况,尝试减少程序运行期的内存消耗。

程序计数器

每个线程拥有自己的程序计数器,线程之间的程序计数器互不影响,在任何一个确定的时刻,一个处理器(对于多核处理器来说是一个内核)都只会执行一条线程中的指令。
PC 寄存器的内容总是下一条将被执行指令的”地址”,这里的”地址”可以是一个本地指针,也可以是在方法字节码中相对于该方法起始指令的偏移量。

  • 如果该线程正在执行一个本地方法,则程序计数器内容为 undefined。
  • 此区域在 Java 虚拟机规范中没有规定任何 OutOfMemoryError 情况的区域。

虚拟机栈

我们一般会把 Java 内存区分为堆内存和栈内存,而所指的“栈”就是这里的虚拟机栈,或者说是虚拟机栈中局部变量表部分。
Java 栈也是线程私有的,虚拟机只会对栈进行两种操作,以帧为单位的入栈出栈。每个方法在执行时都会创建一个栈帧,并入栈,成为当前帧,每一个方法从调用直至执行完成的过程,就对应着一个栈帧在虚拟机栈中从入栈到出栈的过程。栈帧由三部分组成:局部变量区、操作数栈、帧数据区。
局部变量区被组织为一个以字长为单位、从 0 开始计数的数组。字节码指令通过从 0 开始的索引来使用其中的数据。类型为 int、float、reference 和 returnAddress 的值在数组中只占据一项,而类型为 byte、short 和 char 的值存入时都会转化为 int 类型,也占一项,而 long、double 则连续占据两项。
操作数栈也常被称为操作栈,它是一个后入先出栈。当一个方法刚刚执行的时候,这个方法的操作数栈是空的,在方法执行的过程中,会有各种字节码指令向操作数栈中写入和提取值,也就是入栈与出栈操作。在使用操作数栈时需要注意实例方法和类方法:

1
2
3
4
5
6
7
8
9
10
public class Example {
// 类方法
public static int classMethod(int i, long l, float f, double d, Object o, byte b) {
return 0;
}
// 实例方法
public int instanceMethod(char c, double d, short s, boolean b) {
return 0;
}
}

类方法和实例方法栈帧结构有所不同,从图中可以看到它们之间的区别:

  • 类方法帧里没有隐含的 this 引用,而实例方法帧中会隐含一个 this 引用;

并且注意:

  • byte,char,short,boolean 类型存入局部变量区的时候都会被转化成 int 类型值,当被存回堆或者方法区时,才会转化回原来的类型;
  • returnAddress 类型数据是指向字节码的指针(方法区的字节码指令),它只存在于字节码层面,与编程语言无关,我们在 Java 语言中是不会直接与 returnAddress 类型数据打交道的。

    每个线程所持有的程序计数器(pc)实际上就是 returnAddress 类型的数据,当线程执行 native 方法时,pc 中的值为 undefied。

  • 操作数栈被组织成一个以字长为单位的数组,它是通过标准的栈操作——即入栈和出栈来进行访问,而不是通过索引访问。入栈和出栈也会存在类型的转化;
  • 当一个方法执行完毕之后,要返回之前调用它的地方,因此在栈帧中必须保存一个方法返回地址。方法退出的过程实际上等同于把当前栈帧出栈,因此退出时可能执行的操作有:恢复上层方法的局部变量表和操作数栈,把返回值(如果有的话)压入调用都栈帧的操作数栈中,调用 PC 计数器的值以指向方法调用指令后面的一条指令等。
  • 每个栈帧都包含一个指向运行时常量池中该栈帧所属方法的引用,持有这个引用是为了支持方法调用过程中的动态连接。在 Class 文件的常量池中存有大量的符号引用,字节码中的方法调用指令就以常量池中指向方法的符号引用为参数。这些符号引用一部分会在类加载阶段或第一次使用的时候转化为直接引用,这种转化称为静态解析。另外一部分将在每一次的运行期期间转化为直接引用,这部分称为动态连接。

栈数据区存放一些用于支持常量池解析、正常方法返回以及异常派发机制的信息。即将常量池的符号引用转化为直接地址引用、恢复发起调用的方法的帧进行正常返回,发生异常时转交异常表进行处理。

  • 虚拟机规范允许具体的虚拟机实现增加一些规范里没有描述的信息到栈帧中,例如与高度相关的信息,这部分信息完全取决于具体的虚拟机实现。在实际开发中,一般会把动态连接,方法返回地址与其它附加信息全部归为一类,称为栈帧信息

在 Java 虚拟机规范中,规定了两种异常状况:如果线程请求的栈深度大于虚拟机所允许的深度,将抛出 StackOverflowError 异常;如果虚拟机栈可以动态扩展,当扩展时无法申请到足够的内存,就会抛出 OutOfMemoryError 异常。

系统分配给每个进程的内存是有限制的,除去 Java 堆、方法区、程序计数器,如果虚拟机进程本身耗费的内存不计算在内,剩下内存就由虚拟机栈和本地方法栈“瓜分”了。每个线程分配到的栈容量越大,可以建立的线程数量自然就越少,建立线程时就越容易把剩下的内存耗尽,出现虚拟机栈溢出的情况。

  • 如果线程请求的栈深度大于虚拟机所允许的最大深度,将抛出 StackOverflowError 异常,出现 StackOverflowError 异常时有错误栈可以阅读,栈深度在大多数情况下达到 1000~2000 完全没有问题,对于正常的方法调用(包括递归),这个深度在一半情况下完全够用。
  • 如果虚拟机在扩展栈(建立更多的线程)时无法申请到足够的内存空间,则抛出 OutOfMemoryError 异常,在不能减少线程数或者更换 64 位虚拟机的情况下,就只能通过减少最大堆和减少栈容量来换取更多的线程。

一般可以认为 Java 中的一切变量都是引用,实际的对象存储在堆中,但是基础类型比较特殊,它们可以直接使用操作符进行算数运算,不过基础类型也带来了一些问题,比如装箱拆箱过程中可能产生空指针异常。Java 中引入基础类型的动机应该是性能与空间利用率,毕竟使用引用时还得通过指针碰撞空闲列表方法(根据垃圾收集器不同而不同)来找到实际对象,而且基础类型本身占用的空间不多,如果用引用势必占用的空间会加倍。
基础类型并不是都分配在栈上,具体有以下几种情况:

  1. 在方法中声明的变量,即该变量是局部变量,每当程序调用方法时,系统都会为该方法建立一个方法栈,其所在方法中声明的变量就放在方法栈中,当方法结束系统会释放方法栈,其对应在该方法中声明的变量随着栈的销毁而结束,这就局部变量只能在方法中有效的原因
    在方法中声明的变量可以是基本类型的变量,也可以是引用类型的变量。
    • 当声明是基本类型的变量的时,其变量名及值(变量名及值是两个概念)是放在方法栈中
    • 当声明的是引用变量时,所声明的变量(该变量实际上是在方法中存储的是内存地址值)是放在方法的栈中,该变量所指向的对象是放在堆类存中的。
  2. 在类中声明的变量是成员变量,也叫全局变量,放在堆中的(因为全局变量不会随着某个方法执行结束而销毁)。
    同样在类中声明的变量即可是基本类型的变量 也可是引用类型的变量
    • 当声明的是基本类型的变量其变量名及值是放在堆中的
    • 引用类型时,其声明的变量仍然会存储一个内存地址值,该内存地址值指向所引用的对象。引用变量名和对应的对象仍然存储在相应的堆中

堆外内存

一个例子

1
2
3
4
5
// 分配一块1024字节的堆外内存
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
System.out.println(buffer.capacity());
buffer.putInt(0, 2018);
System.out.println(buffer.getInt(0));

为什么要使用堆外内存

主要是因为堆外内存在 IO 操作方面的优势,举一个例子:在通信中,将存在于堆内存中的数据 flush 到远程时,需要首先将堆内存中的数据拷贝到堆外内存中,然后再写入 Socket 中;如果直接将数据存到堆外内存中就可以避免上述拷贝操作,提升性能。类似的例子还有读写文件。
但是需要注意的是,直接访问堆外内存并不会比访问 Java 堆更快:哪个更快:Java 堆还是本地内存。堆外内存更适合用于操作大块的数据(>2G)、可以直接使用操作系统提供的本地 IO 进行操作。

可用的堆外内存额度

  1. 可以通过设定 -XX:MaxDirectMemorySize 来显式指定最大的堆外内存。
  2. 设定 -Dsun.nio.MaxDirectMemorySize 来显式指定:如果该属性为-1,则取directMemory = Runtime.getRuntime().maxMemory(),即 JVM 运行时的最大内存;否则,如果指定这个属性等于-1,则默认为 64M。
    如果是 JDK8,具体代码见VM.saveAndRemoveProperties(),如果是 JDK8 之前,则代码见VM.maxDirectMemory()
    另外,Runtime.getRuntime().maxMemory()是一个 native 方法,HotSpot 中对应的 C++代码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    JNIEXPORT jlong JNICALL
    Java_java_lang_Runtime_maxMemory(JNIEnv *env, jobject this)
    {
    return JVM_MaxMemory();
    }

    JVM_ENTRY_NO_ENV(jlong, JVM_MaxMemory(void))
    JVMWrapper("JVM_MaxMemory");
    size_t n = Universe::heap()->max_capacity();
    return convert_size_t_to_jlong(n);
    JVM_END
    其中 max_capacity() 实际返回的是 –Xmx 设置值减去一个 survivor space 的预留区大小,与堆内大小存很接近。

源码

  1. 分配
    ByteBuffer.allocateDirect
    DirectByteBuffer(int)
    -> Bits.reserveMemory 在系统中保存总分配内存(按页分配)的大小和实际内存的大小
    -> Bits.tryReserveMemory 判断系统内存(堆外内存)是否足够,Bits 有一个全局 totalCapacity 变量记录当前已经使用的堆外内存量
    -> JavaLangRefAccess.tryHandlePendingReference 非阻塞,将已经被 JVM 垃圾回收的 DirectBuffer 对象的堆外内存释放。
    -> System.gc 触发一次 FullGC,System.gc并不能保证 FullGC 被马上执行,如果设置了-XX:+DisableExplicitGC则这种显式的 GC 会被禁用
    -> Bits.tryReserveMemory 为了等待 FullGC 释放足够的空间,之后还会重试最多 9 次 tryReserveMemory,每次重试会 sleep 1, 2, 4, 8, 16, 32, 64, 128, 256ms,也就是说最多可以等 0.5s
    -> throw new OutOfMemoryError 分配空间失败,抛出 OOM 异常
    -> Unsafe.allocateMemory native 方法,通过 JNI 调用 C++函数分配堆外内存,并返回内存基地址
    -> Unsafe.setMemory(base, size, (byte) 0) 将分配的内存清零
    -> Cleaner.create 创建一个 Cleaner 用于释放堆空间
  2. 释放
    Cleaner.create Cleaner 内部维护了一个 static 的 Cleaner 对象的链表,create 调用创建了一个 Cleaner 对象并将其加入到该链表中
    -> new Cleaner(DirectByteBuffer, new Deallocator(base, size, cap))

Cleaner 实现 GC 回收堆外内存的原理是PhantomReference(虚引用),虚引用必须和引用队列(ReferenceQueue)一起使用,一般用于实现追踪垃圾收集器的回收动作。虚引用不会影响 JVM 是否要 GC 这个对象的判断,当 GC 某个对象时,如果有此对象上还有虚引用,会将虚引用对象插入 ReferenceQueue 队列。
对于 Cleaner 对象,当 GC 时发现它除了虚引用外已不可达(持有它的 DirectByteBuffer 对象在 GC 中被回收了,此时,只有 Cleaner 对象唯一保存了堆外内存的数据),就会把它放进 Reference 类的 pending 静态变量里。
PhantomReference 的父类是 Reference,Reference 类内部的 static 块会启动 ReferenceHandler 线程,线程优先级很高,这个线程是用来处理 JVM 在 GC 过程中交接过来的 reference(pending)。
下面是该线程的大致执行流程:
ReferenceHandler.run() 死循环处理 JVM 提交的 reference,如果是 Clearner 则调用其 clean 方法
-> Cleaner.clean
-> Cleaner.remove(Cleaner) 首先将 Cleaner 从链表中移除,以便 Cleaner 自身可被 GC 回收掉
-> DirectByteBuffer.Deallocator.run()
-> Unsafe.freeMemory 释放分配的堆外内存
-> Bits.unreserveMemory 更新 Bits.totalCapacity

手动释放堆外内存

如前面所述,可以通过编码调用 DirectByteBuffer 的 cleaner 的 clean 方法来释放堆外内存。但需要注意:cleaner 是 private 访问权限,所以,需使用反射来实现。

MappedByteBuffer

MappedByteBuffer 是用来访问大文件的,其实是利用操作系统的 mapedfile 的机制,把一个大文件映射到物理内存,然后用户进程像访问内存一样访问文件,背后操作系统会把磁盘文件映射到内存中来,这是操作系统预估你想存取哪块提前为你准备好的。

本地方法栈

访问本地方式时使用到的栈,为本地方法服务,与虚拟机栈一样,本地方法区域也会抛出 StackOverflowError 和 OutOfMemoryError 异常。

方法执行引擎

用户所编写的程序如何表现正确的行为需要执行引擎的支持,执行引擎执行字节码指令,完成程序的功能。

本地方法接口(JNI)

本地方法接口称为 JNI,是为可移植性准备的。

参考

  1. 《Hotspot 实战》
  2. GC 性能优化
  3. JVM 菜鸟进阶高手之路
  4. The Java® Virtual Machine Specification
  5. The Java® Language Specification
  6. Java 和操作系统交互细节

了解 JVM 中内存管理的原理后,下一步就是如何调优了。

JVM参数

-D、-X、-XX 区别

其一是标准参数(-),所有的 JVM 实现都必须实现这些参数的功能,而且向后兼容;(包括-D 等)JVM 的标准参数都是以”-“开头,通过输入”java -help”或者”java -?”,可以查看 JVM 标准参数列表
其二是非标准参数(-X),默认 jvm 实现这些参数的功能,但是并不保证所有 jvm 实现都满足,且不保证向后兼容;但是列出来的都是当前可用的。
其三是非 Stable 参数(-XX),此类参数各个 jvm 实现会有所不同,将来可能会随时取消,需要慎重使用;

栈大小

  • Xss512k:用来设置每个线程的堆栈大小;

堆空间结构调整

堆空间结构调整

  • Xmx4g:JVM 最大允许分配的堆内存,按需分配;
  • Xms4g:JVM 初始分配的堆内存,一般和 Xmx 配置成一样以避免每次 gc 后 JVM 重新分配内存;
  • XX:MetaspaceSize=64m 初始化元空间大小;
  • XX:MaxMetaspaceSize=128m 最大化元空间大小。

Metaspace 建议不要设置,一般让 JVM 自己启动的时候动态扩容就好了,没必要自己去设置。如果不动态加载 class ,当启动起来的时候,一般是很少有变化的。
从这个角度我们可以认为我们的 JVM 内存的大小是堆+Metaspace+io(运行时产生的大小)。

  • -Xms[g|m|k]
  • -Xmx[g|m|k] 堆大小
  • -Xmn[g|m|k] 年轻代大小
  • -XX:NewRatio=老年代/新生代 比例
  • -XX:MaxMetaspaceSize=[unit] 元空间
  • -XX:NewSize=[unit] 初始年轻代大小
  • -XX:SurvivorRatio= # 代表分代回收算法新生代中 Eden:Survivor 的比例,注意 Survivor 是有两块的,如果 Eden:Survivor = 3,则新生代将被分割为 5 份,Eden 占 3 份

堆的最大和最小限制,网上很多资料都将二者设置成一样的值,我觉得这是不好的习惯,因为 JVM 只有在内存使用量达到-Xms 的值时才会开始 gc,设置成一样的值也就是说只有在 JVM 使用完内存后才会开始 gc,这会导致最大暂停时间偏长,用户体验不好,当然设置成一样也可以减轻堆伸缩带来的压力。当然这些都是直观的看法,根据 [3] 的说法,这个调整策略和 JVM 中使用的垃圾回收算法相关,如果是 IBM JVM(采用 sweep-compact),设置不一样较好;如果是 Sun JVM(采用分代回收),设置成一样较好。

选择哪个 GC 收集器

  • -XX:+UseSerialGC
  • -XX:+UseParallelGC
  • -XX:+USeParNewGC
  • -XX:+UseG1GC
  • -XX:-UseConcMarkSweepGC
    对老生代采用并发标记交换算法进行 GC

GC 的时候生成文件

  • -XX:+UseGCLogFileRotation 用文件循环策略
  • -XX:NumberOfGCLogFiles=10 最大文件数
  • -XX:GCLogFileSize=50M 文件大小
  • -Xloggc:/home/user/log/gc.log 位置

OOM后排错

  • -XX:+HeapDumpOnOutOfMemoryError 打出 dump 文件当发生 out of memory
  • -XX:HeapDumpPath=./java_pid.hprof 文件名
  • -XX:OnOutOfMemoryError=”< cmd args >;< cmd args >” 发生异常的时候执行的命令
  • -XX:+UseGCOverheadLimit 在 GC 之前

Java Management Extensions

Java 管理扩展,打开 JMX 端口,就可以用标准的端口来监控 server 端的 jvm 了。
-Djava.rmi.server.hostname=
-Dcom.sun.management.jmxremote.port=
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false

-XX:+PrintFlagsFinal
-XX:+PrintFlagsInitial

生产环境参数设置

  1. 实际生产中很少去设置 gc 相关的详细参数,一般只要把 thread dump 处理好(及异常的时候生成 demp 文件)和 jmx 端口打开;
  2. 能设置好几个分代的内存空间就不错了。这个可以通过 jvm 的监控来设置根据 cpu 和 gc 的情况;
  3. 因为随着 JVM 的版本的升级,jvm 垃圾回收会也来越智能,但是我们必须要了解这些,因为面试的时候大牛为了显摆自己会问这些问题。
  4. 根据我们上面介绍的就算你设置+XX 有的时候也不一定有用,说不定哪个小版本里面就失灵了。jdk1.8 关于 gc 的最多开启+XX:+UseConcMarkSweepGC。

调优工具

对 JVM 的监控可以分为以下几个方面:

  • 内存状况分析(GC)
  • 线程状态分析

关于 GC 的监控,比较重要的有三个方面:

  • 各个区的容量,主要是堆中新生代与老年代的内存分配。
  • Full GC、Young GC 发生的次数,原则上尽量避免发生 Full GC,Young GC 能少则少。
  • 当前系统的内存比、CPU 使用率。

jps

jps 列出正在运行的虚拟机进程
包括 PID、主类、jar 包等

1
jps -ml

jinfo

Java 配置信息工具,并且支持运行时动态修改部分参数
查看某些配置值或开关是否打开:

1
2
jinfo -flag MaxTenuringThreshold 8737
jinfo -flag PrintGCDetails 8737

打开开关

1
2
jinfo -flag MaxTenuringThreshold=10 8737
jinfo -flag +PrintGCDetails 8737

查看虚拟机的默认配置参数还可以在运行时打开虚拟机的 PrintFlagsFinal 开关

1
java -XX:+PrintFlagsFinal Test

jstat

常用选项

1
jstat -gcutil # 垃圾收集统计数据

统计数据列含义:

数据列 描述 支持的 jstat 选项
S0C Survivor0 的当前容量 -gc -gccapacity -gcnew -gcnewcapacity
S1C S1 的当前容量 -gc -gccapacity -gcnew -gcnewcapacity
S0U S0的使用量 -gc-gcnew
S1U S1 的使用量 -gc-gcnew
EC Eden 区的当前容量 -gc -gccapacity -gcnew -gcnewcapacity
EU Eden 区的使用量 -gc -gcnew
OC old 区的当前容量 -gc -gccapacity -gcnew -gcnewcapacity
OU old区的使用量 -gc-gcnew
PC 方法区的当前容量 -gc-gccapacity -gcold -gcoldcapacity -gcpermcapacity
PU 方法区的使用量 -gc -gcold
YGC Young GC 次数 -gc -gccapacity -gcnew -gcnewcapacity -gcold -gcoldcapacity -gcpermcapacity -gcutil -gccause
YGCT Young GC 累积耗时 -gc -gcnew -gcutil -gccause
FGC Full GC次数 -gc -gccapacity -gcnew -gcnewcapacity -gcold -gcoldcapacity -gcpermcapacity -gcutil -gccause
FGCT Full GC 累积耗时 -gc-gcold -gcoldcapacity -gcpermcapacity -gcutil -gccause
GCT GC 总的累积耗时 -gc -gcold -gcoldcapacity -gccapacity -gcpermcapacity -gcutil -gccause
NGCMN 新生代最小容量 -gccapacity -gcnewcapacity
NGCMX 新生代最大容量 -gccapacity -gcnewcapacity
NGC 新生代当前容量 -gccapacity -gcnewcapacity
OGCMN 老年代最小容量 -gccapacity -gcoldcapacity
OGCMX 老年代最大容量 -gccapacity -gcoldcapacity
OGC 老年代当前容量 -gccapacity -gcoldcapacity
PGCMN 方法区最小容量 -gccapacity -gcpermcapacity
PGCMX 方法区最大容量 -gccapacity -gcpermcapacity
PGC 方法区当前容量 -gccapacity -gcpermcapacity
PC 方法区的当前容量 -gccapacity -gcpermcapacity
PU 方法区使用量 -gccapacity -gcold
LGCC 上一次 GC 发生的原因 -gccause
GCC 当前 GC 发生的原因 -gccause
TT 存活阀值,如果对象在新生代移动次数超过此阀值,则会被移到老年代 -gcnew
MTT 最大存活阀值,如果对象在新生代移动次数超过此阀值,则会被移到老年代 -gcnew
DSS survivor 区的理想容量 -gcnew

jmap

1
2
jmap -histo pid -F
jmap -dump:format=b, file=heap.bin pid

JConsole

JVisualVM

GC日志

关于输出 GC 日志的参数有以下几种:

  • -XX:+PrintGC 输出 GC 日志
  • -XX:+PrintGCDetails 输出 GC 的详细日志
  • -XX:+PrintGCTimeStamps 输出 GC 的时间戳(以基准时间的形式)
  • -XX:+PrintGCDateStamps 输出 GC 的时间戳(以日期的形式,如 2013-05-04T21:53:59.234+0800)
  • -XX:+PrintHeapAtGC 在进行 GC 的前后打印出堆的信息
  • -Xloggc:../logs/gc.log 日志文件的输出路径

比如对 PrintGCDetails 这个参数:

1
2
3
4
5
6
7
8
9
public class GCLogTest {
public static void main(String[] args) {
int _1m = 1024 * 1024;
byte[] data = new byte[_1m];
// 将data置null让其可被回收
data = null;
System.gc();
}
}

在 IDE 中设置 VM 参数-XX:+PrintGCDetails,再运行,可以得到:

1
2
3
4
5
6
7
8
9
10
11
12
[GC (System.gc()) [PSYoungGen: 5051K->776K(38400K)] 5051K->784K(125952K), 0.0014035 secs] [Times: user=0.01 sys=0.00, real=0.00 secs] 
[Full GC (System.gc()) Disconnected from the target VM, address: '127.0.0.1:55472', transport: 'socket'
[PSYoungGen: 776K->0K(38400K)] [ParOldGen: 8K->684K(87552K)] 784K->684K(125952K), [Metaspace: 2980K->2980K(1056768K)], 0.0040080 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]
Heap
PSYoungGen total 38400K, used 333K [0x0000000795580000, 0x0000000798000000, 0x00000007c0000000)
eden space 33280K, 1% used [0x0000000795580000,0x00000007955d34a8,0x0000000797600000)
from space 5120K, 0% used [0x0000000797600000,0x0000000797600000,0x0000000797b00000)
to space 5120K, 0% used [0x0000000797b00000,0x0000000797b00000,0x0000000798000000)
ParOldGen total 87552K, used 684K [0x0000000740000000, 0x0000000745580000, 0x0000000795580000)
object space 87552K, 0% used [0x0000000740000000,0x00000007400ab0d0,0x0000000745580000)
Metaspace used 2988K, capacity 4568K, committed 4864K, reserved 1056768K
class space used 318K, capacity 392K, committed 512K, reserved 1048576K

第一行是 YoungGC,其结构如下图所示:
JVM-YoungGC日志
第二行是 FullGC,其结构如下图所示:
JVM-FullGC日志

  • GC日志开头的[GC[Full GC说明了这次垃圾收集的停顿类型,如果有Full,说明这次 GC 发生了Stop-The-World。因为是调用了System.gc()方法触发的收集,所以会显示[Full GC (System.gc()),不然是没有后面的(System.gc())的。
  • [PSYoungGen[ParOldGen是指 GC 发生的区域。
  • 在方括号中PSYoungGen:后面的5051K->776K(38400K)代表的是GC前该内存区域已使用的容量->GC后该内存区域已使用的容量(该内存区域总容量)
  • 在方括号之外的5051K->784K(125952K)代表的是GC前Java堆已使用容量->GC后Java堆已使用容量(Java堆总容量),注意已使用容量是减掉一个 Servivor 区、线程栈等区域后的大小。
  • 再往后的0.0014035 secs代表该内存区域 GC 所占用的时间,单位是秒。
  • 再后面的[Times: user=0.01 sys=0.00, real=0.00 secs],user 代表进程在用户态消耗的 CPU 时间,sys 代表进程在内核态消耗的 CPU 时间、real 代表程序从开始到结束所用的时钟时间。这个时间包括其他进程使用的时间片和进程阻塞的时间(比如等待 I/O 完成)。
  • 至于后面的eden代表的是 Eden 空间,还有fromto代表的是 Survivor 空间。

问题排查

OOM

有两种情况可能导致 OOM:

  1. 内存泄露(Memory Leak),某些对象是需要被释放的但是却由于某些原因释放不了,查看泄露对象对 GC Roots 的引用链,再追本溯源进行分析;
  2. 内存溢出(Memory Overflow),对象太多了导致堆放不下了,查看堆是否可以调大,或者某些对象活太久了。

产生 OutOfMemoryError 错误的具体原因有以下几种:

  • java.lang.OutOfMemoryError: Java heap space 表示 Java 堆空间不足。当应用程序申请更多的内存时,若 Java 堆内存已经无法满足应用程序的需要,则将抛出这种异常。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /*
    VM Args: -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError
    */
    public class HeapOOM {
    static class OOMObject {}

    public static void main(String[] args) {
    List<OOMObject> list = new ArrayList<>();
    while(true) {
    list.add(new OOMObject());
    }
    }
    }
  • java.lang.OutOfMemoryError: PermGen space,表示 Java 永久代(方法区)的空间不足。永久代用于存放类的字节码和常量池,类的字节码被加载后存放在这个区域,这和存放对象实例的堆区是不同的。大多数 JVM 的实现都不会对永久代进行垃圾回收,因此,只要类加载过多就会出现这个问题。一般的应用程序都不会产生这个错误,然而,对于 Web 服务器会产生大量的 JSP,JSP 在运行时被动态地编译为 Java Servlet 类,然后加载到方法区,因此,有很多 JSP 的 Web 工程可能会产生这个异常。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /*
    VM Args: -XX:PermSize=10M -XX:MaxPermSize=10M
    */
    public class RuntimeConstantPoolOOM {
    public static void main(String[] args) {
    // 使用List保持对常量池的引用,避免Full GC回收常量池
    List<String> list = new ArrayList<String>();
    for(int i = 0;; i++) {
    list.add(String.valueOf(i).intern());
    }
    }
    }
    使用 intern 测试运行时常量池是“永久代”的还是“元空间”的:
    1
    2
    3
    4
    String str1 = new StringBuilder("计算机").append("软件").toString();
    System.out.println(str1.intern() == str1);
    String str2 = new StringBuilder("ja").append("va").toString();
    System.out.println(str2.intern() == str2);
    方法区溢出测试(使用 CGLib 动态生成类):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    /*
    VM Args: -XX:PermSize=10M -XX:MaxPermSize=10M
    */
    public class MethodAreaOOM {
    static class OOMObject {}
    public static void main(String[] args) {
    while(true) {
    Enhancer enhancer = new Enhancer();
    enhancer.setSuperclass(HeapOOM.OOMObject.class);
    enhancer.setUseCache(false);
    enhancer.setCallback(new MethodInterceptor() {
    @Override
    public Object intercept(Object o, Method method,
    Object[] objects, MethodProxy proxy) throws Throwable {
    return proxy.invokeSuper(o, args);
    }
    });
    enhancer.create();
    }
    }
    }
    jdk1.6 及之前版本,因为 HotSpot 实行“永久代”(PermGen),方法区保存到永久代中,虽然逻辑上属于堆,但是在这块空间上并没有实行 GC。在这种情况下,上面代码发生堆溢出是必然的。
    jdk1.7 及之后版本,因为“去永久代”,引入了“元空间”(Metaspace),这种策略使得大部分类元数据都保存在本地内存中,元空间使用一个全局空闲组块列表(一个大数组)表示,每创建一个类加载器都会从这个列表中获取一个自己的组块,用处当然是存储元信息(指针碰撞方式分配),当一个类加载器不再活动后,其所持有的组块列表也就返还给全局组块列表了,也就是说,类也是可能会被 GC 回收掉的。
    运行时常量池是分配于方法区的,所以可以这么认为:1.6 及之前常量是分配在一个“全局静态区”的,而 1.7 及之后则在堆中分配。
    运行时常量池导致的溢出不常见,上面的例子感觉也有点极端。
    方法区导致的溢出在实际应用中常见:一些框架 Spring、Hibernate 在对类进行增强时会使用到 CGLib 这类字节码技术;JVM 上的动态语言(Groovy)会持续创建类来实现语言的动态特性;拥有大量 JSP 页面或会动态生成 JSP 文件的应用(JSP 第一次运行时会被编译为 Servlet);基于 OSGi 的应用(即使是同一个类文件,被不同的加载器加载也会视为不同的类)等。
  • java.lang.OutOfMemoryError: unable to create new native thread,本质原因是创建了太多的线程,而系统允许创建的线程数是有限制的。
  • java.lang.OutOfMemoryError:GC overhead limit exceeded,是并行(或者并发)垃圾回收器的 GC 回收时间过长、超过 98%的时间用来做 GC 并且回收了不到 2%的堆内存时抛出的这种异常,用来提前预警,避免内存过小导致应用不能正常工作。
  • 栈溢出
    可以先使用-Xoss 参数设置本地方法栈大小(在 HotSpot 中无效,因为它将两个栈合并了),-Xss 参数设置栈容量大小,设得稍微小一些都没有问题。
    实验非常简单,就是定义并调用一个无限递归的方法,在调用深度到达一定程度后就会报错,并且使用一个 stackLength 成员变量记录栈深度
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    /*
    VM Args: -Xss228k
    */
    public class StackSOF {
    private int stackLength = 1;

    public void stackLeak() {
    stackLength++;
    stackLeak();
    }
    public static void main(String[] args) {
    StackSOF oom = new StackSOF();
    try {
    oom.stackLeak();
    } catch (Throwable e) {
    System.out.println("stack length:" + oom.stackLength);
    throw e;
    }
    }
    }
    /*
    VM Args: -Xss2M
    */
    public class StackOOM {
    private void dontstop() {
    while(true) {}
    }
    public void stackLeakByThread() {
    while(true) {
    new Thread(new Runnable() {
    @Override
    public void run() {
    dontstop();
    }
    }).start();
    }
    }
    public static void main(String[] args) {
    new StackOOM().stackLeakByThread();
    }
    }
    有两种可能的错误,第一种是线程请求的栈深度超出了虚拟机的允许范围,会产生 StackOverflowError 异常,第二种是虚拟机在扩展栈时无法申请到足够空间,会产生 OutOfMemoryError 异常。
    注意,一个 java 进程的内存容量是由操作系统决定的,Windows 下限制为 2GB,减去 Xmx(最大堆容量)、MaxPermSize(最大方法区容量)、程序计数器(很小)、虚拟机本身耗费的内存,剩下的就由虚拟机栈和本地方法栈瓜分了。
    前者比较容易找出错误,因为会有错误堆栈可以分析;
    后者往往是因为线程分配过多了,导致操作系统分配的内存用尽,事实上,每个线程的主要空间都被栈(虚拟机栈和本地方法栈)占用了,所以为了可以分配更多的线程,可以减少最大堆容量或者减少栈容量。
  • 本机直接内存溢出
    这个例子比较复杂,首先-XX:MaxDirectMemorySize 指定了直接内存大小(默认是-Xmx),然后越过了 DirectByteBuffer,反射获取 Unsafe 实例进行内存分配,allocateMemory 等价于 malloc
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /*
    VM Args: -Xmx20M -XX:MaxDirectMemorySize=10M
    */
    public class DirectMemoryOOM {
    private static final int _1MB = 1024 * 1024;

    public static void main(String[] args) throws IllegalAccessException {
    Field unsafeField = Unsafe.class.getDeclaredFields()[0];
    unsafeField.setAccessible(true);
    Unsafe unsafe = (Unsafe) unsafeField.get(null);
    while(true) {
    unsafe.allocateMemory(_1MB);
    }
    }
    }
    直接内存,或者说堆外内存,不是在 java 虚拟机规范中定义的存储区域,一般是不受虚拟机控制的,但是 NIO 提供了 Native 函数库可以直接分配堆外内存。
    由 DirectMemory 导致的内存溢出在 Heap Dump 中不会有明显的异常,如果 OOM 之后 Dump 文件很小,且程序中又直接或间接使用了 NIO,就可以考虑是这方面的问题。

分析内存

为排查内存泄露、young gc耗时过长等问题,我们需要分析内存结构。

判断死锁

使用 jstack 判断死锁,下面是一段测试用的死锁代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DeadLockTest {

private static Object obj1 = new Object();
private static Object obj2 = new Object();

public static void main(String[] args) {
new Thread(new Thread1()).start();
new Thread(new Thread2()).start();
}

private static class Thread1 implements Runnable {

@Override
public void run() {
synchronized (obj1) {
System.out.println("Thread1 拿到了 obj1 的锁!");
try {
// 停顿2秒的意义在于,让Thread2线程拿到obj2的锁
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (obj2) {
System.out.println("Thread1 拿到了 obj2 的锁!");
}
}
}
}

private static class Thread2 implements Runnable {

@Override
public void run() {
synchronized (obj2) {
System.out.println("Thread2 拿到了 obj2 的锁!");
try {
// 停顿2秒的意义在于,让Thread1线程拿到obj1的锁
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (obj1) {
System.out.println("Thread2 拿到了 obj1 的锁!");
}
}
}
}
}

使用 jps 查看 Java 进程:

1
2
3
4
5
6
7
# jps

4098 Jps
2644
3991 Launcher
3992 DeadLockTest
2859 RemoteMavenServer

使用 jstack 查看 Java 进程中的所有线程:

1
2
3
# jstack 3992

结果见下图

JVM死锁

服务器 CPU 打满怎么排查

CPU 打满会导致服务器响应速度变慢甚至夯住,一般查看内存没有明显问题后我们就可以怀疑是有线程运行将 CPU 打满了。
1、查 CPU 占用率较高的进程
top 命令查进程占用 CPU
2、查该进程占用 CPU 最高的线程
top -H -p <查出的进程号>

1
2
3
4
5
6
7
8
9
10
11
12
root@app02:~# top -H -p 1153
top - 21:04:21 up 227 days, 6:52, 1 user, load average: 0.78, 0.75, 0.85
Threads: 1007 total, 0 running, 1007 sleeping, 0 stopped, 0 zombie
%Cpu(s): 3.2 us, 0.8 sy, 0.0 ni, 95.8 id, 0.1 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 49049564 total, 295072 free, 42779556 used, 5974936 buff/cache
KiB Swap: 8191996 total, 431488 free, 7760508 used. 4009280 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1487 root 20 0 19.158g 1.113g 6664 S 6.2 2.4 0:36.42 java
1153 root 20 0 19.158g 1.113g 6664 S 0.0 2.4 0:00.00 java
1155 root 20 0 19.158g 1.113g 6664 S 0.0 2.4 0:00.57 java
1160 root 20 0 19.158g 1.113g 6664 S 0.0 2.4 2:01.54 java

转成 16 进制:

1
2
hero@app02:~$ printf "%x\n" 1487
5cf

之后我们会用到这个值,因为 jstack 输出的 log 中使用十六进制表示线程编号。
3、输出

1
root@app02:~# jstack 1153 > test_jstack.txt

从结果中可以搜到上面给出的线程编码5cf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
11179 "New I/O worker #168" #299 daemon prio=5 os_prio=0 tid=0x00007f3a75127000 nid=0x5cf runnable [0x00007f37278f7000]
11180 java.lang.Thread.State: RUNNABLE
11181 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
11182 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
11183 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
11184 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
11185 - locked <0x00000000e4669320> (a sun.nio.ch.Util$2)
11186 - locked <0x00000000e4669310> (a java.util.Collections$UnmodifiableSet)
11187 - locked <0x00000000e46691e8> (a sun.nio.ch.EPollSelectorImpl)
11188 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
11189 at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
11190 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
11191 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
11192 at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
11193 at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
11194 at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
11195 at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
11196 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
11197 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
11198 at java.lang.Thread.run(Thread.java:745)

SWAP 影响 GC

SWAP 和 GC 同时发生会导致 GC 时间很长,JVM 严重卡顿,甚至导致服务崩溃。
JVM 进行 GC 时,需要对相应堆分区的已用内存进行遍历;假如 GC 的时候,有堆的一部分内容被交换到 SWAP 中,遍历到这部分的时候就需要将其交换回内存,同时由于内存空间不足,就需要把内存中堆的另外一部分换到 SWAP 中去;于是在遍历堆分区的过程中,(极端情况下)会把整个堆分区轮流往 SWAP 写一遍。

QA

栈溢出和由栈引起的OOM有什么关系?

虽然都是由递归调用引起的,但是这两种异常引起的条件并不相同:

  1. 栈溢出(StackOverflowError)
    方法调用栈深度超出了虚拟机的允许范围。
  2. 栈引起的OOM
    虚拟机在扩展栈时无法申请到足够空间。

类文件结构

类文件结构比较繁琐,我暂时没有整理的兴趣,看一下《深入理解 Java 虚拟机上》上的介绍就可以了。

类加载器

Java 类结构是在运行时而不是在编译时确定的,而是由类加载器在运行期间加载的,因此称类的加载过程是动态加载,当调用某个类型对象的方法时,具体调用哪个方法是在运行期间决定的,因此称之为动态连接
类加载器(ClassLoader)不是 Java 虚拟机的组成部分,它由外部调用,执行加载过程。
类加载器用于加载类,任何类都需要由加载它的类加载器和这个类一同确立其在 Java 虚拟机中的唯一性,每一个类加载器,都有一个独立的类名称空间,因此由不同类加载器加载的类、就算它们真的是一个 class 出来的,也不算是同一个类型的,也不能直接进行交互(可以通过反射进行交互)。实现上,实际上 jvm 将类加载器的引用作为类型信息的一部分保存在方法区,作为判断两个类相同的依据。

写一个自定义的类加载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;

public class MyClassLoader extends ClassLoader {

private String rootPath;

public MyClassLoader(String rootPath) {
this.rootPath = rootPath;
}

@Override
protected Class<?> findClass(String name) {
Class<?> clz = findLoadedClass(name);
if (clz != null) {
return clz;
}
byte[] classData = getData(name);
clz = defineClass("Hello", classData, 0, classData.length);
return clz;
}

private byte[] getData(String className) {
String pathName = rootPath + className.replace(".", "/") + ".class";
System.out.println("加载的类路径: " + pathName);
byte[] bytes = null;
try (FileInputStream fis = new FileInputStream(pathName);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
byte[] flush = new byte[1024 * 1024];
int len;
while (-1 != (len = fis.read(flush))) {
bos.write(flush, 0, len);
}
bytes = bos.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return bytes;
}

public static void main(String[] args) throws IllegalAccessException, InstantiationException {
MyClassLoader classLoader = new MyClassLoader("/tmp/");
Class<?> helloClass = classLoader.findClass("Hello");
Object hello = helloClass.newInstance();
System.out.println(hello);
}
}

这个类加载器:

  1. findClass的时候,先判断之前是否已经加载过这个类,如果加载过就直接返回了(双亲委派);
  2. /tmp目录下面读类文件,对读进的二进制流数据使用defineClass转换为类对象。

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.StringJoiner;

public class Hello {

private int a = 10;

static {
System.out.println("123");
}

@Override
public String toString() {
return new StringJoiner(", ", Hello.class.getSimpleName() + "[", "]")
.add("a=" + a)
.toString();
}
}

使用javac命令编译后,将Hello.class文件移动到/tmp目录下。

类从哪里来

除了从文件加载以外,Java 的类加载机制还支持从网络读取,其实只要根据自己的需要实现类加载器,就能将任何二进制数据作为字节码数据进行加载。

  • -XX:+TraceClassLoading
    跟踪类加载过程,结果形如:[Loaded java.lang.invoke.MethodHandleImpl$Lazy from D:\programme\jdk\jdk8U74\jre\lib\rt.jar]

  • mvn dependency:tree > ~/dependency.txt
    打出所有依赖

  • mvn dependency:tree -Dverbose -Dincludes=groupId:artifactId
    只打出指定 groupId 和 artifactId 的依赖关系

  • -XX:+TraceClassLoading
    vm 启动脚本加入。在 tomcat 启动脚本中可见加载类的详细信息

  • -verbose
    vm 启动脚本加入。在 tomcat 启动脚本中可见加载类的详细信息

  • greys:sc
    greys 的 sc 命令也能清晰的看到当前类是从哪里加载过来的

  • tomcat-classloader-locate
    通过以下 url 可以获知当前类是从哪里加载的
    curl http://localhost:8006/classloader/locate?class=org.apache.xerces.xs.XSObject

类加载器的种类

从虚拟机角度看,只存在两种类加载器:1. 启动类加载器。2. 其他类加载器。
从开发人员角度看,包括如下类加载器:1. 启动类加载器。2. 扩展类加载器。3. 应用程序类加载器。4. 自定义类加载器。

  • 启动类加载器,用于加载 Java API,加载<JAVA_HOME>/lib 目录下的类库。
  • 扩展类加载类,由 sun.misc.Launcher$ExtClassLoader 实现,用于加载<JAVA_HOME>/lib/ext 目录下或者被 java.ext.dirs 系统变量指定路径下的类库。
  • 应用程序类加载器,也成为系统类加载器,由 sun.misc.Launcher$AppClassLoader 实现,用于加载用户类路径(ClassPath)上所指定的类库。
  • 自定义类加载器,继承系统类加载器,实现用户自定义加载逻辑。

双亲委派模型

类加载器执行过程
Java 设计者推荐所有自定义加载器都组合一个父加载器,加载类时先委派父加载器去尝试加载,若不成,再由自身去加载。所以一些基类总是由基加载器去加载,可以避免一个程序中有多个 java.lang.Object 类的情况
注意各个类加载器之间是组合关系,并非继承关系
当一个类加载器收到类加载的请求,它将这个加载请求委派给父类加载器进行加载,每一层加载器都是如此,最终,所有的请求都会传送到启动类加载器中。只有当父类加载器自己无法完成加载请求时,子类加载器才会尝试自己加载。
双亲委派模型可以确保安全性,可以保证所有的 Java 类库都是由启动类加载器加载。如用户编写的 java.lang.Object,加载请求传递到启动类加载器,启动类加载的是系统中的 Object 对象,而用户编写的 java.lang.Object 不会被加载。如用户编写的 java.lang.virus 类,加载请求传递到启动类加载器,启动类加载器发现 virus 类并不是核心 Java 类,无法进行加载,将会由具体的子类加载器进行加载,而经过不同加载器进行加载的类是无法访问彼此的。由不同加载器加载的类处于不同的运行时包。所有的访问权限都是基于同一个运行时包而言的。

类的加载时机

Java 虚拟机规范没有强制规定类的加载时机,但是严格规定了以下 5 种情况必须立即对类进行“初始化”(而加载、验证、准备自然需要在此之前开始)

  1. 遇到字节码指令 new(实例化对象时)、getstatic、putstatic(读取或设置一个类的静态字段)、invokestatic(调用一个类的静态方法);
  2. 遇到反射调用 java.lang.reflect 对类进行反射调用;
  3. 初始化一个类时,其父类还未初始化,则先初始化其父类(对接口不适用,即初始化子接口并不会导致父接口初始化);
  4. 主类(main);
  5. JDK7 的动态语言支持,java.lang.invoke.MethodHandle 实例最后的解析结果 REF_getStatic、REF_putStatic、REF_invokeStatic 方法句柄,并且这个方法句柄对应的类没有进行初始化,则需要先进行初始化。

上边的情况统称为主动引用,其他情况都是被动引用,被动引用都不会触发类的初始化

  • 被动引用的示例如下,主要是使用 类初始化块 进行验证的(即 static 块),只输出了”Super!”,原因是不满足上边的条件,子类是否被加载完全由虚拟机实现说了算
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class SuperClass {
    public static int value = 123;
    static {
    System.out.println("Super!");
    }
    }
    class SubClass {
    static {
    System.out.println("Sub!");
    }
    public static void main(String[] args) {
    System.out.println(SubClass.value);
    }
    }
  • 书上举了另外一个例子说明 数组类初始化 的问题,下面代码并不会触发 SuperClass 类的初始化,而是初始化了一个对应的数组类,可以通过加-XX:+TraceClassLoading 的运行时参数来跟踪类的加载过程
    1
    2
    3
    4
    5
    public class NotInitialization {
    public static void main(String[] args) {
    SuperClass[] sca = new SuperClass[10];
    }
    }
  • 还有一个例子来说明 常量传播优化 会导致的混淆情况,即使用常量字段(static final)时不会触发初始化,编译时会将常量值保存到调用者的类常量池中,所以在调用时就和被调用者没关系了,下面的代码并不会触发 ConstClass 类的加载
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    class ConstClass {
    static {
    System.out.println("ConstClass!");
    }
    public static final String HELLOWORLD = "hello world";
    }
    public class NotInitialization {
    public static void main(String[] args) {
    System.out.println(ConstClass.HELLOWORLD);
    }
    }

类的加载过程

类的加载过程

加载

  1. 通过类的完全限定名获取表示类的二进制流;
  2. 转换为方法区中的类结构;
  3. 在 Java 堆中生成一个代表这个类的 java.lang.Class 对象,作为方法区这些数据的访问入口

对于数组类而言,数组类由 java 虚拟机直接创建,不通过类加载器创建。数组类的创建过程如下:

  1. 如果数组元素类型是引用类型,就采用双亲委派模型进行加载(之后会介绍),数组类将在加载该元素类型的类名称空间上被标识。
  2. 如果数组元素类型为基本类型,数组类被标记为与引导类加载器关联。
  3. 数组类的可见性与其元素类型可见性一致,如果元素类型不是引用类型,那数组类的可见性默认为 public。

验证

此阶段的主要目的是确保 class 文件的字节流包含的信息符合虚拟机的要求,进行一部分的语义分析,主要是防止字节码中存在一些危险操作(数组越界、错误转型、跳转过头等),后来的 Java 虚拟机规范还规定了文件格式、符号引用等的验证。
不同的虚拟机,对类验证的实现可能有所不同,但大致都会完成下面四个阶段的验证:

  1. 文件格式验证,是要验证字节流是否符合 Class 文件格式的规范,并且能被当前版本的虚拟机处理。如验证魔数是否为 0xCAFEBABE;主、次版本号是否正在当前虚拟机处理范围之内;常量池的常量中是否有不被支持的常量类型。
    该验证阶段的主要目的是保证输入的字节流能正确地解析并存储于方法区中,经过这个阶段的验证后,字节流才会进入内存的方法区中存储,所以后面的三个验证阶段都是基于方法区的存储结构进行的。
  2. 元数据验证,对字节码描述的信息进行语义分析,以保证其描述的信息符合 Java 语言规范的要求。可能包括的验证如:这个类是否有父类;这个类的父类是否继承了不允许被继承的类;如果这个类不是抽象类,是否实现了其父类或接口中要求实现的所有方法……
  3. 字节码验证,主要工作是进行数据流和控制流分析,保证被校验类的方法在运行时不会做出危害虚拟机安全的行为。如果一个类方法体的字节码没有通过字节码验证,那肯定是有问题的;但如果一个方法体通过了字节码验证,也不能说明其一定就是安全的。
  4. 符号引用验证,发生在虚拟机将符号引用转化为直接引用的时候,这个转化动作将在“解析阶段”中发生。验证符号引用中通过字符串描述的权限定名是否能找到对应的类;在指定类中是否存在符合方法字段的描述符及简单名称所描述的方法和字段;符号引用中的类、字段和方法的访问性(private、protected、public、default)是否可被当前类访问。

验证阶段对于虚拟机的类加载机制来说,不一定是必要的阶段。如果所运行的全部代码确认是安全的,可以使用-Xverify:none 参数来关闭大部分的类验证措施,以缩短虚拟机类加载时间

准备

为类变量(static 变量)分配内存和并初始化为默认值,它们被分配在方法区中。
准备阶段不分配类中的实例变量的内存,实例变量将会在对象实例化时随着对象一起分配在 Java 堆中。
比如static int a = 1;在准备阶段后 a 的值为 0,在初始化阶段后才变成 1,但是对常量字段(static final),在准备阶段会直接赋予用户设定的值。

解析

将常量池内的符号引用替换为直接引用,类似于将一个字面量 hash 到一个确定的槽中。
符号引用(Symbolic Reference):符号引用以一组符号来描述所引用的目标,符号可以是任何形式的字面量,只要使用时能无歧义地定位到目标即可。符号引用与虚拟机实现的内存布局无关,引用的目标并不一定已经加载到内存中。
直接引用(Direct Reference):直接引用可以是直接指向目标的指针、相对偏移量或是一个能间接定位到目标的句柄。直接引用是与虚拟机实现的内存布局相关的,如果有了直接引用,那么引用的目标必定已经在内存中存在。指向类型、类变量、类方法的直接引用可能是指向方法区的本地指针。

常见的符号引用包括类或接口的全限定名、字段名和描述符、方法名和描述符。类型的直接引用可能简单地指向保存类型数据的方法区中的与实现相关的数据结构。类变量的直接引用可以指向方法区中保存的类变量的值。类方法的直接引用可以指向方法区中的一段数据结构方法区中包含调用方法的必要数据。指向实例变量和实例方法的直接引用都是偏移量。实例变量的直接引用可能是从对象的映像开始算起到这个实例变量位置的偏移量。实例方法的直接引用可能是到方法表的偏移量。

为了加快解析效率,可以对解析结果进行缓存,之后再解析符号引用时直接返回即可,但是对于 invokedynamic 则不能进行缓存。解析主要是针对 CONSTANT_Class_info、CONSTANT_Fieldref_info、CONSTANT_Methodref_info、CONSTANT_InterfaceMethodref_info、CONSTANT_MethodType_info、CONSTANT_MethodHandle_info、CONSTANT_InvokeDynamic_info 七种常量类型。

  1. 类或接口的解析
    将符号引用替换为直接引用包括如下几步。假设符号引用记为 S,当前类记为 C,S 对应的类或接口记为 I。
    • 若 S 不是数组类型,则把 S 传递给当前类 C 的类加载器进行加载,这个过程可能会触发其他的加载,这个过程一旦出现异常,则解析失败。
    • 若 S 是数组类型,并且数组元素类型为对象,则 S 的描述符会形如[java/lang/String,按照第一条去加载该类型,如果 S 的描述符符合,则需要加载的类型就是 java.lang.String,接着有虚拟机生成一个代表此数组唯独和元素的数组对象。
    • 若以上两个步骤没有出现异常,即 I 已经存在于内存中了,但是解析完成时还需要进行符号引用验证,确认 C 是否具备对 I 的访问权限。若不具备,则抛出 java.lang.IllegalAccessError 异常。
  2. 字段解析
    首先将 CONSTANT_Fieldref_info 中的 class_index 索引的 CONSTANT_Class_info 符号引用进行解析,即解析字段所在类或接口,若解析出现异常,则字段解析失败。如解析成功,则进行下面的解析步骤。假设该字段所属的类或接口标记为 C。
    • 如果 C 包含了字段的简单名和描述符与目标相匹配的字段,则返回这个字段的直接引用,查找结束。
    • (字段解析对接口优先搜索)否则,如果 C 实现了接口,按照继承关系从下往上递归搜索各个接口和它的父接口,看是否存在相匹配的字段。存在,则返回直接引用,查找结束。
    • 否则,如果 C 不是 Object 对象,按照继承关系从下往上递归搜索父类,看是否存在相匹配的字段。存在,则返回直接引用,查找结束。
    • 否则,查找失败,抛出 java.lang.NoSuchFieldError 异常。
  3. 类方法解析
    首先将 CONSTANT_Methodref_info 中的 class_index 索引的 CONSTANT_Class_info 符号引用进行解析,即解析方法所在的类或接口,若解析出现异常,则方法解析失败;如解析成功,则进行下面解析步骤。假设该方法所属的类标记为 C。
    • 如果在方法表中发现 CONSTANT_Class_info 中索引的 C 是一个接口而不是一个类,则抛出 java.lang.IncompatibleClassChangeError 异常。
    • 否则,如果 C 中包含了方法的简单名和描述符与目标相匹配的字段,则返回这个方法的直接引用,查找结束。
    • (方法解析对父类优先搜索)否则,在 C 的父类中递归搜索,看是否存在相匹配的方法,存在,则返回直接引用,查找结束。
    • 否则,在 C 实现的接口列表及父接口中递归搜索,看是否存在相匹配的方法,存在,说明 C 是一个抽象类(没有实现该方法,否则,在第一步就查找成功),抛出 java.lang.AbstractMethodError 异常。
    • 否则,查找失败,抛出 java.lang.NoSuchMethodError 异常。
    • 若查找过程成功,则对方法进行权限验证,如果发现不具备对此方法的访问权限,则抛出 java.lang.lllegalAccessError 异常。
  4. 接口方法解析
    首先将 CONSTANT_InterfaceMethodref_info 中的 class_index 索引的 CONSTANT_Class_info 符号引用进行解析,即解析方法所在的类或接口,若解析出现异常,则方法解析失败;如解析成功,则进行下面解析步骤。假设该方法所属的类标记为 C。
    • 如果在方法表中发现 CONSTANT_Class_info 中索引的 C 是一个类而不是接口,则抛出 java.lang.IncompatibleClassChangeError 异常。
    • 否则,如果 C 中包含了方法的简单名和描述符与目标相匹配的字段,则返回这个方法的直接引用,查找结束。
    • 否则,在 C 的父接口中递归搜索,直到 Object 类,看是否存在相匹配的方法,存在,则返回直接引用,查找结束。
    • 否则,查找失败,抛出 java.lang.NoSuchMethodError 异常。
    • 若查找过程成功,不需要进行权限验证,因为接口方法为 public,不会抛出 java.lang.IllegalAccessError 异常。

初始化

类初始化是类加载过程的最后一步,前面的类加载过程,除了在加载阶段用户应用程序可以通过自定义类加载器参与之外,其余动作完全由虚拟机主导和控制,到了初始化阶段,才真正开始执行类中定义的 Java 程序代码。
初始化是执行类构造器<clinit>()的过程,()方法是由编译器自动收集类中的所有类变量的赋值动作和静态语句块(static{}块/类初始化块)中的语句合并产生的。具体地说,应该有以下几条规则:

  • 由编译器收集类中的所有类变量的赋值动作(如果仅仅只是声明,不会被收集)和静态语句块中的语句合并产生的,收集顺序按照语句在源文件中出现的顺序所决定;在静态语句块中只能访问定义在静态语句之前的变量;而对于定义在静态语句块之后的变量,可以进行赋值,但是不能够访问。
  • 不需要显示调用父类构造器,虚拟机会保证在子类的()方法执行之前,父类的()方法已经执行完毕,所以,第一个被执行的()方法的类肯定是 java.lang.Object。
  • 父类中定义的静态语句块优先于子类的静态语句。
  • 此方法对类和接口都不是必须的,若类中没有静态语句块和静态变量赋值操作,则不会生成()方法。
  • 接口会生成此方法,因为对接口的字段可以进行赋值操作。执行接口的()方法不需要先执行父接口的()方法,只有在使用父接口的变量时,才会进行初始化;接口的实现类在初始化时也不会执行接口的()方法。
  • 此方法在多线程环境中会被正确的加锁、同步。

使用

完成了初始化阶段后,我们就可以使用对象了,在程序中可以随意进行访问,只要类还没有被卸载。

卸载

GC 能够对方法区内无用对象进行回收,启动类加载的类型永远是可触及的,回收的是由用户自定义加加载器加载的类,具体内容等到 GC 部分再说。

接口的加载过程

接口的加载和类的加载是类似的,只是接口要初始化时并不会连带父接口一块初始化,只有在真正用到父接口时才会执行初始化。

定义类加载器和初始类加载器

我们知道不同类加载器加载的类位于不同的命名空间,它们之间是相互隔离的,这里说的隔离仅仅指它们存储位置隔离,并不是说一个自定义的类 A 使用了 java.util.List 类就会报错。
自定义的类 A 一般会使用系统类加载器加载,而 java.util.List 则会由启动类加载器加载,当加载类 A 时如果遇到了 java.util.List,会首先尝试通过系统类加载器加载,在它发现自己无法加载后,通过双亲委派模型交给父加载器加载。
初始类加载器和定义类加载器
如上图所示:

  • A 是由系统类加载器加载的,因此系统类加载器是其定义类加载器兼初始类加载器;
  • 系统类加载器加载过 java.util.List,因此是其初始类加载器;
  • 启动类加载器实际加载了 java.util.List,因此是其定义类加载器。

QA

  1. 为什么下面的执行结果为 0?
    说明类加载器在加载一个类时,父类的成员变量就算被覆盖,其存储空间依然还存在。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    class A {
    int a;
    }
    public class JavaTest extends A {
    int a;
    @Test
    public void test() {
    a = 1;
    System.out.println(super.a);
    }
    }
  2. 为什么下面的报错?
    类的初始化阶段有一个细节:类初始化块不能访问定义在其之后的变量
    1
    2
    3
    4
    5
    6
    7
    public class JavaTest {
    static {
    i = 0;
    System.out.println(i); // 报错
    }
    static int i = 1;
    }
  3. 为什么输出两个’A’?
    当我们 new A()时,首先为 A 分配内存空间,此时 A 已经存在了,只是还未初始化,然后调用 A 的构造函数,A 的构造函数又隐式调用了父类的构造函数。
    在父类构造函数中使用 this 调用 draw(),this 实际上指向了 a 对象,平常调用方法时 this 也是隐含的。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class B {
    int a;
    B() {
    this.draw();
    }
    void draw() {
    System.out.println("B");
    }
    }
    class A extends B {
    int a;
    A() {
    draw();
    }
    @Override
    void draw() {
    System.out.println("A");
    }
    public static void main(String[] args) {
    A a = new A();
    }
    }
  4. 为什么最后输出的 count2 为 0?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    class SingleTon {
    private static SingleTon singleTon = new SingleTon();
    public static int count1;
    public static int count2 = 0;
    private SingleTon() {
    count1++;
    count2++;
    }
    public static SingleTon getInstance() {
    return singleTon;
    }
    }
    public class Test {
    public static void main(String[] args) {
    SingleTon singleTon = SingleTon.getInstance();
    System.out.println("count1=" + singleTon.count1);
    System.out.println("count2=" + singleTon.count2);
    }
    }
    这里有问题的应该是 static 变量的初始化和构造方法被调用的顺序,实际上构造方法是先被调用的。
  5. SingleTon singleTon = SingleTon.getInstance();调用了类的 SingleTon 调用了类的静态方法,触发类的初始化(主动引用)
  6. 类加载的时候在准备过程中为类的静态变量分配内存并初始化默认值 singleton=null count1=0,count2=0(准备)
  7. 类初始化,为类的静态变量赋值和执行静态代码快。singleton 赋值为 new SingleTon()调用类的构造方法(初始化)
  8. 调用类的构造方法后 count=1;count2=1
  9. 继续为 count1 与 count2 赋值,此时 count1 没有赋值操作,所有 count1 为 1,但是 count2 执行赋值操作就变为 0
  10. 读下面的类加载器应用代码,为什么输出 false?
    myLoader 加载的类和虚拟机的默认类加载器(Bootstrap ClassLoader)将加载的类保存在不同的命名空间中,它们相当于不同的类。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class JavaTest {
    @Test
    public void test() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
    ClassLoader myLoader = new ClassLoader() {
    @Override
    public Class<?> loadClass(String name) throws ClassNotFoundException {
    String fileName = name.substring(
    name.lastIndexOf(".") + 1) + ".class";
    InputStream is = getClass().getResourceAsStream(fileName);
    if(is == null) { return super.loadClass(name); }
    try {
    byte[] b = new byte[is.available()]; // 进行验证
    is.read(b);
    return defineClass(name, b, 0, b.length);
    } catch (IOException e) {
    throw new ClassNotFoundException(name);
    }
    }
    };
    Object obj = myLoader.loadClass("com.tallate.JavaTest")
    .newInstance();
    System.out.println(obj.getClass());
    System.out.println(obj instanceof com.tallate.JavaTest); // false?
    }
    }
  11. JVM 怎么知道哪些类应该委托给父加载器加载?
    每个类装载器都有一个 URLClassPath 对象用于保存类路径,在加载时会先在这个路径下查找该类,找不到再返回 null。
  12. 不同命名空间的类为什么能互相使用?
    双亲委派模型中,一个类装载器总是会先委托父类去进行装载,所有这些被委托的类装载器都被称为初始类装载器,而实际装载的被称为定义类装载器,所有初始装载器间的类型是共享的
  13. 可以不可以自己写个 String 类?
    不能,因为根据类加载的双亲委派机制,会去加载父类,父类发现冲突了 String 就不再加载了。
  14. Tomcat 的应用隔离原理是什么?
    Tomcat 实现了两种隔离技术:用于线程隔离的线程池和用于代码隔离的 WebAppClassLoader。
    前者不必赘述,对于后者,大家比较感兴趣的是 Tomcat 中对双亲委派模型的违背,因为它不是先委托父加载器去加载目标类,因为 Tomcat 一个进程可以运行多个 web 服务器,两个 web 项目中可能会出现两个声明完全一致的类,它们必须所处的命名空间必须隔离开,不然可能会发生一个项目启动后调到另一个项目中的代码的情况,这样就乱套了。

参考

类文件结构

  1. 【JVM】JVM 系列之 Class 文件(三)

对象分配

  1. What do Java objects look like in memory during run-time?
  2. What does a Java array look like in memory?

栈帧结构

栈帧主要包括了局部变量表、操作数栈、动态连接、方法返回地址等信息,在内存结构章节中我们已经探讨过栈结构,但是还未从实现层面来讨论过。

  1. 局部变量表
    用于存放方法参数和方法内部的局部变量。局部变量表的大小在方法的 Code 属性中就已经定义好了,为max_locals的值,局部变量表的单位为slot,32位以内的类型只占用一个slot(包括 returnAddress 类型),64 位的类型占用两个 slot。
    • 对于实例方法而言,索引为 0 的 slot 存放的是 this 引用,之后再依次存放方法参数、局部变量;
    • slot 可以被重用,当局部变量已经超出了作用域时,在作用域外再定义局部变量时,可以重用之前的 slot 空间。
    • 同时,局部变量没有赋值是不能够使用的——会产生编译错误,这和类变量和实例变量是有不同的,如下面代码:
      1
      2
      3
      4
      public void test() {
      int i;
      System.out.println(i);
      }
  2. 操作数栈
    执行方法时,存放操作数的栈,栈的深度在方法的 Code 属性中已经定义好了,为max_stack的值,32 位以内的类型占用一个栈单位,64 位的类型占用两个栈单位。操作数栈可以与其他栈的局部变量表共享区域,这样可以共用一部分数据。
  3. 动态连接
    动态连接是为了支持在运行期间将符号引用转化为直接引用的操作。我们知道,每一个方法对应一个栈帧,而每一个栈帧,都包含指向对应方法的引用,这个引用就是为了支持动态连接,如 invokedynamic 指令。动态连接与静态解析对应,静态解析是在类加载(解析阶段)或者第一次使用时将符号引用转化为直接引用,动态连接则是每一次运行的时候都需要进行转化(invokedynamic 指令)。
  4. 方法返回地址
    正常方法返回,返回地址为到调用该方法的指令的下一条指令的地址;异常返回,返回地址由异常表确定。方法返回时,需要恢复上层方法的局部变量表、操作数栈、将返回值压入调用者栈帧的操作数栈、设置 PC 值。

方法的调用和执行

方法调用决定了调用哪个方法,并创建对应的栈帧,接下来会开始方法的执行

解析

在程序执行前就已经确定了方法调用的版本,即编译期就确定了调用方法版本,这个版本在运行时是不可变的。

  • 静态方法私有方法final方法在编译时就可以确定具体的调用版本,静态方法直接与类型相关、私有方法在外部不可访问、final 不可被继承,也可唯一确定,这些方法称为非虚方法,翻译成字节码是 invokestatic(调用静态方法)、invokespecial(调用实例构造器方法、私有方法、父类方法),在类加载的解析阶段就可以进行解析,如下方法调用在编译期就可以确定方法调用的版本。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    class Father {
    public static void print(String str) {
    System.out.println("father " + str);
    }
    private void show(String str) {
    System.out.println("father " + str);
    }
    }
    class Son extends Father {
    }
    public class Test {
    public static void main(String[] args) {
    Son.print("coder"); // 调用的是Father的print()方法
    //Father fa = new Father();
    //fa.show("cooooder"); // 私有方法无法调用
    }
    }
  • 其他方法称为虚方法

分派

分派调用与多态密切相关,分为静态分派动态分派单分派多分派

静态分派

与静态分派相关的就是方法的重载,重载时根据参数的静态类型引用类型而非实际类型决定调用哪个版本。
选取的过程共分为三个阶段:

  1. 在不考虑对基本类型自动装拆箱(auto-boxing,auto-unboxing),以及可变长参数的情况下选取重载方法;
  2. 如果在第 1 个阶段中没有找到适配的方法,那么在允许自动装拆箱,但不允许可变长参数的情况下选取重载方法;
  3. 如果在第 2 个阶段中没有找到适配的方法,那么在允许自动装拆箱以及可变长参数的情况下选取重载方法。

如果 Java 编译器在同一个阶段中找到了多个适配的方法,那么它会在其中选择一个最为贴切的,而决定贴切程度的一个关键就是形式参数类型的继承关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 重载方法在编译器就可以进行确定,不需要等到运行期间
*/
public class StaticDispatch {
static class Human {
}
static class Women extends Human {
}
static class Men extends Human {
}

public void sayHello(Human human) {
System.out.println("say human");
}
public void sayHello(Women women) {
System.out.println("say women");
}
public void sayHello(Men men) {
System.out.println("say men");
}

public static void main(String[] args) {
StaticDispatch ds = new StaticDispatch();
Human women = new Women();
Human men = new Men();
// 编译时确定方法的调用版本是以Human作为参数的方法
ds.sayHello(women);
ds.sayHello(men);
}
}

动态分派

与动态分派相关的就是方法的重写,在子类中我们会重写父类的方法,而在调用的时候根据实际类型来选择适合的调用版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class DynamicDispatch {
abstract static class Human {
abstract public void sayHello();
}

static class Women extends Human {
@Override
public void sayHello() {
System.out.println("say women");
}
}

static class Men extends Human {
@Override
public void sayHello() {
System.out.println("say men");
}
}

public static void main(String[] args) {
Human women = new Women();
Human men = new Men();
women.sayHello(); // 实际类型是Women
men.sayHello(); // 实际类型是Men
}
}

单分派与多分派

方法的接收者(方法的所有者)与方法的参数统称为方法的宗量,根据分派基于多少种宗量,可以将分派划分为单分派多分派
单分派根据一个宗量确定调用方法的版本;多分派根据多个宗量确定调用方法的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Dispatch {
static class QQ {};
static class _360{};

public static class Father {
public void hardChoice(QQ arg) {
System.out.println("father choose qq");
}

public void hardChoice(_360 arg) {
System.out.println("father choose 360");
}
}

public static class Son extends Father {
public void hardChoice(QQ arg) {
System.out.println("son choose qq");
}

public void hardChoice(_360 arg) {
System.out.println("son choose 360");
}
}

public static void main(String[] args) {
Father father = new Father();
Father son = new Son();
father.hardChoice(new _360());
son.hardChoice(new QQ());
}
}

静态分派过程如下,在编译期阶段,会根据静态类型与参数类型确定调用版本,产生两条分别指向 Father.hardChoice(QQ)和 Father.hardChoice(_360)的指令,可以知道,在编译期,是由多个宗量确定调用版本,是静态多分派。
动态分派过程如下,在运行期,在执行 hardChoice(QQ)或者 hardChoice(_360)时,已经确定了参数必须为 QQ、_360,方法签名确定,静态类型和实际类型此时都不会对方法本身产生任何影响,而虚拟机会根据实际类型来确定调用版本,只根据一个宗量进行确定,因此,在运行时,是动态单分派。
在面向对象编程中我们会很频繁地使用到动态分配,虚拟机采用在类的方法区建立一个虚方法表(非虚方法不会出现在表中)来实现。
动态分派的实现

  • 只有虚方法才会出现在虚方法表中,也就是说静态方法私有方法final 方法是不会出现在这张表中的。
  • 从 Object 类继承的方法都会指向 Object 类型数据中各方法的实际入口地址。
  • 类自身的方法会指向类的数据类型中方法的实际入口地址。
  • 父类的没有被重写的方法在虚方法表中的索引与子类方法表中的索引相同,这样,当类型变化时,只需要改变方法表就行,索引还是相同。
  • 方法表一般在类加载的连接阶段进行初始化,准备了类变量的初始值后,方法表也初始化完毕。

方法退出

当一个方法开始执行后,只有两种方式可以退出:

  1. 第一种方式是执行引擎遇到任意一个方法返回的字节码指令,这种方式称为正常完成出口
  2. 另外一种退出方式是,在方法执行过程中遇到异常,且该异常没有被被捕获,称为异常完成出口

无论是哪种退出方式,在方法退出后,都需要返回到该方法被调用的位置(地址),让程序继续执行。一般来说,方法执行前,会保存调用者当前的 PC 计数器中的值,当方法正常退出时,将该 PC 计数器的值会作为返回地址,返回给调用者。在方法异常退出时,返回地址是通过异常处理器表来确定的。

方法退出的过程实际上就等于把当前栈帧出栈,一般过程为:

  1. 恢复上层方法的局部变量表和操作数栈
  2. 把返回值压入调用者栈帧的操作数栈中
  3. 调整 PC 计数器的值,以指向方法调用指令后面的一条指令

动态类型语言支持

Java 是一种静态类型语言,它与 Python、JavaScript 等动态类型语言的主要区别是:

  • 静态类型语言的类型检查主要过程是在编译期进行而不是运行期。

静态类型语言与动态类型语言的比较如下:

  • 静态类型语言在编译期确定类型,最显著的好处是编译器可以提供严谨的类型检查,这样与类型相关的问题能在编码的时候就及时发现,利于稳定性及代码达到更大规模。
  • 动态类型语言在运行期确定类型,这可以为开发人员提供更大的灵活性,某些在静态类型语言中需用大量“臃肿”代码来实现的功能,由动态类型语言来实现可能会更加清晰和简洁,从而提升开发效率。

在 JDK1.7 以前的字节码指令集中,4 条方法调用指令(invokevirtual、invokespecial、invokestatic、invokeinterface)的第一个参数都是被调用的方法的符号引用(CONSTANT_Methodref_info 或者 CONSTANT_InterfaceMethodref_info 常量),前面已经提到过,方法的符号引用在编译时产生,而动态类型语言只有在运行期才能确定接收者类型。
Java 不像 C/C++那样有 Function Pointer 或者 C#里的 Delegate。在 Java 中要实现类似的功能可以有以下两种方式:

  1. 实现一个函数接口,比如 Comparator
  2. MethodHandle,它的实现原理是第 5 条方法调用的字节码指令 invokedynamic,与其他 invoke*指令的最大差别是它的分派逻辑不是由虚拟机决定的,而是由程序员决定的。

MethodHandle 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;

import static java.lang.invoke.MethodHandles.lookup;

/**
* @author hgc
* @date 2/16/20
*/
public class MethodHandleTest {

static class ClassA {
public void println(String s) {
System.out.println(s);
}
}

private static MethodHandle getPrintlnMH(Object reveiver) throws NoSuchMethodException, IllegalAccessException {
// MethodType代表方法类型,包含了方法的返回值(methodType()的第一个参数)和具体参数(methodType()第二个及之后的参数)
MethodType mt = MethodType.methodType(void.class, String.class);
// lookup()方法来自于MethodHandles.lookup,这句的作用是在指定类中查找符合给定方法名称、方法类型,并且符合调用权限的方法句柄
// 因为这里调用的是一个虚方法,按照Java语言的规则,方法第一个参数是隐式的,代表该方法的接收者,也即是this指向的对象,这个参数以前是放在参数列表中进行传递的,而现在提供了bindTo()方法来完成这件事
return lookup().findVirtual(reveiver.getClass(), "println", mt).bindTo(reveiver);
}

public static void main(String[] args) throws Throwable {
Object obj = System.currentTimeMillis() % 2 == 0 ? System.out : new ClassA();
getPrintlnMH(obj).invokeExact("icyfenix");
}
}

MethodHandle 与反射(Reflection)的区别是:

  • Reflection API 的设计初衷是只为 Java 服务,而 MethodHandle 则设计为可服务于所有 Java 虚拟机之上的语言,当然也包括 Java 语言。
  • MethodHandle 与 Reflection 机制都是在模拟方法调用,但 Reflection 是在模拟代码层次的方法调用,而 MethodHandle 则是在模拟字节码层次的方法调用。
    MethodHandles.lookup 中的 3 个方法——findStatic()、findVirtual()、findSpecial()对应了 invokestatic、invokevirtual + invokeinterface 和 invokespecial 这几条字节码指令的执行权限校验行为,而这些底层细节在使用 Reflection API 时无需考虑。
  • Reflection 中的 Method 比 MethodHandle 对象包含的信息多,Reflection 是重量级的,MethodHandle 是轻量级的。
  • MethodHandle 模拟了字节码的方法指令调用,所以理论上虚拟机在这方面做的各种优化(如方法内联)在 MethodHandle 上也可以采用类似思路来支持,而通过反射去调用方法则不行。

至于 MethodHandle 是如何实现的,可以参考《深入理解 Java 虚拟机》,大致上就是运行期去常量表里根据用户指定的参数找方法。

###基于栈的字节码解释执行引擎
JVM 的指令都是基于栈的,比如iadd表示弹出栈顶的两个元素,然后求出二者的和后重新压入栈中。
基于栈的指令集与基于寄存器的指令集各有优势:

  • 基于栈的指令集的主要优点就是可移植。
    寄存器是由硬件直接提供的,基于寄存器的指令集由于直接依赖这些硬件寄存器则不可避免地要受到硬件的约束。
    如果使用栈架构的指令集,用户程序不会直接使用这些寄存器,就可以由虚拟机实现来自行决定把一些访问最频繁的数据(程序计数器、栈顶缓存等)放到寄存器中以获取尽量好的性能,这样实现起来也更加简单一些。
  • 栈架构的指令集还有一些其他优点,比如代码相对来说更加紧凑(字节码中每个字节就对应一条指令,而多地址指令集中还需要存放参数)、编译器实现更加简单(不需要考虑空间分配的问题,所需空间都在栈上操作)等。
  • 栈架构指令集的主要缺点是执行速度相对来说会稍慢一些。所有主流物理机的指令集都是寄存器架构也从侧面印证了这一点。
    一方面,虽然栈架构指令集的代码非常紧凑,但是完成相同功能所需的指令数量一般会比寄存器架构多。比如出栈入栈操作本身就产生了相当多的指令数量。
    另一方面,栈实现在内存之中,频繁的栈访问也就意味着频繁的内存访问,相对于处理器来说,内存始终是执行速度的瓶颈。尽管虚拟机可以采取栈顶缓存的手段,把最常用的操作映射到寄存器中避免直接内存访问,但这也只能是优化措施而不是解决本质问题的方法。

QA

  1. 哪个方法会被调用?
    重载会触发静态分派,会根据传参的静态类型来决定调用哪个方法,因此会调用 print(Father),但输出时调用了 Child 类的 toString 方法,因为方法被重写了,会触发方法的动态分派,根据传参的实际类型来决定调用哪个方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class DynamicDispatchTest {

    public void print(Father father) {
    System.out.println(father);
    }

    public void print(Child child) {
    System.out.println(child);
    }

    public static class Father {

    @Override
    public String toString() {
    return "Father";
    }
    }

    public static class Child extends Father {

    @Override
    public String toString() {
    return "Child";
    }
    }

    public static void main(String[] args) {
    Father father = new Child();
    DynamicDispatchTest dynamicDispatchTest = new DynamicDispatchTest();
    dynamicDispatchTest.print(father);
    }
    }

环境准备

搭建环境

1
2
docker run --name myredis -d -p6379:6379 redis
docker exec -it myredis redis-cli

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# exists 查看某个key是否存在
exists aa
SET 创建一个key;
GET 获取一个key的值;
DEL ***一个key;
TYPE 获取一个key的类型;
EXISTS 判断一个key是否存在,0:存在,1,不存在;
# KEYS 获取给定模糊匹配的key,但要谨慎使用,因为线上的key一般非常多
keys *
keys a?
EXPIRE 设置一个key过期的秒数;
PERSTST ***一个key过期的秒数;
PEXPIRE 设置一个key过期的毫秒数;
RENAME 将一个key重命名;
RENAMENX 将一个key重命名,且新的key必须是不存在的可以;
TTL 获取key的有效时间,以秒为单位,-1表示永不过期,-2表示已过期、已转移
# dbsize 查看当前库中key数量
dbsize
# flushdb 清除数据库(内存)
flushdb
# move 移动key到另一个库
move aa 2

容量预估

在实际部署前一般都会先对所需容量进行一个评估,这样可以尽量避免在上线后容量不够还要扩容、或者容量过大造成浪费。
官方提供了一个容量预估工具,一些博客比如Redis 容量评估模型贴近 Redis 底层数据结构给出了容量的评估分析,可以作为一个参考,但是业务架构一直在变,实际的容量监控还是必须的,我们下面还会谈到这方面的工具。

使用Redis Cluster

搭建Redis Cluster集群

redis集群搭建

1、安装
到官网找到:

1
2
wget http://download.redis.io/releases/redis-4.0.8.tar.gz
make && make install # 默认安装目录为/usr/local/bin

ruby

1
2
yum install ruby
yum install rubygems

还有gem文件在此处下载,安装:

1
gem install /usr/local/redis-3.0.0.gem

2、创建 redis 节点
在一个目录(比如编译目录)下创建 redis_cluster 目录,再在这个目录下创建 7001、7002、7003、7004、7005、7006 的子目录,拷贝配置文件 redis.conf 到各个这些子目录中,并编辑以下内容

1
2
3
4
5
6
7
8
9
port 7001 //端口7001,7002,7003        
bind 本机ip //默认ip为127.0.0.1 需要改为其他节点机器可访问的ip 否则创建集群时无法访问对应的端口,无法创建集群
daemonize yes //redis后台运行
pidfile /var/run/redis_7001.pid //pidfile文件对应7001,7002,7003
logfile /tmp/redis_7001.log // 日志文件
cluster-enabled yes //开启集群 把注释#去掉
cluster-config-file nodes_7001.conf //集群的配置 配置文件首次启动自动生成 7001,7002,7003
cluster-node-timeout 15000 //请求超时 默认15秒,可自行设置
appendonly yes //aof日志开启 有需要就开启,它会每次写操作都记录一条日志

3、创建集群
先安装 ruby,因为 redis 的集群协调程序是用 ruby 写的

1
yum -y install ruby ruby-devel rubygems rpm-build

再安装gem,在编译目录下执行

1
gem install redis

运行每个redis实例:

1
redis-server redis.conf

复制编译目录下的src目录中的redis-trib.rb到/usr/local/bin,然后运行
在编译目录的src子目录下执行,其中host为各redis节点的绑定ip(如果绑定的ip是0.0.0.0则必须指定为对外开放的ip,否则会默认绑定127.0.0.1,在slot重定向时会报错),设置每个主分片有一个副本分片

1
./redis-trib.rb create --replicas 1 host1:port1 host2:port2 ...

4、测试
为了连到集群上,需要在 redis-cli 请求后加上-c 参数,比如

1
redis-cli -h 192.168.31.245 -c -p 7002

在普通set和get时,redis会自动计算出目标地址。
5、Java客户端连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RedissonTest {

private static final Random random = new Random();

static int succeed = 0;

static int failed = 0;

public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001", "redis://127.0.0.1:7002", "127.0.0.1:7003")
.addNodeAddress("redis://127.0.0.1:7004", "redis://127.0.0.1:7005", "127.0.0.1:7006");
RedissonClient redissonClient = Redisson.create(config);
while (true) {
try {
RBucket<Object> bucket = redissonClient.getBucket(Integer.toString(random.nextInt()));
bucket.get();
succeed++;
log.info("调用成功, 当前 succeed:{}, failed:{}", succeed, failed);
} catch (Exception e) {
failed++;
log.info("调用失败, 当前 succeed:{}, failed:{}", succeed, failed, e.getMessage());
}
Thread.sleep(500);
}
}
}

info 命令中涉及集群部署的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Replication
# 当前副本角色,如果实例不是任何节点的从节点,则该值是“master”,如果实例从某个节点同步数据,则是“slave”
role:master
# 已连接的从节点数
connected_slaves:2
# 每个从节点的信息,包括ID、地址、端口号、状态
slave0:ip=10.32.140.18,port=6222,state=online,offset=1745391794554,lag=0
slave1:ip=10.32.140.15,port=6212,state=online,offset=1745391807778,lag=0
master_replid:6a64bcbbcae91324e72e24745392275e2f1382ea
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:1745391878919
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:16777216
repl_backlog_first_byte_offset:1745375101704
repl_backlog_histlen:16777216

cluster info 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
10.32.140.15:6212> cluster info
# ok状态表示集群可以正常接受查询请求。fail 状态表示,至少有一个哈希槽没有被绑定(说明有哈希槽没有被绑定到任意一个节点),或者在错误的状态(节点可以提供服务但是带有FAIL 标记),或者该节点无法联系到多数master节点。
cluster_state:ok
# 已分配到集群节点的哈希槽数量(不是没有被绑定的数量)。16384个哈希槽全部被分配到集群节点是集群正常运行的必要条件.
cluster_slots_assigned:16384
# 哈希槽状态不是FAIL 和 PFAIL 的数量
cluster_slots_ok:16384
# 哈希槽状态是 PFAIL的数量。只要哈希槽状态没有被升级到FAIL状态,这些哈希槽仍然可以被正常处理。PFAIL状态表示我们当前不能和节点进行交互,但这种状态只是临时的错误状态
cluster_slots_pfail:0
# 哈希槽状态是FAIL的数量。如果值不是0,那么集群节点将无法提供查询服务,除非cluster-require-full-coverage被设置为no
cluster_slots_fail:0
# 集群中节点数量,包括处于握手状态还没有成为集群正式成员的节点
cluster_known_nodes:9
# 至少包含一个哈希槽且能够提供服务的master节点数量
cluster_size:3
# 集群本地Current Epoch变量的值。这个值在节点故障转移过程时有用,它总是递增和唯一的
cluster_current_epoch:3
# 当前正在使用的节点的Config Epoch值. 这个是关联在本节点的版本值
cluster_my_epoch:2
cluster_stats_messages_ping_sent:27870482
cluster_stats_messages_pong_sent:27072640
cluster_stats_messages_meet_sent:4
cluster_stats_messages_sent:54943126
cluster_stats_messages_ping_received:27072636
cluster_stats_messages_pong_received:27865273
cluster_stats_messages_meet_received:4
cluster_stats_messages_fail_received:4
cluster_stats_messages_publish_received:7514286
cluster_stats_messages_received:62452203

cluster nodes

1
2
3
4
5
6
7
8
# 节点ID,IP地址:端口号,标识,上一次发送 ping 包的时间,上一次收到 pong 包的时间,连接状态,节点使用的哈希槽
127.0.0.1:7001> cluster nodes
fcdcafe5482daa80d0a382f675e8caced2d6ce63 127.0.0.1:7001@17001 myself,slave b2d0be87492d75bce14d3e50f687ce8a7872ef73 0 1603765953000 7 connected
75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 127.0.0.1:7002@17002 master - 0 1603765955026 2 connected 5461-10922
b2d0be87492d75bce14d3e50f687ce8a7872ef73 127.0.0.1:7004@17004 master - 0 1603765952012 7 connected 0-5460
8bf573a595a3955293e37f2e34e20c4cfb469060 127.0.0.1:7003@17003 master - 0 1603765954021 3 connected 10923-16383
5d599dea56108003633cd27d13abf87a9ef07d52 127.0.0.1:7005@17005 slave 75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 0 1603765953000 2 connected
68e6ddbf689bc6d51ebfadd24064fc6cc8204210 127.0.0.1:7006@17006 slave 8bf573a595a3955293e37f2e34e20c4cfb469060 0 1603765954000 3 connected

测试结果预期

测试一些Cluster宕机的情况,预期会有以下结论:

  1. 关闭任意一主,会导致部分写操作失败,是由于从节点不能执行写操作,在Slave升级为Master期间会有少量的失败。
  2. 关闭从节点对于整个集群没有影响。
  3. 如果半数以上 Master 处于关闭状态那么整个集群处于不可用状态。
    原因:Redis Cluster的选举需要有Master参与,如果多半的Master都挂掉了,也就不能再支持选举新Master了。
  4. 关闭任意一对主从节点会导致部分(大约为整个集群的1/3)失败。
    Master宕机了,且没有替补的Slave,则分配给这个Master的slot就不可用了。

测试 - 压测

测试 - 宕机1台Master

下面的命令将7001干掉后测试集群的主从迁移情况。
刚开始集群7001、7002、7003是Master:

1
2
3
4
5
6
7
127.0.0.1:7001> cluster nodes
5d599dea56108003633cd27d13abf87a9ef07d52 127.0.0.1:7005@17005 slave 75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 0 1603615592617 2 connected
fcdcafe5482daa80d0a382f675e8caced2d6ce63 127.0.0.1:7001@17001 myself,master - 0 1603615591000 1 connected 0-5460
b2d0be87492d75bce14d3e50f687ce8a7872ef73 127.0.0.1:7004@17004 slave fcdcafe5482daa80d0a382f675e8caced2d6ce63 0 1603615591614 1 connected
8bf573a595a3955293e37f2e34e20c4cfb469060 127.0.0.1:7003@17003 master - 0 1603615591000 3 connected 10923-16383
75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 127.0.0.1:7002@17002 master - 0 1603615592000 2 connected 5461-10922
68e6ddbf689bc6d51ebfadd24064fc6cc8204210 127.0.0.1:7006@17006 slave 8bf573a595a3955293e37f2e34e20c4cfb469060 0 1603615593620 3 connected

将7001 kill掉后,请求7001服务器的请求都会失败,大约20秒后请求恢复,且观察日志可以发现,原来7001的Slave-7004现在替补上来成为了Master:

1
2
3
4
5
6
7
8
9
10
11
18:29:28.186 [main] INFO  c.t.l.r.RedissonTest - 调用成功, 当前 succeed:15, failed:0
18:29:33.475 [main] INFO c.t.l.r.RedissonTest - 调用失败, 当前 succeed:15, failed:1
18:29:38.750 [main] INFO c.t.l.r.RedissonTest - 调用失败, 当前 succeed:15, failed:2
18:29:39.253 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:16, failed:2
18:29:39.756 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:17, failed:2
18:29:40.259 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:18, failed:2
18:29:40.763 [main] INFO c.t.l.r.RedissonTest - 调用成功, 当前 succeed:19, failed:2
18:29:45.271 [redisson-netty-2-4] INFO o.r.c.MasterSlaveEntry - master 127.0.0.1/127.0.0.1:7004 used as slave
18:29:45.274 [redisson-netty-2-14] INFO o.r.c.p.PubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:7004
18:29:45.280 [redisson-netty-2-4] WARN o.r.c.ClusterConnectionManager - slave: redis://127.0.0.1:7001 has down for slot ranges: [[0-5460]]
18:29:45.285 [redisson-netty-2-28] INFO o.r.c.p.SlaveConnectionPool - 24 connections initialized for 127.0.0.1/127.0.0.1:7004

之后重启7001后,发现7001重新加入到了集群中:

1
2
3
4
18:34:18.539 [redisson-netty-2-29] INFO  o.r.c.p.PubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:7001
18:34:18.542 [redisson-netty-2-4] INFO o.r.c.MasterSlaveEntry - master 127.0.0.1/127.0.0.1:7004 excluded from slaves
18:34:18.542 [redisson-netty-2-4] INFO o.r.c.ClusterConnectionManager - slave: redis://127.0.0.1:7001 has up for slot ranges: [[0-5460]]
18:34:18.544 [redisson-netty-2-6] INFO o.r.c.p.SlaveConnectionPool - 24 connections initialized for 127.0.0.1/127.0.0.1:7001

测试 - 宕机2台Master

把两台Master干掉后,两个Master均进入fail状态,这时集群也会进入fail状态,选举不会成功。

1
2
3
4
5
6
7
localhost:7001> cluster nodes
fcdcafe5482daa80d0a382f675e8caced2d6ce63 127.0.0.1:7001@17001 myself,slave b2d0be87492d75bce14d3e50f687ce8a7872ef73 0 1603622378000 7 connected
75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 127.0.0.1:7002@17002 master - 0 1603622381510 2 connected 5461-10922
b2d0be87492d75bce14d3e50f687ce8a7872ef73 127.0.0.1:7004@17004 master - 0 1603622379504 7 connected 0-5460
8bf573a595a3955293e37f2e34e20c4cfb469060 127.0.0.1:7003@17003 master - 0 1603622380507 3 connected 10923-16383
5d599dea56108003633cd27d13abf87a9ef07d52 127.0.0.1:7005@17005 slave 75d6d1e3d4d64eb6fb85d1ac7d883ecef4ac5e7e 0 1603622378498 2 connected
68e6ddbf689bc6d51ebfadd24064fc6cc8204210 127.0.0.1:7006@17006 slave 8bf573a595a3955293e37f2e34e20c4cfb469060 0 1603622380000 3 connected

集群fail后,客户端之后的请求也都失败了。

分布式锁

1
2
3
set key value ex nx
do sth...
del key
  1. 为什么要加过期时间?
    怕中间遇到异常退出,del 没有执行,导致陷入死锁。
    set 命令现在支持加 ex 和 nx 参数,既保证原子性,又支持加过期时间。
  2. 过早过期释放别人的锁
    业务执行时间过长,锁过期了,可能导致其他线程先获取到了锁,这样当前线程 del 的时候相当于将别人的锁释放掉了。
    可以给 value 设置一个随机数,释放前检查一下,这是个“读后写”的过程,为了保证其原子性,一般会使用 Lua 脚本来实现(类似 Redisson 中的实现)。
    Redisson 中的解决方案是加一个“看门狗”,定时刷新过期时间。
  3. 重入性
    为了实现可重入性,一般是在客户端使用 ThreadLocal 存储当前持有锁的计数。
    在 Redisson 中,可重入锁是通过在 Lua 脚本中对客户端线程 ID 进行计数来实现的。

延时队列

异步消息队列

可以使用 list 数据结构来实现异步消息队列,使用 rpush/lpush 操作入队列,使用 lpop 和 rpop 来出队列。

1
2
3
4
5
6
rpush notify-queue a b
llen notify-queue
lpop notify-queue
llen notify-queue
lpop notify-queue
...
  1. 如果队列空了怎么办?
    客户端通过轮询队列 pop 获取消息处理,如果队列空了,那么就会陷入 pop 的死循环。
    一般的解决办法是sleep,每次 pop 后可以暂停个 1 秒。
    但是sleep同样会带来延迟增大的问题,因此,更好的解决办法是blpop / brpop,即阻塞读:
    1
    2
    // 等待一秒若还没有数据则直接返回
    blpop notify-queue 1
  2. 空闲连接
    如果线程一直阻塞,则 Redis 的客户端连接将成为闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用。这个时候 blpop/brpop 会抛出异常来,这时要注意对异常的捕获和重试
  3. 锁冲突处理
    加锁失败一般有 3 种处理方式:
    • 直接抛出异常,由用户稍后重试;
      适合由用户直接发起的请求,比如商城下单,下单失败后由用户决定是否重新请求。
    • sleep
      sleep 会阻塞消息处理线程,会导致后续消息处理出现延迟,不适合高并发(锁冲突频繁)或队列消息较多的情况,并且,如果是由于死锁导致的加锁不成功,sleep 将导致线程一直处于阻塞状态、后续的消息永远得不到处理。
    • 延时队列
      将当前冲突的请求扔到另一个队列延后处理以避开冲突,适合异步消息处理的场景。

下面的实现例子来自《Redis 深度历险》:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class RedisDelayingQueue<T> {

static class TaskItem<T> {
public String id;
public T msg;
}

// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();

private Jedis jedis;
private String queueKey;

public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}

public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString();
task.msg = msg;
String s = JSON.toJSONString(task);
// key是当前时间
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
}

public void loop() {
while (!Thread.interrupted()) {
// 取时间范围是0到当前时间内的一条记录
// 第3个参数0表示不记分数,
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
// 暂停一会,避免浪费CPU
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
// 说明抢到了
if (jedis.zrem(queueKey, s) > 0) {
TaskItem<T> task = JSON.parseObject(s, TaskType);
this.handleMsg(task.msg);
}
}
}

public void handleMsg(T msg) {
System.out.println(msg);
}

public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {

public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}

};
Thread consumer = new Thread() {

public void run() {
queue.loop();
}

};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}

一些优化点:

  1. 注意zrangeByScorezrem不是一个原子操作,可能会有多个线程争抢同一个 key,这可以通过 lua 脚本来优化。
  2. 使用 Redis 作为消息队列并不能保证 100%的可靠性,因为 rem 执行后如果客户端崩溃了消息就丢失了,或者 Redis 崩溃了消息也有可能丢失(这个可以通过主备来避免)。

阻塞队列

使用Redis实现队列可以利用lpop/rpush或rpop/lpush,但是这两组命令在遇到队列为空或满时还是会直接返回,如果要实现阻塞队列的阻塞等待能力,可以:
1、使用lua脚本轮询
lua脚本中先检测队列是否为空/满,在不空/不满的情况下才执行后续的操作。
如果为空/满,则客户端先等待一会再执行一次该lua脚本。
缺点是会有很多空轮询。
2、使用blpop命令
在list结构尚不存在元素的情况下,blpop命令会先将客户端挂起,等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {

...

// 关联阻塞客户端和键的相关信息
for (j = 0; j < numkeys; j++) {

/* If the key already exists in the dict ignore it. */
// c->bpop.keys 是一个集合(值为 NULL 的字典)
// 它记录所有造成客户端阻塞的键
// 以下语句在键不存在于集合的时候,将它添加到集合
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;

incrRefCount(keys[j]);

/* And in the other "side", to map keys -> clients */
// c->db->blocking_keys 字典的键为造成客户端阻塞的键
// 而值则是一个链表,链表中包含了所有被阻塞的客户端
// 以下程序将阻塞键和被阻塞客户端关联起来
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
// 链表不存在,新创建一个,并将它关联到字典中
int retval;

/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
// 将客户端填接到被阻塞客户端的链表中
listAddNodeTail(l,c);
}
blockClient(c,REDIS_BLOCKED_LIST);
}

执行指令结束后,处理解除了阻塞的键。
redis.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int processCommand(redisClient *c) {

...

/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 在事务上下文中
// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外
// 其他所有命令都会被入队到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 执行命令
call(c,REDIS_CALL_FULL);

c->woff = server.master_repl_offset;
// 处理那些解除了阻塞的键
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}

return REDIS_OK;
}

可以看到,执行命令的末尾需要处理解除了阻塞的键,遍历这些键然后唤醒等待的客户端。
t_list.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/* 这个函数会在 Redis 每次执行完单个命令、事务块或 Lua 脚本之后调用。
*
* 对所有被阻塞在某个客户端的 key 来说,只要这个 key 被执行了某种 PUSH 操作
* 那么这个 key 就会被放到 serve.ready_keys 去。
*
* 这个函数会遍历整个 serve.ready_keys 链表,
* 并将里面的 key 的元素弹出给被阻塞客户端,
* 从而解除客户端的阻塞状态。
*
* 函数会一次又一次地进行迭代,
* 因此它在执行 BRPOPLPUSH 命令的情况下也可以正常获取到正确的新被阻塞客户端。
*/
void handleClientsBlockedOnLists(void) {

// 遍历整个 ready_keys 链表
while(listLength(server.ready_keys) != 0) {
list *l;

/* Point server.ready_keys to a fresh list and save the current one
* locally. This way as we run the old list we are free to call
* signalListAsReady() that may push new elements in server.ready_keys
* when handling clients blocked into BRPOPLPUSH. */
// 备份旧的 ready_keys ,再给服务器端赋值一个新的
l = server.ready_keys;
server.ready_keys = listCreate();

while(listLength(l) != 0) {

// 取出 ready_keys 中的首个链表节点
listNode *ln = listFirst(l);

// 指向 readyList 结构
readyList *rl = ln->value;

/* First of all remove this key from db->ready_keys so that
* we can safely call signalListAsReady() against this key. */
// 从 ready_keys 中移除就绪的 key
dictDelete(rl->db->ready_keys,rl->key);

/* If the key exists and it's a list, serve blocked clients
* with data. */
// 获取键对象,这个对象应该是非空的,并且是列表
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == REDIS_LIST) {
dictEntry *de;

/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
// 取出所有被这个 key 阻塞的客户端
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);

while(numclients--) {
// 取出客户端
listNode *clientnode = listFirst(clients);
redisClient *receiver = clientnode->value;

// 设置弹出的目标对象(只在 BRPOPLPUSH 时使用)
robj *dstkey = receiver->bpop.target;

// 从列表中弹出元素
// 弹出的位置取决于是执行 BLPOP 还是 BRPOP 或者 BRPOPLPUSH
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
REDIS_HEAD : REDIS_TAIL;
robj *value = listTypePop(o,where);

// 还有元素可弹出(非 NULL)
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);

// 取消客户端的阻塞状态
unblockClient(receiver);

// 将值 value 推入到造成客户端 receiver 阻塞的 key 上
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == REDIS_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}

if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
// 如果执行到这里,表示还有至少一个客户端被键所阻塞
// 这些客户端要等待对键的下次 PUSH
break;
}
}
}

// 如果列表元素已经为空,那么从数据库中将它删除
if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}

/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}

位图

场景

位图数据结构与 java 中的 Set 类似,占用空间小,但是并不能防止冲突,适合数据离散性比较大且对数据准确性不高的场景。

  1. 统计月活
    统计月活的时候我们需要对 userId 进行去重,每个用户就可以定位到这个位图上的一个确定的位置上,0 表示不活跃,1 表示活跃,遍历一次就可以知道月活用户数有多少。

使用

1
2
3
4
set s a
getbit s 2
setbit s 6 1
get s

注意下标并不是从低位到高位递增的,而是反过来的,a 的 ASCII 码值是01100001setbit s 6 1设置第 7 位为 1 后值就变成了 c。

1
2
3
4
5
6
// 指定范围内1的个数
bitcount s 0 7
// 第一个1的位置
bitpos s 1
// 下标0到7范围内第一个1的位置
bitpos s 1 0 7

比如用位数组记录用户登录的日期,bitcount 可以用于统计用户一共签到了多少天,bitpos 可以用于查找用户从哪一天开始第一次签到。

1
2
3
4
5
6
// 从第一个位(0)开始取4个位,结果是无符号数(u)
bitfield s get u4 0
// 从第三个位(2)开始取3个位,结果是有符号数(i)
bitfield s get i3 2
// 一次性执行多个子命令
bitfield s get u4 0 get u3 2 get i4 0 get i3 2

如果 s 的值是 c,二进制值是0110 0011,取前 4 位结果是 6(0110)。

1
2
3
4
5
6
// +1,结果为0,因为溢出了
bitfield s incrby u2 1 1
// 不执行,返回nil
bitfield s overflow fail incrby u2 1 1
// 结果为3
bitfield s overflow sat incrby u2 1 1

位的增加操作有 3 种策略:

  1. 默认的 wrap:
  2. fail:失败报错不执行
  3. sat:饱和截断,如果有溢出就停留在最大最小值。

HyperLogLog

HyperLogLog 主要用于大数据量的计数,比如访问频繁的页面需要统计 UV(一天内访问的用户数),不同于 PV,UV 需要去重。
HyperLogLog 只能粗略统计,理论上会有不到 1%的误差。

使用

1
2
3
4
5
6
7
8
pfadd s user1
pfcount s
pfadd s user1
pfadd s user2
// 结果仍然为2
pfcount s
// 将两个计数器累加
pfmerge s b

布隆过滤器

HyperLogLog 可以用于计数,但是不能用于判断一个值是否存在于该结构里,这时最好使用布隆过滤器(Bloom Filter):

  1. 去重;
  2. 支持 contains 判断;
  3. 节省空间;
  4. 不精确,存在误判的可能(当布隆过滤器说某个值存在时,这个值可能不存在,当它说不存在时,那就肯定不存在,这个问题是由 hash 函数的冲突引起的);
  5. 不支持计数。

使用

插入值并且判断该值是否存在于 bf 对象内:

1
2
3
4
5
bf.add s user1
bf.add s user2
bf.madd s user3 user4
bf.exists s user1
bf.mexists s user1 user5

第一次 bf.add 会创建一个默认参数的布隆过滤器,也可以显式创建:

1
2
// 创建一个名为key的布隆过滤器,错误率0.1,预计放入的元素数量为10
bf.reserve key 0.1 10
  • 错误率越低,需要的空间越大,默认值为 0.01
    对于不需要非常精确的场合,错误率设置得稍大一点也无伤大雅。
  • 当实际数量超出预计放入数量时,误判率会上升,默认值为 100
    该值设置得过大,会浪费存储空间,估计得过小,就会影响准确率,最理想的情况下是略大于实际元素数量。

当实际元素数超出初始化大小时,应该对布隆过滤器进行重建,重新分配一个 size 更大的过滤器,再将所有的历史元素批量 add 进去(比如如果布隆过滤器保存的是 userId,那么就需要把历史 userId 重新保存到一个新的布隆过滤器中)。

原理

布隆过滤器本身是一个大型的位数组和几个不同的无偏 hash 函数。

无偏指的是该 hash 函数能把值分布得比较均匀。

  • add:使用这些 hash 函数计算 hash(key)%length(bit_arr),每个 hash 函数都可以得到一个位置,将这些位置置为 1;
  • exists:同样用这些 hash 函数计算位置,如果有一个不为 1 说明 key 不存在,如果都为 1 也不一定说明 key 一定存在,因为这些位置可能会被其他 key 设置到。

GeoHash

一种简单的方法

找一个节点附近的所有节点,可以近似看做以该节点为中心的一个矩形范围内有哪些节点,可以用类似下面的 SQL 来查询:

1
select id from positions where x0-r < x < x0+r and y0-r < y < y0+r

对 x 和 y 字段加索引后该 SQL 的性能也不会太差。

GeoHash

GeoHash 的基本思想是“降维”,将二维坐标映射到直线上的一个点,要寻找二维平面上的“附近的人”就相当于在直线上找相邻的点。
GeoHash 本质是一个 zset(带 score 的 set),底层结构是 skiplist:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
geoadd company 116.489033 40.007669 meituan
geoadd company 116.562108 39.787602 jd 116.334255 40.027400 xiaomi
// geoadd指令添加一个节点,因为geo存储结构上使用的是zset,因此可以使用zset相关的指令来操作,比如可以使用zrem指令来删除
zrem company jd
geoadd company 116.562108 39.787602 jd
// 计算两个节点之间的距离
geodist company meituan jd
// 单位km
geodist company meituan jd km
// 获取节点位置
geopos company meituan jd
// 获取元素的hash值
geohash company meituan
// 查询指定元素附近的其他元素
// 范围20公里内最多3个元素按距离正序排列,
georadiusbymember company meituan 20 km count 3 asc

GeoHash 算法的执行流程如下:

  1. 将坐标编码为一个 52 位整数,这个整数也可以还原为原坐标;
  2. zset 的 value 是元素的 key(即上面的 meituan、jd 等),score 是 GeoHash 的 52 位整数值,通过 zset 的 score 排序即可得到坐标附近的其它元素。

其中主要问题是坐标是如何编码的,编码后为什么可以通过比较大小来判断是否相邻?
GeoHash 算法的原理是按经纬度区间对半分来进行编码,比如,维度的区间是(-90, 90),如果坐标的维度值大于 0,则记 1,否则记 0,进一步的,如果坐标维度大于 45,则记 1,否则记 0,以此类推,对于39.918118来说,最后的编码值就是10111000110001011011

一般实现中还会进行 BASE64 编码,本质是一样的,只是编码后占用的空间更小了。

Scan

扫描大量 key 找到目标 key 的需求有两种实现方法:

  1. keys 命令
    不支持分页,一次性吐出所有满足条件的 key,如果数据量过大、耗时过长,可能导致服务端卡顿。

    因为 Redis 是单线程模型。

  2. scan
    和 keys 命令一样提供模式匹配,但它是通过游标扫描的,每次只扫描指定数量的数据,并将其中匹配的结果返回。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 从cursor=0开始,匹配"key99*",共扫描1000条
scan 0 match key99* count 1000
1) "11928"
2) 1) "key9956"
2) "key9993"
3) "key9911"
4) "key996"
5) "key9933"
6) "key9962"
7) "key991"
8) "key9981"
9) "key9990"
10) "key9946"
11) "key9971"
12) "key99"
13) "key9951"

返回值中,第一个”11928”是结果中最后一条的 cursor,下一次遍历时可以使用该值作为初始 cursor。

1
2
3
scan 11928 match key99* count 10
1) "3480"
2) (empty list or set)

结果为空集合并不意味着遍历结束了,只有 cursor=0 才是遍历结束的标识。

1
2
3
4
5
6
7
8
scan 11928 match key99* count 10000
1) "0"
2) 1) "key9952"
2) "key9961"
3) "key9988"
4) "key9931"
5) "key9998"
...忽略更多的

原理

Redis 中所有 key 都存储在一个非常大的字典中,这个字典的结构和 Java 的 HashMap 类似:
Redis-字典

  1. 一维数组大小为 2^n(n >= 0),扩容一次大小翻倍,保存的是所有 key 的下标,或者称为槽(slot);
  2. 二维链表保存的是所有的 key,不同 key 是有可能被 hash 到同一个槽上的,这时这个槽里所有元素都会被模式匹配过滤后一次性返回。

scan 的遍历并不是从 0 开始递增,而是通过二进制高位进位加法来遍历,比如遍历顺序:

  1. 0000 -> 0
  2. 1000 -> 8
  3. 0100 -> 4
  4. 1100 -> 12
  5. 以此类推

Redis-高位递增遍历
这种遍历方式的好处是可以避免扩容缩容后相同元素被反复遍历到。
因为槽数组的长度总是 2 的 n 次方,因此取模运算等价于位与操作,比如原来数组长度为 8,15(1111)会被 hash 到 7 号槽,而扩容后数组长度变成 16,增加了一个高位的 1,15(1111)会被 hash 到 15 号槽。
因此从低到高的遍历方式可能会导致重复遍历,而从高到低的方式则可以避免。

Redis 的 rehash 是渐进式的,在 rehash 的过程中,操作需要同时访问新旧的两个数组结构,如果旧的找不到就需要再到新的下面去找,scan 同理。

大 key 问题

大 key 会导致数据迁移和扩容时需要分配更大的一块内存空间,导致卡顿,删除时,同样需要一次性回收大 key,卡顿会再一次产生。
为了定位大 key,可以使用 redis-cli 的扫描功能

1
2
3
4
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys
# 如果担心这个指令会大幅抬升 Redis 的 ops 导致线上报警,还可以增加一个休眠参数
# 每隔100条scan指令就会休眠0.1s
redis-cli -h 127.0.0.1 -p 7001 –-bigkeys -i 0.1

通过操作 Thread、Object 的 API 和内置锁我们可以解决大部分的线程协调问题,但这绝不是最优雅的方式,接下来我们就来见识一下 JUC 中的各种高效线程同步工具。

阅读全文 »

RPC

客户端 OR 服务端 异步

客户端异步比较常见,因为任何网络调用基本都可以方便地包装成异步调用,朴实无华且枯燥,下载这段代码是一个简单的 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ClientAsync {

private static class ClientAsyncExecutor {

private ExecutorService threadPool;
private Supplier<String> dataSupplier;
private Consumer<String> callback;

public ClientAsyncExecutor(ExecutorService threadPool,
Supplier<String> dataSupplier,
Consumer<String> callback) {
this.threadPool = threadPool;
this.dataSupplier = dataSupplier;
this.callback = callback;
}

public void call() {
threadPool.submit(() -> {
String data = dataSupplier.get();
callback.accept(data);
});
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientAsyncExecutor executor = new ClientAsyncExecutor(
threadPool,
() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "done";
},
res -> System.out.println("结果:" + res));
executor.call();
System.out.println("call returned");
threadPool.shutdown();
threadPool.awaitTermination(10000, TimeUnit.SECONDS);
}
}

客户端异步是一种伪异步,本质上仍是同步调用,只不过等待是放到另外一个线程中去做的,如果这样等待的线程比较多,对客户端的线程池容易造成压力,。
服务端的异步实现起来就比较费劲了,因为客户端需要额外提供一个入口来接收服务端执行完毕的结果:
服务端异步

良好习惯

不论是桌面应用还是 Web 应用,多线程代码都是比较难玩得转的,玩不明白的结果就是一大堆令人毛骨悚然且难以捉摸、难以调试的问题——实际上,一旦你意识到正在处理一个并发问题,你可能就不得不完全放弃调试了,并转而手动检查代码。
鉴于此,我们当然是希望尽量避免并发问题的,理想情况下希望完全避免多线程错误,同样,不存在那种一刀切的方法,但这有一些调试和防止多线程错误的实际考虑因素:

  1. 避免全局状态
    首先,牢记 “全局状态” 问题。如果你正创建一个多线程应用,那么应该密切关注任何可能全局修改的内容,如果可能的话,将他们全部删掉,如果部分全局变量确实有理由保留,那么应该仔细保证其并发安全,并对程序性能进行跟踪,以确定不会因为引入新的等待时间而导致系统性能降低(并发修改时需要同步多个线程)。
  2. 避免可变性
    这点直接来自于 函数式编程,并且适用于 OOP,声明应该避免类和对象状态的改变。简而言之,这意味着放弃 setter 方法,并在需要避免可变性的类或字段上拥有私有的 final 字段,它们的值唯一发生变化的时间是在构造期间。这样,你可以确定不会出现争用问题,且访问对象属性将始终提供正确的值。
  3. 日志及报警
    评估你的程序可能会在何处发生异常,并预先记录所有关键数据。如果发生错误,你将很高兴可以得到信息说明收到了哪些请求,并可更好地了解你的应用程序为什么会出现错误。需要再次注意的是,日志记录引入了额外的文件 I/O,可能会严重影响应用的性能,因此请不要滥用日志。
    在记日志的基础上,有必要根据 SLA 记录一些指标的报警阈值,比如订单中心下单失败,考虑可能是网络出现抖动引起超时(如果用到三方服务这个问题会更明显),因此报警阈值可以稍微调高一些,比如 1 分钟 3 次失败就打电话报警。
  4. 复用现存实现
    每当你需要创建自己的线程时(例如:向不同的服务发出异步请求),复用现有的安全实现来代替创建自己的解决方案。这在很大程度上意味着要使用 ExecutorService 和 Java 8 简洁的函数式 CompletableFuture 来创建线程。Spring 还允许通过 DeferredResult 类来进行异步请求处理。

反模式-异步狂热

上边我们已经讨论过异步存在的优势和劣势,实际上在我读过的项目代码中,确实存在不少那种不异步不开心的“炫技代码”,给维护带来很大困难。
下面是一个非常直观的方法:

1
2
3
public String punch(People t) {
return "Oh, No";
}

经过一个莫名其妙的异步包装,原来的可扩展性、性能均没有提升,甚至性能成功降低了:

1
2
3
4
5
6
7
8
9
10
public Function doPunch(People t) {
return t -> {
return "Oh, No";
}
}

public String punch(People t) {
Function f = doPunch(t);
return f.apply(t);
}

实际情况可能会复杂得多,这样的代码不够简单直观、容易暗藏 Bug、不符合 KISS 原则,但是即便如此,还是有很多人会觉得特别绕的代码能体现一个人的水平、对代码的驾驭能力、能灵活运用设计模式的能力,我觉得大部分情况下事实并非如此。

Spring - DeferredResult

TODO

Hystrix - Command

Hystrix 的 Command 框架在 CompletableFuture 的基础上提供了合并请求的特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class BatchGetDataCommand extends HystrixCommand<List<Double>> {

private Collection<CollapsedRequest<Double, Long>> requests;

public BatchGetDataCommand(Collection<CollapsedRequest<Double, Long>> requests) {
super(Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("batchGetData")));
this.requests = requests;
}

@Override
protected List<Double> run() throws Exception {
// TODO: 做些批量查询操作,这里作为示范直接返回
return requests.stream()
.map(CollapsedRequest::getArgument)
.map(arg -> (double) arg)
.collect(Collectors.toList());
}
}

public class SimpleGetDataCommand extends HystrixCollapser<List<Double>, Double, Long> {

private Long id;

public SimpleGetDataCommand(Long id) {
super(HystrixCollapser.Setter
.withCollapserKey(HystrixCollapserKey.Factory.asKey("getData"))
.andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
.withMaxRequestsInBatch(2)
.withTimerDelayInMilliseconds(5)
// 允许缓存request的结果
.withRequestCacheEnabled(true))
.andScope(Scope.REQUEST));
this.id = id;
}

@Override
public Long getRequestArgument() {
return id;
}

@Override
protected HystrixCommand<List<Double>> createCommand(Collection<CollapsedRequest<Double, Long>> collapsedRequests) {
return new BatchGetDataCommand(collapsedRequests);
}

@Override
protected void mapResponseToRequests(final List<Double> batchResponse, Collection<CollapsedRequest<Double, Long>> collapsedRequests) {
final AtomicInteger count = new AtomicInteger();
collapsedRequests.forEach(request -> {
request.setResponse(
batchResponse.get(count.getAndIncrement()));
});
}

public static void main(String[] args) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
// Hystrix内部将多个查询合并成一个
SimpleGetDataCommand command1 = new SimpleGetDataCommand(1L);
SimpleGetDataCommand command2 = new SimpleGetDataCommand(2L);
// 这里需要先使用queue而不是execute
Future<Double> f1 = command1.queue();
Future<Double> f2 = command2.queue();
System.out.println(f1.get());
System.out.println(f2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
context.shutdown();
}
}
}

任务调度

补偿执行

异常订单表,定时任务

定时任务的实现

无分布式的,HashTimeWheelTimer
分布式情况下,可以先用 Redis,当复杂时再用 RabbitMQ 等消息队列中间件。

消息 OR 定时任务

很多异步实现的功能既可以通过消息实现又可以通过定时任务来实现,我经历过很多需要抉择的情况,甚至也碰到过对此都不大清楚的架构师,我认为对此没有唯一的答案,只能根据具体业务场景来分析,一些要点可供参考。

  • 分布式

  • 并发安全

  • 运维便利性
    消息本质上是把任务暂存在队列服务里,统计某段时间内发生了什么、会发生什么就比较困难了,因为往往消息队列都会有自定义的数据格式,判断一条消息是否被消费往往得通过日志来判断。
    定时任务一般都会有执行记录,什么时间会执行任务也可以直接用 cron 表达式计算出来,缺点是使用定时任务意味着需要自己维护很多东西,比如在数据库里维护一个队列保存消息,保存有创建时间、重试次数、重试时间等,失败 N 次后还需要存一份失败记录。

以一个“通知拉取”的场景举例,A 系统需要从 B 系统拉取数据,但并不是定时地直接调 B 的接口,而是先由 B 通知 A 哪些数据发生了变更,然后由 A 去拉取这些数据的变更部分:

  • 如果使用消息队列来实现:
  • 如果使用定时任务实现:

使用 Thread 实现任务调度

实现任务调度最简单的方式可以直接利用 Thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
Thread thread = new Thread(() -> {
while (true) {
String task = "";
try {
task = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(task);
}
});
thread.start();
queue.put("hello");

Timer

Timer 内部通过一个 TimerThread 来循环执行提交给 Timer 的任务,因此任务是串行的,前一个任务的延迟会影响后续的所有任务。
Timer 可以看做一种简化版的 ScheduledExecutor,下面是一种 Demo 实现:

1
2
3
4
5
6
7
8
9
10
11
12
public class DemoTimer {
private static ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);

public static void setTimeout(final Runnable function, long time,
TimeUnit timeUnit, final Executor executor) {
Preconditions.checkArgument(time >= 0);
Preconditions.checkNotNull(function);
Preconditions.checkNotNull(timeUnit);
Preconditions.checkNotNull(executor);
timer.schedule(() -> executor.execute(function), time, timeUnit);
}
}

ScheduledExecutor

ScheduledExecutor 能提供一种更灵活的周期性任务处理功能,

Quartz

Spring Task

Quartz 集群版

tbschedule

xxl-job

Elastic-Job

Elastic-Job架构
Elastic-Job 分为 Elastic-Job-Lite 和 Elastic-Job-Console 两个模块。Elastic-Job-Lite 实现了分布式任务调度、动态扩容缩容、任务分片、失效转移等功能。
如上图所示,Elastic-Job-Lite 采用去中心化的调度方式,由 Elastic-Job-Lite 的客户端定时自动触发任务调度,通过任务分片的概念实现服务器负载的动态扩容和缩容,并且使用 ZooKeeper 作为分布式任务调度的注册中心;当某任务实例崩溃后,自动失效转移,实现高可用。

消息队列(MQ)原理

MQ 有很多优势,当我们选择 MQ 时,主要是为了:

  • 解耦……
    比如 A 系统要将用户提交的数据推送到 B、C 两个系统的时候,最初的想法很有可能是直接用 http 或 rpc 调用实现。像这样的下游系统后来又会多出 D、E、F…,对 A 系统的压力就会越来越大,更复杂的场景中,数据通过接口传给其他系统有时候还要考虑重试、超时等一些异常情况。
    这时,对 A 来说更好的方案是将消息发送给 mq,不管有哪个下游系统需要这个数据都可以直接订阅这个 subject。
  • 异步
    当请求比较复杂,而其中有部分数据没必要实时更新时,可以用 mq 实现异步化。比如取消订单后需要做订单状态的更新、对账、退款等操作,而其中只有状态的变更是有必要实时反馈给用户的,那么后续的所有操作就完全可以做成异步的。
  • 削峰填谷
    消息队列作为缓冲队列应对突发流量时,并不能使处理速度变快,而是使处理速度变平滑,从而不会因瞬时压力过大而压垮应用。
    举个例子,比如我们的订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒 1000 左右的并发写入,并发量再高就容易宕机。
    低峰期的时候并发也就 100 多个,但是在高峰期时候,并发量会突然激增到 5000 以上,这个时候数据库肯定死了。
    但是使用了 MQ 之后,情况就变了,消息被 MQ 保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒 1000 个数据,这样慢慢写入数据库,这样就不会打死数据库了。
    如果没有用 MQ 的情况下,并发量高峰期的时候是有一个“顶峰”的,然后高峰期过后又是一个低并发的“谷”。
    但是使用了 MQ 之后,限制消费消息的速度为 1000,但是这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了。
    但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在 1000QPS,直到消费完积压的消息,这就叫做“填谷”。

高可用

使用了 MQ 之后,我们肯定是希望 MQ 有高可用特性,因为不可能接受机器宕机了,就无法收发消息的情况。
这一块我们也是基于 RabbitMQ 这种经典的 MQ 来说明一下:
RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种 MQ 的高可用性怎么实现。
rabbitmq 有三种模式:单机模式,普通集群模式,镜像集群模式
单机模式
单机模式就是 demo 级别的,就是说只有一台机器部署了一个 RabbitMQ 程序。
这个会存在单点问题,宕机就玩完了,没什么高可用性可言。一般就是你本地启动了玩玩儿的,没人生产用单机模式。
普通集群模式
这个模式的意思就是在多台机器上启动多个 rabbitmq 实例。类似的 master-slave 模式一样。
但是创建的 queue,只会放在一个 master rabbtimq 实例上,其他实例都同步那个接收消息的 RabbitMQ 元数据。
在消费消息的时候,如果你连接到的 RabbitMQ 实例不是存放 Queue 数据的实例,这个时候 RabbitMQ 就会从存放 Queue 数据的实例上拉去数据,然后返回给客户端。
总的来说,这种方式有点麻烦,没有做到真正的分布式,每次消费者连接一个实例后拉取数据,如果连接到不是存放 queue 数据的实例,这个时候会造成额外的性能开销。如果从放 Queue 的实例拉取,会导致单实例性能瓶颈。
如果放 queue 的实例宕机了,会导致其他实例无法拉取数据,这个集群都无法消费消息了,没有做到真正的高可用。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性可言了,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式
镜像集群模式才是真正的 rabbitmq 的高可用模式,跟普通集群模式不一样的是:创建的 queue 无论元数据还是 queue 里的消息都会存在于多个实例上,
每次写消息到 queue 的时候,都会自动把消息到多个实例的 queue 里进行消息同步。
这样的话任何一个机器宕机了别的实例都可以用提供服务,这样就做到了真正的高可用了。
但是也存在着不好之处:

  • 性能开销过高,消息需要同步所有机器,会导致网络带宽压力和消耗很重
  • 扩展性低:无法解决某个 queue 数据量特别大的情况,导致 queue 无法线性拓展。就算加了机器,那个机器也会包含 queue 的所有数据,queue 的数据没有做到分布式存储。
    对于 RabbitMQ 的高可用一般的做法都是开启镜像集群模式,这样起码来说做到了高可用,一个节点宕机了,其他节点可以继续提供服务。

高性能

  • 对于内存操作的线程分离,大部分中间件做法是将数据文件缓存与内存中,通过异步线程 flush 至硬盘
  • 合理的存储引擎对应不同的服务场景 B+树,hash,LSM
  • 对于消息队列,选取顺序读写磁盘的方式,可以高效的提升磁盘 IO 速度
  • 顺序写磁盘可以带来足够的写入速度,其读取方式为二分查找
  • 对于 LSM 存储引擎,同样采用顺序写磁盘方式,牺牲一部分读性能从而获得更优越的写性能

消息队列如何选型

异步处理

延时消费

应用隔离(系统解耦)

比如有两个主题的消息,其中 A 主题的消息特别多,别的消息就会来不及处理。
这种情况有点类似于服务治理中的隔离策略:一个服务出错不能影响别的服务不可用。一般会采用线程池、信号量来实现。
MQ 消息中的主题隔离

数据同步

Canel 订阅数据库 binlog 可以实现数据库数据变更捕获,然后业务端订阅 Canel 进行业务处理,这种方式可以保证一致性,且不会有乱序问题。

数据异构

反模式-为了撇清关系所以使用 MQ 消息

反模式-为了解耦过度使用 MQ 消息

反模式-利用数据差异化来触发事件

公司里有几位老员工基于 MQ 监听器组件开发了一套 MQ 客户端,这套 MQ 客户端的核心就是能通过修改数据来触发监听对应字段修改事件的监听器,这样可以避免定义一大堆主题,看起来似乎变简单了对吗?但是经过一段时间的维护发现情况并非如此,大量监听器不再根据主题来相互关联,而是数据中的一大堆字段,最开始的一批开发爽了,因为需要定义的主题少了,少了很多手动发消息的代码,但是后续维护的人就糟了,试想,每次希望修改某个字段的时候都需要把监听该字段修改事件的监听器都找一遍。
两条业务同时修改

MQ 存在的缺陷

上边已经说过了优点,那么 mq 又有哪些缺点呢?

  • 系统可用性降低
    上面的说解耦的场景,本来 A 系统的哥们要把系统关键数据发送给 B、C 系统的,现在突然加入一个 MQ,现在 BC 系统接收数据要通过 MQ 来接收。
    万一 MQ 挂了怎么办?这就引出一个问题,加入了 MQ 之后,系统的可用性是不是就降低了?
    因为多了一个风险因素:MQ 可能会挂掉。只要 MQ 挂了,数据没了,系统运行就不对了。
  • 系统复杂度提高
    本来我的系统通过接口调用一下就能完事的,但是加入一个 MQ 之后,需要考虑消息重复消费、消息丢失、甚至消息顺序性的问题
    为了解决这些问题,又需要引入很多复杂的机制,这样一来是不是系统的复杂度提高了。
  • 数据一致性问题
    本来好好的,A 系统调用 BC 系统接口,如果 BC 系统出错了,会抛出异常,返回给 A 系统让 A 系统知道,这样的话就可以做回滚操作了
    但是使用了 MQ 之后,A 系统发送完消息就完事了,认为成功了。而刚好 C 系统写数据库的时候失败了,但是 A 认为 C 已经成功了?这样一来数据就不一致了。

并发修改

消息从发出到被消费会有一小段时间,这一段时间内数据可能会经过其他线程的多次修改,所以在消息消费方的编程中尤其需要注意并发修改的问题。
如果是同步操作——比如用户购买商品扣款的场景——需要在比较高并发的情况下才会出现并发问题,但是如果功能是基于消息实现的,由于消息消费具有不确定性,这种风险会被放大。
并发扣款

  • 这些并发查询是在不同的站点实例 / 服务实例上完成的,进程内互斥锁无法解决问题。
  • 不确定性来自于很多方面,比如同一主题的消息可能被多个业务线的触发、被重试、被手动重发。

解决这种不一致问题的解决办法一般是加锁,因为异步处理并没有直接被用户感知,因此对效率并没有特别高的要求,悲观锁或乐观锁都是可行的。

悲观锁会牺牲一定的吞吐量,乐观锁实现起来比较有技巧性、且可能会和业务数据耦合。

以乐观锁为例:

1
2
3
4
5
6
7
8
9
// 查询订单,Order中包含了版本信息
Order order = queryOrder();
// 这里使用状态机校验订单状态
if (isStatusInvalid(order)) {
记一下日志
return ;
}
// 扭转状态的同时也做了版本的校验,相当于一个原子操作,如果校验失败则抛出异常、交给外层MQ组件重试
changeOrderStatus(order, targetStatus);

At-Least-Once(消息丢失)

有很多情况可能发生 MQ 消息的丢失:

  • 生产者向 MQ 发送消息时,网络传输出现问题;
  • 消息在 MQ 中存储时,发生磁盘故障等不可控问题;
  • 消费者从 MQ 接收消息时,网络传输出现问题;

一般 MQ 中间件都会保证At-Least-Once的消费,就需要避免消息丢失的情况,有两种方式可以解决这种情况:

事务方式:
在生产者发送消息之前,通过channel.txSelect开启一个事务,接着发送消息
如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚channel.txRollback然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务channel.txCommit
但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。

另外一种方式就是通过 confirm 机制:
这个 confirm 模式是在生产者哪里设置的,就是每次写消息的时候会分配一个唯一的 id,然后 RabbitMQ 收到之后会回传一个 ack,告诉生产者这个消息 ok 了。
如果 rabbitmq 没有处理到这个消息,那么就回调一个 nack 的接口,这个时候生产者就可以重发。
事务机制和 cnofirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿
但是 confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 rabbitmq 接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

Rabbitmq 弄丢了数据
RabbitMQ 集群也会弄丢消息,这个问题在官方文档的教程中也提到过,就是说在消息发送到 RabbitMQ 之后,默认是没有落地磁盘的,万一 RabbitMQ 宕机了,这个时候消息就丢失了。
所以为了解决这个问题,RabbitMQ 提供了一个持久化的机制,消息写入之后会持久化到磁盘
这样哪怕是宕机了,恢复之后也会自动恢复之前存储的数据,这样的机制可以确保消息不会丢失。
设置持久化有两个步骤:

  • 第一个是创建 queue 的时候将其设置为持久化的,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化 queue 里的数据
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 rabbitmq 就会将消息持久化到磁盘上去。
    但是这样一来可能会有人说:万一消息发送到 RabbitMQ 之后,还没来得及持久化到磁盘就挂掉了,数据也丢失了,怎么办?
    对于这个问题,其实是配合上面的 confirm 机制一起来保证的,就是在消息持久化到磁盘之后才会给生产者发送 ack 消息。
    万一真的遇到了那种极端的情况,生产者是可以感知到的,此时生产者可以通过重试发送消息给别的 RabbitMQ 节点
    消费端弄丢了数据
    RabbitMQ 消费端弄丢了数据的情况是这样的:在消费消息的时候,刚拿到消息,结果进程挂了,这个时候 RabbitMQ 就会认为你已经消费成功了,这条数据就丢了。
    对于这个问题,要先说明一下 RabbitMQ 消费消息的机制:在消费者收到消息的时候,会发送一个 ack 给 RabbitMQ,告诉 RabbitMQ 这条消息被消费到了,这样 RabbitMQ 就会把消息删除。
    但是默认情况下这个发送 ack 的操作是自动提交的,也就是说消费者一收到这个消息就会自动返回 ack 给 RabbitMQ,所以会出现丢消息的问题。
    所以针对这个问题的解决方案就是:关闭 RabbitMQ 消费者的自动提交 ack,在消费者处理完这条消息之后再手动提交 ack。
    这样即使遇到了上面的情况,RabbitMQ 也不会把这条消息删除,会在你程序重启之后,重新下发这条消息过来。

消息重复

一般情况下 MQ 除了不保证消息的有序性外、还不保证消息不重复。
因为在「网络不可达」的情况下,MQ 不能确认消息接收方收到了消息必然会重试。重试除了本文讲的幂等处理外,还可以采用每个消息有唯一的 ID+去重表实现。

消息的有序性

因为 MQ 消息在服务器上是分区存储的,每个分区自己是有序的。分区被接收端消费的时候。一般也是多个接收端一起消费。中间的每个环节都是只能保证局部有序。如果想全局有序。就需要分区只有一个,并且接收端服务器是单点,而且一次只处理一个请求。
TODO: TCP 是怎么做的。

消息积压

参考

任务调度

  1. cron
    crontab 定时任务
  2. 中心化-去中心化调度设计
    xxl-job 和 elastic-job 在设计上的本质区别是中心化还是去中心化。

Disruptor 相对于传统方式(普通)的优点

  1. 无锁
    相对 Lock 来说效率更高(线程不需要挂起,只涉及到一次内存交换速度快),但是同时会带来 ABA 问题,且多线程下竞争容易产生空转。
  2. 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
  3. 在每个对象中都能跟踪序列号(ring buffer,claim Strategy,生产者和消费者),加上神奇的 cache line padding,就意味着没有为伪共享和非预期的竞争。

如何使用 Disruptor

1
2
3
4
5
6
7
8
9
10
11
12
public class LongEvent {

private long value;

public void set(long value) {
this.value = value;
}

public long getValue() {
return this.value;
}
}
1
2
3
4
5
6
7
public class LongEventFactory implements EventFactory<LongEvent> {

@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue() + " sequence:+" + sequence + " endOfBatch:" + endOfBatch);
}
}
1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue() + " sequence:+" + sequence + " endOfBatch:" + endOfBatch);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LongEventProducer {

private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void onData(ByteBuffer bb) {
// Grab the next sequence
long sequence = ringBuffer.next();
try {
// Get the entry in the Disruptor
LongEvent event = ringBuffer.get(sequence);
// for the sequence Fill with data
event.set(bb.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LongEventMain {

public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();

// The factory for the event
LongEventFactory factory = new LongEventFactory();

// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;

// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);

// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());

// Start the Disruptor, starts all threads running
disruptor.start();

// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

LongEventProducer producer = new LongEventProducer(ringBuffer);

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
}
}
}

QA

  1. 并发框架 Disruptor 译文

使用主从复制

  1. 运行 Master
    调整 Master 内存中保存的缓冲积压部分(replication backlog),以便执行部分重同步。
    1
    2
    3
    # 缓冲区越大,可断开连接再重连执行部分重同步的时间越长,缓冲区会在每次连接时分配。
    repl-backlog-size 1mb
    repl-backlog-ttl 3600
  2. 运行 Slave
    先在配置文件中设置 Master 和 logfile 路径再运行
    1
    2
    slaveof 172.16.205.141 6379
    logfile "/usr/redis/log/slave.log"
  3. 级联复制(从从复制)
    之前是所有 Slave 连到一个 Master 上,这是一种中心化的办法,对 Master 的负担较大,事实上我们完全可以不全部连到 Master 上,而是 Master->Slave1->Slave2 这样传递。
    实现级联复制也较简单,只用修改 Slave2 配置文件的slaveof属性即可。
  4. Master write,Slave read
    通过程序(客户端)实现数据的读写分离,即在程序中判断请求是读是写,让 Master 负责处理写请求,Slave 负责处理读请求;通过扩展 Slave 处理更多的并发请求,减轻 Master 端的负载。

只读 Slave

Redis2.6 之后,Redis 支持只读模式,可以使用slave-read-only配置来控制这个行为。
只读模式下的 slave 将会拒绝所有写入命令,因此实践中不可能由于某种出错而将数据写入 slave 。但这并不意味着该特性旨在将一个 slave 实例暴露到 Internet ,或者更广泛地说,将之暴露在存在不可信客户端的网络,因为像 DEBUG 或者 CONFIG 这样的管理员命令仍在启用。但是,在 redis.conf 文件中使用 rename-command 指令可以禁用上述管理员命令以提高只读实例的安全性。

同步复制和异步复制

Redis 使用默认的异步复制,其特点是低延迟和高性能,不会影响 Redis 主线程的响应效率。

  • Redis 复制在 master 侧是非阻塞的。这意味着 master 在一个或多个 slave 进行初次同步或者是部分重同步时,可以继续处理查询请求。
  • 复制在 slave 侧大部分也是非阻塞的。当 slave 进行初次同步时,它可以使用旧数据集处理查询请求,假设你在 redis.conf 中配置了让 Redis 这样做的话。否则,你可以配置如果复制流断开, Redis slave 会返回一个 error 给客户端。但是,在初次同步之后,旧数据集必须被删除,同时加载新的数据集。 slave 在这个短暂的时间窗口内(如果数据集很大,会持续较长时间),会阻塞到来的连接请求。自 Redis 4.0 开始,可以配置 Redis 使删除旧数据集的操作在另一个不同的线程中进行,但是,加载新数据集的操作依然需要在主线程中进行并且会阻塞 slave 。

Redis 虽然声称是单线程模型,但是很多功能仍然是采用多线程实现的。

什么时候触发复制

  • 当一个 Master 和一个 Slave 实例连接正常时,Master 通过向 Slave 发送命令流来增量同步自身数据集的改变情况,包括客户端的写入、key 的过期等;
  • Master 与 Slave 之间因为网络问题或宕机,之后 Slave 重新连上 Master 时会尝试进行部分重同步,即只获取在断开连接期间内丢失的命令流;
    为此,slave 会记住旧 master 的旧 replication ID复制偏移量,因此即使询问旧的 replication ID,其也可以将部分复制缓冲提供给连接的 slave 。
  • 当无法进行部分重同步时,Slave 会请求进行全量重同步。Master 需要创建所有数据的快照,将之发送给 Slave,之后在数据集发生更改时持续发送命令流到 Slave。

主从复制原理

当用户往 Master 端写入数据时,通过Redis Sync机制将数据文件发送至 Slave,Slave 也会执行相同的操作确保数据一致。

  1. 同一个 Master 可以拥有多个 Slaves。Master 下的 Slave 还可以接受同一架构中其它 Slave 的链接与同步请求,实现数据的级联复制,即 Master->Slave->Slave 模式;
    repl-diskless-sync-delay参数可以延迟启动数据传输,目的可以在第一个 slave 就绪后,等待更多的 slave 就绪。
    主从复制最好配置成级联复制,因为这样更容易解决单点问题,避免Master承受过大的复制压力
  2. Master 以非阻塞的方式同步数据至 slave,这将意味着 Master 会继续处理一个或多个 slave 的读写请求;
  3. Slave 端同步数据也可以修改为非阻塞的方式,当 slave 在执行新的同步时,它仍可以用旧的数据信息来提供查询;否则,当 slave 与 master 失去联系时,slave 会返回一个错误给客户端;
  4. 主从复制可以做到读写分离,保证了可扩展性,即多个 slave 专门提供只读查询与数据的冗余,Master 端专门提供写操作;
  5. 通过配置禁用 Master 数据持久化机制,将其数据持久化操作交给 Slaves 完成,避免在 Master 中要有独立的进程来完成此操作。
  6. Redis 主从复制的性能问题,为了主从复制的速度和连接的稳定性,Slave 和 Master 最好在同一个局域网内。

标识同步进程:

  1. 每个 Master 都有一个Replication ID:这是一个较大的伪随机字符串,标记了一个给定的数据集。
  2. 每个 Master 持有一个偏移量offset,Master 将自己产生的复制流发送给 slave 时,发送多少个字节的数据,自身的偏移量就会增加多少,目的是当有新的操作修改自己的数据集时,它可以以此更新 Slave 的状态。即使没有 Slave 连接到 Master,offset 也会自增,所以基本上每一对 <Replication ID, offset> 都会标识一个 Master 数据集的确切版本。
  3. Slave 也维护了一个复制偏移量offset,代表从库同步的字节数,从库每收到主节点传来的 N 个字节数据时,从库的 offset 增加 N。
    Master 和 Slave 的offset总是不断增大,这也是判断主从数据是否同步的标志,若主从的 offset 相同则表示数据同步量,不通则表示数据不同步。

复制积压缓冲区
主节点(master)响应写命令时,不但会把命名发送给从节点,还会写入复制积压缓冲区,用于复制命令丢失的数据补救。
Slave 连接中断时主节点仍然可以响应命令,但因复制连接中断命令无法发送给 Slave。之后,当 Slave 重启并触发部分复制时,Master 可以将复制积压缓冲区的内容同步给 Slave,从而提高复制效率;

部分重同步过程:

  1. 当 Slave 连接到 Master,发送一个PSYNC命令表明自己记录的旧的 Master Replication ID和它们至今为止处理的偏移量offset
  2. Master 仅发送 Slave 所需的增量部分的命令流,即上次同步偏移量offset之后执行的写命令;
  3. 但是如果 master 的缓冲区中没有足够的命令积压缓冲记录,或者如果 slave 引用了不再知道的历史记录(replication ID),则会转而进行一个全量重同步:在这种情况下, slave 会得到一个完整的数据集副本,从头开始。

全量同步(完整重同步):

  1. Slave 向 Master 发送PSYNC命令;
  2. Master 执行BGSAVE命令,开启一个后台进程用于生成一个 RDB 文件;
  3. 同时它开始缓冲所有从客户端接收到的新的写入命令;
  4. 当后台保存完成时, master 将数据集文件传输给 slave, slave 将之保存在磁盘上,然后加载文件到内存;
  5. 再然后 master 会将所有缓冲的写命令发给 slave,这个过程以指令流的形式完成并且和 Redis 协议本身的格式相同。

    可以通过telnet连接到 Redis 服务器上然后发送SYNC命令来模拟这个过程,但是因为SYNC功能有限(比如不支持部分重同步),现在的版本用PSYNC作为代替。
    正常情况下,全量同步会先在磁盘上创建一个 RDB 文件,传输时将其加载进内存,然后 Slave 对此进行数据的同步,如果磁盘性能很低,这个过程压力会比较大,Redis 2.8.18之后支持直接传输 RDB 文件,可以使用repl-diskless-sync配置参数配置。

全量同步完成以后,在此后的时间里主从维护着心跳检查来确认对方是否在线,每隔一段时间(默认 10 秒,通过repl-ping-slave-period参数指定)主节点向从节点发送 PING 命令判断从节点是否在线,而从节点每秒 1 次向主节点发送 REPLCONF ACK 命令,命令格式为:REPLCONF ACK {offset},其中 offset 指的是从节点保存的复制偏移量,作用是:

  1. 向主节点报告自己复制进度,主节点会对比复制偏移量向从节点发送未同步的命令;
  2. 判断主节点是否在线。

主从复制执行过程 - Slave怎么与Master建立连接

主从复制
1、Slave Redis实例上配置slaveof xxx,表示将成为另一台Redis实例的从服务器,启动 Slave时,需要设置当前节点的Master信息,并开始主从同步过程;
代码位置:replication.c/slaveofCommand()

1
2
3
4
// 进入连接状态(重点)
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
server.repl_down_since = 0;

2、上边设置复制信息成功后,Redis服务器会有一个cron任务(serverCron)定时判断需要进行同步操作,向Master建立连接,也就是一个握手的过程;
代码位置:replication.c/replicationCron()

1
2
3
4
5
if (server.repl_state == REPL_STATE_CONNECT) {
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}

serverCron是Redis的主事件循环,负责超多的任务,包括过期key处理、rehash、备份RDB文件、AOF重写等等。

3、确定连接后,接下来,cron任务里还有比较关键的一项是确定复制方案,
会先向 Master 发送一个 PSYNC Command,Master会返回复制方案,也就是下面的全量、增量及不支持这3种情况:
代码位置:replication.c/syncWithMaster()
replication.c/slaveTryPartialResynchronization()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 向主服务器发送 PSYNC 命令
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);

// 全量复制
if (!strncmp(reply,"+FULLRESYNC",11)) {
...
}

// 增量复制
if (!strncmp(reply,"+CONTINUE",9)) {
...
}

// 错误,目前master不支持PSYNC
if (strncmp(reply,"-ERR",4)) {
...
}

注意PSYNC命令的两个参数:

  • 主库的runID:每个Redis实例启动时都会自动生成的一个随机ID,用来唯一标识这个实例。
    当从库和主库第一次复制时,因为不知道主库的runID,因此会将runID设为”?”。
  • 复制进度offset:设为-1表示第一次复制。

4、Master接收到命令后需要判断需要全量同步还是部分同步
这部分代码在replication.c/syncCommand()中,接下来我们再讨论主节点如何判断同步方式及同步的流程。

主从复制执行过程 - Master如何处理PSYNC命令

1、无论是第一次连接还是重新连接,Master 都会启动一个后台进程(fork),将数据快照保存到数据文件中,同时 Master 会记录所有修改数据的命令并缓存在数据文件中(持久化),Master会将文件内容加载到内存中,等之后回传给Slave(复制);
2、Master端与Slave端完成握手后,需要判断是需要进行全量还是增量复制(也就是上面的返回+FULLRESYNC还是+CONTINUE
处理Slave的PSYNC命令的代码位置:replication.c/syncCommand()
判断是否需要执行全量复制的代码位置:replication.c/masterTryPartialResynchronization()
判断执行全量复制的条件如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 检查 master id 是否和 runid 一致,只有一致的情况下才考虑执行psync
if (strcasecmp(master_runid, server.runid)) {
/* Run id "?" is used by slaves that want to force a full resync. */
// 从服务器提供的 run id 和服务器的 run id 不一致
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
// 从服务器提供的 run id 为 '?' ,表示强制 FULL RESYNC
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
// 需要 full resync
goto need_full_resync;
}

// 判断当前Slave带来的offset在Master的backlog中是否还能找到,找不到则执行全量复制
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;

// 如果没有backlog
if (!server.repl_backlog ||
// 或者 psync_offset 小于 server.repl_backlog_off
// (想要恢复的那部分数据已经被覆盖)
psync_offset < server.repl_backlog_off ||
// psync offset 大于 backlog 所保存的数据的偏移量
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
// 执行 FULL RESYNC
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
}
goto need_full_resync;
}

3、如果是部分复制
Master会向Slave发送 backlog 中从 offset 到 backlog 尾部之间的数据
代码:replication.c/addReplyReplicationBacklog()
部分复制在3.0版本和之后的版本中的实现有比较大的差异。
在3.0时,部分复制发生在Slave向Master发送PSYNC命令时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void syncCommand(redisClient *c) {
...

if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 尝试进行 PSYNC
if (masterTryPartialResynchronization(c) == REDIS_OK) {
// 可执行 PSYNC
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 不可执行 PSYNC
char *master_runid = c->argv[1]->ptr;

/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
}

...
}

int masterTryPartialResynchronization(redisClient *c) {
...

/* If we reached this point, we are able to perform a partial resync:
* 程序运行到这里,说明可以执行 partial resync
*
* 1) Set client state to make it a slave.
* 将客户端状态设为 salve
*
* 2) Inform the client we can continue with +CONTINUE
* 向 slave 发送 +CONTINUE ,表示 partial resync 的请求被接受
*
* 3) Send the backlog data (from the offset to the end) to the slave.
* 发送 backlog 中,客户端所需要的数据
*/
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
// 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);

...
}

3.0后,在每次命令执行完之后,还会触发命令传播:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void processInputBufferAndReplicate(client *c) {
// 处理命令然后广播命令
// if this is a slave, we just process the commands
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
* applied to the master state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
size_t prev_offset = c->reploff;
processInputBuffer(c);
// applied is how much of the replication stream was actually applied to the master state
size_t applied = c->reploff - prev_offset;
if (applied) {

replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}

所谓命令传播,就是当Master节点每处理完一个命令都会把命令广播给所有的子节点,而每个子节点接收到Master的广播过来的命令后,会在处理完之后继续广播给自己的子节点。
命令传播也是异步的操作,即Master节点处理完客户端的命令之后会立马向客户端返回结果,而不会一直等待所有的子节点都确认完成操作后再返回以保证Redis高效的性能。
4、什么时候会改为采用全量复制
上面的增量复制中,我们看到Redis实际上是将repl_backlog中的内容复制给了Slave,backlog是一块内存缓冲区(默认大小为1M),每次处理完命令之后,先写入缓冲区repl_backlog, 然后再发送给Slave。
如果一个Slave断连了一段时间,重启后Master可以将这块缓冲区内的内容复制给Slave,但是如果断连的时间比较长,也有可能会触发全量复制,因为缓冲区能保存的命令有限,只能至多保存的命令长度为repl_backlog_length,如果某个子节点落后当前最新命令的长度大于了repl_backlog_length,那么就会触发全量复制。
5、如果是全量复制
这种情况下,Master并不会直接将RDB文件传给Slave,而是先发给Slave+FULLRESYNC,;
代码:replication.c/masterTryPartialResynchronization()的末尾
什么时候Master会将RDB文件传给Slave呢?如果当前已经有可用的RDB文件,则直接将RDB文件传输给Slave;如果当前RDB正在备份过程中,Master会在每次RDB文件备份完毕后执行一次传输任务。
replication.c/syncCommand()末尾Master判断RDB当前的备份状态,设置标识表示当前RDB文件是否可用于复制,如果可以复制则会在之后的主事件循环中触发文件的发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void syncCommand(redisClient *c) {
...

/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
// 检查是否有 BGSAVE 在执行
if (server.rdb_child_pid != -1) {
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;

// 如果有至少一个 slave 在等待这个 BGSAVE 完成
// 那么说明正在进行的 BGSAVE 所产生的 RDB 也可以为其他 slave 所用
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}

if (ln) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
// 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB
copyClientOutputBuffer(c,slave);
// 设置复制状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
// 不好运的情况,必须等待下个 BGSAVE
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
/* Ok we don't have a BGSAVE in progress, let's start one */
// 没有 BGSAVE 在进行,开始一个新的 BGSAVE
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
// 设置状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
/* Flush the script cache for the new slave. */
// 因为新 slave 进入,刷新复制脚本缓存
replicationScriptCacheFlush();
}

...

}

主事件循环中发RDB文件的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

...

/* Check if a background saving or AOF rewrite in progress terminated. */
// 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;

// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;

if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);

// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);

} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
}

...
}

接下来的调用包括:
replication.c/backgroundSaveDoneHandler()
replication.c/updateSlavesWaitingBgsave()
replication.c/sendBulkToSlave()

全量同步的大致流程如此,主要分为以下几步:

  1. Master节点开启子进程进行RDB文件生成
  2. Master节点将RDB文件发送给Slave节点
  3. Slave节点清空内存中的所有数据并删除之前的RDB文件
  4. Slave节点使用从Master接收的RDB文件恢复数据到内存中

需要注意的是,这个过程中的每一步都是耗时的IO操作,所以大部分时候Redis都是尽可能采用增量复制,而不是全量复制。
下面再来讨论Master如何发送及Slave如何接收这份数据。

主从复制执行过程 - Master如何发送及Slave如何接收复制数据

1、如果是全量复制
Slave和Master刚开始握手完毕后,会注册一个readSyncBulkPayload处理器,用于读取从Master发送过来的RDB文件。
2、Slave 将数据文件保存到磁盘上,然后再加载到内存中;
从库接收到RDB文件后,会先清空当前数据库,然后加载RDB文件,这是因为从库在开始和主库同步前可能保存了其他数据,为了避免之前数据的影响,从库需要先把当前数据库清空。
3、同步过程中主库产生的新数据也要同步给从库
主库同步数据给从库的过程中,主库不会被阻塞,仍然可以正常接收请求(否则Redis服务不就中断了?),但是这些请求中的写操作并没有记录到刚刚生成的RDB文件中,为了保证主从库的数据一致性,主库会在内存中用专门的replication buffer(代码中对应repl_backlog_buffer记录RDB文件生成后收到的所有写操作。
repl_backlog_buffer是一个环形缓冲区,主库会记录自己写到的位置,而从库则会记录自己已经读到的位置,可以使用repl_backlog_size来配置这个缓冲区的大小,如果配得过小,可能会导致增量复制阶段从库复制进度赶不上主库,进而导致从库重新进行全量复制。
在Master端定义的offset是master_repl_offset,在Slave端定义的offset是slave_repl_offset,正常情况下这两个偏移量是基本相等的。
增量同步期间,从库在发送psync的同时,会把自己当前的slave_repl_offset发给主库,主库判断自己的master_repl_offset和slave_repl_offset之间的差距,如果断连了,master_repl_offset可能会超过slave_repl_offset,那么将这超过的部分发给slave就可以恢复同步了。

主从复制存在的问题

主从库间网络断了怎么办?

Redis2.8之前,如果主从同步过程中出现了网络闪断,那么主从是会重新进行一次全量复制的,开销非常大。
Redis2.8之后,网络闪断后,主从会采取增量复制,将闪断期间的命令发给从库。

宕机恢复

因为 slave 顶多只负责处理读请求,slave 挂掉不会造成数据丢失的问题。
slave 宕机的情况下,应该要求客户端具有一定的熔断恢复能力,并且能在重启后快速恢复:

  1. 恢复正常后重新连接;
  2. Master 收到 Slave 的连接后,第一次同步时,主节点做一次 bgsave,并同时将后续修改操作记录到内存 buffer;
  3. Master 将其完整的 rdb 数据文件全量发送给 Slave;
  4. Slave 接收完成后将 rdb 镜像文件加载到内存,加载完成后,再通知 Master 将期间修改的操作记录同步到 Slave 节点进行重放就完成了同步过程;
  5. 如果 Master 同时收到多个 Slave 发来的同步请求,Master 只会在后台启动一个进程保存数据文件,然后将其发送给所有的 Slave,确保 Slave 正常。

主从复制无法应对 Master 挂掉的情况,实际上这种方案只能尽量保证数据不会丢失,不能保证服务的高可用性,为此,需要引入 Redis 的 Sentinel 机制。

客户端可以使用 WAIT 命令来请求同步复制某些特定的数据。但是,WAIT 命令只能确保在其他 Redis 实例中有指定数量的已确认的副本:在故障转移期间,由于不同原因的故障转移或是由于 Redis 持久性的实际配置,故障转移期间确认的写入操作可能仍然会丢失。

是否可以关闭持久化

作为复制方案中的一环,可以考虑关闭 Master 或 Slave 的持久化功能,但是并不建议关掉它们,因为:

  • 如果关闭 Master 的持久化:重启(重启功能可以由一些只能运维工具来保证,比如 K8S)的 Master 将从一个空数据集开始,如果一个 Slave 试图与它同步,那么这个 Slave 也会被清空。
  • 如果关闭 Slave 的持久化:重启的 Slave 需要从 Master 全量同步数据。

正如前所述,关闭了持久化并配置了自动重启的 Master 是危险的——会导致整个集群的数据全部被清空。
如果 Sentinel 集群用于需要高可用的场景、且 Master 被关闭掉了持久化功能,也是非常危险的:

  • 如果重启比较慢,Sentinel 的故障迁移机制重新选主,一个 Slave 会上升为 Master;
  • 如果重启得足够快,Sentinel 没有探测到故障,此时 Master 数据被清空了,而 Slave 仍从 Master 同步数据,这将引起上边提到的故障模式——数据将丢失。

因此,如果考虑磁盘性能过慢会导致延迟、关掉了持久化,那么自动重启进程这项应该被禁用。

如何保证主从数据的一致性 - 数据丢失窗口的存在

由于 Redis 使用异步复制,无法保证Slave和Master的实时一致性,因此总会有一个数据丢失窗口
那在什么情况下,从库会滞后执行同步命令呢?

  1. 一方面,主从库间的网络可能会有传输延迟,所以从库不能及时地收到主库发送的命令,从库上执行同步命令的时间就会被延后。
  2. 另一方面,即使从库及时收到了主库的命令,但是,也可能会因为正在处理其它复杂度高的命令(例如集合操作命令)而阻塞。此时,从库需要处理完当前的命令,才能执行主库发送的命令操作,这就会造成主从数据不一致。而在主库命令被滞后处理的这段时间内,主库本身可能又执行了新的写操作。这样一来,主从库间的数据不一致程度就会进一步加剧。

因为异步复制的本质,Redis主从复制无法完全避免数据的丢失,除了尽量保证网络连接状况良好外,还可以写一些监控程序来监控主从库间的复制进度,原理是实时给Redis实例发info replication命令得到master_repl_offsetslave_repl_offset这两个进度信息,计算这二者的差值即可得到主从复制进度的实时程度,如果某个从库进度差值大于我们预设的一个阈值,我们可以让客户端不再和这个从库连接进行数据读取,从而减少读到不一致数据的情况。

这个阈值当然不能设置得过低,否则可能导致所有从库都连不上了。

既然无法避免,那么只能退一步、控制影响范围了,Redis 可以保证:

  1. Redis slave 每秒钟都会 ping master,确认已处理的复制流的数量。
  2. Redis master 会记得上一次从每个 slave 都收到 ping 的时间。
  3. 用户可以配置一个最小的 slave 数量,使得它滞后 <= 最大秒数。
  4. 如果至少有 N 个 slave ,并且滞后小于 M 秒,则写入将被接受。如果条件不满足,master 将会回复一个 error 并且写入将不被接受。

这些条件是通过min-slaves-to-writemin-slaves-max-lag这两个配置来实现的:

  • min-slaves-to-write:最少有n个slave的连接还是健康的情况下才能提供服务,至于怎么判断连接是否健康,需要看下面一个配置;
  • min-slaves-max-lag:判断连接健康的最大延迟时间,slave每次PING Master时Master都会记录该Slave 最后一次PING的时间,如果最后一次PING成功的时间距今比较长了,就说明该Slave的连接状态很有可能已经出问题了。

对于给定的写入来说,虽然不能保证绝对实时的一致性,但至少数据丢失的时间窗限制在给定的秒数内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# It is possible for a master to stop accepting writes if there are less than
# N slaves connected, having a lag less or equal than M seconds.
#
# The N slaves need to be in "online" state.
#
# The lag in seconds, that must be <= the specified value, is calculated from
# the last ping received from the slave, that is usually sent every second.
#
# This option does not GUARANTEES that N replicas will accept the write, but
# will limit the window of exposure for lost writes in case not enough slaves
# are available, to the specified number of seconds.
#
# For example to require at least 3 slaves with a lag <= 10 seconds use:
#
# min-slaves-to-write 3
# min-slaves-max-lag 10
#
# Setting one or the other to 0 disables the feature.
#
# By default min-slaves-to-write is set to 0 (feature disabled) and
# min-slaves-max-lag is set to 10.

min-slaves-to-write <slave 数量>
min-slaves-max-lag <秒数>

过期的 key 问题

由于复制的异步特性,对 key 设置过期时间和写入操作很容易导致 race condition 及导致数据集不一致,比如:

1
2
3
(1) sadd x 1
(2) expire x 100
(3) sadd x 2

在 Master 上,命令(3)是在过期前执行的,而 Slave 上可能因为延后导致命令(3)执行前 x 就已经过期了,此时 x 是没有过期时间的(ttl x 得到-1 表示不过期),这就导致了数据的不一致。

set 命令不会出现这个问题,因为 set 会将过期时间给覆盖成-1。当然情况比较复杂,也有可能是我没有想到。

为了保证针对过期的 key 的复制能够正确工作,Redis 提供如下保证:

  1. slave 不会让 key 过期,而是等待 master 让 key 过期。当一个 master 让一个 key 到期(或由于 LRU 算法将之驱逐)时,它会合成一个 DEL 命令并传输到所有的 slave。一旦一个 slave 被提升为一个 master ,它将开始独立地过期 key,而不需要任何旧 master 的帮助。
  2. 但是,由于这是 master 驱动的 key 过期行为,master 无法及时提供 DEL 命令,所以有时候 slave 的内存中仍然可能存在在逻辑上已经过期的 key 。为了处理这个问题,slave 使用它的逻辑时钟以报告只有在不违反数据集的一致性的读取操作(从主机的新命令到达)中才存在 key。用这种方法,slave 避免报告逻辑过期的 key 仍然存在。在实际应用中,使用 slave 程序进行缩放的 HTML 碎片缓存,将避免返回已经比期望的时间更早的数据项。
  3. 在 Lua 脚本执行期间,不执行任何 key 过期操作。当一个 Lua 脚本运行时,从概念上讲,master 中的时间是被冻结的,这样脚本运行的时候,一个给定的键要么存在要么不存在。这可以防止 key 在脚本中间过期,保证将相同的脚本发送到 slave ,从而在二者的数据集中产生相同的效果。

QA

AOF日志更全,为什么主从同步不使用AOF而是RDB呢?

网络传输效率:RDB直接存储数据,而不是命令,数据量更小,传输更快。
恢复效率:因为使用AOF恢复数据库的话是需要将AOF中记录的命令再执行一次的,这个效率远不如直接将RDB中的数据直接加载到内存里。

主从切换过程中,客户端能正常进行请求吗?

主库故障后从库仍能正常接收读请求,但主库挂掉了所以无法处理写请求。

如果实现应用程序不感知服务器的中断?

  1. 客户端可以缓存写请求,因为使用Redis的场景同步写请求比较少,且一般都不会在应用程序的关键路径上,所以在不能立刻执行写请求的情况下,客户端完全可以先把请求缓存起来,给应用程序返回一个确认即可。
  2. 另外,主从切换后,客户端要能及时和新主库重新建立连接。

主从数据发生不一致怎么办?

参考

  1. Redis复制实现原理
  2. Redis集群——主从复制数据同步

为什么 Redis 这么快

Redis 采用的是一种单线程工作模型,它能这么快主要归功于下面几个策略:

  1. 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于 HashMap,HashMap 的优势就是查找和操作的时间复杂度都是 O(1);
  2. 数据结构简单,对数据操作也简单,Redis 中的数据结构是专门进行设计的;
  3. 采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
  4. 使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;
  5. 使用多路 I/O 复用模型,非阻塞 IO;
    多路 I/O 复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。
    Redis-Client 在操作的时候,会产生具有不同事件类型的 socket,在服务端,有一段 I/O 多路复用程序,将其置入队列之中,然后,文件事件分派器依次去队列中取,转发到不同的事件处理器中(对这个 I/O 多路复用机制,Redis 还提供了 select、epoll、evport、kqueue 等多路复用函数库)。
    这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响 Redis 性能的瓶颈,主要由以上几点造就了 Redis 具有很高的吞吐量。
    多路IO复用模型

一些常见的进程模型

  1. 单进程多线程模型:MySQL、Memcached、Oracle(Windows 版本);
  2. 多进程模型:Oracle(Linux 版本);
  3. Nginx 有两类进程,一类称为 Master 进程(相当于管理进程),另一类称为 Worker 进程(实际工作进程)。启动方式有两种:
    1. 单进程启动:此时系统中仅有一个进程,该进程既充当 Master 进程的角色,也充当 Worker 进程的角色。
    2. 多进程启动:此时系统有且仅有一个 Master 进程,至少有一个 Worker 进程工作。
    3. Master 进程主要进行一些全局性的初始化工作和管理 Worker 的工作;事件处理是在 Worker 中进行的。

为什么是 NIO

对于优化单个 server 节点的网络层,多使用 NIO 方式,server 端与 client 端在多次通讯的情况下使用 TCP 长连接维持会话,比如 Redis epoll 模型,RocketMq 的 netty 模型
对于高性能 Server 节点,在处理好网络请求同时,还要保证 server 端逻辑可以快速执行完成,这就涉及到合理的数据结构与线程模型。
在 Redis 中,采用的是 Reactor 模式实现文件事件处理器:
Redis-事件处理模型

  1. IO 多路复用
    根据平台不同选择不同的 IO 复用模型,比如 Linux 就是选择 epoll,select 是备选方案,不过正常情况下根本不会采用,因为 select 效率低,且有文件描述符监听上限。
  2. 封装不同 IO 模型,为事件处理器提供统一接口

Redis单线程多路复用IO模型实现 - 事件注册

Redis服务器的初始化过程中包括了对事件处理器的初始化。
1、服务器启动期间初始化事件处理器
服务器初始化代码:redis.c/initServer
初始化事件处理器代码:ae.c/aeCreateEventLoop
2、根据系统的不同,选择不同的底层IO事件处理实现
比如linux的话,会选择epoll作为实现:epoll.c/aeApiCreate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* 创建一个新的 epoll 实例,并将它赋值给 eventLoop
*/
static int aeApiCreate(aeEventLoop *eventLoop) {

aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

// 初始化事件槽空间
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}

// 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}

// 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}

当服务器需要监听某些事件时,会注册对这些事件的监听器,下面是注册监听器的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}

if (fd >= eventLoop->setsize) return AE_ERR;

// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];

// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;

// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;

// 私有数据
fe->clientData = clientData;

// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;

return AE_OK;
}

比如Redis服务器要接受客户端的请求,就要注册一个监听连接事件,回调函数中会为客户端连接创建一个socket,并注册可读文件事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void initServer() {

...

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}

...
}

Redis单线程多路复用IO模型实现 - 事件循环(处理)

Redis中定义了两种事件:时间事件TimeEvents、文件事件FileEvents:

  • TimeEvents:一般都是一些定时任务,实际上现在时间事件只应用于服务器启动时注册的serverCron定时任务的执行;
  • FileEvents:socket文件的IO事件,比如上面的监听连接的事件就是文件事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    /*
    * 事件处理器的主循环
    */
    void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);

    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
    }

    // 1、看是否有事件到达了执行时间
    // 2、如果有,则执行这些事件
    /* Process every pending time event, then every pending file event
    * (that may be registered by time event callbacks just processed).
    *
    * 处理所有已到达的时间事件,以及所有已就绪的文件事件。
    *
    * Without special flags the function sleeps until some file event
    * fires, or when the next time event occurs (if any).
    *
    * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,
    * 或者下个时间事件到达(如果有的话)。
    *
    * If flags is 0, the function does nothing and returns.
    * 如果 flags 为 0 ,那么函数不作动作,直接返回。
    *
    * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
    * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。
    *
    * if flags has AE_FILE_EVENTS set, file events are processed.
    * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。
    *
    * if flags has AE_TIME_EVENTS set, time events are processed.
    * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。
    *
    * if flags has AE_DONT_WAIT set the function returns ASAP until all
    * the events that's possible to process without to wait are processed.
    * 如果 flags 包含 AE_DONT_WAIT ,
    * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。
    *
    * The function returns the number of events processed.
    * 函数的返回值为已处理事件的数量
    */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
    * file events to process as long as we want to process time
    * events, in order to sleep until the next time event is ready
    * to fire.
    */
    if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;

    // 获取最近的时间事件
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
    shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
    // 如果时间事件存在的话
    // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
    long now_sec, now_ms;

    /* Calculate the time missing for the nearest
    * timer to fire. */
    // 计算距今最近的时间事件还要多久才能达到
    // 并将该时间距保存在 tv 结构中
    aeGetTime(&now_sec, &now_ms);
    tvp = &tv;
    tvp->tv_sec = shortest->when_sec - now_sec;
    if (shortest->when_ms < now_ms) {
    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
    tvp->tv_sec --;
    } else {
    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
    }

    // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
    if (tvp->tv_sec < 0) tvp->tv_sec = 0;
    if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {

    // 执行到这一步,说明没有时间事件
    // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

    /* If we have to check for events but need to return
    * ASAP because of AE_DONT_WAIT we need to set the timeout
    * to zero */
    if (flags & AE_DONT_WAIT) {
    // 设置文件事件不阻塞
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
    } else {
    /* Otherwise we can block */
    // 文件事件可以阻塞直到有事件到达为止
    tvp = NULL; /* wait forever */
    }
    }

    // 处理文件事件,阻塞时间由 tvp 决定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
    // 从已就绪数组中获取事件
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;

    /* note the fe->mask & mask & ... code: maybe an already processed
    * event removed an element that fired and we still didn't
    * processed, so we check if the event is still valid. */
    // 读事件
    if (fe->mask & mask & AE_READABLE) {
    // rfired 确保读/写事件只能执行其中一个
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 写事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }

    processed++;
    }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
    }
0%