项目总结-bm

  1. 业务背景
  2. 业务逻辑
  3. 收益
  4. 技术选型
    服务器几台、QPS多少、存储、缓存、MQ等
  5. 架构图
  6. 规划
  7. 个人角色 / 做了什么
  8. 技术难点
  9. 遇到的问题
  10. 复盘 / 未来规划

京医通项目介绍

患者通过微信公众号、APP、自助机等多渠道多平台的客户端,透过防火墙和安全中心访问京医通线上服务,包括:预约挂号、京医通卡管理、在线缴费、充值等服务。
京医通

限流

与外部系统的交互需要限流以保护接口:

  • HIS -> benmu:使用Guava的RateLimiter实现;
  • benmu -> HIS:Semaphore+ScheduledThreadPoolExecutor。

幂等

消息会重试,因此Consumer总是要考虑到幂等:

  • 如果和订单相关,可以根据订单的状态来判断,比如有一个订单通知功能需要在状态A时触发,发送完后将订单状态改为B,如果此时订单状态已经是B了,就可以直接跳过这个操作了。
  • 上一种情况其实存在漏洞,因为查状态->发通知这两个操作并不是原子的,很多时候要实现严格幂等的话还需要用到Redis,其实幂等在实现上和加锁一样,这里加锁的过期时间和业务相关,比如要确保一个操作1小时内只能执行1次,则这个过期时间就可以设置为1小时。

容灾

分为业务接口层面和系统层面的容灾,先说业务接口层面的容灾。

  • 如果是号源这样的非实时接口,我们允许短时间内不可用,只要之后能正常访问即可,代价是这段时间内用户查看到的数据不是最新的。
  • 如果是锁号这样的实时接口,因为我们的业务场景不允许超买超卖,因此这个接口没法做异步化,出错了就是出错了,报警后需要人工进行排查,很多时候需要反馈给HIS进行修复。
  • 如果是支付成功通知这样的异步执行接口,则可以通过重试来提高执行成功的概率,如果重试都失败了,则会调用逆向的回滚接口(比如锁号接口的逆向就是退号接口)。

系统层面:

  • 如果是进程本身出问题了,比如内存占用超标、CPU打满等情况,可以通过zabbix来判断(运维管理);
  • 如果是JVM内的问题,比如FULLGC了、中间件客户端连不上服务器了等情况,可以通过一个探活服务来检测;
  • 如果是业务相关的问题,一般是error日志,我们可以统计ELK日志然后发报警。

数据量(QPS)

  • 业务服务器最高的是单机2000qps,30tps(只考虑下单请求),8台机器。
  • RocketMQ的并发量可以通过RocketMQ-console来查看,最高的Topic一天是190W条消息,消息总量一天会有千万的量级,不过平均到每秒也就只有100的量级。
    RocketMQ消息量可以在RocketMQ-console上看到:
    RocketMQDataSize
  • Redis的QPS在低峰期大约在500,高峰期,注意Cluster是有3台Master来提供服务的,且不同业务的集群多少有点差异。
    Redis的QPS可以通过记录两次info命令中的total_commands_processed属性再做差得到。
  • MySQL,像订单中心低峰期的QPS是30左右,不同库的QPS会有差别,但是差得不会太多。
    MySQL的QPS可以通过比较两次执行show global status like 'Queries%';命令的结果来得到。

流程描述

号源抓取

号源抓取
号源相当于是商城系统里的库存,维护号源同样会面临库存维护的诸多问题,比如多级缓存。
号源可以看作医院内医生某个时间段内能坐诊(意思是医生在医院等固定地点给人看病)的人次,这个概念其实完全可以和商城系统中的商品库存作类比,事实上我们代码里就是这么抽象的:将号源称为Product,数量称为Inventory。
我们这边的号源维护逻辑是由His维护号源的数据库,而我们这边则主要维护号源的缓存,并提供多级缓存,供用户查询时使用,并在库存发生变更的时刻及时更新。到此为止似乎业务逻辑还是比较简单的,实际上这块业务逻辑的复杂性主要来自于对多级缓存的存储结构和号源拉取规则设计。

product(号源本地缓存、过滤等功能)实例7台,accessor(号源抓取规则)3台,单机的号源查询QPS约上百,并不多。
使用Redis存储号源缓存,3主3从。
RocketMQ发送消息,2主4从。
MySQL存号源快照。

号源有Redis缓存和本地缓存,缓存的结构简单地说都是<医院-科室, 日期, 库存信息>,当然其实有其他缓存,只是用途不同,比如用于统计某个号源的查询次数时以回源时,只用记下次数。
号源缓存并不能和数据库中的数据保持实时一致,而是尽量保证即使一时不一致,过一段时间后也能最终保持一致,即最终一致性

  1. 写入时触发主动缓存刷新;
  2. 读取一定次数后回源并更新缓存。

