Tallate

该吃吃该喝喝 啥事别往心里搁

架构

选型

消息队列 Kafka RocketMQ
适用场景 大量消息快速消费如流式计算 高性能、稳定、高可靠
热度 与周边生态系统的兼容性最好 有活跃中文社区
消息可靠传递
延迟 毫秒级 毫秒级
性能 每秒几十万 每秒几十万
消息丢失 参数优化配置后0丢失 参数优化配置后0丢失
消费模式 Pull Pull + Push(原理都是Pull)
可用性 非常高(分布式) 非常高(主从)
topic数量对吞吐量的影响 topic达到几十,几百个时,吞吐量会大幅度下降 topic达到几百,几千个时,吞吐量会有较小幅度的下降

缺点:

  • Kafka:同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。当业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
  • RocketMQ:没有太明显的缺点

部署结构

RocketMQ-架构图
RocketMQ-部署图

  • 启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  • Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

下图来自GitHub
rocketmq_architecture

推拉模型

推模式优缺点

  • 实时性高

  • 推送速率难以适应消费速率

  • 不同消费者的消费速率很有可能不一样,Broker难以平衡每个消费者的推送速率,如果要实现自适应就会大大增加Broker自身的复杂度

因此推模式适用于消息量不大、消费能力强要求实时性高的情况下。

拉模式优缺点

  • 消费者可以根据自己能力拉取消息处理,灵活稳定

  • 可以更合适地进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者能不能一次性处理这么多消息。而拉模式可以根据消费者缓存能力决定拉取多少消息。

  • 会造成消息的延迟消费,如果长时间没有消息,消费端不断轮询拉取,会造成一定时间的忙等,如果轮询时间过长,又会导致消息的延迟加大。

QA

消息队列可以做什么?

异步处理耗时任务
解耦上下游系统
削峰填谷

哪些消息队列可以做到在消息生产、消费过程中不重、不丢(Exactly once)?

Kafka、RocketMQ、RabbitMQ都没有实现这个需求,因为要实现Exactly once,除了重发外还需要做幂等,实现比较复杂,而且对性能影响比较大。

RocketMQ中的Consumer是推还是拉?

RocketMQ支持推和拉,但这两种方式实际上都是通过pull实现的,只是拉是同步的,而推是传个回调函数,当RocketMQ客户端接收到消息后再调用这个回调函数。

RocketMQ发送、存储、接收的流程?

当发现消费者不消费时,如何诊断问题?

  1. 检查连接状态,看消费者是否正常连接Broker;
  2. 看消费者是否有分配到ConsumeQueue,因为一个ConsumeQueue只能被一个消费者消费,所以消费者数量超过ConsumeQueue时,就会出现部分消费者没有ConsumeQueue可消费的情况;
  3. 生产者是否有正常消费,从控制台就可以看;
  4. 如果检查完以上步骤后仍然没有发现问题,则需要查看消费者的客户端日志再进一步分析。

怎么实现消息发送的严格顺序性?

RMQ中的分区算法指的就是把消息发到固定的某些队列上,因为同一队列只能被一个消费者消费,因此可以保证这个队列中消息的顺序性。
可选的分区算法如:

  1. 在表中存储key和分区的对应关系,通过查表确定分区号;
  2. 取模

RocketMQ能否做到单队列的并行消费?

RocketMQ 在消费的时候,为了保证消息的不丢失和严格顺序,每个队列只能串行消费(一个消费者可以消费多个队列),无法做到并发,否则会出现消费空洞的问题。那如果放宽一下限制,不要求严格顺序,能否做到单个队列的并行消费呢?

怎么实现负载均衡?

RocketMQ如何保证消息不丢(消息一定能被消费)?

  1. Producer端重试
    默认push重试3次。
  2. Broker端只有在复制半数以上副本之后才会返回发送成功。

    和MySQL里的semisync有点像。

  3. Consumer端重复消费
    DefaultMQPushConsumer默认超时重试无限次,默认异常重试16次,过期或重试不成功则进入死信队列、默认凌晨 3 点会清除死信队列,为了确保重试不会出现重复消费,业务逻辑一般都需要保证幂等(幂等key可以使用业务oid或uniqId)。
    Consumer有两种返回值,CONSUME_SUCCESS和CONSUME_LATER,后者令Broker将消息转移到另一个Retry队列中供重试使用。

RocketMQ如何实现消息去重?

RocketMQ本身没有实现消息的去重功能,因为RocketMQ是At-Least-Once的。
所以,很多时候我们需要自己通过Redis等来实现消息去重,但是要注意的是不要用错了msgId:

  • MessageExt.msgId:重试时这个msgId是会变的,因此不适合当作幂等key;
  • MessageExt.properties["PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX"]:相对上面那个msgId来说,这个UNIQ_KEY就算重试多次值还是一样的,因此更适合当作幂等key。

消费失败怎么重发的?

怎么判断消息堆积了?

刷盘的原理?

  1. CommitLog
  2. ConsumeLog

怎么实现消息复制(Broker主从之间)?

RocketMQ 如何保证消息的高可用?

  1. NameServer 集群
    NameServer集群节点没有Master、Slave之分,即使挂掉其中几台,其他的仍可提供服务。
  2. Broker多主多从
    Broker支持多主多从集群,即使其中某台Master挂掉了,其他Master照样可以提供服务,而且挂掉的Master,其从节点照样可以通过选举得到一个新的Master。

broker集群的master宕机,slave是怎么提供服务的?master是怎么切换回来的?

为什么 RocketMQ 使用 NameServer 而不是 ZooKeeper 作为服务注册表

NameServer 具有高可用性,就算其中某台挂掉,其他服务器仍然能提供服务注册和查询功能。
ZooKeeper 的设计目标是高一致性,其中某台服务器挂掉,整个 ZooKeeper 集群就无法提供服务了——直到下一个Leader被选举出来为止。

NameServer是怎么感知Broker的变化的?

RocketMQ的事务消息是否完整实现了事务的ACID特性?

为什么要有Half Message?

  1. 可以先确认Broker服务器是否正常,如果半消息都发送失败了,就说明Broker挂了。
  2. 可以通过半消息来回查事务状态,如果半消息发出后一直没有被二次确认,就会回查事务状态。

    事务回查有两种情况:1、由于网络等原因一直没有执行事务的commit和rollback;2、本地事务执行成功了,但是返回commit的时候服务挂了,Broker最终也没有收到消息,因此还是半消息状态,因此仍会进行重试。

为什么说RocketMQ只能保证最终一致性?

比如以一个转帐功能为例,A账户扣减、发MQ消息通知另一个服务增加B账户的余额,扣减和增加是在两个事务中执行的,MQ虽然能保证两个事务最终一定都能执行上,但是并不能保证中间状态不会出现,比如某个时刻A账户扣减了、但是B账户仍为原状。

RocketMQ使用某个消息序号messageID消费某个队列的消息,时间复杂度是多少?(假设消息文件commitLog数量为m,每个消息文件中消息条数是k,索引文件consumerQueue的数量是n,队列中共有j条消息)

复杂度是O(1),因为消息序号中包含了消息在commitLog中的偏移量,因此可以直接通过偏移量来拿到消息。

参考

环境

  1. rocketmq 控制台搭建(rocketmq-console)
  2. Quick Start
  3. RocketMQ 源码解析 —— 调试环境搭建
  4. RocketMQ管理命令说明

原理

  1. 特性(features)
  2. 设计(design)
  3. RocketMQ 实战(三) - 消息的有序性
  4. 事务消息
  5. RocketMQ 源码分析

为什么使用RocketMQ

使用MQ的优点

  1. 解耦
    直连的情况下,每接入一个系统就需要直接改代码调新系统接口;
    使用 RocketMQ 的情况下,新系统可以自己监听消息。
  2. 异步
    非必要的业务逻辑异步处理,加快响应速度。
  3. 削峰
    直连的情况下,所有请求会直接全部打到下游服务,引入消息队列后,消息队列可以暂存消息,让下游服务慢慢消费;

使用 MQ 的缺点

系统可用性降低:消息队列挂掉将影响系统可用性。
系统复杂性增加:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,系统复杂性增大。

使用 RocketMQ

搭建 Name Server 和 Broker Server(单点)

看下这个
为了方便,需要设置 Broker 服务器的一些属性,在 broker.conf 中:

1
2
3
4
# 允许使用SQL语法过滤消息
enablePropertyFilter=true
# 自动创建Topic,否则使用新Topic时还需要在控制台创建
autoCreateTopicEnable=true

对 Broker 服务器的启动可以使用下面的命令:

1
start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf &

如果后续生产消息时报错:maybe your broker machine memory too small
出现这个问题的主要原因是磁盘空间不够了,而我的电脑磁盘一直都是稀缺资源,所以就出问题了。
可以修改 runbroker.sh,在里面增加一句话即可: JAVA_OPT=”${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98” 我这里把磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息。

搭建RocketMQ Console

控制台,方便创建Topic、统计消息、查看集群状态等。
看下这个

多主多从(2主多从)

同上下载源码并编译完成后,第一步是创建配置文件,我计划将所有主从放到同一台服务器上,因此在本地创建所有配置文件和数据存储目录:
1、创建数据存储目录

1
2
mkdir -p /home/hgc/Documents/project/open/rocketmq/store1/{rootdir-a-m,commitlog-a-m,rootdir-a-s,commitlog-a-s}
mkdir -p /home/hgc/Documents/project/open/rocketmq/store2/{rootdir-b-m,commitlog-b-m,rootdir-b-s,commitlog-b-s}

2、配置环境变量
/etc/profile文件中加入:

1
2
3
# RocketMQ
export ROCKETMQ_HOME=/home/hgc/Documents/project/open/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin

3、创建配置文件
/home/hgc/Documents/project/open/rocketmq/conf.1.m/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
brokerClusterName = DefaultCluster
brokerName = rocketmq-nameserver-1
#brokerId 0 表示 Master,>0 表示 Slave
brokerId=0
# Broker 对外服务的监听端口
listenPort=10911
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=72
#Broker role有3种:SYNC_MASTER、ASYNC_MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制,SYNC即同步更新,指当Slave和Master消息同步完成后,再返回发送成功的状态。ASYNC即异步更新,master与slave有短暂消息延迟,毫> 秒级。本文在此使用了异步复制集群模式,线上环境推荐使用同步双写模式,即SYNC_MASTER
brokerRole=SYNC_MASTER
# 刷盘方式 ASYNC_FLUSH 异步刷盘
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store1/rootdir-a-m
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store1/commitlog-a-m
# 是否允许 Broker 自动创建Topic
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup=true

/home/hgc/Documents/project/open/rocketmq/conf.2.m/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
brokerName=rocketmq-nameserver-2
listenPort=10921
namesrvAddr=rocketmq-nameserver-1:9876
brokerId=0
deleteWhen=04
fileReservedTime=72
brokerRole=SYNC_MASTER
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store2/rootdir-b-m
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store2/commitlog-b-m
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

/home/hgc/Documents/project/open/rocketmq/conf.1.s/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
brokerClusterName = DefaultCluster
brokerName = rocketmq-nameserver-1
brokerId=1
listenPort=10917
namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store1/rootdir-a-s
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store1/commitlog-a-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true

/home/hgc/Documents/project/open/rocketmq/conf.2.s/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
brokerName=rocketmq-nameserver-2
listenPort=10927
namesrvAddr=127.0.0.1:9876
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store2/rootdir-b-s
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store2/commitlog-b-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

4、启动NameServer
启动前先修改启动脚本,因为是单机使用,把堆大小稍微改小点:

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1
nohup sh bin/mqnamesrv &

5、启动Broker
同理改下启动脚本:

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"

按master - slave的顺序启动每个Broker

1
2
3
4
nohup sh bin/mqbroker -c conf.1.m/broker.conf > conf.1.m/logs/broker.1.m.log &
nohup sh bin/mqbroker -c conf.2.m/broker.conf > conf.2.m/logs/broker.2.m.log &
nohup sh bin/mqbroker -c conf.1.s/broker.conf > conf.1.s/logs/broker.1.s.log &
nohup sh bin/mqbroker -c conf.2.s/broker.conf > conf.2.s/logs/broker.2.s.log &

6、查看注册情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 看看集群注册情况
./mqadmin clusterList -n 127.0.0.1:9876
# 创建一个Topic
./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t TopicTest
# 看看Topic列表
./mqadmin topicList -n 127.0.0.1:9876
# 看看Topic路由信息
./mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest
# 查看Topic统计信息
mqadmin topicStatus -n 127.0.0.1:9876 -t TopicTest
# 根据消息key查询消息
./mqadmin queryMsgByKey -n "127.0.0.1:9876" -t TopicTest -k messageKey
# 根据消息ID查询消息
./mqadmin queryMsgById -n "127.0.0.1:9876" -t TopicTest -i 240882208B134140BD0FB45F5DBD2075000018B4AAC2130BE69B0001
# 查询Producer的网络连接
./mqadmin producerConnection -n 127.0.0.1:9876 -g ExampleProducerGroup -t TopicTest
# 查询Consumer的网络连接
./mqadmin consumerConnection -n 127.0.0.1:9876 -g ExampleConsumerGroup -t TopicTest
# 查看订阅组的消费状态
consumerProgress
# 添加KV配置信息
updateKvConfig
# 删除KV配置信息
deleteKvConfig
# 添加project group配置信息
updateProjectGroup
# 删除project group配置信息
deleteProjectGroup
# 获取product group配置信息
getProjectGroup
# 设置消费进度
resetOffsetByTime
# 清除特定Broker权限
wipeWritePerm
# 获取Consumer消费进度
getConsumerStatus

7、使用客户端测试

8、关闭服务器

1
2
sh mqshutdown namesrv
sh mqshutdown broker

主从自动切换(DLedger)

  • enableDLegerCommitLog
    是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true 。
  • dLegerGroup
    节点所属的 raft 组,建议与 brokerName 保持一致,例如 broker-a。
  • dLegerPeers
    集群节点信息,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循 legerSlefId-ip:端口,这里的端口用作 dledger 内部通信。
  • dLegerSelfId
    当前节点id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字。
  • storePathRootDir
    DLedger 日志文件的存储根目录,为了能够支持平滑升级,该值与 storePathCommitLog 设置为不同的目录。

第一个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store1/
# 与dledger相关的属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n0

第二个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10921
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store2/
# 与dledger 相关的配置属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n1

第三个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10931
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store3/
# 与dledger 相关的配置属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n2

对于DLedger集群来说,一方面要保证消息仍能正常收发,另一方面还要保证Master挂掉后集群会选举出一个新的Master,可以将Master kill掉来模拟这种情况,在rocketmq-console中查看节点角色的变化情况。

客户端配置

消费模式

有以下3种:

  1. CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息
  2. CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
  3. CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

其他概念

msgId:根据 queueNum 取模放到 broker 队列
instanceName:每个集群不一致,同一集群 groupName 不能重,两个 topic 的 groupName 不能一样
两种模式本质都是拉:push 是构建长连接,一次拉完,一个一个给 broker 回执 ack,多个 Consumer 平分(必须是 4 的倍数,不够再给余数,比如 9 个消息三个 Consumer 分别拉到 4、4、1),poll 模式批量,按设定拉取

消息的生成和消费

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>

消息生产:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class ProducerTest {

/**
* 同步消息
*/
@Test
public void sendSyncMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 异步消息
*/
@Test
public void sendAsyncMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 单向消息
*/
@Test
public void testSendOnewayMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConsumerTest {

public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

顺序消息

发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class OrderedProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = new OrderedProducer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}, orderList.get(i).getOrderId());//订单id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}

/**
* 订单的步骤
*/
private static class OrderStep {

private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

return orderList;
}
}

接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class OrderedConsumer {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + " queueId=" + msg.getQueueId() + ", topic:" + msg.getTopic() + " content:" + new String(msg.getBody()));
}

try {
//模拟业务逻辑处理中...
// TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

延时消息

延时消息可以用于一些需要延迟处理业务的场景,比如下单后超过一定时间没支付就自动取消。
发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ScheduledProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}

