RocketMQ 的应用

为什么使用RocketMQ

使用MQ的优点

  1. 解耦
    直连的情况下,每接入一个系统就需要直接改代码调新系统接口;
    使用 RocketMQ 的情况下,新系统可以自己监听消息。
  2. 异步
    非必要的业务逻辑异步处理,加快响应速度。
  3. 削峰
    直连的情况下,所有请求会直接全部打到下游服务,引入消息队列后,消息队列可以暂存消息,让下游服务慢慢消费;

使用 MQ 的缺点

系统可用性降低:消息队列挂掉将影响系统可用性。
系统复杂性增加:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,系统复杂性增大。

使用 RocketMQ

搭建 Name Server 和 Broker Server(单点)

看下这个
为了方便,需要设置 Broker 服务器的一些属性,在 broker.conf 中:

1
2
3
4
# 允许使用SQL语法过滤消息
enablePropertyFilter=true
# 自动创建Topic,否则使用新Topic时还需要在控制台创建
autoCreateTopicEnable=true

对 Broker 服务器的启动可以使用下面的命令:

1
start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf &

如果后续生产消息时报错:maybe your broker machine memory too small
出现这个问题的主要原因是磁盘空间不够了,而我的电脑磁盘一直都是稀缺资源,所以就出问题了。
可以修改 runbroker.sh,在里面增加一句话即可: JAVA_OPT=”${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98” 我这里把磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息。

搭建RocketMQ Console

控制台,方便创建Topic、统计消息、查看集群状态等。
看下这个

多主多从(2主多从)

同上下载源码并编译完成后,第一步是创建配置文件,我计划将所有主从放到同一台服务器上,因此在本地创建所有配置文件和数据存储目录:
1、创建数据存储目录

1
2
mkdir -p /home/hgc/Documents/project/open/rocketmq/store1/{rootdir-a-m,commitlog-a-m,rootdir-a-s,commitlog-a-s}
mkdir -p /home/hgc/Documents/project/open/rocketmq/store2/{rootdir-b-m,commitlog-b-m,rootdir-b-s,commitlog-b-s}

2、配置环境变量
/etc/profile文件中加入:

1
2
3
# RocketMQ
export ROCKETMQ_HOME=/home/hgc/Documents/project/open/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin

3、创建配置文件
/home/hgc/Documents/project/open/rocketmq/conf.1.m/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
brokerClusterName = DefaultCluster
brokerName = rocketmq-nameserver-1
#brokerId 0 表示 Master,>0 表示 Slave
brokerId=0
# Broker 对外服务的监听端口
listenPort=10911
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=72
#Broker role有3种:SYNC_MASTER、ASYNC_MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制,SYNC即同步更新,指当Slave和Master消息同步完成后,再返回发送成功的状态。ASYNC即异步更新,master与slave有短暂消息延迟,毫> 秒级。本文在此使用了异步复制集群模式,线上环境推荐使用同步双写模式,即SYNC_MASTER
brokerRole=SYNC_MASTER
# 刷盘方式 ASYNC_FLUSH 异步刷盘
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store1/rootdir-a-m
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store1/commitlog-a-m
# 是否允许 Broker 自动创建Topic
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup=true

/home/hgc/Documents/project/open/rocketmq/conf.2.m/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
brokerName=rocketmq-nameserver-2
listenPort=10921
namesrvAddr=rocketmq-nameserver-1:9876
brokerId=0
deleteWhen=04
fileReservedTime=72
brokerRole=SYNC_MASTER
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store2/rootdir-b-m
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store2/commitlog-b-m
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

/home/hgc/Documents/project/open/rocketmq/conf.1.s/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
brokerClusterName = DefaultCluster
brokerName = rocketmq-nameserver-1
brokerId=1
listenPort=10917
namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store1/rootdir-a-s
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store1/commitlog-a-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true

/home/hgc/Documents/project/open/rocketmq/conf.2.s/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
brokerName=rocketmq-nameserver-2
listenPort=10927
namesrvAddr=127.0.0.1:9876
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store2/rootdir-b-s
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store2/commitlog-b-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

4、启动NameServer
启动前先修改启动脚本,因为是单机使用,把堆大小稍微改小点:

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1
nohup sh bin/mqnamesrv &

5、启动Broker
同理改下启动脚本:

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"

按master - slave的顺序启动每个Broker

1
2
3
4
nohup sh bin/mqbroker -c conf.1.m/broker.conf > conf.1.m/logs/broker.1.m.log &
nohup sh bin/mqbroker -c conf.2.m/broker.conf > conf.2.m/logs/broker.2.m.log &
nohup sh bin/mqbroker -c conf.1.s/broker.conf > conf.1.s/logs/broker.1.s.log &
nohup sh bin/mqbroker -c conf.2.s/broker.conf > conf.2.s/logs/broker.2.s.log &

6、查看注册情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 看看集群注册情况
./mqadmin clusterList -n 127.0.0.1:9876
# 创建一个Topic
./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t TopicTest
# 看看Topic列表
./mqadmin topicList -n 127.0.0.1:9876
# 看看Topic路由信息
./mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest
# 查看Topic统计信息
mqadmin topicStatus -n 127.0.0.1:9876 -t TopicTest
# 根据消息key查询消息
./mqadmin queryMsgByKey -n "127.0.0.1:9876" -t TopicTest -k messageKey
# 根据消息ID查询消息
./mqadmin queryMsgById -n "127.0.0.1:9876" -t TopicTest -i 240882208B134140BD0FB45F5DBD2075000018B4AAC2130BE69B0001
# 查询Producer的网络连接
./mqadmin producerConnection -n 127.0.0.1:9876 -g ExampleProducerGroup -t TopicTest
# 查询Consumer的网络连接
./mqadmin consumerConnection -n 127.0.0.1:9876 -g ExampleConsumerGroup -t TopicTest
# 查看订阅组的消费状态
consumerProgress
# 添加KV配置信息
updateKvConfig
# 删除KV配置信息
deleteKvConfig
# 添加project group配置信息
updateProjectGroup
# 删除project group配置信息
deleteProjectGroup
# 获取product group配置信息
getProjectGroup
# 设置消费进度
resetOffsetByTime
# 清除特定Broker权限
wipeWritePerm
# 获取Consumer消费进度
getConsumerStatus

7、使用客户端测试

8、关闭服务器

1
2
sh mqshutdown namesrv
sh mqshutdown broker

主从自动切换(DLedger)

  • enableDLegerCommitLog
    是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true 。
  • dLegerGroup
    节点所属的 raft 组,建议与 brokerName 保持一致,例如 broker-a。
  • dLegerPeers
    集群节点信息,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循 legerSlefId-ip:端口,这里的端口用作 dledger 内部通信。
  • dLegerSelfId
    当前节点id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字。
  • storePathRootDir
    DLedger 日志文件的存储根目录,为了能够支持平滑升级,该值与 storePathCommitLog 设置为不同的目录。

第一个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store1/
# 与dledger相关的属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n0

第二个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10921
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store2/
# 与dledger 相关的配置属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n1

第三个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10931
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store3/
# 与dledger 相关的配置属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n2

对于DLedger集群来说,一方面要保证消息仍能正常收发,另一方面还要保证Master挂掉后集群会选举出一个新的Master,可以将Master kill掉来模拟这种情况,在rocketmq-console中查看节点角色的变化情况。

客户端配置

消费模式

有以下3种:

  1. CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息
  2. CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
  3. CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

其他概念

msgId:根据 queueNum 取模放到 broker 队列
instanceName:每个集群不一致,同一集群 groupName 不能重,两个 topic 的 groupName 不能一样
两种模式本质都是拉:push 是构建长连接,一次拉完,一个一个给 broker 回执 ack,多个 Consumer 平分(必须是 4 的倍数,不够再给余数,比如 9 个消息三个 Consumer 分别拉到 4、4、1),poll 模式批量,按设定拉取

消息的生成和消费

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>

消息生产:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class ProducerTest {

/**
* 同步消息
*/
@Test
public void sendSyncMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 异步消息
*/
@Test
public void sendAsyncMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 单向消息
*/
@Test
public void testSendOnewayMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConsumerTest {

public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

顺序消息

发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class OrderedProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = new OrderedProducer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}, orderList.get(i).getOrderId());//订单id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}

/**
* 订单的步骤
*/
private static class OrderStep {

private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

return orderList;
}
}