缓存更新最常见的策略是“先更新数据库,后失效缓存”,但是这种策略最大的问题是缓存实时性不高,缓存失效失败后缓存中就还是旧的值,而且每次失效后,查询缓存都会发生一次MISS。
反观号源缓存维护策略,更新缓存采取的是“先更新数据库,后更新缓存”的策略,这种策略存在的问题是:

  1. 存在缓存更新失败的情况,对此我们的解决方式是在抓取失败后重试
    但是如果长时间缓存更新失败,缓存数据就会一直都是老的了,对此,我们的解决方案是查询也会触发抓取号源,只不过查询请求太多,不可能每次查询都抓取一次号源,因此对抓取规则进行限制,规则如下:
    • 如果号源缓存为空,表示之前没拉过,则直接触发拉取;
    • 如果当前时间在配置的放号时间内(一般是下午3点),则每秒拉一次;
    • 如果是热门科室(根据科室的访问次数统计分数排名得到)且当前库存量低于10,则每5秒拉一次;
    • 不是热门科室但是库存量也低于10,则每10秒拉一次;
    • 其他情况下,每30秒拉一次。

      即使我们尽力保证缓存的一致性,但是一定时间的不一致是无法避免的,主要是号源缓存并不要求严格的一致性,而是只需要最终一致性,即使一时失败,也可以通过后续的刷新来保持一致,因此问题不大。

  2. 存在一个问题,事务A和B同时更新一条数据,事务A先更新了数据库,但是后来的事务B却先更新了缓存,那么缓存中存的就是旧值了,我们的主要通过对更新号源、更新缓存这个流程进行加分布式锁实现的。
  3. 以前,我们这边的缓存是Redis单机Master的,通过Sentinel集群实现高可用,效率有问题,因此除了Redis缓存,号源更前端还有一个本地缓存,本地缓存每3秒回源(查Redis),后来Redis集群转为了Cluster模式,所以效率问题已经大大缓解了。本地缓存还有一个优点是减少了网络的交互,不过缺点是耗费了更多的应用内存

号源抓取 - 放号提醒

放号提醒
比对缓存中的老号源信息和新拉取的号源信息。
并不是每次拉取号源都是放号,实际上放号提醒QPS比拉取号源QPS低一个数量级。

锁号生成订单

挂号

  1. 从HIS抓取号源时,经过医院网关系统ha,具备并发控制、限流、验签等功能;
  2. 从HIS抓取号源后,保存到号源缓存;
  3. 订单存储成功后,维护订单缓存,并异步刷新ES以提供多维度查询;
  4. 订单支付时,提交支付表单,跳转到支付页进行支付,支付完后回调通知业务,进行订单状态的转换。

订单的多级缓存

见《订单中心》。

如果失败怎么办?

挂号操作的任务是扣减库存然后生成订单,显然,这两个操作并不是原子的,如果扣减库存成功,然后生成订单失败,那么库存不就浪费了吗?
实际上号源也是有状态的,刚开始,号源被占用一个后,仍然处于PREPARE状态,这个号只是被暂时占用了,被实际占用还要等到用户支付后,如果30分钟内用户不支付,那么这个库存会被自动释放掉。

轻问诊

主要就是答题,使用到了规则引擎,见:《规则引擎》

多渠道订单同步

多渠道订单同步流程

候补

候补流程

电子病历

电子病历

缴费

缴费

重复拉订单问题
拉取缴费单后会先存储到一张中间表,然后在用户自己准备支付时生成订单,这时有可能重复生成订单,所以实际上是会加分布式锁的,而且用户有可能在操作自助机的同时,在微信端操作,这时,由于加锁维度问题,有可能产生重复生成订单的问题。
刚开始:加锁是对缴费单的就诊号、处方组加锁,而且会尽可能拼上用户的就诊卡等。
后来:仅仅对就诊号、处方组加锁,虽然处理方式挺粗暴,但是确实解决问题了。

线上医保分解

线上缴费和线下缴费的主体流程并没有太大的差异,主要区别在于:线上医保分解是后端调自助机的医保分解组件(首信提供)来执行医保分解、医保提交等操作。
这里的主要难点在于:

  1. 自助机是不支持并发的,因此,我们需要对并发进行限制;
    MIP网关图
  2. 医保分解和提交需要是在一个事务内执行,因此我们需要实现分布式事务。
    但是退一步,因为医保分解是可以重复分解的,因此就算医保分解失败多次,重试也不影响一致性,所以最终也就没有实现分布式事务了。

物流系统

物流系统的主要目的是提供和邮政等物流平台的交互,医院HIS本身已经接入物流平台的情况下,我们还会和HIS交互。
express系统表结构
物流系统涉及模块
物流系统交互
物流实现流程

共享轮椅

共享轮椅
共享轮椅项目我曾经维护过一阵,但是后来组织结构变更,项目就交给其他小组维护了。
对共享轮椅印象比较深的一点是后端服务器通过Netty与锁平台交互,ChannelHandler主要是:

  • IdleStateHandler
    空闲检测
  • DelimiterBasedFrameDecoder
    半包/粘包拆分器
  • 一个自定义的数据包解析器
    格式如下:
    数据格式
    在以上数据结构中,比较重要的是命令ID和数据长度,凭这两个字段可以取到完整的命令信息,协议版本主要用于平滑升级,网关mac可以标识请求来自哪个基站,CRC校验码暂时没有使用。

支付中心

pay-center

架构变迁、遇到的问题及解决方法

网络

大致架构

  • 内部服务器 -> 外部系统
    业务服务器 IP -> Nginx -> 防火墙 -> 外部系统(Client)
  • 外部系统 -> 内部服务器
    外部系统(Client) -> 防火墙 -> HA vip(负载均衡 keepalived + HAProxy) -> Nginx -> 业务服务器 IP

探活 - HealthCheck

这并不是特别复杂的功能,稍微介绍下算了。

背景

之前公司内采用Redis哨兵集群,不稳定,偶尔抖动进入TILT,当时的lettuce客户端就连不上了。

解决方案

短期定时调各个实例的Redis客户端,看看是否连通;长期将Redis哨兵集群迁移到Redis Cluster集群(下面细说)。

探活原理