接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ScheduledConsumer {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。
发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BatchProducer {

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

接收逻辑并无不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class BatchConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

注意我们上边提到的 4MB 的大小限制,如果消息列表中的消息变多了,很有可能会超过这个大小,这时最好对消息列表进行分割再发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BatchProducer {

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
splitSend(producer, messages);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

static void splitSend(MQProducer producer, List<Message> messages) {
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ListSplitter implements Iterator<List<Message>> {

private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;

public ListSplitter(List<Message> messages) {
this.messages = messages;
}

@Override
public boolean hasNext() {
return currIndex < messages.size();
}

@Override
public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}

private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while (tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}

private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}

消息订阅

RocketMQ 支持用 SQL 语法的语句来“模糊”订阅消息,只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,接口如下:

1
public void subscribe(finalString topic, final MessageSelector messageSelector)
  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:’abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TagProducer {

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", "2");
SendResult sendResult = producer.send(msg);
producer.shutdown();
}
}

消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TagConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("localhost:9876");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

事务消息

事务消息有3种状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

发送事务消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
// 事务监听
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

实现事务监听接口,在发送半消息成功后会触发executeLocalTransaction方法来执行本地事务,checkLocalTransaction用于检查本地事务状态,并回应消息队列的检查请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

事务消息使用上的限制:

  • 事务消息不支持延时消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  • 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  • 事务性消息可能不止一次被检查或消费。
  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。

RocketMQ的运维

海量数据处理方法

当数据量特别大时,首先排除直接加载到内存中的算法,最核心的思路往往是分治,比如排序时先将大文件分割成多个足以加载到内存中的小文件,然后利用内存排序算法分别排序得到有序的一些小文件,最后通过归并排序得到最终的结果。

阅读全文 »

MyBatis 整体结构

MyBatis结构

配置文件

配置类提供的功能几乎贯穿了整个处理过程:

  1. 解析 Xml 文件
  2. 创建 SQL 处理器 Executor
  3. 对语句进行缓存 MappedStatement

怎么定位路径

  • getResourceAsStream

怎么解析文件

xml 文件的解析方式有两种,一种 DOM 是直接读入整个 xml 文件,根据标签的嵌套关系构建一棵文档树;另一种方式叫 SAX(Simple API for XML),是一种事件驱动的文档解析方式,什么是事件驱动呢?比如说 SAX 驱动扫描到了起始标签,就代表发生了一个事件,它会转而调用某个由用户定义的函数(startElement)执行逻辑。
有一种设计原则叫好莱坞法则(Hollywood),形象地说就是“你不要 call 我,需要你时我会 call 你”,一个例子是异步调用,这是一种通信机制,客户端在发出请求后不必等待服务端处理完毕就可以返回处理自己的逻辑,等到服务端处理完毕后再将结果传回,这种方式一定程度上可以解决客户端长期阻塞的问题、改善用户体验,回调函数也是一个例子。
据网上的说法,DOM 需要一次构建整棵 DOM 树,所以比较占内存,不适合大的 xml 文档解析,但是由于 DOM 树上可以任意遍历,所以自由度很高,相对来说,SAX 是读到什么就调用什么回调函数,所以内存占用小,但是编程多少会复杂一些。

构建数据库连接

数据库连接

SqlSessionFactoryBuilder

应用了建造者模式,根据配置文件来创建 SqlSessionFactory,创建后其任务就结束了,生命周期在一个方法内。

SqlSessionFactory

创建和数据库连接的工具,在整个应用运行期间应该作为一个单例存在,或者使用依赖注入管理其生命周期。

SqlSession

代表和数据库的一次连接,在 MyBatis 中其实现是线程不安全的,生命周期最好控制在一次请求之间。

数据源

  • DBCP
  • C3P0
  • Druid
  • MyBatis 内置数据源(UNPOOLED、POOLED、JNDI)
  • 自定义数据源

映射器

  • mapper 文件
  • 注解

SQL 执行

SqlSession 本身是可以直接执行 sql 语句的,它的所有 update、query 等方法都是对语句进行了包装(MappedStatement),然后再调用 Executor 的相应方法,Executor 是执行器,是 MyBatis 的核心。
SQL 的执行是由 Executor 负责的,Executor 对象是和 SqlSession 同时创建的,SqlSessionFactory 会为 Executor 创建事务,事务类默认为 ManagedTransactionFactory,Executor 需要从事务对象获取数据库连接(包装上一层事务后扩展性更好),事务会从环境对象中获取 DataSource 对象,然后委托 DataSource 创建连接,并且可以根据事务等级来为连接设置事务。说白了,把 Config 对象传给新建的 Transaction,由 Transaction 创建连接。
Executor 并不是直接执行 SQL 语句,SQL 语句由 MappedStatement 包装,再交给 StatementHandler 执行

Executor

MyBatis 提供 4 种 Executor,他们都继承于 BaseExecutor
BaseExecutor 是一个抽象类,实现了延迟加载、一级缓存(PerpetualCache)等功能
SimpleExecutor 语句使用 PreparedStatement 保存,使用 StatementHandler 处理
ReuseExecutor 与 SimpleExecutor 的区别是它使用一个 Map<String, Statement>来缓存 SQL 语句对应的 Statement,如果某些 Sql 复杂且使用频繁的话可以使用这个执行器,因为这个 Map 不是静态的,并且 MyBatis 实际上会为每个新建的 SqlSession 创建一个 Executor,所以这个缓存只在同一个 Session 内有效

1
2
3
4
5
6
7
8
9
10
private final Map<String, Statement> statementMap = new HashMap<String, Statement>();
if (hasStatementFor(sql)) {
//如果缓存中已经有了,直接得到Statement
stmt = getStatement(sql);
} else {
//如果缓存没有找到,则和SimpleExecutor处理完全一样,然后加入缓存
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection);
putStatement(sql, stmt);
}

BatchExecutor(批量执行器) 将一些 SQL 语句放在一个 List 中,最后 doFlushStatements 一块执行,并且如果两个相邻的 SQL 语句是相同的,还会复用前一个 Statement 对象。
CachingExecutor(二级缓存执行器) 为什么说是二级缓存?一级缓存由 BaseExecutor 中的 PerpetualCache 实现,CachingExecutor 会先在二级缓存中查找,如果找不到再委托给 delegate 执行,delegate 是 BaseExecutor 的子类,当然有一级缓存的功能。

参数类型和返回值

我们很多时候会指定 parameterType 和 resultType 为复杂类型,怎么将这些类型和数据库表结构进行映射正是 orm 框架的任务之一。
parameterType 表示传入参数类型,在 sql 语句中可以使用#{参数名}来调用,比如

1
sqlSession.selectOne("com.tallate.UserMapper.selectUser", 1);

传入了一个 Integer 类型的参数 1,那么 PreparedStatementHandler 在准备语句时,应该对这个参数的类型进行判断,这个是由 ParameterHandler 负责的。
resultType 表示返回值类型,PreparedStatementHandler 在获得 ResultSet 后应该将查询到的表记录转换为 Java 对象,这个是由 ResultSetHandler 负责的,它最终会调用对应类的构造函数将查询出的结果传入。

动态代理

我们平常使用 MyBatis 时都会定义一个 XXMapper 接口,对应 mapper.xml 中的一个 namespace,而且我们也不必显示写出其实现类,调用过程都是由动态代理实现的。
一般来说,代理类和被代理类应该实现相同的接口,但是现在我们的被代理类是一个 xxmapper.xml 文件,所以问题现在变成了:怎么将 xxmapper.xml 文件转换成被代理类。
查看源码中的 MapperProxyFactory 和 MapperProxy 可以知道,MyBatis 实现 Mapper 接口其实是调用了 SqlSession 中的方法(select、selectOne 等,已经实现了),但是它们的方法名并不相同,比如 selectUser 怎么和 selectOne 关联上呢?
MapperProxy 的 invoke 方法并不是直接调用被代理对象的方法,而是使用 MapperMethod 来表示映射的方法,通过 MapperMethod 可以判断接口方法的返回值、方法名等来确定应该调用 SqlSession 的哪个方法

  1. 启动时 XMLConfigBuilder 会为 config.xml 中所有 mapper 节点扫描包下所有映射器
  2. 创建对应的映射 interface -> MapperProxyFactory
  3. 添加动态代理对象到 MapperRegistry 中(我为了方便,直接加到 Config 中了,其实是刚开始对 MapperRegistry 的功能理解错了…)
  4. 之后每次 getMapper,都可以根据接口名来找到对应的动态代理对象,调用方法时实际上是调用了相应的 MapperMethod

并发

有哪些资源是中心化的?如果是,会不会被多线程同时访问?在 web 环境中,假设每个用户代表一个线程,当他们同时访问服务器就会出现并发问题。

  • 线程池(数据源)
    如果线程池是使用链表(LinkedList)实现的,可以使用 Collections.synchronizedList 进行包装,或者直接使用 Vector
  • Map<String, MappedStatement> MappedStatements
    使用 Map 容器储存 MappedStatement,MappedStatement 表示调用语句到 sql 语句的映射,比如”namespace.selectUser”到 mapper.xml 中对应的 sql 语句(使用 SqlSource 包装)。
  • List environments
    表示 config.xml 中注册的所有环境对象列表
  • List mappers
    表示 config.xml 中注册的所有 mapper 的列表
  • Map<Method, MapperMethod> methodCache
    MapperProxyFactory 中的映射器方法缓冲是使用 ConcurrentHashMap 实现的

QA

  1. 一级缓存不够吗?为什么要有二级缓存?
    一级缓存是会话级缓存,在 BaseExecutor 中,是成员变量,生命周期在一个 SqlSession 内,连接断开就没了;
    二级缓存是语句级缓存,在 MappedStatement 中,可以跨多个 SqlSession,当一些数据不常发生变化或者允许偶尔的并发的时候,二级缓存可能更有效率。
  2. 为什么不推荐使用 MyBatis 中的缓存?
    一级缓存会产生脏数据。因为作用范围是会话,如果有俩会话,会话 1 加载数据到缓存,会话 2 修改该条数据,之后会话 1 读到的是缓存里的老数据。
    二级缓存同样会产生脏数据。二级缓存作用范围是语句,需要手动刷新或在 xml 中配置需要刷新,一般在写入操作和事务提交后都需要刷新一下。但是如果表 A 的 Amapper.xml 中关联了表 B,即使表 B 的数据有变更,我们在 Amapper.xml 中执行查询语句仍然会读到缓存中的脏数据。
  3. MyBatis 与 JDBC 对象之前的关联?
    ParameterStatement - ParameterStatementHandler
    SimpleStatement - SimpleStatementHandler
    ResultSet - ResultSetHandler

单机环境下的锁

单机环境下,资源竞争者都是来自机器内部(进程/线程),那么实现锁的方案只需要借助单机资源就可以了,比如借助磁盘、内存、寄存器来实现。

竞态条件(Race Condition)

计算的正确性取决于多个线程的交替执行时序时,就会发生竞态条件。比如:

  1. 先检测(查询)后执行。执行依赖于检测的结果,而检测结果依赖于多个线程的执行时序,而多个线程的执行时序通常情况下是不固定不可判断的,从而导致执行结果出现各种问题。
  2. 延迟初始化(如单例的实例化)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class ObjFactory {  
    private Obj instance;

    public Obj getInstance(){
    if(instance == null){
    instance = new Obj();
    }
    return instance;
    }
    }
    如果两个线程同时调用 getInstance()就有可能出现:一个线程 A 创建了一个新对象 instance = obj1,立马被另一个线程 B 覆盖 instance = obj2,线程 A 返回了 obj1,线程 B 返回 obj2,于是 Obj 就相当于被实例化了两次。

锁的分类

  1. 悲观锁,前提是,一定会有并发抢占资源,强行独占资源,在整个数据处理过程中将数据处于锁定状态。
  2. 乐观锁,前提是,不会发生并发抢占资源,只有在执行修改时检查是否违反数据完整性。只能防止脏读后数据的提交,不能解决脏读

悲观锁

乐观锁

乐观锁一般有以下两种实现方法:

  1. 版本号:使用版本标识来确定读到的数据与提交时的数据是否一致。提交后修改版本标识,不一致时可以采取丢弃再次尝试的策略。
  2. CAS:java 中的 compareandswap 即 cas,解决多线程并行情况下使用锁造成性能损耗的一种机制。CAS 操作包含三个操作数,内存位置(V),预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会西东将该位置值更新为新值。否则,处理器不做任何操作。

分布式锁

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP 理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)可用性(Availability)分区容错性(Partition tolerance),最多只能同时满足其中两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,Java 中其实提供了很多并发处理相关的 API,但是这些 API 在分布式场景中就无能为力了。也就是说单纯的 Java Api 并不能提供分布式锁的能力。
对于分布式环境下,资源竞争者生存环境更复杂了,原有依赖单机的方案不再发挥作用,这时候就需要一个大家都认可的协调者出来,帮助解决竞争问题,那这个协调者称之为分布式锁。

实现分布式锁的需求(方法锁,以方法作为临界区,资源锁是类似的)

  1. 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
  2. 这把锁要是一把可重入锁(单线程可重复获取同一把锁,避免死锁)
  3. 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
  4. 有高可用的获取锁和释放锁功能
  5. 获取锁和释放锁的性能要好

基于数据库表

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。
当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
创建这样一张数据库表:

1
2
3
4
5
6
7
8
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

使用锁表实现方法锁

执行 SQL:

1
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)

因为我们对 method_name 做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
当方法执行完毕之后,想要释放锁的话,需要执行以下 Sql:

1
delete from methodLock where method_name ='method_name'

上面这种简单的实现有以下几个问题:

  • 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
  • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
  • 这把锁只能是非阻塞的,因为数据的 insert 操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
  • 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据库中数据已经存在了。

当然,我们也可以有其他方式解决上面的问题。

  • 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
  • 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
  • 非阻塞的?搞一个 while 循环,直到 insert 成功再返回成功。
  • 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

使用数据库 X 锁(排他锁)实现分布式锁

除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。
我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。
基于 MySQL 的 InnoDB 引擎,可以使用以下方法来实现加锁操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name = xxx for update;
if(result==null){
return true;
}
}catch(Exception e){
log.warn("加锁失败", e);
}
sleep(1000);
}
return false;
}

在查询语句后面增加 for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:

1
2
3
public void unlock(){
connection.commit();
}

通过 connection.commit()操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。

  • 阻塞锁? for update 语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
  • 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。
    但是还是无法直接解决数据库单点和可重入问题。

总结

总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。
数据库实现分布式锁的优点:

  1. 直接借助数据库,容易理解。

数据库实现分布式锁的缺点

  1. 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。
  2. 操作数据库需要一定的开销,性能问题需要考虑。

基于缓存

使用缓存中间件实现分布式锁的方法我已经在Redis 客户端中有过分析。

基于 ZooKeeper

基于 zookeeper 临时有序节点可以实现的分布式锁。
大致思想即为:每个客户端对某个方法加锁时,在 zookeeper 上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点
判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
来看下 Zookeeper 能不能解决前面提到的问题。

  • 锁无法释放?使用 Zookeeper 可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在 ZK 中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session 连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
  • 非阻塞锁?使用 Zookeeper 可以实现阻塞的锁,客户端可以通过在 ZK 中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper 会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
  • 不可重入?使用 Zookeeper 也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
  • 单点问题?使用 Zookeeper 可以有效的解决单点问题,ZK 是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。

使用 Curator 实现分布式锁

可以直接使用 zookeeper 第三方库 Curator 客户端,这个客户端中封装了一个可重入的锁服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public boolean unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
}
return true;
}

Curator 提供的 InterProcessMutex 是分布式锁的实现。acquire 方法用户获取锁,release 方法用于释放锁。
使用 ZK 实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper 实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK 中创建和删除节点只能通过Leader服务器来执行,然后将数据同步到所有的 Follower 机器上。

总结

使用 Zookeeper 实现分布式锁的优点

  1. 有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。
  2. 实现起来较为简单。

使用 Zookeeper 实现分布式锁的缺点

  1. 性能上不如使用缓存实现分布式锁。
  2. 需要对 ZK 的原理有所了解。

分布式锁实现需要根据实际需要来选择,比如红锁是AP的,而ZooKeeper是CP的。

QA

  1. 怎么使用 Redis 实现分布式锁?
    set 命令带上 nx 和 ex 参数。
  2. 怎么使用 zk 实现分布式锁?
    先建一个代表锁的持久节点,然后每个线程要加锁就在该持久节点下创建临时有序节点,如果当前线程创建的节点是最小的,则说明可以获取到该锁,否则阻塞等待;释放锁就是将这个临时节点删除。

参考

  1. 分布式锁的几种实现方式
  2. 终极锁实战:单 JVM 锁+分布式锁

