并发和中间件
RPC
客户端 OR 服务端 异步
客户端异步比较常见,因为任何网络调用基本都可以方便地包装成异步调用,朴实无华且枯燥,下载这段代码是一个简单的 demo:
1 | public class ClientAsync { |
客户端异步是一种伪异步,本质上仍是同步调用,只不过等待是放到另外一个线程中去做的,如果这样等待的线程比较多,对客户端的线程池容易造成压力,。
服务端的异步实现起来就比较费劲了,因为客户端需要额外提供一个入口来接收服务端执行完毕的结果:
良好习惯
不论是桌面应用还是 Web 应用,多线程代码都是比较难玩得转的,玩不明白的结果就是一大堆令人毛骨悚然且难以捉摸、难以调试的问题——实际上,一旦你意识到正在处理一个并发问题,你可能就不得不完全放弃调试了,并转而手动检查代码。
鉴于此,我们当然是希望尽量避免并发问题的,理想情况下希望完全避免多线程错误,同样,不存在那种一刀切的方法,但这有一些调试和防止多线程错误的实际考虑因素:
- 避免全局状态
首先,牢记 “全局状态” 问题。如果你正创建一个多线程应用,那么应该密切关注任何可能全局修改的内容,如果可能的话,将他们全部删掉,如果部分全局变量确实有理由保留,那么应该仔细保证其并发安全,并对程序性能进行跟踪,以确定不会因为引入新的等待时间而导致系统性能降低(并发修改时需要同步多个线程)。 - 避免可变性
这点直接来自于 函数式编程,并且适用于 OOP,声明应该避免类和对象状态的改变。简而言之,这意味着放弃 setter 方法,并在需要避免可变性的类或字段上拥有私有的 final 字段,它们的值唯一发生变化的时间是在构造期间。这样,你可以确定不会出现争用问题,且访问对象属性将始终提供正确的值。 - 日志及报警
评估你的程序可能会在何处发生异常,并预先记录所有关键数据。如果发生错误,你将很高兴可以得到信息说明收到了哪些请求,并可更好地了解你的应用程序为什么会出现错误。需要再次注意的是,日志记录引入了额外的文件 I/O,可能会严重影响应用的性能,因此请不要滥用日志。
在记日志的基础上,有必要根据 SLA 记录一些指标的报警阈值,比如订单中心下单失败,考虑可能是网络出现抖动引起超时(如果用到三方服务这个问题会更明显),因此报警阈值可以稍微调高一些,比如 1 分钟 3 次失败就打电话报警。 - 复用现存实现
每当你需要创建自己的线程时(例如:向不同的服务发出异步请求),复用现有的安全实现来代替创建自己的解决方案。这在很大程度上意味着要使用 ExecutorService 和 Java 8 简洁的函数式 CompletableFuture 来创建线程。Spring 还允许通过 DeferredResult 类来进行异步请求处理。
反模式-异步狂热
上边我们已经讨论过异步存在的优势和劣势,实际上在我读过的项目代码中,确实存在不少那种不异步不开心的“炫技代码”,给维护带来很大困难。
下面是一个非常直观的方法:
1 | public String punch(People t) { |
经过一个莫名其妙的异步包装,原来的可扩展性、性能均没有提升,甚至性能成功降低了:
1 | public Function doPunch(People t) { |
实际情况可能会复杂得多,这样的代码不够简单直观、容易暗藏 Bug、不符合 KISS 原则,但是即便如此,还是有很多人会觉得特别绕的代码能体现一个人的水平、对代码的驾驭能力、能灵活运用设计模式的能力,我觉得大部分情况下事实并非如此。
Spring - DeferredResult
TODO
Hystrix - Command
Hystrix 的 Command 框架在 CompletableFuture 的基础上提供了合并请求的特性:
1 | public class BatchGetDataCommand extends HystrixCommand<List<Double>> { |
任务调度
补偿执行
异常订单表,定时任务
定时任务的实现
无分布式的,HashTimeWheelTimer
分布式情况下,可以先用 Redis,当复杂时再用 RabbitMQ 等消息队列中间件。
消息 OR 定时任务
很多异步实现的功能既可以通过消息实现又可以通过定时任务来实现,我经历过很多需要抉择的情况,甚至也碰到过对此都不大清楚的架构师,我认为对此没有唯一的答案,只能根据具体业务场景来分析,一些要点可供参考。
分布式
并发安全
运维便利性
消息本质上是把任务暂存在队列服务里,统计某段时间内发生了什么、会发生什么就比较困难了,因为往往消息队列都会有自定义的数据格式,判断一条消息是否被消费往往得通过日志来判断。
定时任务一般都会有执行记录,什么时间会执行任务也可以直接用 cron 表达式计算出来,缺点是使用定时任务意味着需要自己维护很多东西,比如在数据库里维护一个队列保存消息,保存有创建时间、重试次数、重试时间等,失败 N 次后还需要存一份失败记录。
以一个“通知拉取”的场景举例,A 系统需要从 B 系统拉取数据,但并不是定时地直接调 B 的接口,而是先由 B 通知 A 哪些数据发生了变更,然后由 A 去拉取这些数据的变更部分:
- 如果使用消息队列来实现:
- 如果使用定时任务实现:
使用 Thread 实现任务调度
实现任务调度最简单的方式可以直接利用 Thread:
1 | BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); |
Timer
Timer 内部通过一个 TimerThread 来循环执行提交给 Timer 的任务,因此任务是串行的,前一个任务的延迟会影响后续的所有任务。
Timer 可以看做一种简化版的 ScheduledExecutor,下面是一种 Demo 实现:
1 | public class DemoTimer { |
ScheduledExecutor
ScheduledExecutor 能提供一种更灵活的周期性任务处理功能,
Quartz
Spring Task
Quartz 集群版
tbschedule
xxl-job
Elastic-Job
Elastic-Job 分为 Elastic-Job-Lite 和 Elastic-Job-Console 两个模块。Elastic-Job-Lite 实现了分布式任务调度、动态扩容缩容、任务分片、失效转移等功能。
如上图所示,Elastic-Job-Lite 采用去中心化的调度方式,由 Elastic-Job-Lite 的客户端定时自动触发任务调度,通过任务分片的概念实现服务器负载的动态扩容和缩容,并且使用 ZooKeeper 作为分布式任务调度的注册中心;当某任务实例崩溃后,自动失效转移,实现高可用。
消息队列(MQ)原理
MQ 有很多优势,当我们选择 MQ 时,主要是为了:
- 解耦……
比如 A 系统要将用户提交的数据推送到 B、C 两个系统的时候,最初的想法很有可能是直接用 http 或 rpc 调用实现。像这样的下游系统后来又会多出 D、E、F…,对 A 系统的压力就会越来越大,更复杂的场景中,数据通过接口传给其他系统有时候还要考虑重试、超时等一些异常情况。
这时,对 A 来说更好的方案是将消息发送给 mq,不管有哪个下游系统需要这个数据都可以直接订阅这个 subject。 - 异步
当请求比较复杂,而其中有部分数据没必要实时更新时,可以用 mq 实现异步化。比如取消订单后需要做订单状态的更新、对账、退款等操作,而其中只有状态的变更是有必要实时反馈给用户的,那么后续的所有操作就完全可以做成异步的。 - 削峰填谷
消息队列作为缓冲队列应对突发流量时,并不能使处理速度变快,而是使处理速度变平滑,从而不会因瞬时压力过大而压垮应用。
举个例子,比如我们的订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒 1000 左右的并发写入,并发量再高就容易宕机。
低峰期的时候并发也就 100 多个,但是在高峰期时候,并发量会突然激增到 5000 以上,这个时候数据库肯定死了。
但是使用了 MQ 之后,情况就变了,消息被 MQ 保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒 1000 个数据,这样慢慢写入数据库,这样就不会打死数据库了。
如果没有用 MQ 的情况下,并发量高峰期的时候是有一个“顶峰”的,然后高峰期过后又是一个低并发的“谷”。
但是使用了 MQ 之后,限制消费消息的速度为 1000,但是这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了。
但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在 1000QPS,直到消费完积压的消息,这就叫做“填谷”。
高可用
使用了 MQ 之后,我们肯定是希望 MQ 有高可用特性,因为不可能接受机器宕机了,就无法收发消息的情况。
这一块我们也是基于 RabbitMQ 这种经典的 MQ 来说明一下:
RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种 MQ 的高可用性怎么实现。
rabbitmq 有三种模式:单机模式,普通集群模式,镜像集群模式
单机模式
单机模式就是 demo 级别的,就是说只有一台机器部署了一个 RabbitMQ 程序。
这个会存在单点问题,宕机就玩完了,没什么高可用性可言。一般就是你本地启动了玩玩儿的,没人生产用单机模式。
普通集群模式
这个模式的意思就是在多台机器上启动多个 rabbitmq 实例。类似的 master-slave 模式一样。
但是创建的 queue,只会放在一个 master rabbtimq 实例上,其他实例都同步那个接收消息的 RabbitMQ 元数据。
在消费消息的时候,如果你连接到的 RabbitMQ 实例不是存放 Queue 数据的实例,这个时候 RabbitMQ 就会从存放 Queue 数据的实例上拉去数据,然后返回给客户端。
总的来说,这种方式有点麻烦,没有做到真正的分布式,每次消费者连接一个实例后拉取数据,如果连接到不是存放 queue 数据的实例,这个时候会造成额外的性能开销。如果从放 Queue 的实例拉取,会导致单实例性能瓶颈。
如果放 queue 的实例宕机了,会导致其他实例无法拉取数据,这个集群都无法消费消息了,没有做到真正的高可用。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性可言了,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式
镜像集群模式才是真正的 rabbitmq 的高可用模式,跟普通集群模式不一样的是:创建的 queue 无论元数据还是 queue 里的消息都会存在于多个实例上,
每次写消息到 queue 的时候,都会自动把消息到多个实例的 queue 里进行消息同步。
这样的话任何一个机器宕机了别的实例都可以用提供服务,这样就做到了真正的高可用了。
但是也存在着不好之处:
- 性能开销过高,消息需要同步所有机器,会导致网络带宽压力和消耗很重
- 扩展性低:无法解决某个 queue 数据量特别大的情况,导致 queue 无法线性拓展。就算加了机器,那个机器也会包含 queue 的所有数据,queue 的数据没有做到分布式存储。
对于 RabbitMQ 的高可用一般的做法都是开启镜像集群模式,这样起码来说做到了高可用,一个节点宕机了,其他节点可以继续提供服务。
高性能
- 对于内存操作的线程分离,大部分中间件做法是将数据文件缓存与内存中,通过异步线程 flush 至硬盘
- 合理的存储引擎对应不同的服务场景 B+树,hash,LSM
- 对于消息队列,选取顺序读写磁盘的方式,可以高效的提升磁盘 IO 速度
- 顺序写磁盘可以带来足够的写入速度,其读取方式为二分查找
- 对于 LSM 存储引擎,同样采用顺序写磁盘方式,牺牲一部分读性能从而获得更优越的写性能
消息队列如何选型
异步处理
延时消费
应用隔离(系统解耦)
比如有两个主题的消息,其中 A 主题的消息特别多,别的消息就会来不及处理。
这种情况有点类似于服务治理中的隔离策略:一个服务出错不能影响别的服务不可用。一般会采用线程池、信号量来实现。
MQ 消息中的主题隔离
数据同步
Canel 订阅数据库 binlog 可以实现数据库数据变更捕获,然后业务端订阅 Canel 进行业务处理,这种方式可以保证一致性,且不会有乱序问题。
数据异构
反模式-为了撇清关系所以使用 MQ 消息
反模式-为了解耦过度使用 MQ 消息
反模式-利用数据差异化来触发事件
公司里有几位老员工基于 MQ 监听器组件开发了一套 MQ 客户端,这套 MQ 客户端的核心就是能通过修改数据来触发监听对应字段修改事件的监听器,这样可以避免定义一大堆主题,看起来似乎变简单了对吗?但是经过一段时间的维护发现情况并非如此,大量监听器不再根据主题来相互关联,而是数据中的一大堆字段,最开始的一批开发爽了,因为需要定义的主题少了,少了很多手动发消息的代码,但是后续维护的人就糟了,试想,每次希望修改某个字段的时候都需要把监听该字段修改事件的监听器都找一遍。
两条业务同时修改
MQ 存在的缺陷
上边已经说过了优点,那么 mq 又有哪些缺点呢?
- 系统可用性降低
上面的说解耦的场景,本来 A 系统的哥们要把系统关键数据发送给 B、C 系统的,现在突然加入一个 MQ,现在 BC 系统接收数据要通过 MQ 来接收。
万一 MQ 挂了怎么办?这就引出一个问题,加入了 MQ 之后,系统的可用性是不是就降低了?
因为多了一个风险因素:MQ 可能会挂掉。只要 MQ 挂了,数据没了,系统运行就不对了。 - 系统复杂度提高
本来我的系统通过接口调用一下就能完事的,但是加入一个 MQ 之后,需要考虑消息重复消费、消息丢失、甚至消息顺序性的问题
为了解决这些问题,又需要引入很多复杂的机制,这样一来是不是系统的复杂度提高了。 - 数据一致性问题
本来好好的,A 系统调用 BC 系统接口,如果 BC 系统出错了,会抛出异常,返回给 A 系统让 A 系统知道,这样的话就可以做回滚操作了
但是使用了 MQ 之后,A 系统发送完消息就完事了,认为成功了。而刚好 C 系统写数据库的时候失败了,但是 A 认为 C 已经成功了?这样一来数据就不一致了。
并发修改
消息从发出到被消费会有一小段时间,这一段时间内数据可能会经过其他线程的多次修改,所以在消息消费方的编程中尤其需要注意并发修改的问题。
如果是同步操作——比如用户购买商品扣款的场景——需要在比较高并发的情况下才会出现并发问题,但是如果功能是基于消息实现的,由于消息消费具有不确定性,这种风险会被放大。
- 这些并发查询是在不同的站点实例 / 服务实例上完成的,进程内互斥锁无法解决问题。
- 不确定性来自于很多方面,比如同一主题的消息可能被多个业务线的触发、被重试、被手动重发。
解决这种不一致问题的解决办法一般是加锁,因为异步处理并没有直接被用户感知,因此对效率并没有特别高的要求,悲观锁或乐观锁都是可行的。
悲观锁会牺牲一定的吞吐量,乐观锁实现起来比较有技巧性、且可能会和业务数据耦合。
以乐观锁为例:
1 | // 查询订单,Order中包含了版本信息 |
At-Least-Once(消息丢失)
有很多情况可能发生 MQ 消息的丢失:
- 生产者向 MQ 发送消息时,网络传输出现问题;
- 消息在 MQ 中存储时,发生磁盘故障等不可控问题;
- 消费者从 MQ 接收消息时,网络传输出现问题;
一般 MQ 中间件都会保证At-Least-Once的消费,就需要避免消息丢失的情况,有两种方式可以解决这种情况:
事务方式:
在生产者发送消息之前,通过channel.txSelect
开启一个事务,接着发送消息
如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚channel.txRollback
然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务channel.txCommit
。
但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。
另外一种方式就是通过 confirm 机制:
这个 confirm 模式是在生产者哪里设置的,就是每次写消息的时候会分配一个唯一的 id,然后 RabbitMQ 收到之后会回传一个 ack,告诉生产者这个消息 ok 了。
如果 rabbitmq 没有处理到这个消息,那么就回调一个 nack 的接口,这个时候生产者就可以重发。
事务机制和 cnofirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿
但是 confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 rabbitmq 接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
Rabbitmq 弄丢了数据
RabbitMQ 集群也会弄丢消息,这个问题在官方文档的教程中也提到过,就是说在消息发送到 RabbitMQ 之后,默认是没有落地磁盘的,万一 RabbitMQ 宕机了,这个时候消息就丢失了。
所以为了解决这个问题,RabbitMQ 提供了一个持久化的机制,消息写入之后会持久化到磁盘
这样哪怕是宕机了,恢复之后也会自动恢复之前存储的数据,这样的机制可以确保消息不会丢失。
设置持久化有两个步骤:
- 第一个是创建 queue 的时候将其设置为持久化的,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化 queue 里的数据
- 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 rabbitmq 就会将消息持久化到磁盘上去。
但是这样一来可能会有人说:万一消息发送到 RabbitMQ 之后,还没来得及持久化到磁盘就挂掉了,数据也丢失了,怎么办?
对于这个问题,其实是配合上面的 confirm 机制一起来保证的,就是在消息持久化到磁盘之后才会给生产者发送 ack 消息。
万一真的遇到了那种极端的情况,生产者是可以感知到的,此时生产者可以通过重试发送消息给别的 RabbitMQ 节点
消费端弄丢了数据
RabbitMQ 消费端弄丢了数据的情况是这样的:在消费消息的时候,刚拿到消息,结果进程挂了,这个时候 RabbitMQ 就会认为你已经消费成功了,这条数据就丢了。
对于这个问题,要先说明一下 RabbitMQ 消费消息的机制:在消费者收到消息的时候,会发送一个 ack 给 RabbitMQ,告诉 RabbitMQ 这条消息被消费到了,这样 RabbitMQ 就会把消息删除。
但是默认情况下这个发送 ack 的操作是自动提交的,也就是说消费者一收到这个消息就会自动返回 ack 给 RabbitMQ,所以会出现丢消息的问题。
所以针对这个问题的解决方案就是:关闭 RabbitMQ 消费者的自动提交 ack,在消费者处理完这条消息之后再手动提交 ack。
这样即使遇到了上面的情况,RabbitMQ 也不会把这条消息删除,会在你程序重启之后,重新下发这条消息过来。
消息重复
一般情况下 MQ 除了不保证消息的有序性外、还不保证消息不重复。
因为在「网络不可达」的情况下,MQ 不能确认消息接收方收到了消息必然会重试。重试除了本文讲的幂等处理外,还可以采用每个消息有唯一的 ID+去重表实现。
消息的有序性
因为 MQ 消息在服务器上是分区存储的,每个分区自己是有序的。分区被接收端消费的时候。一般也是多个接收端一起消费。中间的每个环节都是只能保证局部有序。如果想全局有序。就需要分区只有一个,并且接收端服务器是单点,而且一次只处理一个请求。
TODO: TCP 是怎么做的。
消息积压
参考
任务调度
- cron
crontab 定时任务 - 中心化-去中心化调度设计
xxl-job 和 elastic-job 在设计上的本质区别是中心化还是去中心化。