从应用服务器的探活接口获取到应用健康信息后,Task服务器会执行一个报警汇总和发送过程,报警发送规则:

  1. 每10秒扫描所有服务器一次,判断是UP还是DOWN:
    如果是UP,将异常服务信息从缓存(Redis)清除,因为服务已经恢复正常,所以没有必要再发送异常通知了;
    如果是DOWN,将异常服务器信息保存到缓存(Redis),key为<serverName, serverAddr>,value需要保存内容包括<errorCount, startTime, endTime, alarmInfo>
    注意添加与清除操作存在并发竞争,因此通过分布式锁来同步,同步粒度是一个服务;
  2. 定时任务判断各服务器健康状况,对每个服务器,如果距离最开始汇报时间已经超过3分钟,且汇报次数超过10次,则执行报警;
  3. 发送报警实际上是调用了腾讯云的发送短信接口。
    当然也不是每次都会发送,比如超过最大发送次数、超过最长发送时间,都会限制不再发送;而且,也并不是每次执行步骤2的定时任务就要发送,还需要进行一个限流,原理其实就是Guava里的RateLimiter

收益

接入后至今已经有多次准确检测到服务器宕机的案例,较为灵敏,且是通过短信发送报警,组内成员基本均能及时响应,可以成功探测到:

  1. Redis服务器不可用的情况;
  2. FullGC导致的服务不可用情况;
  3. Redis客户端连不上服务器的情况(比如网络出问题)。

Sentinel -> Cluster

背景

为什么要从Sentinel升级到Cluster?除了上面提到的Sentinel的诸多缺点外,我们还遇到了:

  1. 长久以来(至少为期一年)有个历史问题没有解决,在很偶然随机的情况下,redis哨兵集群不稳定抖动自行恢复之后,业务方客户端重连不上调用抛异常;
  2. Redis服务器版本低,不稳定的因素有很多,比如AOF的io堵塞问题,高版本都有修复和一系列bug fix 及性能优化改进;

比如某一次,很多医院频繁出现挂号、缴费系统繁忙的报错:

  1. 问题反馈时(报警+用户反馈),检查Redis服务器本身没有问题,连服务器调info命令返回内容都是正常的;
  2. 为了及时恢复,将涉及的服务器先重启了,发现确实服务恢复了;
  3. 发现服务日志中出现:
    RedisException: Master is currently unknown: ...

故障review发现:Redis服务端因为网络抖动发生了主观下线,很快自动恢复了,但是客户端并没有恢复,主要是因为lettuce客户端版本相对比较老。
为此,我们所做的:

  1. 探活,针对客户端无法恢复的情况,探活检查服务器挂掉的情况,实时通知开发;
    探活实现了对Redis客户端及服务器可用性的检查,之后发现还可以检查长时间的FULLGC,这是运维的zabbix无法实现的。
  2. sentinel存在单点问题,而且容量已经不大够了且扩容困难,于是重建一套Redis Cluster;
  3. 服务端从Sentinel迁移到Cluster,客户端也需要同时进行迁移,迁移过程中需要双写双读;
    数据库的迁移(扩容)实际上一般也会采取双写双读,注意不能采取监听binlog的方式,因为监听binlog是异步的,数据的写入并不实时。

    数据库检查迁移的过程检测可以看阿里的愚公。

  4. 新客户端采用Redisson,对分布式锁提供了更好的支持,刚开始我们这边采用的分布式锁是setnx+ex,后来采用set nx ex,之后引入Redisson后转为RedLock,提高了可靠性;

迁移原理

存储中间件的迁移一般包括存量迁移和增量迁移这两个步骤,在Redis中,可以:

  • 存量迁移:可参考我的另一篇博文《Redis高可用方案Cluster》。
    存量迁移的主要问题是迁移过程中的请求应该被重定向到数据当时所在的节点上,对于这点Redis中是通过服务端的重定向来实现的,Master会记录一个槽在迁出节点上还是在迁入节点上,请求会被重定向到槽当前所在的节点上;
  • 增量迁移:在线迁移过程中,新数据会源源不断地被添加到Redis集群中,对这些数据,一般的处理方法是同时写入新节点和老节点,可以参考下面的迁移流程;

双读和双写的数据库迁移方案中也提到了我所提的方案,我们采用的正是不停机的渐进式双读方案,而不是带存量迁移的双写双读方案。

迁移流程

客户端修改及增量同步

原Redis Sentinel集群是1主2从3哨兵,使用lettuce作为客户端。
新Redis Cluster集群是3主6从,使用Redisson作为客户端。
集群不同,客户端的连接方式也有所不同,客户端启动及创建连接方式:

  1. 客户端启动时,会从配置中心读取分配给当前服务的Redis集群地址,然后创建Redisson对象;
  2. 因为是3主6从的Cluster架构,因此客户端会先找到Master,之后的读写请求都是针对Master进行的;

迁移过程访问Redis的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public void set(String bizName,String key,String value){
//新开关初始值为true
if(newRedissonFlag){
//为防止新redisson瞬间网络抖动带来的新老redis数据不一致添加重试功能,重试功能只需在写时重试
for (int i = 0; i <3 ; i++) {
try {
//操作新的redisson需要用try..catch 包起来
RBucket<String> bucket = redissonClient.getBucket(this.entireKeyPrefix + bizName + keySeparator + key);
bucket.set(value);
LOGGER.info("走新的redisson,方法:set -> key:"+key+",value:"+value+",重试次数:"+i);
break;
} catch (Exception e) {
LOGGER.error("走新的redisson异常,方法:set -> key:"+key+",value:"+value+",重试次数:"+i,e);
try {
Thread.sleep(5);
} catch (Exception ex) {
LOGGER.error("redisson set() 重试sleep异常",ex);
}

}
}
}
//老开关初始值为true,在新老功能并行至老的redis中key的最大过期时间后方可关闭
if(oldRedisFlag){
sChairRedisCache.set(key, value);
LOGGER.info("走老的redis,方法:set -> key:"+key+",value:"+value);
}
}