常用服务器配置

  • 启动选项和系统变量
    启动选项是运维启动 MySQL 时传入的一些参数,包括命令行启动选项和配置文件 my.cnf
    系统变量会影响 MySQL 进程的运行行为,大部分是由启动选项初始化的,有些是运行时自动生成的
  • 查看系统变量
    show [GLOBAL|SESSION] variables [like 匹配的模式];
  • 配置文件中配置组的概念
  • 配置作用范围
    1、GLOBAL 指配置文件或命令行启动选项设置的系统变量
    2、SESSION(LOCAL)刚连接时会被初始化为 GLOBAL 的变量,可以通过以下命令来设置
    SET [GLOBAL|SESSION] 系统变量名 = 值;
  • 状态变量
    指关于程序运行状态的变量,是只读的,不能手动修改
    比方说 Threads_connected 表示当前有多少客户端与服务器建立了连接,Handler_update 表示已经更新了多少行记录
    SHOW [GLOBAL|SESSION] STATUS [LIKE 匹配的模式];

InnoDB 统计数据

两种统计数据

InnoDB 中有两种统计数据:
1、永久性:服务器重启也不会消失,这些数据被存储到了innodb_table_statsinnodb_index_stats这两张表中;
2、非永久性:重启即消失。
可以通过服务器的innodb_stats_persistent变量来查看这个统计数据的方式。

innodb_table_stats 统计方式

1、n_rows(一个表中的记录行数)统计项的收集
按照一定算法选取几个叶子节点页面,计算每个页面中主键值记录数量,然后计算平均一个页面中主键值的记录数量乘以全部叶子节点的数量就算是该表的 n_rows 值
2、clustered_index_size 和 sum_of_other_index_sizes

  • 从数据字典里找到表的各个索引对应的根页面位置。
    系统表 SYS_INDEXES 里存储了各个索引对应的根页面信息。
  • 从根页面的 Page Header 里找到叶子节点段和非叶子节点段对应的 Segment Header。
    在每个索引的根页面的 Page Header 部分都有两个字段:
    PAGE_BTR_SEG_LEAF:表示 B+树叶子段的 Segment Header 信息。
    PAGE_BTR_SEG_TOP:表示 B+树非叶子段的 Segment Header 信息。
  • 从叶子节点段和非叶子节点段的 Segment Header 中找到这两个段对应的 INODE Entry 结构。
    这个是 Segment Header 结构:
  • 从对应的 INODE Entry 结构中可以找到该段对应所有零散的页面地址以及 FREE、NOT_FULL、FULL 链表的基节点。
    这个是 INODE Entry 结构:
  • 直接统计零散的页面有多少个,然后从那三个链表的 List Length 字段中读出该段占用的区的大小,每个区占用 64 个页,所以就可以统计出整个段占用的页面。
    这个是链表基节点的示意图:
  • 分别计算聚簇索引的叶子结点段和非叶子节点段占用的页面数,它们的和就是 clustered_index_size 的值,按照同样的套路把其余索引占用的页面数都算出来,加起来之后就是 sum_of_other_index_sizes 的值。

innodb_index_stats 统计方式

1
SELECT * FROM mysql.innodb_index_stats WHERE table_name = 'single_table';
  • n_leaf_pages:表示该索引的叶子节点占用多少页面。
  • size:表示该索引共占用多少页面。
  • n_diff_pfxNN:表示对应的索引列不重复的值有多少。其中的 NN 长得有点儿怪呀,啥意思呢?
    其实 NN 可以被替换为 01、02、03… 这样的数字。比如对于 idx_key_part 来说:
    n_diff_pfx01 表示的是统计 key_part1 这单单一个列不重复的值有多少。
    n_diff_pfx02 表示的是统计 key_part1、key_part2 这两个列组合起来不重复的值有多少。
    n_diff_pfx03 表示的是统计 key_part1、key_part2、key_part3 这三个列组合起来不重复的值有多少。
    n_diff_pfx04 表示的是统计 key_part1、key_part2、key_part3、id 这四个列组合起来不重复的值有多少。
  • 在计算某些索引列中包含多少不重复值时,需要对一些叶子节点页面进行采样,sample_size 列就表明了采样的页面数量是多少。

基于内存的非永久性统计数据

开启非永久性统计数据的方法:
1、将innodb_stats_persistent的值设置为 OFF;
2、直接在创建表或修改表时设置STATS_PERSISTENT属性的值为 0;

MySQL Server 统计数据

Server 层而不是 InnoDB(存储引擎层)统计数据。
1、查看连接数配置

1
show variables like '%max_connections%'

2、查看当前连接数

1
show full processlist;

数据恢复

数据的误删基本分以下几种情况:

  1. 使用 delete 语句误删数据行;
  2. 使用 drop table 或 truncate table 误删表;
  3. 使用 drop database 误删数据库;
  4. 使用 rm 命令误删整个 MySQL 实例。

误删行

使用 Flashback 工具通过闪回把数据恢复。
Flashback 恢复数据的原理,是修改 binlog 的内容(事务里的语句顺序颠倒、语句的语义颠倒比如 insert 变成 delete),拿回原库重放。而能够使用这个方案的前提是,需要确保 binlog_format=row 和 binlog_row_image=FULL。

误删库 / 表

误删库表的情况不能使用 Flashback 恢复,因为即使配置 binlog_format=row,truncate/drop 语句在 binlog 中也只会记录一条对应的语句,而用这些信息是无法恢复数据的。
这种情况下,恢复需要使用全量备份,加增量日志。这个方案要求线上有定期的全量备份,并且实时备份 binlog。

rm 删除数据

仅仅删除某个节点的数据的情况,HA 系统可以选出新的主库,从而保证整个集群的正常工作。
之后,我们可以在这个被删节点上把数据恢复回来,再接入整个集群。

中断查询

有时候因为查询耗时过长,或出现死锁等待,我们不得不提早终止执行 SQL 的线程,可以通过information_schema.processlistperformance_schema.threads这两张表来查看正在执行的线程:

  • processlist 表中每一行对应一个客户端连接,也对应一个线程;
  • threads 每一行对应一个线程。

kill query pid可以杀死线程,但是客户端的连接还在,可以看到被 kill 后该连接进入了 Sleep 状态:

1
2
# Id, User, Host, db, Command, Time, State, Info
'494633', 'beta', '192.168.19.142:56193', 'ds_0', 'Sleep', '26', '', NULL

kill pid可以中断连接,执行后再用processlist就找不到那个 pid 了。

在客户端 Ctrl + C 并不能中断服务器线程,只能中断客户端进程,

大表查询

Server 层

MySQL 使用缓存来保证一次性查询大量数据的情况下不会把服务器内存打满,服务器并不需要保存一个完整的结果集。取数据和发数据的流程如下:
MySQL-查询结果发送流程

  1. 获取一行,写到 net_buffer 中。这块内存的大小是由参数 net_buffer_length 定义的,默认是 16k。
  2. 重复获取行,直到 net_buffer 写满,调用网络接口发出去。
  3. 如果发送成功,就清空 net_buffer,然后继续取下一行,并写入 net_buffer。
  4. 如果发送函数返回 EAGAIN 或 WSAEWOULDBLOCK,就表示本地网络栈(socket send buffer)写满了,进入等待。直到网络栈重新可写,再继续发送。

从上面的流程可知,MySQL 一次查询占用的内存是有限的,最大是**min(net_buffer_length, socket send buffer)**,即不能超过 net_buffer_length 和 socket send buffer;

存储引擎层(InnoDB)

InnoDB 使用 Buffer Pool 管理内存数据页,如果 Buffer Pool 命中率足够高,那么大部分时候是不需要读磁盘的,直接从内存拿结果,可以加快查询速度。
执行 show engine innodb status ,可以看到“Buffer pool hit rate”字样,显示的就是当前的命中率,一般一个稳定服务的线上系统,要保证响应时间符合要求的话,内存命中率要在 99% 以上。
Buffer Pool 的空间是有限的,新旧页面的更替是通过 LRU 算法控制的,但 InnoDB 中的 LRU 并不是单纯的新页面替换老页面(因为这样相当于每次大查询都会把整个 Buffer Pool 都刷新一遍),而是将 LRU 链表分成了 young 区和 old 区,页面第一次被访问时会被添加到 old 区,old 区的页面如果是短期内被多次访问,则其不会被移动到链表的头部(young 区),会很快被淘汰掉。

临时表

1
2
3
4
create temporary table temp_t like t1;
alter table temp_t add index(b);
insert into temp_t select * from t2 where b>=1 and b<=2000;
select * from t1 join temp_t on (t1.b=temp_t.b);

临时表特性:

  1. 不同 session 的临时表是可以重名的,常被用在复杂查询的优化过程中,比如有多个 session 同时执行 join 优化,不需要担心表名重复导致建表失败的问题。
  2. 不需要担心数据删除问题。如果使用普通表,在流程执行过程中客户端发生了异常断开,或者数据库发生异常重启,还需要专门来清理中间过程中生成的数据表。而临时表由于会自动回收,所以不需要这个额外的操作。

临时表的使用场景

union 语句

表 t1 在执行前已初始化插入了 1~1000 的数据。

1
(select 1000 as f) union (select id from t1 order by id desc limit 2);

MySQL-union执行流程
上面语句将两个子查询的结果合并去重,union 合并时会生成临时表,这可以通过 explain 来验证。

group by

1
select id%10 as m, count(*) as c from t1 group by m;

MySQL-groupby执行流程
上面语句先创建内存临时表,表里有 m 和 c 两个字段,主键是 m,扫描 t1 索引 a,将id%10的结果插入临时表,如果出现主键冲突则计算 c 值+1。

  1. 加索引
    默认情况下id%10是无序的,所以需要先在临时表中统计排序后再返回,但是如果原表本身就是有序的,则不需要临时表、也不需要额外排序了,实际上只要引入索引就可以解决这个问题,因为索引是有序的
  2. 如果不能加索引,也可以加一列 generated column
    MySQL5.7 支持 generated column 机制,并可以在该列上创建索引:
    1
    alter table t1 add column z int generated always as(id % 100), add index(z);
    上面的 group by 语句可以改成如下的形式:
    1
    select z, count(*) as c from t1 group by z;
  3. 如果不需要排序,可以显式声明忽略排序
    如果对 group by 语句的结果没有排序要求,要在语句后面加 order by null
  4. 数据量小时使用内存临时表
    如果 group by 需要统计的数据量不大,尽量只使用内存临时表;也可以通过适当调大 tmp_table_size 参数,来避免用到磁盘临时表;
  5. 数据量大时使用磁盘临时表
    如果数据量较大,因为内存临时表的空间是有限的,当达到上限后就会转到磁盘内存表,与其这样转一下,不如直接使用磁盘内存表。
    因此,如果数据量实在太大,使用 SQL_BIG_RESULT 这个提示,来告诉优化器直接使用排序算法得到 group by 的结果。

Memory 引擎

Memory 引擎与 InnoDB 引擎区别

  1. 数据组织方式
    InnoDB 引擎采用 B+树来组织数据,主键是有序存储的。InnoDB 引擎把数据放在主键索引上,其他索引上保存的是主键 id。这种方式,我们称之为索引组织表(Index Organizied Table)
    Memory 引擎的数据和索引是分开的,数据以数组的方式单独存放,而主键索引是 hash 索引,存的是每个数据的位置,索引上的 key 并不是有序的:
    MySQL-Memory引擎数据组织
    Memory 引擎采用的是把数据单独存放,索引上保存数据位置的数据组织形式,我们称之为堆组织表(Heap Organizied Table)
  2. 存放顺序
    InnoDB 表的数据总是有序存放的,而内存表的数据就是按照写入顺序存放的;
  3. 当数据文件有空洞的时候,InnoDB 表在插入新数据的时候,为了保证数据有序性,只能在固定的位置写入新值,而内存表找到空位就可以插入新值;
  4. 数据位置发生变化的时候,InnoDB 表只需要修改主键索引,而内存表需要修改所有索引;
  5. InnoDB 表用主键索引查询时需要走一次索引查找,用普通索引查询的时候,需要走两次索引查找。而内存表没有这个区别,所有索引的“地位”都是相同的。
  6. InnoDB 支持变长数据类型,不同记录的长度可能不同;内存表不支持 Blob 和 Text 字段,并且即使定义了 varchar(N),实际也当作 char(N),也就是固定长度字符串来存储,因此内存表的每行数据长度相同。

hash 索引和 B-Tree 索引

内存表也支持 B-Tree 索引:

1
alter table t1 add index a_btree_index using btree (id);

MySQL-内存表B-Tree索引
可以查看以下两个语句的输出:

1
2
3
4
-- 命中索引a_btree_index,因此输出结果是有序的
select * from t1 where id < 5;
-- 强制使用主键id索引,因此是无序的
select * from t1 force index (primary) where id < 5;

不推荐在生产环境使用 Memory 引擎

  1. 锁粒度问题
    内存表不支持行锁,只支持表锁,只要这张表上有更新,就会堵住所有其他在这张表上的读写操作,因此在处理并发事务时性能也不会太好。
  2. 数据持久化问题
    因为数据被存放在内存中,数据库重启时所有的内存表都会被清空。

虽然一般情况下不适合使用内存表,但是还有一种情况可以考虑使用内存表:用户临时表,只是临时数据,如果数据可控,不会消耗过多内存的情况下,可以考虑使用内存表。
内存临时表(通过 create temporary table 语句创建)刚好可以无视内存表的两个不足,主要是下面的三个原因:

  1. 临时表不会被其他线程访问,没有并发性的问题;
  2. 临时表重启后也是需要删除的,清空数据这个问题不存在;
  3. 备库的临时表也不会影响主库的用户线程。

备份

  • 将数据导出成一组 insert 语句
    1
    mysqldump -h$host -P$port -u$user --add-locks=0 --no-create-info --single-transaction  --set-gtid-purged=OFF db1 t --where="a>900" --result-file=/client_tmp/t.sql
    恢复:
    1
    mysql -h127.0.0.1 -P13000  -uroot db2 -e "source /client_tmp/t.sql"
  • 导出 CSV 文件
    1
    select * from db1.t where a>900 into outfile '/server_tmp/t.csv';
    恢复,将数据导入到目标表 db2.t 中:
    1
    load data infile '/server_tmp/t.csv' into table db2.t;
  • 物理拷贝
    不能通过直接拷贝表的.frm 文件和.ibd 文件来实现物理拷贝,因为一个 InnoDB 表除了包含这两个物理文件外,还需要在数据字典中注册,直接拷贝的情况下系统不会识别。
    在 MySQL 5.6 版本引入了可传输表空间(transportable tablespace) 的方法,可以通过导出 + 导入表空间的方式,实现物理拷贝表的功能。
    1. 执行 create table r like t,创建一个相同表结构的空表;
    2. 执行 alter table r discard tablespace,这时候 r.ibd 文件会被删除;
    3. 执行 flush table t for export,这时候 db1 目录下会生成一个 t.cfg 文件;
    4. 在 db1 目录下执行 cp t.cfg r.cfg; cp t.ibd r.ibd;
    5. 这两个命令(这里需要注意的是,拷贝得到的两个文件,MySQL 进程要有读写权限);
    6. 执行 unlock tables,这时候 t.cfg 文件会被删除;
    7. 执行 alter table r import tablespace,将这个 r.ibd 文件作为表 r 的新的表空间,由于这个文件的数据内容和 t.ibd 是相同的,所以表 r 中就有了和表 t 相同的数据。

这三种方法各有优劣:

  1. 物理拷贝的方式速度最快,尤其对于大表拷贝来说是最快的方法。如果出现误删表的情况,用备份恢复出误删之前的临时库,然后再把临时库中的表拷贝到生产库上,是恢复数据最快的方法。但是,这种方法的使用也有一定的局限性:
    • 必须是全表拷贝,不能只拷贝部分数据;
    • 需要到服务器上拷贝数据,在用户无法登录数据库主机的场景下无法使用;……
    • 由于是通过拷贝物理文件实现的,源表和目标表都是使用 InnoDB 引擎时才能使用。
  2. 用 mysqldump 生成包含 INSERT 语句文件的方法,可以在 where 参数增加过滤条件,来实现只导出部分数据。这个方式的不足之一是,不能使用 join 这种比较复杂的 where 条件写法。
  3. 用 select … into outfile 的方法是最灵活的,支持所有的 SQL 写法。但,这个方法的缺点之一就是,每次只能导出一张表的数据,而且表结构也需要另外的语句单独备份。

MySQL 中的自增 ID

表的自增 id

我们经常给表的主键加上自增属性,用于唯一标识一条记录,但是因为自增值达到上限后再申请得到的值不变,因此自增字段的范围应该略大一些,尽可能创建成bigint unsigned

row_id

如果没有指定主键,InnoDB 会创建一个不可见的、长度为 6 字节的 row_id,超过上限后再申请时会得到 0,如果新写入的行的 row_id 在表中已存在,则会直接覆盖原有的行,因此,最好优先使用自增 ID 而不是 row_id。