接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class OrderedConsumer {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + " queueId=" + msg.getQueueId() + ", topic:" + msg.getTopic() + " content:" + new String(msg.getBody()));
}

try {
//模拟业务逻辑处理中...
// TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

延时消息

延时消息可以用于一些需要延迟处理业务的场景,比如下单后超过一定时间没支付就自动取消。
发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ScheduledProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}

接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ScheduledConsumer {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。
发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BatchProducer {

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

接收逻辑并无不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class BatchConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

注意我们上边提到的 4MB 的大小限制,如果消息列表中的消息变多了,很有可能会超过这个大小,这时最好对消息列表进行分割再发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BatchProducer {

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
splitSend(producer, messages);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

static void splitSend(MQProducer producer, List<Message> messages) {
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ListSplitter implements Iterator<List<Message>> {

private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;

public ListSplitter(List<Message> messages) {
this.messages = messages;
}

@Override
public boolean hasNext() {
return currIndex < messages.size();
}

@Override
public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}

private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while (tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}

private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}

消息订阅

RocketMQ 支持用 SQL 语法的语句来“模糊”订阅消息,只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,接口如下:

1
public void subscribe(finalString topic, final MessageSelector messageSelector)
  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:’abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TagProducer {

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", "2");
SendResult sendResult = producer.send(msg);
producer.shutdown();
}
}

消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TagConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("localhost:9876");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

事务消息

事务消息有3种状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

发送事务消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
// 事务监听
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

实现事务监听接口,在发送半消息成功后会触发executeLocalTransaction方法来执行本地事务,checkLocalTransaction用于检查本地事务状态,并回应消息队列的检查请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

事务消息使用上的限制:

  • 事务消息不支持延时消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  • 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  • 事务性消息可能不止一次被检查或消费。
  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。

RocketMQ的运维