public String get(String bizName,String key){
String obj = null;
String objNew=null;
String objOld=null;

// 新Redis开关,打开的情况下查下新Redis
if(newRedissonFlag){
RBucket<String> bucket = redissonClient.getBucket(entireKeyPrefix + bizName + keySeparator + key);
objNew = bucket.get();
obj = objNew;
LOGGER.info("走新的redisson,方法:get -> key:"+entireKeyPrefix + bizName + keySeparator + key+",objNew:"+objNew);
}
//老开关初始值为true,在新老功能并行至老的redis中key的最大过期时间后方可关闭
if(oldRedisFlag){
objOld = sChairRedisCache.get(key);
LOGGER.info("走老的redis,方法:get ->key:"+key+",objOld:"+objOld);
obj = objOld;
if (StringUtils.isBlank(objNew) && StringUtils.isNotBlank(objOld)){
if(newRedissonFlag){
//为防止新redisson瞬间网络抖动带来的新老redis数据不一致添加重试功能,重试功能只需在写时重试
for (int i = 0; i <3 ; i++) {
try {
//操作新的redisson需要用try..catch 包起来
RBucket<String> bucket = redissonClient.getBucket(entireKeyPrefix + bizName + keySeparator + key);
bucket.set(obj);
LOGGER.info("走新查询未命中老的重新set的redisson,方法:get -> key:"+entireKeyPrefix + bizName + keySeparator + key+",value:"+obj+",重试次数:"+i);
break;
} catch (Exception e) {
LOGGER.error("走新查询未命中老的重新set的redisson异常,方法:get -> key:"+entireKeyPrefix + bizName + keySeparator + key+",value:"+obj+",重试次数:"+i,e);
try {
Thread.sleep(5);
} catch (Exception ex) {
LOGGER.error("redisson set 重试sleep异常",ex);
}
}
}
}
}
}
return obj;
}
public void delKey(String bizName,String key){
//新开关初始值为true
if(newRedissonFlag){
//为防止新redisson瞬间网络抖动带来的新老redis数据不一致添加重试功能,重试功能只需在写时重试
try {
//操作新的redisson需要用try..catch 包起来
RBucket<String> bucket = redissonClient.getBucket(entireKeyPrefix + bizName + keySeparator + key);
bucket.delete();
LOGGER.info("走新的redisson,方法:delkey -> key:"+entireKeyPrefix + bizName + keySeparator + key);
} catch (Exception e) {
LOGGER.error("走新的redisson异常,方法:delkey 删除key:"+entireKeyPrefix + bizName + keySeparator + key,e);

}
}
//老开关初始值为true,在新老功能并行至老的redis中key的最大过期时间后方可关闭
if(oldRedisFlag){
sChairRedisCache.del(key);
LOGGER.info("走老的redis,删除key:"+key);
}
}
  1. 刚开始,老Redis集群和新Redis集群是同时工作的,对Redis的访问也是同时访问新老Redis集群。
    如上代码所示,在迁移过程中新老开关都是打开的(新老Redis开关可以统称双写开关),这时get以老Redis中的值为准,可能新Redis中没有值,所以最后还把老Redis中的值设置到了新Redis中。
    set会先设置新Redis,然后再设置老Redis,del同理。
  2. 有全量同步吗?
    实际上我们没有做全量同步,因为没必要:
    如基础数据这样的服务,它本身所有的数据都是定时全量加载的,到时候全刷新一遍新Redis就可以了。
    其他服务不能保证会不会存在一些存量数据(指不会通过任何手段重新set的),可以参考上边的get的逻辑:新Redis不存在而老Redis存在的情况下,会把这些kv直接塞给新Redis,这样实际上就避开了需要全量同步的情况。
  3. 之后,观察业务是否稳定,然后关闭老Redis的开关
    什么时候关闭?同步完成的时候,即双写开关打开前老Redis里的数据已经全部转移到了新Redis里,更具体地说,等老Redis中最老的数据的过期时间到了后,并且get时会将老Redis中的数据转移到新Redis里并打日志,如果这样的日志不打了,就说明转移完毕了。
    正如上边代码逻辑所示,关闭老Redis开关后,所有对Redis的操作都不走老Redis了。
    我们会先切换比较边缘的业务,最后再切换核心业务,保证Redis迁移不影响业务。

用量和性能

单机服务器内存总量是125.56G,平均用量是1.27G,而峰值用量是4.63G,基本上资源是过剩的。
压测命令:

1
ab -n2000000 -c 5000  http://localhost:9001/testRedisson

经过压测,单个4c8g的qps的都在8000左右,单个8c16g的qps在16000左右。

扩容方式

暂时没有遇到需要扩容的场景,但是如果遇到了,可以参考一下我记录的扩容原理:Redis高可用方案Cluster - 扩容-缩容

多级缓存

打个比方,我们现在有这样一个场景:需要存储10亿个value,每个value占用100k,怎么设计缓存存储?
如果直接有多存多少,那么服务器肯定是不够用的,假设一台Redis占用1G(当然实际一台并不会这么小),那么题中所述的数量至少要100W台(当然还要考虑key占用的内存大小),这肯定是不能接受的,所以一般会采用多级缓存来存储,上层缓存服务器快,分配的空间少,下层服务器负责持久化存储,分配的空间大。