Xid

Xid 用于唯一标识一个事务。Xid 的值由一个内存变量 global_query_id 给出,重启后清零,但是因为每次重启时 binlog 都会重新生成,所以 binlog 中的 Xid 也不会重复。global_query_id 的长度为 8 个字节,除非 MySQL 实例一直执行了2^64 - 1次查询且期间没有重启,不然不会出现 Xid 重复的情况。

max_trx_id

Xid 由 server 层维护。InnoDB 内部使用 Xid,就是为了能够在 InnoDB 事务和 server 之间做关联。但是,InnoDB 自己的 trx_id,是另外维护的。
InnoDB 内部维护了一个 max_trx_id 全局变量,每次需要申请一个新的 trx_id 时,就获得 max_trx_id 的当前值,然后并将 max_trx_id 加 1。

InnoDB 事务在读操作时不会申请 trx_id,trx_id 的值就是 0,只有在加锁或执行写操作时才会申请。
只读事务不申请 trx_id 的原因是只读事务不影响事务的可见性判断,且能减少 trx_id 的申请次数、减少并发事务申请 trx_id 的锁冲突。

MVCC 判断数据可见性的核心思想:每一行数据都记录了更新它的 trx_id,当一个事务读到一行数据的时候,判断这个数据是否可见的方法,就是通过事务的一致性视图与这行数据的 trx_id 做对比。

thread_id

系统保存一个全局变量 thread_id_counter,每新建一个连接就将 thread_id_counter 赋值给这个新连接的线程变量。

主从同步

MySQL的主从同步是基于bin log实现的。

bin log 同步流程

MySQL主从复制
备库 B 和主库 A 之间维持了一个长连接,主库 A 内部有一个线程专门服务于与 B 的 bin log 同步,一个事务日志同步的过程如下:

  1. 在备库 B 上通过 change master 命令,设置主库 A 的 IP、端口、用户名、密码,以及要从哪个位置开始请求 binlog,这个位置包含文件名和日志偏移量;
  2. 在备库 B 上执行 start slave 命令,这时候备库会启动两个线程,就是图中的 io_threadsql_thread。其中 io_thread 负责与主库建立连接。
  3. 主库 A 校验完用户名、密码后,开始按照备库 B 传过来的位置,从本地读取 bin log,发给 B。
  4. 备库 B 拿到 bin log 后,写到本地文件,称为中转日志(relay log)。
  5. sql_thread 读取中转日志,解析出日志里的命令,并执行。

binlog格式

MySQL的bin log主要支持三种格式,分别是statement、row以及mixed。MySQL是在5.1.5版本开始支持row的、在5.1.8版本中开始支持mixed。

  • statement:binlog中记录的是SQL语句的原文
    容易引起主从不一致,因为binlog是按事务提交顺序记录的,但是两个并发执行的事务中update语句可能是乱序的。
  • row:记录原数据

主备延迟

产生主备延迟的可能情况:

  1. 备库所在的机器性能较主库差;
  2. 备库的压力较大,比如因为备库不跑业务,所以很多人会随意执行一些特别耗时的操作,这些查询耗费大量的 CPU 资源,影响了同步速度,造成主备延迟。
  3. 出现了大事务,比如,一次性用 delete 语句删除大量数据,或者大表的 DDL。
  4. 一个服务器开放N个链接给客户端来连接,但是Slave里读取binlog的线程只有一个,当某个SQL在Slave上执行的时间稍长或由于某个SQL要进行锁表就会导致Master的SQL大量积压,未被同步到Slave,这就导致了主从不一致,也就是主从延迟。

并行复制策略

为了避免备库追不上主库的情况,MySQL 利用并行复制策略提高复制的效率,从上面的主备同步流程图可知,并行化可以加到客户端连接和写入数据两个过程中。

  1. 按表分发策略
    如果两个事务更新不同的表,它们就可以并行。因为数据是存储在表里的,所以按表分发,可以保证两个 worker 不会更新同一行。
    当然,如果有跨表的事务,还是要把两张表放在一起考虑的。
  2. 按行分发策略
    按表复制存在热点表的并行复制问题,即热点表会被分配给一个 worker 执行复制,这样就会退化成单线程复制。
    按行复制的核心思路是:如果两个事务没有更新相同的行,则它们在备库上可以并行执行,为了知道具体修改了哪些行,这种模式需要设置 binlog 的格式为 row(因为 statement 格式直接记录更新语句,row 记录的是受影响的具体数据的 ID)。

半同步

在 MySQL 5.5 版本之前一直采用的是上述的异步复制方案,主库的事务执行不会管备库的同步进度,如果备库落后,主库不幸 crash,那么就会导致数据丢失。
于是在 MySQL 在 5.5 中就顺其自然地引入了半同步复制,主库在应答客户端提交的事务前需要保证至少一个从库接收并写到 relay log 中。

异步 & 半同步 & 全同步

  • 对于异步复制,主库将事务 Binlog 事件写入到 bin log 文件中,此时主库只会通知一下 Dump 线程发送这些新的 Binlog,然后主库就会继续处理提交操作,而此时不会保证这些 Binlog 传到任何一个从库节点上。
  • 对于全同步复制,当主库提交事务之后,所有的从库节点必须收到,APPLY 并且提交这些事务,然后主库线程才能继续做后续操作。这里面有一个很明显的缺点就是,主库完成一个事务的时间被拉长,性能降低。
  • 对于半同步复制,是介于全同步复制和异步复制之间的一种,主库只需要等待至少一个从库节点收到并且 Flush Binlog 到 Relay Log 文件即可,主库不需要等待所有从库给主库反馈。同时,这里只是一个收到的反馈,而不是已经完全执行并且提交的反馈,这样就节省了很多时间。

主备切换

主备功能主要是通过 bin log 实现的。

主备切换流程

MySQL-主备切换流程
如图示,客户端的读写都是直接访问的节点 A,而节点 B 是 A 的备库,通常是只读的,只是将 A 的更新同步过来到本地执行,节点 A 的 update 同步到节点 B 的流程图如下所示:
MySQL-主备同步流程图

  1. 主库接收到客户端的更新请求后,执行内部事务的更新逻辑,同时写 binlog;
  2. 备库 B 跟主库 A 之间维持了一个长连接,专门用于服务备库 B 的事务日志同步;

当需要切换时,切换到状态 2,这时候客户端读写访问的都是节点 B,而节点 A 是 B 的备库;

双 M 架构

实际生产中更多采用的是双 M 架构:
MySQL-双M架构主备切换流程
与原先的方案相比,只是节点 A 和 B 之间多了一条线,这样,节点 A 和 B 之间总是互为主备关系,在切换的时候就不用再修改主备关系。

可靠性优先策略

双 M 结构的可靠性优先主备切换流程如下:

  1. 判断备库 B 现在的 seconds_behind_master,如果小于某个值(比如 5 秒)继续下一步,否则持续重试这一步;
  2. 把主库 A 改成只读状态,即把 readonly 设置为 true;
  3. 判断备库 B 的 seconds_behind_master 的值,直到这个值变成 0 为止;
  4. 把备库 B 改成可读写状态,也就是把 readonly 设置为 false;
  5. 把业务请求切到备库 B。

这个切换流程一般由专门的 HA 系统来完成,称为可靠性优先流程
注意:

  1. 这个过程中,比较耗时的是第 3 步,可能会耗费好几秒的时间,因此一般会先在第 1 步中做判断,确保 seconds_behind_master 足够小后才执行。

可用性优先策略

可靠性优先策略中,同步流程中存在一段系统不可用的时间,如果强行把步骤 4、5 调整到最开始执行,也就是说不等主备数据同步,直接把连接切到备库 B,并且让备库 B 可以读写,那么系统几乎就没有不可用时间了,这个流程称为可用性优先策略。

一主多从

平时使用数据库一般都是读多写少,在发展过程中很可能会先遇到读性能问题,为了解决读性能问题,在架构上的解决方式是一主多从
MySQL-一主多从基本结构
其中:

  • A 和 A’互为主备;
  • 从库 B、C、D 指向主库 A,主库负责所有写入和一部分读,其他的读请求由从库分担。

MySQL-一主多从的主备切换

  • 主备切换后,A’将成为新的主库;
  • 从库 B、C、D 改成连接到 A’。

读写分离

上述的主从结构其实形成了一种读写分离的架构,连接信息一般保存到客户端,由客户端执行负载均衡。
另一种读写分离架构在客户端和服务器之间架设了一个代理层 proxy,客户端全部连接到这个 proxy,由 proxy 根据请求类型和上下文执行请求的路由分发。
MySQL-带proxy的读写分离架构

  • 直连的架构,少了一层 proxy,因此性能稍微更好一点,排查问题也更方便,但是主备切换库迁移时客户端会感知到,所以客户端需要一个后端管理组件,比如 Zookeeper。
  • 带 proxy 架构,对客户端友好,但是同时 proxy 架构也更加复杂。

“过期读”问题

当客户端先写入再读取时可能会读到修改前的值,因为写入是对主库写入,读取是对从库读,而主从同步存在延迟,刚写入主库的数据可能还没有同步到所有的从库。
解决过期读问题的方案:

  • 强制走主库方案;
    一些必须拿到最新结果的请求,可以强制将其发到主库上,比如用户支付后需要马上看到商品是否已经购买成功,这个请求需要马上拿到最新的结果,因此最好走主库;
    一些请求没有必要立刻拿到最新的结果,比如商户发布商品后,用户即使没有马上看到商品也是可以的,因此用户读取商品列表的请求完全可以路由到从库上去。
  • sleep 方案;
    不大靠谱,但是一定程度上还是可以解决问题的。
  • 判断主备无延迟方案;
    判断 show slave status 结果里的 seconds_behind_master 参数的值,等于 0 才执行查询请求,这个参数可以表明从库是否已经完全同步。
  • 配合 semi-sync 方案;
  • 等主库位点方案;
  • 等 GTID 方案。

探活

在一主一备的双 M 架构里,主备切换只需要把客户端流量切到备库;而在一主多从架构里,主备切换除了要把客户端流量切到备库外,还需要把从库接到新主库上。
主备切换有两种场景:主动切换和被动切换,其中被动切换往往是因为主库出问题而由 HA 系统发起的。

select 1

select 1只能用于判断该数据库进程仍能执行,但是不能说明主库没有问题,比如,数据库线程池(由参数 innodb_thread_concurrency 控制)被打满的情况下,虽然select 1能执行,但是线程池还是会被堵住。

innodb_thread_concurrency 控制的是并发查询,而不是并发连接,因为并发连接多只是多占用一些内存空间,并不会占用 CPU 资源。

查表判断

为了知道线程池是否被打满,我们可以创建一张health_check表,里面只放一条数据,然后定时执行:

1
select * from mysql.health_check;

这种方法的缺点是,不能用于判断磁盘空间是否满了,因为如果磁盘空间满了,所有的更新语句和事务提交语句都会被堵塞,但是查询语句仍能执行。

更新判断

更新一行数据,一般会放一个 timestamp 字段,用来表示最后一次执行检测的时间。
但是要注意如果主备都要检测,就不能只有一行数据了,因为会产生行冲突,导致主备同步的停止。一般会采用数据库实例的 server_id 作为主键,因为 MySQL 规定了主备服务器的 server_id 必须不同,这样就能保证主备的检测命令不会冲突了。
这种方式仍然存在一种问题:这种更新语句占用的 IO 资源很少,即使当时 IO 已经 100%,检测语句仍可以获得 IO 资源来执行,但系统可能已经出问题了,也就是说,这种检测存在随机性。

内部统计

前面几种方法都是通过外部调用来发现问题的,更好的方式是利用 MySQL 本身的统计数据:performance_schema库的file_summary_by_event_name表。

QA

主从同步的流程

主备服务器之间维持了一个长连接,备库上回启动两个线程,一个 io_thread 负责与主库建立连接并读取 bin log,另一个 sql_thread 负责解析命令并执行。

MySQL 是怎么保证数据不丢失的

MySQL 是怎么保证高可用的

MySQL如何优化千万级的大表?

  1. 优化SQL和索引;
  2. 加缓存,比如Memcached或Redis;
  3. 主从复制或主主复制,实现读写分离
    可以在应用层做,效率高
    也可以用三方工具,如360的atlas
  4. 使用MySQL自带的分区表
    优点是对应用透明,但是SQL需要针对分区表做一些优化,sql条件中要带上分区条件的列,从而使查询定位到少量的分区上,否则就会扫描全部分区。
  5. 垂直拆分,根据模块耦合情况将一个大系统分为多个小系统
  6. 水平切分,选择合适的sharding key将大表数据拆分到多个小表上

参考

  1. MySQL 5.7 半同步复制技术

为什么使用 Netty

  1. 实现协议的局限性
    今天,我们使用通用的应用程序或者类库来实现互相通讯,比如,我们经常使用一个 HTTP 客户端库来从 web 服务器上获取信息,或者通过 web 服务来执行一个远程的调用。
    然而,有时候一个通用的协议或他的实现并没有很好的满足需求。比如我们无法使用一个通用的 HTTP 服务器来处理大文件、电子邮件以及近实时消息,比如金融信息和多人游戏数据。我们需要一个高度优化的协议来处理一些特殊的场景。例如你可能想实现一个优化了的 Ajax 的聊天应用、媒体流传输或者是大文件传输器,你甚至可以自己设计和实现一个全新的协议来准确地实现你的需求。
    另一个不可避免的情况是当你不得不处理遗留的专有协议来确保与旧系统的互操作性。在这种情况下,重要的是我们如何才能快速实现协议而不牺牲应用的稳定性和性能。
  2. 使用 Netty 可以有效改善这种情况
    Netty 是一个提供 asynchronous event-driven (异步事件驱动)的网络应用框架,是一个用以快速开发高性能、高可靠性协议的服务器和客户端。
    换句话说,Netty 是一个 NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。
    “快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty 是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如 FTP、SMTP、HTTP、许多二进制和基于文本的传统协议.因此,Netty 已经成功地找到一个方式,在不失灵活性的前提下来实现开发的简易性,高性能,稳定性。
    有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是 Netty 和它们的不同之处。答案就是 Netty 的哲学设计理念。Netty 从开始就为用户提供了用户体验最好的 API 以及实现设计。正是因为 Netty 的哲学设计理念,才让您得以轻松地阅读本指南并使用 Netty。

架构总览

Netty架构总览
Netty 的架构由三部分组成——缓冲(buffer),通道(channel),事件模型(event model)——所有的高级特性都构建在这三个核心组件之上。

NIO

  1. 想了解 Aio 与 Nio 的利弊,为什么 Netty 没有采用 Aio 实现?

NIO 基于传输层,可以自定义数据处理逻辑来作为应用层,或者基于现有的 HTTP 组件进行升级,在线上环境这样的升级会带来一些兼容性问题,HTTP 已有相应的协议升级机制:Protocol upgrade mechanism

NIO 相对 BIO 优势:

  1. 零拷贝
    零拷贝减少线程上下文切换次数,且数据直接拷贝到内核空间,不占用 JVM 堆空间;
  2. 减少线程资源浪费
    NIO 可以一个线程监听多个 Socket 的连接、读、写请求,而不是像 BIO 那样每个 Socket 创建一个线程,但是同时会有一个问题:

Netty 核心组件

  1. Channel 和 ChannelHandler
  2. ByteBuf
  3. Pipeline

服务端

Netty流程

代码

下面是一个启动Netty服务端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 空闲检测
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(15, 0, 0,
TimeUnit.MINUTES));

// 半包/粘包分解器
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(2048, true, getFirstBytes()
));
ch.pipeline().addLast(其他Handler比如解码之类的);
}
}).option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.bind(10885).sync()

创建EventLoop

在上面的代码中,出现了bossGroupworkerGroup,bossGroup主要负责监听连接,拿到连接后,交给workerGroup中的线程来监听读或写事件。
io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup
EventExecutorGroup会给每个线程创建一个EventLoop

io.netty.channel.nio.NioEventLoop#NioEventLoop
newChild()创建EventLoop实例,其默认实现是NioEventLoop

io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
服务器初始化过程中创建了个线程池ThreadPerTaskExecutor

  • 每次执行任务都会构造一个线程执行
    io.netty.util.concurrent.ThreadPerTaskExecutor#execute

创建及初始化 ServerSocketChannel

Netty 有一个叫做 Channel 的统一的异步 I/O 编程接口,这个编程接口抽象了所有点对点的通信操作。也就是说,如果你的应用是基于 Netty 的某一种传输实现,那么同样的,你的应用也可以运行在 Netty 的另一种传输实现上。Netty 提供了几种拥有相同编程接口的基本传输实现:

  • 基于 NIO 的 TCP/IP 传输 (见 io.netty.channel.nio),
  • 基于 OIO 的 TCP/IP 传输 (见 io.netty.channel.oio),
  • 基于 OIO 的 UDP/IP 传输, 和
  • 本地传输 (见 io.netty.channel.local).

切换不同的传输实现通常只需对代码进行几行的修改调整,例如选择一个不同的 ChannelFactory 实现。
此外,你甚至可以利用新的传输实现没有写入的优势,只需替换一些构造器的调用方法即可,例如串口通信。而且由于核心 API 具有高度的可扩展性,你还可以完成自己的传输实现。

  1. 入口
    io.netty.bootstrap.AbstractBootstrap#bind(int)
    用户代码调用bind绑定端口时会触发Channel的创建和初始化

io.netty.bootstrap.ServerBootstrap#init
对Channel的使用可以追溯到这个init方法,包括Channel的创建、属性等的设置。

  1. 创建
    NioServerSocketChannel的构造方法 -> io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
    可以看到,Netty中的ServerSocketChannel其实就对应JDK NIO中的ServerSocketChannel,在创建NioServerSocketChannel的同时创建了一个NIO中的ServerSocketChannel

  2. 初始化
    中间包含对childOptionschildAttrs等的设置。

  3. 添加一个连接处理器ServerBootstrapAcceptor

注册Selector

紧接着上面的初始化过程,接下来是注册NIO中的Selector。
io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)
总而言之最终还是使用NIO注册了 Selector。
io.netty.channel.nio.AbstractNioChannel#doRegister

启动 NioEventLoop

io.netty.bootstrap.AbstractBootstrap#doBind0
绑定端口号的同时,执行一个线程。
io.netty.util.concurrent.SingleThreadEventExecutor#startThread
NioEventLoop启动流程的最终启动了一个线程。
io.netty.channel.nio.NioEventLoop#run
该线程任务根据EventLoop的实现不同而有所不同,在NioEventLoop中,主要任务为以下3步:

  1. 接收事件(selectionKey)
    io.netty.channel.nio.NioEventLoop#select
    当检查没有需要处理的selectionKey时就会发生空轮询,Netty在轮询时会记录空轮询次数,当空轮询达到一定次数时,将之前注册的事件先取消,从而避免了NIO的空轮询Bug

  2. 检测新连接并创建NioSocketChannel
    io.netty.channel.nio.NioEventLoop#processSelectedKeys
    处理连接请求,并分发请求到pipeline
    io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

    • 每个连接创建一个ServerSocketChannel
      io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
    • 读取数据并分发到pipeline
      io.netty.channel.ChannelPipeline#fireChannelReadComplete
  3. 执行线程任务
    io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)

pipeline中的第一个ChannelHandler

pipeline的第一个Handler为ServerBootstrapAcceptor,它的主要任务包括:

  1. 将用户自定义ChannelHandler添加到pipeline

  2. 选择一个NioEventLoop传播事件
    io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)

  3. 注册selector
    代码流程非常长,但是最终可以跟到doRegister这个方法,可以发现最后还是调用了JDK的SocketChannel注册Selector。
    io.netty.channel.AbstractChannel.AbstractUnsafe#register0 -> io.netty.channel.nio.AbstractNioChannel#doRegister

  4. 注册读事件
    代码最后判断第一次连接则触发连接激活事件,代码位置仍然是上边的register0
    io.netty.channel.AbstractChannel.AbstractUnsafe#register0
    继续往下看可以看到最终将读事件(selectionKey)注册到了Selector
    io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
    -> io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead
    -> io.netty.channel.nio.AbstractNioChannel#doBeginRead

选择EventLoop:
io.netty.util.concurrent.MultithreadEventExecutorGroup#chooser
每当有客户端连接进来时,Netty需要决定选择哪个EventLoop,这个工作是由EventExecutorChooser负责的:

  • GenericEventExecutorChooser:循环选择。
  • PowerOfTwoEventExecutorChooser:也是循环选择,只不过GenericEventExecutorChooser使用了取模运算,而PowerOfTwoEventExecutorChooser是通过位运算实现的。

Pipeline

  1. 创建Pipeline
    创建NioSocketChannel时会创建Pipeline:
    io.netty.channel.AbstractChannel#AbstractChannel
    Pipeline本身是一个双向链表的结构,且有两个哨兵节点headtail

  2. 添加Pipeline
    添加到链表
    io.netty.channel.ChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
    检查是否重复添加,如果加了@Sharable注解是可以重复添加的
    io.netty.channel.DefaultChannelPipeline#checkMultiplicity
    添加到链表末尾,也就是添加到tail节点的前面。
    io.netty.channel.DefaultChannelPipeline#addLast0

  3. 删除Pipeline
    有时候我们需要删除一个Pipeline上的某些ChannelHandler,比如已经进行过了授权校验,那下次就不需要再执行授权校验了,我们就可以直接把授权相关的那些ChannelHandler删除掉。
    首先遍历Pipeline找到目标ChannelHandler。
    io.netty.channel.DefaultChannelPipeline#getContextOrDie
    然后从Pipeline中移除。
    io.netty.channel.DefaultChannelPipeline#remove(AbstractChannelHandlerContext)

  4. inBound事件传播
    ChannelHandler中每个事件都有一个接口,ChannelInboundHandler专门处理输入事件,以channelRead为例。
    EventLoop会将读事件传给Pipeline,然后按责任链模式的逻辑从head节点开始传播事件。
    io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

  5. outBound事件传播
    ChannelOutboundHandler专门用于处理输出事件,以write为例。

    1
    2
    3
    4
    5
    6
    7
    public class EchoServerOutHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.channel().write("Hello");
    }
    }

    当我们在Handler中调用Context的write方法时,就是将写事件传给了Pipeline,Pipeline会从tail节点开始往前传播。
    io.netty.channel.AbstractChannelHandlerContext#write

心跳检测

应用协议层的心跳是必须的,它和 tcp keepalive 是完全不同的概念。应用层协议层的心跳检测的是连接双方的存活性,兼而连接质量,而 keepalive 检测的是连接本身的存活性。而且后者的超时时间默认过长,完全不能适应现代的网络环境。
Netty 内置通过增加 IdleStateHandler 产生 IDLE 事件进行便捷的心跳控制。你要处理的,就是心跳超时的逻辑,比如延迟重连。但它的轮训时间是固定的,无法动态修改,高级功能需要自己定制。
不同场景下需要切换不同的保活机制,在一些客户端比如 Android,频繁心跳的唤起会浪费大量的网络和电量,它的心跳策略会更加复杂一些。

优雅退出

Java 的优雅停机通常通过注册 JDK ShutdownHook 来实现。
Runtime.getRuntime().addShutdownHook();
一般通过 kill -15 进行 java 进程的关闭,以便在进程死亡之前进行一些清理工作。

注意:kill -9 会立马杀死进程,不给遗言的机会,比较危险。

虽然 netty 做了很多优雅退出的工作,通过 EventLoopGroup 的 shutdownGracefully 方法对 nio 进行了一些状态设置,但在很多情况下,这还不够多。它只负责单机环境的优雅关闭。
流量可能还会通过外层的路由持续进入,造成无效请求。一种可行的做法是首先在外层路由进行一次本地实例的摘除,把流量截断,然后再进行 netty 本身的优雅关闭。

示例协议实现

不少中间件会实现自己的协议,比如 Redis、MySQL,MyCat、TiDB 用的就是 MySQL 协议。
netty 默认实现了 dns、haproxy、http、http2、memcache、mqtt、redis、smtp、socks、stomp、xml 等协议。
协议分为两种:

  • 文本协议在调试起来是比较直观和容易的,但安全性欠佳;
  • 二进制协议就需要依赖日志、wireshark 等其他方式进行分析,增加了开发难度。
  1. 示例协议 - echo
  2. 示例协议 - discard
  3. 示例协议 - uptime
  4. 示例二进制协议 - factorial
  5. 示例文本协议 - telnet

数据结构 - ByteBuf

Netty 使用自建的 buffer API,而不是使用 NIO 的 ByteBuffer 来表示一个连续的字节序列。与 ByteBuffer 相比这种方式拥有明显的优势。Netty 使用新的 buffer 类型 ByteBuf,被设计为一个可从底层解决 ByteBuffer 问题,并可满足日常网络应用开发需要的缓冲类型。这些很酷的特性包括:

  • 如果需要,允许使用自定义的缓冲类型。
  • 复合缓冲类型中内置的透明的零拷贝实现。
  • 开箱即用的动态缓冲类型,具有像 StringBuffer 一样的动态缓冲能力。
  • 不再需要调用的 flip()方法。
  • 正常情况下具有比 ByteBuffer 更快的响应速度。

ByteBuf结构
以上就是一个 ByteBuf 的结构图,从上面这幅图可以看到

  1. ByteBuf 是一个字节容器,容器里面的的数据分为三个部分,第一个部分是已经丢弃的字节,这部分数据是无效的;第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;最后一部分的数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段。最后一部分虚线表示的是该 ByteBuf 最多还能扩容多少容量
  2. 以上三段内容是被两个指针给划分出来的,从左到右,依次是读指针(readerIndex)、写指针(writerIndex),然后还有一个变量 capacity,表示 ByteBuf 底层内存的总容量
  3. 从 ByteBuf 中每读取一个字节,readerIndex 自增 1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读, 由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读
  4. 写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增 1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了
  5. ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错

使用 ByteBuf 有以下好处:

  1. 可以有效地区分可读数据和可写数据,读写之间相互没有冲突
  2. Extensibility 可扩展性
    ByteBuf 具有丰富的操作集,可以快速的实现协议的优化。例如,ByteBuf 提供各种操作用于访问无符号值和字符串,以及在缓冲区搜索一定的字节序列。你也可以扩展或包装现有的缓冲类型用来提供方便的访问。自定义缓冲式仍然实现自 ByteBuf 接口,而不是引入一个不兼容的类型
  3. Transparent Zero Copy 透明的零拷贝
    网络应用中需要减少内存拷贝操作次数。你可能有一组缓冲区可以被组合以形成一个完整的消息。网络提供了一种复合缓冲,允许你从现有的任意数的缓冲区创建一个新的缓冲区而无需内存拷贝。例如,一个信息可以由两部分组成:header 和 body。在一个模块化的应用,当消息发送出去时,这两部分可以由不同的模块生产和装配。
    1
    2
    3
    +--------+------+
    | header | body |
    +--------+------+
    如果你使用的是 ByteBuffer ,你必须要创建一个新的大缓存区用来拷贝这两部分到这个新缓存区中。或者,你可以在 NIO做一个收集写操作,但限制你将复合缓冲类型作为 ByteBuffer 的数组而不是一个单一的缓冲区,这样打破了抽象,并且引入了复杂的状态管理。此外,如果你不从 NIO channel 读或写,它是没有用的。
    1
    2
    // 复合类型与组件类型不兼容。
    ByteBuffer[] message = new ByteBuffer[] { header, body };
    通过对比, ByteBuf 不会有警告,因为它是完全可扩展并有一个内置的复合缓冲区。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 复合类型与组件类型是兼容的。
    ByteBuf message = Unpooled.wrappedBuffer(header, body);
    // 因此,你甚至可以通过混合复合类型与普通缓冲区来创建一个复合类型。
    ByteBuf messageWithFooter = Unpooled.wrappedBuffer(message, footer);
    // 由于复合类型仍是 ByteBuf,访问其内容很容易,
    //并且访问方法的行为就像是访问一个单独的缓冲区,
    //即使你想访问的区域是跨多个组件。
    //这里的无符号整数读取位于 body 和 footer
    messageWithFooter.getUnsignedInt(
    messageWithFooter.readableBytes() - footer.readableBytes() - 1);
  4. Automatic Capacity Extension 自动容量扩展
    许多协议定义可变长度的消息,这意味着没有办法确定消息的长度,直到你构建的消息。或者,在计算长度的精确值时,带来了困难和不便。这就像当你建立一个字符串。你经常估计得到的字符串的长度,让 StringBuffer 扩大了其本身的需求。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 一种新的动态缓冲区被创建。在内部,实际缓冲区是被“懒”创建,从而避免潜在的浪费内存空间。
    ByteBuf b = Unpooled.buffer(4);
    // 当第一个执行写尝试,内部指定初始容量 4 的缓冲区被创建
    b.writeByte('1');
    b.writeByte('2');
    b.writeByte('3');
    b.writeByte('4');
    // 当写入的字节数超过初始容量 4 时,
    //内部缓冲区自动分配具有较大的容量
    b.writeByte('5');
  5. Better Performance 更好的性能
    最频繁使用的缓冲区 ByteBuf 的实现是一个非常薄的字节数组包装器(比如,一个字节)。与 ByteBuffer 不同,它没有复杂的边界和索引检查补偿,因此对于 JVM 优化缓冲区的访问更加简单。更多复杂的缓冲区实现是用于拆分或者组合缓存,并且比 ByteBuffer 拥有更好的性能。

粘包拆包和半包合并

基于流的传输比如 TCP/IP, 接收到数据是存在 socket 接收的 buffer 中。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。造成粘包的原因,主要是由于缓冲区的介入,所以需要严格约定去所传输的包的格式——何时开始何时结束。意味着,即使你发送了 2 个独立的数据包,操作系统也不会作为 2 个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了 3 个数据包,在应用程序中读取数据的时候可能被分成下面的片段:
粘包和半包问题
因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意义并且能够让程序的业务逻辑更好理解的数据。
在没有 Netty 的情况下,用户如果自己需要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包

  • 半包:如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
  • 粘包:如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

解码器 - ByteToMessageDecoder

入口:io.netty.handler.codec.ByteToMessageDecoder#channelRead

  1. 累加字节流
    累加器累加已读入的字节数,如果超过ByteBuf当前可读入的空间大小,则执行扩容。
    io.netty.handler.codec.ByteToMessageDecoder.Cumulator#cumulate
  2. 调用子类的decode方法进行解析(模板方法)
    io.netty.handler.codec.ByteToMessageDecoder#callDecode
  3. 将子类解析出的ByteBuf向下传播
    io.netty.handler.codec.ByteToMessageDecoder#fireChannelRead(io.netty.channel.ChannelHandlerContext, io.netty.handler.codec.CodecOutputList, int)

Netty中的一些拆箱即用的解码器

如果要自己实现所有协议的拆包无疑是非常麻烦的,实际上 Netty 已经自带了一些开箱即用的拆包器:

  1. 固定长度的拆包器 FixedLengthFrameDecoder
    如果你的应用层协议非常简单,每个数据包的长度都是固定的,比如 100,那么只需要把这个拆包器加到 pipeline 中,Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。
  2. 行拆包器 LineBasedFrameDecoder
    从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。
  3. 分隔符拆包器 DelimiterBasedFrameDecoder
    DelimiterBasedFrameDecoder 是行拆包器的通用版本,只不过我们可以自定义分隔符。
  4. 基于长度域拆包器 LengthFieldBasedFrameDecoder
    最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。由于上面三种拆包器比较简单,读者可以自行写出 demo,接下来,我们就结合我们小册的自定义协议,来学习一下如何使用基于长度域的拆包器来拆解我们的数据包。

编码 - MessageToByteEncoder

编码器是一个ChannelHandler,一般是第一个添加到Pipeline内,然后write的最后会将数据进行编码再输出。

  1. 匹配对象
    io.netty.handler.codec.MessageToByteEncoder#acceptOutboundMessage
  2. 内存分配
    io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
  3. 调用子类的编码实现
    io.netty.handler.codec.MessageToByteEncoder#encode
  4. 释放内存
    io.netty.util.ReferenceCountUtil#release(java.lang.Object)
  5. 放到Pipeline里传播
    默认情况下会一直传播到head节点
    io.netty.channel.ChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    io.netty.channel.Channel.Unsafe#write
  6. 输出
    将数据暂存到ByteBuf,将堆内对象转换为堆外内存
    io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
    插入写队列
    io.netty.channel.ChannelOutboundBuffer#addMessage
    TODO: 什么时候刷新buffer队列?

自定义数据处理逻辑

基于拦截链模式的事件模型 - pipeline