QMQ -> RocketMQ

背景

qmq大体流程是这样的:生产者在本地写数据库记录,调用qmq的服务端的dubbo接口,qmq的dubbo接口里是将消息存到数据库,写入redis,根据订阅qmq消息情况,从qmq的服务端发起dubbo接口调用,去调用consumer提供的dubbo接口。

刚开始公司内所有老项目都是使用的QMQ,后来转到了RocketMQ,具体原因如下:

  1. qmq基于redis和mysql的存储,系统复杂性太高,稳定性风险较大。
    相较起来,RocketMQ是基于文件系统的,不需要维护数据库的索引,而且是顺序读的,CommitLog文件达到一定大小就会生成一个新文件,所以效率更高更稳定。

    出现过qmq连不上Redis,消息没被删掉,导致Redis内存几乎被打满的情况。当时排查出是qmq中的Redis客户端连不上服务器导致的,所以后来做了探活功能,并计划迁移到RocketMQ。

  2. qmq的存储瓶颈主要在于mysql调用,消息堆积能力远不及rocketmq,而且数据量增大后已经出现热点库表问题。
  3. 为了弥补消息堆积能力的不足,qmq的做法是起一个task服务定时清理redis和mysql里的历史消息,热消息即时删除导致task服务压力较大而且职责重大,历史消息需要转送往backup服务进行额外存储,不仅又增加了task服务的工作负担,而且新增加的backup服务也占用了额外的资源,尤其是其中包括hdfs的大数据组的存储资源。
  4. 内部设计有缺陷,主要是 “推模式” 的弊端
    完全依靠中间件去主动推送消息给到消费者,完全忽视了业务方消费者的多样性情况和每个消费者项目各自的特殊性,比如断网、掉线、处理能力有限或者业务复杂容易处理超时的情况,都一视同仁的设置超时时间和失败重试机制,导致业务方如果处理时间较长,中间件等不及也抛出超时异常并强行杀掉连接启动失败重试,即使业务方过了一段时间这个消息是消费处理成功的也没用了,中间件会一直失败重试直到达到重试次数上限值,不仅大大增加了失败重试的工作量,抛出了大量超时异常,而且一直重试也占用了中间件服务的大量资源导致中间件吞吐能力下降。

之前有一次QMQ出现问题:

  1. 消息消费出现问题,我们这边订单的通知HIS支付成功这一部出现了问题,好多订单没有及时通知,导致自动退掉;
  2. DBA发现Redis暴涨,原来占用不到1G,花了一早上暴涨到超过3G,过了1小时的过期时间key才因为过期而自动被清除;
  3. 怀疑半夜Redis发生抖动导致QMQ连接Redis出问题,因此重启QMQ,服务恢复;
  4. 发现QMQ的task服务在Redis发生抖动后就没有再进行消息的清除了,相当于业务一直在往Redis里塞消息却不把历史消息删掉,导致Redis被打满,Redis打满后QMQ即不可用、不再执行消息的推送(QMQ是“推模式”),所以业务也消费不到消息了。

相比之下,rocketmq架构简单,只依赖本地文件系统读写,系统相对更加稳定,吞吐和消息堆积能力更高。利用commitlog的数据文件进行本地文件存储,每个不同的topic下都有多个队列可以增加吞吐量,通过设置消息队列的offset,记录业务方消费者的下一次消费起始位置,而不是中间件主动重复推送和失败再推送,完全交由业务方消费者自己控制消费节奏,即使下线了不工作了等到重新上线又自己控制重新开始消费,这样不仅提高了业务方消费者的自主控制灵活性,也降低了中间件的工作负担,提高了整个系统的效率和消息处理能力。

平滑迁移步骤

按Topic逐步迁移:

  1. 搭建RocketMQ集群;
  2. 在使用qmq消息集群的系统里,选定一个topic,在新的rocketmq创建对应的topic;
  3. 找到该topic的所有消费者group,保持这些group的消费逻辑不变,增加RocketMQ对应消费者group的处理逻辑,与qmq的保持一致;
  4. 用基于RocketMQ的生产者替换原来基于qmq的生产者;
    此时对消费者来说既有来自qmq的消息正在消费中,又有来自RocketMQ的新消息。
  5. 等qmq消费不到数据时,把qmq消费者从代码逻辑中删除,这样就完成了一个Topic的平滑迁移。

为什么不直接将消息迁移到RocketMQ以实现迁移?

  1. QMQ数据结构和RocketMQ差异比较大;
  2. QMQ消息会存储到数据库中,且消费完毕后会被删除;
  3. 业务中不同的使用场景,包括延迟消费等等场景相对复杂。

RocketMQ架构

公司内RocketMQ采用两主四从的集群架构,集群模式是DLedger。

RocketMQ客户端接入

我们定义一个类似下面这样的客户端Consumer注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RMQConsumer {
String consumerGroup();

String topic();

String messageMode() default "CLUSTERING";

String tag() default "*";

String consumeFromWherec() default "CONSUME_FROM_LAST_OFFSET";

int consumeThreadMax() default 20;

int consumeThreadMin() default 20;

int pullThresholdForQueue() default 1000;
}

同时,会在应用启动期间把这个注解标记的类注册为消费者:

1
RMQConsumerRegisterContainer

用量

最高的是抓取号源消息,达到百万级的数量,其他如订单更新、发送推送之类的消息也达到了近百万。
总消息量约10亿的量级。