一个定义良好并具有扩展能力的事件模型是事件驱动开发的必要条件。Netty 具有定义良好的 I/O 事件模型。由于严格的层次结构区分了不同的事件类型,因此 Netty 也允许你在不破坏现有代码的情况下实现自己的事件类型。这是与其他框架相比另一个不同的地方。很多 NIO 框架没有或者仅有有限的事件模型概念;在你试图添加一个新的事件类型的时候常常需要修改已有的代码,或者根本就不允许你进行这种扩展。
在 Netty 中一条连接对应一个 Channel,该 Channel 的所有处理逻辑都在一个 ChannelPipeline 对象内,ChannelPipeline 是一个双向链表结构,在一个 ChannelPipeline 内部一个 ChannelEvent 被一组 ChannelHandler 处理。这个管道是 Intercepting Filter (拦截过滤器)模式的一种高级形式的实现,因此对于一个事件如何被处理以及管道内部处理器间的交互过程,你都将拥有绝对的控制力。例如,你可以定义一个从 socket 读取到数据后的操作:

1
2
3
4
5
6
7
8
9
public class MyReadHandler implements SimpleChannelHandler {
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {
Object message = evt.getMessage();
// Do something with the received message.
...
// And forward the event to the next handler.
ctx.sendUpstream(evt);
}
}

同时你也可以定义一种操作响应其他处理器的写操作请求:

1
2
3
4
5
6
7
8
9
public class MyWriteHandler implements SimpleChannelHandler {
public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt) {
Object message = evt.getMessage();
// Do something with the message to be written.
...
// And forward the event to the next handler.
ctx.sendDownstream(evt);
}
}

ChannelHandler 分为两种:

  • ChannelInboundHandler
    处理读数据逻辑,核心方法是 channelRead。
  • ChannelOutBoundHandler
    处理些数据逻辑,核心方法是 write,在链式处理中总是位于 ChannelInboundHandler 之后。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// inBound,处理读数据的逻辑链
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());

// outBound,处理写数据的逻辑链
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});

其执行顺序如下图所示:
pipeline执行顺序

异常处理

netty 由于其异步化的开发方式,以及其事件机制,在异常处理方面就显得异常重要。为了保证连接的高可靠性,许多异常需要静悄悄的忽略,或者在用户态没有感知。
netty 的异常会通过 pipeline 进行传播,所以在任何一层进行处理都是可行的,但编程习惯上,习惯性抛到最外层集中处理。
为了最大限度的区别异常信息,通常会定义大量的异常类,不同的错误会抛出不同的异常。发生异常后,可以根据不同的类型选择断线重连(比如一些二进制协议的编解码紊乱问题),或者调度到其他节点。

Codec 框架

我们可以使用 POJO 代替 ChannelBuffer,从业务逻辑代码中分离协议处理部分总是一个很不错的想法。然而如果一切从零开始便会遭遇到实现上的复杂性。你不得不处理分段的消息。一些协议是多层的(例如构建在其他低层协议之上的协议)。一些协议过于复杂以致难以在一台独立状态机上实现。
因此,一个好的网络应用框架应该提供一种可扩展,可重用,可单元测试并且是多层的 codec 框架,为用户提供易维护的 codec 代码。
Netty 提供了一组构建在其核心模块之上的 codec 实现,这些简单的或者高级的 codec 实现帮你解决了大部分在你进行协议处理开发过程会遇到的问题,无论这些协议是简单的还是复杂的,二进制的或是简单文本的。

SSL / TLS 支持

不同于传统阻塞式的 I/O 实现,在 NIO 模式下支持 SSL 功能是一个艰难的工作。你不能只是简单的包装一下流数据并进行加密或解密工作,你不得不借助于 javax.net.ssl.SSLEngine,SSLEngine 是一个有状态的实现,其复杂性不亚于 SSL 自身。你必须管理所有可能的状态,例如密码套件,密钥协商(或重新协商),证书交换以及认证等。此外,与通常期望情况相反的是 SSLEngine 甚至不是一个绝对的线程安全实现。
在 Netty 内部,SslHandler 封装了所有艰难的细节以及使用 SSLEngine 可 能带来的陷阱。你所做的仅是配置并将该 SslHandler 插入到你的 ChannelPipeline 中。同样 Netty 也允许你实现像 StartTlS 那样所拥有的高级特性,这很容易。

HTTP 实现

HTTP 无 疑是互联网上最受欢迎的协议,并且已经有了一些例如 Servlet 容器这样的 HTTP 实现。因此,为什么 Netty 还要在其核心模块之上构建一套 HTTP 实现?
与现有的 HTTP 实现相比 Netty 的 HTTP 实现是相当与众不同的。在 HTTP 消息的低层交互过程中你将拥有绝对的控制力。这是因为 Netty 的 HTTP 实现只是一些 HTTP codec 和 HTTP 消息类的简单组合,这里不存在任何限制——例如那种被迫选择的线程模型。你可以随心所欲的编写那种可以完全按照你期望的工作方式工作的客户端或服务器端代码。这包括线程模型,连接生命期,快编码,以及所有 HTTP 协议允许你做的,所有的一切,你都将拥有绝对的控制力。
由于这种高度可定制化的特性,你可以开发一个非常高效的 HTTP 服务器,例如:

  • 要求持久化链接以及服务器端推送技术的聊天服务(如,Comet )
  • 需要保持链接直至整个文件下载完成的媒体流服务(如,2 小时长的电影)
  • 需要上传大文件并且没有内存压力的文件服务(如,上传 1GB 文件的请求)
  • 支持大规模混合客户端应用用于连接以万计的第三方异步 web 服务。

WebSockets 实现

WebSockets 允许双向,全双工通信信道,在 TCP socket 中。它被设计为允许一个 Web 浏览器和 Web 服务器之间通过数据流交互。
WebSocket 协议已经被 IETF 列为 RFC 6455 规范。
Netty 已经实现了 WebSocket 和一些老版本的规范:http://netty.io/4.0/api/io/netty/handler/codec/http/websocketx/package-frame.html

Google Protocol Buffer 整合

Google Protocol Buffers 是快速实现一个高效的二进制协议的理想方案。通过使用 ProtobufEncoderProtobufDecoder,你可以把 Google Protocol Buffers 编译器 (protoc) 生成的消息类放入到 Netty 的 codec 实现中。请参考“LocalTime”实例,这个例子也同时显示出开发一个由简单协议定义 的客户及服务端是多么的容易。

性能优化

FastThreadLocal

重写了JDK的ThreadLocal,但是速度更快

Recycle

对象池

单机百万连接

Netty应用级别性能优化

QA

如何使用 Netty

Netty 是 Java 中的一个 NIO 框架:

  1. 易用的 API;
  2. NIO 模型相对 BIO 更高效。
  3. 解决了 Java 原生 NIO 接口存在的一些问题。
    包括粘包半包问题、心跳检测等问题。

ServerBootstrap - 默认情况下Netty服务端会起多个线程?又是什么时候启动这些线程的?

Netty中线程主要用于执行EventLoop的for循环任务,当ServerBootstrap
默认情况下创建2倍CPU核心线程数的线程。
io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
可以看到最终创建了个线程池ThreadPerTaskExecutor
io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)

ServerBootstrap - Netty是如何解决JDK的空轮询Bug的?

NioEventLoop

ServerBootstrap - Netty是如何保证异步串行无锁化的?

执行需要保证并发安全的操作时先判断是否是刚开始创建的线程,如果不是则放入一个单线程的线程池中执行。
线程创建位置:SingleThreadEventExecutor的构造方法
判断位置:io.netty.util.concurrent.AbstractEventExecutor#inEventLoop

NioEventLoop - Netty如何检测新连接的接入?

初始化ServerBootstrap时

NioEventLoop - 新连接怎样被注册NioEventLoop线程?

调用bind时会启动一个NioEventLoop线程,用于监听连接请求。

pipeline - Netty如何判断ChannelHandler类型?

ChannelHandler分为Inbound类型和Outbound类型,在Netty中将ChannelHandler添加到Pipeline时会判断这个ChannelHandler的类型,然后设置到一个bool类型的成员变量里,在传播时使用。
io.netty.channel.DefaultChannelHandlerContext#isInbound
io.netty.channel.DefaultChannelHandlerContext#isOutbound

pipeline - 对ChannelHandler的添加会遵循什么样的顺序?

根据Pipeline的传播逻辑可以看出,Inbound类型的ChannelHandler按添加顺序传播,而Outbound类型的ChannelHandler是按逆顺序传播的。

pipeline - 用户手动触发事件传播,不同的触发方式有什么区别?

如果是在Pipeline中间的某个ChannelHandler中调用了read,则就是从这个节点开始往后传播,如果是write,就是从这个节点开始往前传播。

ByteBuf - 内存的类别有哪些?

ByteBuf - 如何减少多线程之间内存分配的竞争?

ByteBuf - 不同大小的内存是如何进行分配的?

ByteBuf - 粘包半包问题是什么

解码器抽象的解码过程?

Netty里面有哪些拆箱即用的解码器?

如何把对象变成字节流,并最终写到socket底层?

如何使用Netty实现长短连接?

长连接是为了复用连接资源,长连接下,多个请求可以使用同一个连接传输数据包。

如何使用Netty实现长短轮询?

长轮询的特点是请求发到服务器上时若没有资源(比如库存),请求会被挂起,直到资源充足后才返回。

参考

  1. Netty 4.x 用户指南
  2. User guide for 4.x(上面这个文档的英文原版)
  3. github - netty / netty
  4. Netty Source Xref (4.0.56.Final)(同上为源码)

  • 锁 / 封锁
    • 锁的类型
      • 排他锁 / X锁
      • 共享锁 / S锁
    • 封锁协议
      • 一级封锁:对应读未提交
      • 二级封锁:对应读已提交
      • 三级封锁:对应可重复读
    • MySQL 并发安全
      • 行锁 + 间隙锁 + MVCC
      • 隔离级别实现
        • 读已提交:二级封锁协议(行锁)+MVCC
        • 可重复读:三级封锁协议+MVCC
        • 解决幻读:Next-Key Locks = 行锁 + 间隙锁
    • 死锁

封锁(Blockade)

封锁是数据库原理范畴内的概念,封锁分为排它锁、共享锁、活锁、死锁这4种,以及封锁协议定义了数据库如何使用这些锁来保证并发安全。
所谓封锁就是事务在对某个数据对象例如表、记录等操作之前,先向系统发出请求对其加锁。
加锁后事务 T 就对该数据对象有了一定的控制,在事务 T 释放它的锁之前,其他事务不能更新此数据对象。例如,事务 T1 要修改 A,若在读出 A 之前先锁住 A,其他事务就不能再读取和修改 A 了,直到 T1 修改并写回 A 后解除了对 A 的封锁为止。这样,就不会丢失 T1 的修改。
确切的控制由封锁的类型决定。基本的封锁类型有两种:排他锁(exclusive locks,简称 X 锁)和共享锁(share locks,简称 S 锁)

  • **X 锁(排他写锁)**:若事务 T1 对数据对象 A 加上 X 锁,则只允许 T 读取和修改 A,其他任何事物都不能再对 A 加任何类型的锁,直到 T 释放 A 上的锁为止。这就保证了其他事务在 T 释放 A 上的锁之前不能再读取和修改 A;
  • **S 锁(共享读锁)**:若事务 T 对数据 A 加上 S 锁,则事务 T 可以读 A 但是不能修改 A,其他事务只能对 A 加 S 锁而不能加 X 锁,直到 T 释放 A 上的 S 锁为止。这就保证了其他食物可以读 A,但在 T 释放 A 上的 S 锁之前不能对 A 进行任何修改。

封锁种类

  • 排它锁(X锁)
    可读可写,一个事务对表加了X锁,其他事务必须等该事务操作完这张表后,才可以对这张表操作。
    如果一个事务给表加了 X 锁(意味着该事务要独占这个表),那么:
    别的事务不可以继续获得该表的 S 锁
    别的事务不可以继续获得该表中的某些记录的 S 锁
    别的事务不可以继续获得该表的 X 锁
    别的事务不可以继续获得该表中的某些记录的 X 锁
  • 共享锁(S锁)
    只读,多个事务可以同时对都某一张表加共享锁
    如果一个事务给表加了 S 锁,那么:
    别的事务可以继续获得该表的 S 锁
    别的事务可以继续获得该表中的某些记录的 S 锁
    别的事务不可以继续获得该表的 X 锁
    别的事务不可以继续获得该表中的某些记录的 X 锁

封锁协议

封锁有 3 级的封锁协议:

  1. 一级封锁协议
    事务 T 在对数据对象 A 进行修改之前,必须对其加 X 锁,直至事务结束才释放。事务结束包括正常结束(COMMIT)和非正常结束(ROLLBACK);
    在一级加锁协议中,如果仅仅是对数据进行读操作而不进行修改,是不需要进行加锁的。所以只能避免修改丢失而不能避免不可重复读和脏读。
  2. 二级封锁协议
    在一级加锁协议的基础上增加事务 T 在读取数据 R 之前必须先对其加 S 锁,读完后即可释放 S 锁;
    二级加锁协议除防止了丢失修改,还可进一步防止读脏数据。例如:事务 T1 正在对数据对象 R 进行修改,此前已经对 R 加上了 X 锁,此时事务 T2 想读取 R,就必须对 R 加上 S 锁,但是 T2 发现 R 已经被 T1 加上了 X 锁,于是 T2 只能等待 T1 释放了在 R 上加的锁之后才能对 R 加 S 锁并读取。这能防止 T2 读取到 T1 未提交的数据,从而避免了脏读。
    但是在二级封锁协议中,由于读完数据后即可释放 S 锁,所以它不能保证可重复读。
  3. 三级封锁协议
    三级封锁协议是指,在一级封锁协议的基础上增加事务 T 在读取数据 R 之前对其加 S 锁直至事务结束才释放。
    三级封锁协议除了防止丢失修改和读“脏”数据之外,还进一步防止了不可重复读。
    上述三级协议的主要区别在于什么操作需要申请加锁,以及何时释放锁(即锁的持有时间)。不同的封锁协议使事务达到的一致性是不同的,封锁协议越高,一致性程度越强。

封锁粒度

封锁粒度指的是封锁对象的大小。
封锁粒度与系统的并发度和并发控制的开销有关:封锁粒度越大,数据库能封锁的数据单元越少,并发度越小,系统开销也变小。
一般来说,处理个别元组的事务以元组为封锁粒度;处理某个关系的大量元组的事务以关系为封锁粒度;处理多个关系的大量元组的事务以数据库为封锁粒度。

在一个系统中,提供多种封锁粒度给不同的事务选择,这种封锁方法称为多粒度封锁( multiple granularity locking)

  • 封锁在实现上,由一棵多粒度树组织,根结点是整个数据库,表示最大的封锁粒度,叶结点是最小的封锁粒度,如元组、属性值等。
  • 封锁协议中的给一个节点加锁的同时,该结点的所有后裔结点也会被加上同样的锁。对于该结点来说,这种加锁方式为显式封锁,而对于其后裔结点来说,这样的方式为隐式封锁
    显然,这样的检查方法效率很低。为此人们引进了一种新型锁,称为意向锁(intention lock)

意向锁

在具有意向锁的多粒度封锁方法中,对任何一个结点加锁时,必须先对它的上层结点加意向锁。
申请封锁时应按自上而下的次序进行,释放封锁时则应按自下而上的次序进行。(栈)

  1. 意向共享锁(IS锁):
    对一个数据对象加IS锁,表示它的后裔结点拟(意向)加S锁。
    事务T1对数据对象A加上IS锁后,事务T2可以继续加除X锁以外的锁。
  2. 意向排他锁(IX锁):
    对一个数据对象加IX锁,表示它的后裔结点拟(意向)加X锁。
    事务T1对数据对象A加上IX锁后,事务T2只能继续加IS或IX锁。
  3. 共享意向排他锁(SIX = S+IX锁):
    对一个数据对象先加S锁,再加IX锁。例如对某个表加SIX锁,则表示该事务要读(S)整个表,同时会更新(IX)个别元组。
    事务T1对数据对象A加上SIX锁后,事务T2只能加IS锁。

活锁和死锁

封锁可能引起活锁和死锁问题。

活锁

根据事务的优先级顺序,可能会出现某个事务永远在等待封锁的情况,即事务T1封锁了数据对象A后,T2、T3陆续请求封锁,但是T1释放锁后,系统优先批准了T3的请求,T2仍然在等待。
最简单的解决方法就是先来先服务(FCFS),不考虑事务的优先级。

死锁

事务T1封锁了数据A,事务T2封锁了数据B,然后T1请求封锁B,与此同时T2也请求封锁A,但因为两个事务的请求都需要等待对方释放锁,这样就出现了永远在等待对方的死锁。
在数据库中,解决死锁问题主要有两类方法:预防和诊断解除。

死锁预防

  1. 一次封锁法:每个事务一次将所有要使用的数据加锁,否则事务不继续执行
    一次性获取所有锁,锁粒度比较大,降低系统的并发度。
  2. 顺序封锁法:预先对数据规定一个封锁顺序,所有事务都按照这个顺序加锁
    需要预处理的信息太多,开销大,成本高。

死锁诊断和解除

  1. 超时法
    时间过短可能误判死锁,时间过长可能无法及时发现死锁
  2. 等待图法
    一般是撤销一个处理代价最小的事务,下面MySQL中的死锁解除算法也是基于这种方法。

两段锁协议

可串行化调度

可串行性是并发事务正确调度的准则。
当且仅当多个事务的并发执行结果,与按某一次序的串行执行结果相同,这种并发调度策略才是可串行化调度,即具有可串行性。
在一个调度策略中,交换两个事务的不冲突操作的次序,得到另一个调度策略,如果另一个调度策略的执行结果与原来的调度相同,则称原来的调度为冲突可串行化调度冲突可串行化调度是可串行化调度的充分条件,但不是必要条件
冲突操作是指不同事务对同一个数据的读写和写写操作,其他操作都属于不冲突操作。

两段锁协议

为了保证并发调度的正确性,DBMS的并发控制机制必须提供一定的手段来保证调度是可串行化的。
目前DBMS普遍采用两段锁协议(TwoPhase Locking,2PL)来实现,所有事务遵守两段锁协议是可串行化调度的充分条件,但不是必要条件。
两段锁的两个阶段:

  • 第一阶段(扩展阶段):所有事务对数据加锁,但不能解锁;
  • 第二阶段(收缩阶段):所有事务对数据解锁,但不能加锁。

预防死锁的一次封锁法遵守两段锁协议;但是两段锁协议并不要求事务必须一次将所有要使用的数据全部加锁,因此遵守两段锁协议的事务可能发生死锁。

由于两阶段锁协议的存在,如果我们的事务中需要锁住多个行,最好把最可能造成锁冲突、最可能影响并发度的锁尽量往后放。

MySQL 如何保证并发安全

latch 和 lock

MySQL 中的锁主要分为闩锁(latch)锁(lock)
latch对象是除了数据库对象外的其他对象,包括操作缓冲池汇总的LRU列表、删除、添加、移动LRU列表中的元素,为了保证一致性必须要有锁介入,因此引入了latch锁。latch是轻量级的锁,因为它要求锁定的时间必须非常短,主要用于保护临界资源的线程安全。
lock对象是事务,用来锁定数据库中的对象,如表、行、页。一般lock的对象仅在事务commit或rollback后进行释放。lock有死锁机制。

- lock latch
对象 事务 线程
保护 数据库内容 内存数据结构
持续时间 整个事务过程 临界资源
模式 行锁、表锁、意向锁 读写锁、互斥量
死锁 通过 waits-for graph、time out 等机制进行死锁检测与处理 无死锁检测与处理机制,仅通过控制应用程序加锁顺序(lock leveling)来保证无死锁发生
存在位置 Lock Manager 的哈希表中 每个数据结构的对象中

latch 不能显式添加,而是线程在获取行锁时前,先对行所在的页面添加 latch,然后再对行添加 lock,添加完行 lock 后再释放页面的 latch。
如果行被其他线程占有,则线程会先释放页面 latch,等待行锁,待获取行锁后会再次对页面添加 latch,查看页面数据是否有改动,再次获取改动后的行。
这种机制主要是为了保证线程获取的行数据的一致性和完整性。

MVCC

MySQL 的特色之一是提供了 MVCC(多版本并发控制)机制,MVCC 给每行数据增加了版本号,事务在执行读操作时只能读到数据的历史版本,因此可以避免脏读等问题。
MVCC 与事务紧密关联,因此我放到事务小节中去论述了。

封锁与 MVCC 之间的关系

封锁与 MVCC 并不是互斥的,MySQL 实现隔离级别时结合了这二者,比如:

  • 读已提交:二级封锁协议+MVCC,二级封锁协议在读之前加 S 锁,读完之后就释放 S 锁,所以不能保证不可重复读与幻读;
  • 可重复读:三级封锁协议+MVCC,读完之后不会立刻释放 S 锁,直到事务提交时才会释放,可以解决可重复读。

封锁协议+MVCC 并不能解决幻读问题,在 MVCC 中是通过 Next-Key Lock 解决的。

Record Locks(行锁)

该锁的官方类型名为 LOCK_REC_NOT_GAP。
和前面提到的表锁一样,分 S 锁和 X 锁,只是作用粒度精确到行了。

Gap Locks(间隙锁)

该锁的官方类型名为 LOCK_GAP。
MySQL 解决幻读问题有两种方案:
第一种是 MVCC,因为新插入的数据事务 ID 必然不在 ReadView 内,因此读取这些记录后会被直接忽略,但是快照读只在普通读操作中生效,如果发生了当前读仍然会有幻读问题;
第二种是加锁,但是加锁有一个问题,就是事务没法给尚不存在的记录加锁。
如果我们希望为 number 值为 8 的记录加 gap 锁,则该记录的前后间隙都不允许别的事务立即插入记录:
MySQL-gap锁
如图中为 number 值为 8 的记录加了 gap 锁,意味着不允许别的事务在 number 值为 8 的记录前边的间隙插入新记录,其实就是 number 列的值(3, 8)这个区间的新记录是不允许立即插入的。比方说有另外一个事务再想插入一条 number 值为 4 的新记录,它定位到该条新记录的下一条记录的 number 值为 8,而这条记录上又有一个 gap 锁,所以就会阻塞插入操作,直到拥有这个 gap 锁的事务提交了之后,number 列的值在区间(3, 8)中的新记录才可以被插入。
另外,如何为(20, +∞)这个区间加 gap 锁?其实是为数据页中的 Infimum 和 Supremum 记录加上了 gap 锁。
比如假设此时表里有 5、25 这两条数据,则SELECT c1 FROM t WHERE c1 BETWEEN 10 and 20 FOR UPDATE查询 10 到 20 范围内的记录,并加上范围(5, 10)[10, 20](20, 25]以内的 gap 锁。
比如假设此时表里有 102、105、107 三个值,则select * from test where n = 105 for update;这个语句会对(102, 105)(105, 107]这两个区间加 gap 锁。

间隙锁可以通过innodb_locks_unsafe_for_binlog参数来开启关闭:

  • 设置为ON,表示关闭区间锁,此时一致性会被破坏(所以是unsafe)
  • 设置为OFF,表示开启区间锁

Next-Key Locks

该锁的官方类型名为 LOCK_ORDINARY。
Next-Key Lock 其实是Record Lock 和 Gap Lock 的组合,它既会保护该条记录,又能阻止别的事务将新记录插入被保护记录的前后间隙。

意向锁

加表锁时怎么知道该表上有没有行锁?InnoDB 通过意向锁(Intention Locks)来解决这个问题:
1、意向共享锁,英文名:Intention Shared Lock,简称IS 锁。当事务准备在某条记录上加 S 锁时,需要先在表级别加一个 IS 锁。
2、意向独占锁,英文名:Intention Exclusive Lock,简称IX 锁。当事务准备在某条记录上加 X 锁时,需要先在表级别加一个 IX 锁。

IS、IX 锁是表级锁,它们的提出仅仅为了在之后加表级别的 S 锁和 X 锁时可以快速判断表中的记录是否被上锁,以避免用遍历的方式来查看表中有没有上锁的记录,也就是说其实 IS 锁和 IX 锁是兼容的,IX 锁和 IX 锁是兼容的。

兼容性 X IX S IS
X 不兼容 不兼容 不兼容 不兼容
IX 不兼容 兼容 不兼容 兼容
S 不兼容 不兼容 兼容 兼容
IS 不兼容 兼容 兼容 兼容

Insert Intention Locks(插入意向锁)

该锁的官方类型名为 LOCK_INSERT_INTENTION。
InnoDB 中事务在等待gap 锁的释放时,还需要在内存里生成一个锁结构,表示事务现在正想往某个间隙中插入记录,但是现在正在等待。

如上图所示,有 3 个事务,其中 T1 持有 gap 锁,所以 T2 和 T3 需要生成一个插入意向锁的锁结构,等待 T1 释放后才能获取到插入意向锁(本质上是将 is_waiting 属性改成了 false),然后再继续执行插入操作。

隐式锁

一般来说间隙锁可以避免 gap 锁锁住的区间被其他事务修改(当要插入的记录所在的区间有 gap 锁,事务会先再该间隙上加一个插入意向锁),但是还有一种情况正好是反过来的:如果一个事务首先插入了一条记录,别的记录如果直接读(SELECT … LOCK IN SHARE MODE 或 SELECT … FOR UPDATE)则会产生脏读问题,如果直接修改则又会产生脏写问题。
这个问题在 InnoDB 中是通过事务 ID 解决的:

  • 聚簇索引中有一个隐藏列 trx_id,存储的是最后改动该记录的事务 ID,新插入记录的 trx_id 当然就是当前事务的事务 ID,如果别的事务想对该记录添加 S 锁或 X 锁,会首先看一下该记录 trx_id 是否是当前正活跃的事务,如果是的话就会创建一个 X 锁然后进入等待状态;
  • 二级索引本身没有 trx_id 列,但是在二级索引页面的 Page Header 部分有一个 PAGE_MAX_TRX_ID 属性,该属性代表对该页面做改动的最大的事务 id,如果 PAGE_MAX_TRX_ID 属性值小于当前最小的活跃事务 id,那么说明对该页面做修改的事务都已经提交了,否则就需要在页面中定位到对应的二级索引记录,然后回表找到它对应的聚簇索引记录,然后再重复聚簇索引的做法。

表锁

表锁种类

  1. lock tables …… read/write
    锁住整个表会对数据库效率产生比较大的影响。
  2. MDL(metadata lock)
    MDL 不需要显式使用,在访问一个表的时候会被自动加上,表结构变更操作之间、表结构变更操作与读表操作之间都是互斥的,保证表结构变更的正确性。
    MDL 可能会导致表的锁死,比如一个 alter 语句正在等待一个长事务(该事务中有 select 语句)释放读 MDL,这时 alter 会加上写 MDL,因此之后的所有事务都需要等待该写 MDL 释放了,因此在变更表结构时最好先将长事务终止,或者给 alter 语句设置等待时间:
    1
    2
    ALTER TABLE tbl_name NOWAIT add column ...
    ALTER TABLE tbl_name WAIT N add column ...

不会加表锁的情况

1、在对某个表执行 SELECT、INSERT、DELETE、UPDATE 语句时,InnoDB 存储引擎是不会为这个表添加表级别的 S 锁或者 X 锁的;
2、执行 DDL 语句时(ALTER TABLE、DROP TABLE)时,使用的是 Server 层的元数据锁(Metadata Locks)

手动获取表级 S 锁和 X 锁的方式:
1、LOCK TABLES t READ:InnoDB 存储引擎会对表 t 加表级别的 S 锁。
2、LOCK TABLES t WRITE:InnoDB 存储引擎会对表 t 加表级别的 X 锁。

不过一般表锁不会用到,只会在崩溃恢复之类的场景下会用到。

表级 IS 锁、IX 锁,和之前的描述一致。

如果实现自增列:
1、表级别 AUTO-INC 锁:当表中某列设置了 auto_increment 属性,那么该列的值是会自动生成的,插入时会在表级加一个 AUTO-INC 锁,保证这个字段是严格递增的;当插入语句执行完毕后,该锁会自动释放,而不是在事务结束后再释放。
2、一个轻量级锁:生成 auto_increment 列的值后马上释放。

InnoDB 提供了一个称之为 innodb_autoinc_lock_mode 的系统变量来控制到底使用上述两种方式中的哪种来为 AUTO_INCREMENT 修饰的列进行赋值,当 innodb_autoinc_lock_mode 值为 0 时,一律采用 AUTO-INC 锁;当 innodb_autoinc_lock_mode 值为 2 时,一律采用轻量级锁;当 innodb_autoinc_lock_mode 值为 1 时,两种方式混着来(也就是在插入记录数量确定时采用轻量级锁,不确定时使用 AUTO-INC 锁)。不过当 innodb_autoinc_lock_mode 值为 2 时,可能会造成不同事务中的插入语句为 AUTO_INCREMENT 修饰的列生成的值是交叉的,在有主从复制的场景中是不安全的。

MySQL 全局锁

一般只有全库逻辑备份时才会使用到全局锁

  1. Flush tables with read lock(FTWRL)
    这个命令不保证备份时数据库是否处于一个一致性视图,可能有的事务刚执行一半。
  2. mysqldump
    在导数据前会启动事务,确保可以拿到一致性视图,前提是数据库中所有表都使用了支持事务的引擎,否则就只能使用FTWRL来备份了。

其他存储引擎中的锁

MyISAM、MEMORY、MERGE 这些引擎不支持事务,因此加锁一般都是针对当前会话来说的,比如 Session1 先对表加 S 锁,之后 Session2 再对该表执行 UPDATE 操作时,获取 X 锁的过程就会被阻塞了。
相当于这些存储引擎同一时刻只允许一个会话对表执行写操作,因此这些存储引擎最好用于读多写少的场景下。

常见操作利用锁方式

有以下几种方式:
1、一致性读 - 读操作利用 MVCC(多版本并发控制)
读操作只能读取记录的历史版本:读操作时生成一个 ReadView,记录当时正在执行的事务 ID,记录的每个版本都有事务 ID,查询数据时只能读到在生成 ReadView 之前已提交事务所做的修改,在生成 ReadView 之前未提交的事务或之前才开启的事务所做的修改都看不到。
2、锁定读 - 显式加锁
利用 MVCC 的方式,读写操作彼此并不冲突,性能更高。而加锁的方式读写操作之间都是互斥的,需要排队执行,比较影响性能。
3、当前读 - 隐式加锁
写操作只能针对最新版本的记录,因此写操作前需要加锁。

一致性读(Consistent Reads、一致性无锁读、快照读)

事务利用 MVCC 进行的读取操作称之为一致性读,所有普通的 SELECT 语句在 READ COMMITTED、REPEATABLE READ 隔离级别下都算是一致性读。
也就是上面提到的“读操作利用 MVCC”。

锁定读(Locking Reads)

读-读的情况并不会引起并发冲突,我们不希望对这种情况造成影响,因此 MySQL 给锁分了几个类:
1、共享锁(Shared Locks、S 锁):在事务要读取一条记录时,需要先获取该记录的 S 锁。
2、独占锁(排他锁、Exclusive Locks、X 锁):在事务要改动一条记录时,需要先获取该记录的 X 锁。

兼容性 X S
X 不兼容 不兼容
S 不兼容 兼容

一般读取一条记录时我们会获取这条记录的 S 锁,但是如果我们想在读取记录时就获取记录的 X 锁,来禁止别的事务读写该记录,则需要使用一些特殊的 SELECT 语句格式:
1、对读取的记录加 S 锁

1
SELECT ... LOCK IN SHARE MODE;

上面语句为记录加 S 锁,允许多个事务同时发起读请求,但是当别的事务尝试获取 X 锁(SELECT … FOR UPDATE 或修改这些记录)则会阻塞,直到当前事务提交之后将这些记录上的 S 锁释放掉。
2、对读取的记录加 X 锁

1
SELECT ... FOR UPDATE;

上面语句为记录加 X 锁,之后别的事务来获取该记录的 S 锁或 X 锁时都会被阻塞,直到当前事务提交之后将这些记录上的 X 锁释放掉。

写操作(write)如何利用锁

DELETE

DELETE 操作会先在 B+树中定位到这条记录,然后获取这条记录的 X 锁,并执行 delete mark 操作(逻辑删除)。

UPDATE

如果未修改该记录的键值并且被更新的列占用的存储空间在修改前后未发生变化,则先在 B+树种定位到这条记录然后获取该记录的 X 锁,最后在原记录的位置执行修改操作。
如果未修改该记录的键值并且至少有一个被更新的列占用的存储空间在修改前后发生变化,则先在 B+树种定位到这条记录后获取 X 锁,将这条记录彻底删除(移入垃圾链表),最后插入一条记录。
如果修改了该记录的键值,则相当于在原记录上 DELETE 后 INSERT。

INSERT

一般 INSERT 操作并不加锁,MySQL 引入了一种称为隐式锁的技术来保护这条新插入的记录在本事务提交之前不被其他事务访问。

显式锁即 select .. for update 语句,从语法上就能看出这个语句加了锁。

让我们分几种情况来分析 insert 语句的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE `t` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`c` int(11) DEFAULT NULL,
`d` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `c` (`c`)
) ENGINE=InnoDB;

insert into t values(null, 1,1);
insert into t values(null, 2,2);
insert into t values(null, 3,3);
insert into t values(null, 4,4);