平滑迁移

从原来QMQ集群迁移到新RocketMQ集群,可以按如下流程进行:

  1. 增加RocketMQ消费者,逻辑和原来QMQ消费者一致;
  2. 在RocketMQ上创建对应的Topic;
  3. 增加RocketMQ生产者,新发布的服务器均生产RocketMQ消息,也就是说会有一段时间RocketMQ和QMQ的Broker、Consumer都会同时接收消息;
  4. 最终QMQ会把历史消息消费完,这时可以将原来QMQ的代码删掉。

扩容方式

暂时没有遇到需要扩容的场景,但是如果遇到了,可以参考一下我记录的RocketMQ原理:RocketMQ 原理总结

示例用法

发消息前diff老订单和新订单的字段,把新增、更新和删除的字段放到消息体里,消费者监听这种消息,然后根据配置执行一个监听器。

医院网关(ha / hospital-adaptor)

医院网关(ha)

业务背景

医院网关负责所有和医院HIS系统的交互,包括之前提到过的抓取号源、挂号下单、通知支付成功等接口。
HIS系统有一个非常鲜明的特征是:效率非常差。一般我们指的高并发是单机QPS至少也要上千,但是对于HIS系统来说,可能上百就不大行了,这样的情况常见于下午3点的放号时间,这是一个比较小的秒杀场景,到3点整时几乎所有医院未来第8天的号都会放出来,用户经常会为寥寥十来个号抢破了头,甚至有的抢不到的还会投诉。
对我们来说,虽然我们不能保证你一定能抢到号,但是我们需要保证抢号过程中HIS不要崩溃,因此限流的规则也是比较严格的:

  1. 非公平:每个请求(新用户请求或刚从队列里被唤醒的)进来时都会先尝试获取一遍信号量
  2. 排队:按等待时间排序,

QConfig -> Apollo

  • 使用的注解改一下;
  • 把配置手动粘到Apollo。

使用模板引擎实现问卷功能

背景:今年做的新功能,需要知道用户是否有患病的风险,所以做了个线上的问诊功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1.您的孩子本次就诊的症状中是否有发热(体温≧37.2℃)?
□有 □无
2.您的孩子本次就诊的症状中是否有咳嗽、流涕、咽痛?
□有 □无
3.发病前14天内孩子或父母及其他看护人是否有武汉市及周边地区,或其他有病例报告社区的旅行史或居住史?
□有 □无
4.发病前14天内孩子或父母及其他看护人是否与新型冠状病毒感染者(核酸检测阳性者)有接触史?
□有 □无
5.发病前14天内孩子或父母及其他看护人是否曾接触过来自武汉市及周边地区,或来自有病例报告社区的发热或有呼吸道症状的患者?
□是 □否
6.周围是否有2人及2人以上同时或先后出现发热或/和呼吸道症状?
□有 □否
·“1”和“2”中只要有一项答“有”的患者,如果同时有“3”、“4”、“5”、“6”中的任意一项,直接挂“感染门诊(一级科室)”下的“感染门诊(二级科室)”
·“1”和“2”中只要有一项答“有”的患者,如果“3”、“4”、“5”、“6”均答“无”的患者,挂“内科综合门诊”下任意二级科室的号,或“呼吸内科门诊(一级科室)”下的“呼吸内科门诊(二级科室)”任意号别的号
·“1”和“2”中均答“无”的患者,如果“3”、“4”、“5”、“6”均答“无”的患者,正常挂号。
·“1”和“2”中均答“无”的患者,如果“3”、“4”、“5”、“6”任意一项答“有”的患者,提示患者 “非急诊居家隔离14天,如果是急诊,建议到医院挂号就诊”并微信限制挂号。

刚开始思路就是做一个计算器,把表达式转成后缀表达式来计算结果。
后来发现用框架实现更直接,对比当时考虑的几种方式:

*

订单中心的变迁

订单中心
订单中心-ER图
订单中心的查询
订单中心的写入

为什么查询缓存的同时还要查一次库?

上面的流程图中可以看到,为了确定缓存中的数据是最新版本的,还查了一次数据库中的order_main表中的版本号。
直观来看,似乎查缓存的同时还查一次数据库,缓存的地位就变得非常尴尬了。实际上这个表只存版本号,用订单号来查索引的话,查询速度是非常快的,使用缓存而不是直接查数据库,原因其实是:数据库中的订单结构比较复杂,如果每次都查出整个订单进行反序列化,比较低效,因此加入了缓存。

写order_main和order_detail这两个操作并不是原子的操作,会不会发生并发问题?

不会,原来设计中在更新order_detail时也会校验version:

1
2
3
4
5
6
update ${table}
<set>
data = #{detail.data} ,
version = #{detail.version}
</set>
where id = #{detail.id} and order_id = #{detail.orderId} and section = #{detail.section} and version &lt;= #{oldVersion}

订单状态机

好多功能都需要监听订单状态,比如订单支付成功后要其他系统,如果直接写到原来的支付代码里,一方面耦合严重,以后不好维护,另一方面要实现重试、限流等功能并不简单,所以何不将这部分功能打包一下呢?
对于发送方,需要发送一个包含特定前缀的消息,一般这种消息会被包装成事件,在订单被修改时发送;也可以设计成订单修改时总会发送一个固定前缀的消息,而提供状态机功能的服务就主动监听这种消息就行了。
发送的内容不能单单是订单号,因为我们的状态机需要实现比对新老订单哪些字段发生了变更,因此,发送的消息中还需要包含所有发生变更的字段,大致逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// this为新节点,other为老订单,flatten将订单字段打平,这样更方便一个一个字段地比较
Map<Path, ValueNode> t = this.flatten();
Map<Path, ValueNode> o = other.flatten();
// diffs保存所有发生了变更的字段
List<DiffValue> diffs = Lists.newLinkedList();

for (Entry<Path, ValueNode> entry : t.entrySet()) {
Path path = entry.getKey();
ValueNode tNode = entry.getValue();
ValueNode oNode = o.remove(path);
if (!tNode.equals(oNode)) {
// 如果新节点和原节点不一致,是更新的字段
if (oNode != null) {
addDiff(diffs, DiffValue.DiffType.CHANGE, path, oNode, tNode);
} else {
// 新节点是新增的字段
addDiff(diffs, DiffValue.DiffType.ADD, path, null, this.find(path.getPath()).get(0));
}
}
}
// 老订单有而新订单里没有的字段其实就是已经被删除的字段
for (Entry<Path, ValueNode> entry : o.entrySet()) {
addDiff(diffs, DiffValue.DiffType.DELETE, entry.getKey(), other.find(entry.getKey().getPath()).get(0), null);
}
return diffs;

对于接收方,需要扫描注解注册监听哪些字段的变更,然后注册一个消费者来监听消息并尝试触发用户自定义的回调。

  1. 配置注解后,扫描指定的注解,该注解中需要指定监听哪些字段的变更事件
    注解扫描器:ActorScanner
    注解:@MActor(retry_step = 1, retry_maxCount = ConfirmOrderUtils.RETRY_MAX_COUNT)
    @MDiff(section = "main-order", path = "orderPayInfo.payStatus", newValue = "PAY_SUCCESS")
  2. 启动后设置事件监听器
    监听指定的订单变更事件
  3. 事件处理逻辑
    基本上就是遍历事件里的每个字段变更,判断是否与某个注解中指定的字段变更一致。

如何实现幂等?
消费成功后,将messageId保存到Redis。
消费前需要检查Redis里是否已经保存了该messageId。
幂等是有状态的,消费失败意味着可以重试;消费成功意味着下次再来相同messageId的消费可以忽略掉;如果是消费中,可以考虑阻塞等待,但更多时候是直接抛出异常然后等消息队列重试它。

ES索引

订单中心除了将数据保存到数据库和缓存外,还会异步地将数据索引到ES,供多维度的查询。

数据量:
总共60G的内存,其中20G分给JVM,40G分给LUA。

链路追踪

问到ThreadLocal的时候可以带出来。
如果问到有没有应用,说下消息队列迁移时,给RocketMQ接入TraceId。
链路追踪用ThreadLocal存TraceId,需要注意一个线程调起另一个线程时TraceId的传播:

  • 线程池
    构造Runnable时传进去。
  • HTTP
    放到请求头
    HttpTraceUtils
  • Dubbo
    写个Filter,通过SPI生效,将TraceId放到请求的attachments里。
    QTraceFilter
  • MQ
    放到消息体里

SpringMVC项目改造为SpringBoot

SpringMVC项目改造为SpringBoot项目。

收益

  1. 统一项目结构,为后续接入其他SpringCloud项目提供方便;
  2. 内存结构占用得更少了,

ServerManager -> DNS

ServerManager依赖DNS,导致应用服务并不是无状态的,所以难以实现自由的弹性伸缩,加机器比较复杂,要运维配置一大堆东西。

SpringBoot

SpringBoot使用一个parent统一指定各包的版本,benmu老项目里已经有一个parent包了,因此改为新建一个boot-parent,在里面声明boot版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<artifactId>boot-parent</artifactId>

<dependencyManagement>
<dependencies>
<!--声明boot版本-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.6.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

扫描包的位置,改到注解里

1
2
3
4
@SpringBootApplication(scanBasePackages = {
"com.benmu.platform.robin",
"com.benmu.root.bom.bundles.spring",
"com.benmu.common.redis"})

Dubbo

registries、protocol配到application.yaml里,reference写到Config类里。

MyBatis

原来定义在mybatis-config.xml中的typeHandler,现在需要用注解的方式定义。
比如:

1
2
3
4
<typeHandlers>
<!-- 把对象转成json存入数据库 -->
<typeHandler handler="package.JsonTypeHandler"
javaType="package.BaseData"/>

改成:

1
2
3
4
5
6
7
@MappedJdbcTypes(JdbcType.VARCHAR)
@MappedTypes(value = BaseData.class)
public class BaseDataTypeHandler extends JsonTypeHandler<BaseData> {
public BaseDataTypeHandler(Class<BaseData> clazz) {
super(clazz);
}
}

然后在配置文件中定义typeHandler所处的包:

1
2
3
mybatis:
# 需要把类型传到JsonTypeHandler的构造里,JsonTypeHandler没有默认构造,因此这里不能直接配
type-handlers-package: package

Config -> Apollo

1、在application-xxx.yaml中增加apollo配置

1
2
3
4
5
6
7
8
9
10
11
app:
id: robin
apollo:
meta: http://apollo.test.bmsre.com:18080
cluster: beta
# meta: #meta.url#
bootstrap:
enabled: true
eagerLoad:
enabled: true
namespaces: rocketmq,qmq,profile,json

2、配置注解
之前用的注解根本不是@Value,可以手动改掉,也可以给原注解设置别名(假设原注解名是MValue):

1
2
3
4
5
6
7
8
9
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Value(value = "")
public @interface MValue {

@AliasFor(annotation = Value.class, attribute = "value")
String value();
}