create table t2 like t

insert … select 语句

1
insert into t2(c,d) select c,d from t;

上面Session A语句需要对表 t 的所有行和间隙加锁。原因见下面的例子:

session A session B
insert into t values(-1, -1, -1); insert into t2(c, d) select c, d from t;

上面的两个session,如果过session B先执行,由于对表t的主键索引加了(-∞, 1]这个next-key lock,会在语句执行完毕后,才允许session A的insert语句执行。
但如果没有锁,就有可能出现session B的insert语句先执行,但是后写入binlog的情况,于是,在 binlog_format=statement 的情况下,binlog 里面就记录了这样的语句序列:

1
2
insert into t values(-1,-1,-1);
insert into t2(c,d) select c,d from t;

这个语句到了备库执行,就会把 id=-1 这一行也写到表 t2 中,出现主备不一致。

insert 唯一键冲突

insert 发生唯一键冲突时可能引起死锁,例:

T session A session B session C
T1 begin; insert into t values(null, 5, 5)
T2 insert into t values(null, 5, 5) insert into t values(null, 5, 5)
T3 rollback; (Deadlock fount)
  • 正如前面的《加锁规则》所述,session A执行insert语句时,会在所以c的c=5这行上加记录锁,由于这个索引是唯一索引,因此会退化为记录锁;
  • session B要执行相同的insert语句,发现了唯一键冲突,加上读锁,同时session C也会在同一个记录上加上读锁;

    为什么加读锁?应该是为了保证不被删掉的同时,可以不影响读操作。

  • T3时刻,session A 回滚。这时候,session B 和 session C 都试图继续执行插入操作,都要加上写锁。两个 session 都要等待对方的行锁,所以就出现了死锁。

insert into … on duplicate key update

上面的例子是主键冲突后直接报错,如果写成如下形式:

1
insert into t values(11,10,10) on duplicate key update d=100;

则会给索引c上(5, 10]加一个排他的next-key lock(写锁)。

加锁场景例举(InnoDB)

  1. 原则 1:加锁的基本单位是 next-key lock。

    next-key lock 是前开后闭区间。

  2. 原则 2:查找过程中访问到的对象才会加锁。
  3. 原则 3:只有明确指定主键时 InnoDB 才会使用行锁,否则会使用表锁
  4. 原则 4:命中一个索引并加锁后,执行更新删除操作时还需要对其他索引中对应记录加X锁。
    这个规则涉及加锁顺序,可能引起死锁。
  5. 优化 1:索引上的等值查询,给唯一索引加锁的时候,next-key lock 退化为行锁。
  6. 优化 2:索引上的等值查询,向右遍历时且最后一个值不满足等值条件的时候,next-key lock 退化为间隙锁。
  7. 一个 bug:唯一索引上的范围查询会访问到不满足条件的第一个值为止。

例 1、等值查询间隙锁

  1. 根据原则 1,通过 next-key lock 加锁范围(5, 10]
  2. 因为第一个查询是等值查询(id = 7),而 id = 10 不满足查询条件,因此 next-key lock 退化为间隙锁,因此最终加锁范围是(5, 10)

因此 Session B 要往这个间隙中插入 id = 8 会被锁住,但是 Session C 修改 id = 10 这行是可以的。

例 2、主键不明确导致锁表

下面的情况不会锁表:

1
2
3
4
5
6
明确指定主键,并且有此行数据,row lock
SELECT * FROM products WHERE id = '3' FOR UPDATE;
SELECT * FROM products WHERE id = '3' and type = 1 FOR UPDATE;
明确指定主键,若查无此行数据,无lock
SELECT * FROM products WHERE id = '-1' FOR UPDATE;

下面的情况会锁表:

1
2
3
4
5
6
非索引字段,table lock
SELECT * FROM products WHERE name = 'Mouse' FOR UPDATE;
主键不明确,table lock
SELECT * FROM products WHERE id <> '3' FOR UPDATE;
主键不明确,table lock
SELECT * FROM products WHERE id LIKE '3' FOR UPDATE;

例 3、update未命中行触发间隙锁

1
2
3
4
5
CREATE TABLE `hgc_test` (
`a` int(11) DEFAULT NULL,
`b` int(11) DEFAULT NULL
) ENGINE=InnoDB;
insert into hgc_test values(1, 0), (5, 0), (10, 0);

事务S1执行:

1
2
begin;
update hgc_test set b = 10 where a = 7;

事务S2执行:

1
2
begin;
insert into hgc_test values(8, 1);

由于事务S1已经锁住了(5, 10]这个区间,因此S2执行插入时会被阻塞。

例 4、自增长与锁

InnoDB 中,对每个含有自增长值的表都有一个自增长计数器。

  • 初始化:当对这样的表进行插入操作时,这个计数器会被初始化。
  • 更新:插入操作根据这个自增长计数器值加 1 赋予自增长列,对该计数器的更新需要保证线程安全,这可以通过设置使用表锁还是互斥量来实现。

活锁、死锁与死锁检测

活锁

活锁:如果事务 T1 封锁了数据 R,事务 T2 又请求封锁 R,于是 T2 等待。T3 也请求封锁 R,当 T1 释放了 R 上的锁之后系统首先批准了 T3 的请求,T2 继续等待;然后 T4 又请求封锁 R,T3 在释放 R 上的锁之后系统又批准了 T4 的请求,T2 有可能永远等待,这就是活锁的情形。
避免活锁的简单方法就是采用先来先服务的策略。当多个事务请求封锁同一数据对象时,封锁子系统按请求锁的先后次序对事务进行排队,数据对象上的锁一旦释放就批准批准申请队列中第一个事务获得锁。

死锁

1
死锁在许多操作系统书中都有描述,简而言之,就是多个线程出现循环资源依赖,涉及的线程都在等待别的线程释放资源时,都会导致这些线程都进入无限等待的状态,称为死锁。
在 InnoDB 中,也会出现两个事务互相等待对方释放某条记录的行锁的情况,从而导致进入死锁状态。死锁可以预防,也可以等发生死锁后再作处理。

如何处理死锁

MySQL 有两种死锁处理方式:

  1. 等待直到超时(show variables like 'innodb_lock_wait_timeout'
  2. 发起死锁检测,主动回滚一条事务,让其他事务继续执行(show variables like 'innodb_deadlock_detect'

死锁预防

在数据库中,产生死锁的原因是两个或多个事务都已经封锁了一些数据对象,然后又都请求对已被事务封锁的对象加锁,从而出现死锁。防止死锁的发生其实就是要破坏产生死锁的条件。预防死锁发生通常有以下两种方法。

  • 一次封锁法:一次封锁法要求每个事务必须一次将所有要使用的数据全部加锁,否则就不能继续执行下去。一次封锁法虽然可以有效防止死锁的发生,但是增加了锁的粒度,从而降低了系统的并发性。并且数据库是不断变化的,所以事先很难精确地确定每个事务所需进行加锁的对象,为此只能扩大封锁范围,将事务在执行过程中可能需要封锁的数据对象全部加锁,这就进一步降低了并发度;
  • 顺序封锁法:顺序封锁法是预先对数据对象规定一个封锁顺序,所有事务都按这个顺序实施封锁。例如在 B 树结构的索引中,可规定封锁的顺序必须是从根节点开始,然后是下一级的子节点,逐级封锁。顺序封锁法可以有效地避免死锁,但是要实现顺序封锁法十分的困难,因为很难事先确定每一个事务要封锁哪些对象,因此也就很难按规定的顺序去实施加锁

由此可见数据库中不适合预防死锁,只适合进行死锁的诊断与解除。

死锁检测与解除

  • 设置最大等待时间,等待超过目标时间后自动释放之前获取到的锁,让别的事务先执行;
    可以通过参数innodb_lock_wait_timeout来设置
    超时法实现简单,但其不足也十分明显,一是有可能误判了死锁,如事务因为其他原因而使等待时间超过时限,系统就会误认为发生了死锁;二是若时限设置得太长,则不能及时发现死锁。
  • 发起死锁检测,发现死锁后,主动回滚死锁链条中的某个事务,让其他事务得以继续执行
    将参数 innodb_deadlock_detect 设置为 on 即表示开启死锁检测。
    死锁检测是一个耗时操作,因为每当一个事务被锁的时候,都要看看它所依赖的线程有没有被别人锁住,如此循环,最后判断是否出现了循环等待,也就是死锁。
    死锁检测的基础是事务等待图,事务等待图是一个有向图 G=(T,U),T 为结点的集合,每个结点表示正在运行的事务;U 为边的集合,每条边表示事务等待的情况。若 T1 等待 T2,则在 T1,T2 之间画一条有向边,从 T1 指向 T2。事务等待图动态地反应了所有事务的等待情况。并发控制子系统周期性(比如每隔数秒)生成事务等待图,并进行检测。如果发现图中存在回路,则表示系统中出现了死锁。

数据库管理系统的并发控制系统一旦检测到系统中存在死锁,就要设法解除死锁。通常采用的方法是选择一个处理死锁代价最小的事务,将其撤销,释放此事务持有的所有的锁,使其他事务得以继续运行下去。当然,对撤销的事务所进行的数据修改必须加以恢复。

死锁的检测会产生一定的性能损耗,因此解决热点行更新导致的性能问题需要结合业务来进行权衡:

  1. 如果能确保业务一定不会出现死锁,可以临时把死锁检测关掉。
    但是需要注意的是,死锁检测可以保证出现死锁后可以通过业务回滚然后重试来解决,这是业务无损的,而关掉死锁检测意味着可能会出现大量的超时,这是业务有损的。
  2. 控制并发度
    保证对于相同行的更新,在进入引擎之前排队,这样就可以避免大量的死锁检测工作了。
  3. 将对同一行的操作改成多行
    比如,将库存分成多份,减库存时随机取出一份来操作,这样冲突的概率就会变成原本的 1/10 了,既减少了锁等待个数,又减少了死锁检测的 CPU 消耗。

可能发生死锁的情况

  1. 注意加锁顺序
    比如下面语句查3行数据,而且由于desc,该查询语句是倒序在索引树上遍历的,遍历过程中会给查到的记录和区间加行锁和间隙锁。
    1
    select id from t where c in(5,20,10) order by c desc for update;
    在上面这条语句执行期间,如果有另外一条语句是正序遍历并加锁的,就很有可能会导致死锁,比如如下语句:
    1
    select id from t where c in(5,20,10) lock in share mode;
    因此对同一组咨询,要尽量按照相同的顺序访问

如何查看死锁

出现死锁后,执行 show engine innodb status 命令,这个命令会输出很多信息,有一节 LATESTDETECTED DEADLOCK

死锁示例1 - 间隙锁互斥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 设置使用区间锁(默认使用,因此不需要设置)
-- 设置事务隔离级别为RR(默认为RR,因此不需要设置)
-- 设置需要手动提交
set session autocommit=0;

create table t (
id int(10) primary key
) engine = innodb;

start transaction;
insert into t values(1);
insert into t values(3);
insert into t values(10);
commit;

在开启区间锁且RR隔离级别的情况下,插入上面数据后会出现以下4个区间:

  • (-infinity, 1)
  • (1, 3)
  • (3, 10)
  • (10, infinity)

线程A:

1
2
3
set session autocommit=0;
start transaction;
delete from t where id=5;

线程B:

1
2
3
4
5
6
set session autocommit=0;
start transaction;
insert into t values(0);
insert into t values(2);
insert into t values(12);
insert into t values(7);

事务A删除某个区间内的一条不存在记录,获取到共享间隙锁,会阻止其他事务B在相应的区间插入数据,因为插入需要获取排他间隙锁,也就是在插入7的时候会发生阻塞。

使用show engine innodb status;命令可以查看锁的情况:
死锁示例1_间隙锁互斥
如果事务A一直不提交或回滚,则事务B会一直等待直到超时,并显示:
Error Code: 1205. Lock wait timeout exceeded; try restarting transaction

死锁示例2 - 共享排他锁死锁

线程A、B、C均执行以下SQL:

1
2
3
set session autocommit=0;
start transaction;
insert into t values(7);
  • A先执行,插入成功并获取到id=7的排他锁

  • B、C在执行时,需要进行PK校验,需要先获取id=7的共享锁,因此阻塞。

  • 如果此时A执行rollback;回滚了,此时会释放掉id=7的排他锁。

  • B、C继续进行主键校验,并同时获取到id=7的共享锁;

  • 如果B、C想要插入成功,必须获得id=7的排他锁,但由于双方都已经获取到共享锁,因此它们都无法获取到排他锁,死锁就出现了。

  • MySQL有死锁检测机制,因此B和C中有一个会插入成功,而另一个事务会自动放弃,并报错:Error Code: 1213. Deadlock found when trying to get lock; try restarting transaction

死锁示例3 - 并发间隙锁的死锁

1
2
3
4
5
6
7
8
A:set session autocommit=0;
A:start transaction;
A:delete from t where id=6;
         B:set session autocommit=0;
         B:start transaction;
         B:delete from t where id=7;
A:insert into t values(5);
         B:insert into t values(8);
  • A、B线程都会在执行删除语句后获取到(3, 10)的共享间隙锁
  • A、B在执行insert语句时希望获得(3, 10)的排他间隙锁,于是出现死锁。
    可以使用show engine innodb status;命令来查看死锁情况。
    死锁示例3_并发间隙锁的死锁
  • 检测到死锁后,事务2自动回滚了,报错:insert into t values(8) Error Code: 1213. Deadlock found when trying to get lock; try restarting transaction
    同时,事务1将执行成功。

QA

如何安全地给表加字段

首先需要处理掉长事务,因为长事务不提交的话会一直占用 MDL 锁。

information_schema 库的 innodb_trx 表中可以看到当前执行中的长事务。
但是如果这样的事务比较多,kill 掉并不一定管用,因为新的请求总是会源源不断地到来,所以最好的方法是在 alter table 语句里面设定等待时间,如果在这个指定的等待时间里面能够拿到 MDL 写锁最好,拿不到也不要阻塞后面的业务语句,先放弃:

1
2
ALTER TABLE tbl_name NOWAIT add column ...
ALTER TABLE tbl_name WAIT N add column ...

MySQL 连接池被打满怎么办

这里的连接池指的是应用服务器里访问 MySQL 服务的连接池,比如 Druid,

以下哪些场景会导致语句Q1: select * from t1 limit 1被堵住?

  1. 另一个线程在Q1执行前,执行了alter table t1 add index(f1),当前正处于拷贝数据到临时表阶段。

以下什么情况会发生”等待行锁”的状态?

RR隔离级别下,表t的建表结构和初始化数据如下:

1
2
create table t (id int primary key, c int) engine = innoDB;
insert into t values (1, 1), (11, 11), (21, 21);

在会话1中执行如下语句:

1
2
begin;
select * from t lock in share mode;

可见这条语句希望对表t加一个表级的读锁。

  1. 会进入”等待行锁”的情况1
    insert into t values (15, 15);
    插入时会先给表加IX意向锁,IX意向锁是和会话1中对表t加上的读锁互斥的,因此会导致阻塞。
  2. 不会进入”等待行锁”的情况1
    update t set c = c + 1 where id = 15;
    因为id = 15这条数据不存在,因此这条语句实际上不会加锁。
  3. 不会进入”等待行锁”的情况2
    delete from t where id = 15;
    因为找不到id = 15这条数据,因此也不会加锁。
  4. 不会进入”等待行锁”的情况3
    alter table t add d int;
    alter table 会加MDL,这并不是行锁。

DB的CPU打满怎么排查?

有很多情况都可能引起DB的CPU打满

  1. 业务量上涨了
    现象:如果是突发的业务上涨,业务监控(接口、API)应该会有报警,其次就是看有没有执行次数特别多的SQL
    解决:临时可以对SQL进行限流,后续可以对具体业务做下限流、或DB做扩容
  2. 慢SQL
    现象:有SQL执行速度特别慢,慢SQL一般只是表象,引起慢SQL背后的原因可能有很多,比如死锁、扫描行数过多/读取数据过多(没命中索引)等
    解决:如果有SQL一直在运行中,可以先把对应的线程kill掉,排查这个sql产生的原因,进行优化。
  3. 热点行更新导致行锁过多
    现象:用相关工具查看DB中的行锁数量是否出现异常上涨,行锁过多是有可能把CPU打满的
    解决:排查是否有具体业务场景会导致多线程同时更新同一行数据。

参考

  1. [1] Mysql造成死锁的原因有哪些呢?如何避免?
  2. [2] Mysql 死锁如何排查:insert on duplicate 死锁一次排查分析过程
0%