这样配置的含义是:@MValue被看作一种@Value,其value是@Value的value属性的别名。
接下来,把对应QConfig中的配置文件拷贝到Apollo,创建一个同名的Namespace,就可以使用了。

另附:

  1. Apollo接入Spring的原理:https://github.com/ctripcorp/apollo/wiki/Apollo%E9%85%8D%E7%BD%AE%E4%B8%AD%E5%BF%83%E8%AE%BE%E8%AE%A1#31-%E5%92%8Cspring%E9%9B%86%E6%88%90%E7%9A%84%E5%8E%9F%E7%90%86
  2. Apollo配置的更新和读取
    见:https://tallate.github.io/ea705bce.html

其他配置

将SpringMVC的xml配置改成代码里的Config配置,各种中间件的客户端啊、Filter之类的,要改的东西还是蛮多的。

遇到了什么问题?

  1. 应用服务GC时间超长
    半夜报警
    用pinpoint观察发现GC时间特别长
  2. 发现swap影响了GC时长
    观察gc日志时间长
    free看到空闲内存少、swap被占用
    sar查看有swap交换
    swap原理
    隐患:速度慢、Linux会把进程杀了
  3. 解决办法
    怎么查到对象占用内存多的(dump内存),修改后怎么样了

QA

有没有使用MQ实现过削峰?

MQ的3大功能:异步、解耦、削峰。
这个问题回答时容易把异步和削峰搞混,要知道削峰属于异步处理,但是异步并不都是削峰。
一般情况下我们要异步处理的主要目的是某个复合操作包含了好多耗时的接口调用,我们可以让这些耗时的操作被同时处理,甚至如果一些操作不重要的话也可以放到mq中处理。
我接触的削峰场景其实并发量并不高,只是三方接口承载量比较低,导致高峰期我们的业务也被拖垮了,所以把这些会调用失败的接口放到MQ中延迟处理。

有没有优化过SQL?

有一个印象比较深刻的SQL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
select
id,
card_no as cardno,
card_type as cardtype
from
mts_order_booking.hot_cancel
where
id >= (
select id
from mts_order_booking.hot_cancel
where to_days(create_date) = to_days(now())
limit 1
)
and to_days(create_date) = to_days(now())
limit 1000;

explain结果如下:

1
2
3
id  select_type table   partitions  type    possible_keys   key key_len ref rows    filtered    Extra
1 PRIMARY hot_cancel - range PRIMARY PRIMARY 8 - 66390 100 Using where
2 SUBQUERY hot_cancel - ALL - - - - 4764239 100 Using where

第二行是子查询,目的是找出今天的第一条记录,但是create_date并没有加索引,因此:type=ALL表示全表查询,possible_keys中是空的表示没有命中任何索引,rows是通过抽样统计出来的命中行数,并不准确,实际上该表有4510596行而不是4764239,最后一个Using where是额外信息,表示用where条件执行了全表查询。
第一行是外部查询,目的是查当天的所有记录,type=range表示范围查询,possible_keys和key都指明了这次查询使用的是主键,key_len=8表示这次使用的索引(主键)的物理长度。

显然问题主要出在子查询中,子查询是全表扫描,子查询扫描完毕后外部查询又以id为范围查了一次,这次是可以命中id主键的,优化方法就是给create_date加索引,子查询实际上也是冗余的,完全可以去掉。

ha网关怎么检查的超时、队列溢出,怎么看哪些接口出错了,怎么报警

超时的话是在每次重试时叠加的执行时间,然后在执行前查看是否达到了阈值,再报警;
队列溢出是通过一个AtomicLong实现的,和Semaphore同时操作,累加的是<hos_code, interface>,达到阈值后发报警。
报警通过falcon实现,发到微信里的。

为什么是Redis而不是Memcache

Memcache和Redis功能上是重复的,维护两套缓存系统会带来维护成本,Memcache在性能上相对Redis高一些,但是不支持复杂的数据结构、不支持持久化,虽然有高可用方案,但是是基于内存的,服务器挂掉数据就没了。
顺便主动介绍下Redis中的数据结构,和持久化、高可用方案。

另外,对于获取号源这样的功能,还提供本地缓存。

为什么用RocketMQ而不是QMQ或Kafka

看上面对迁移的介绍。

  1. 对RocketMQ更熟悉
  2. Kafka批量发,对于小量数据发送效率可能还没有RocketMQ高

为什么用ES而不是(只用)MySQL

首先需要强调的是,并不是所有订单都会建立ES索引,ES索引专门用于一些需要多维度查询的订单场景。
比如挂号前有很多限制规则,比如某个专家号在一周内一个用户只能挂出去3个号这种。
这样的规则是有很多的,如果要完全用MySQL的索引来实现,需要创建很多索引,且很多索引的长度都很长,会给MySQL带来比较大的负担。

使用Redis期间有没有遇到什么坑?

使用RocketMQ期间有没有遇到什么坑?

其他

平滑迁移的理论原理

迁移分类:

  • 停机迁移
  • 平滑迁移

平滑迁移的原则:从一个存储中间件转移到另一个存储中间件,期间不影响线上服务;

平滑迁移的常用方案:

  • 双写
  • 迁移历史数据
    这一步可能不是必须的,比如Redis对历史数据没有那么大的需求,只要双写的时候判断大部分数据都已经写入到了新Redis中即可。
  • 切读
  • 下双写

迁移后的比对:

  • 全量比对
  • 抽样比对

参考

  1. 库存系统设计
  2. 如何设计订单系统?