[聚合文章] 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试

消息系统 2017-12-05 13 阅读

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-schedule-and-retry/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版

������关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

建议前置阅读内容:

:smiling_imp: 为什么把 定时消息消息重试 放在一起?你猜。

:ghost: 你猜我猜不猜。

2. 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

下图是 定时消息 的处理逻辑图:

2.1 延迟级别

RocketMQ 目前只支持 固定精度 的定时消息。官方说法如下:

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

  • 延迟级别:
延迟级别 时间
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h
  • 核心源码如下:

     1: // :arrow_down::arrow_down::arrow_down:【MessageStoreConfig.java】
     2: /**
    3: * 消息延迟级别字符串配置
    4: */
     5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     6: 
     7: // :arrow_down::arrow_down::arrow_down:【ScheduleMessageService.java】
     8: /**
    9: * 解析延迟级别
    10: *
    11: * @return 是否解析成功
    12: */
    13: public boolean parseDelayLevel(){
    14: HashMap<String, Long> timeUnitTable = new HashMap<>();
    15: timeUnitTable.put("s", 1000L);
    16: timeUnitTable.put("m", 1000L * 60);
    17: timeUnitTable.put("h", 1000L * 60 * 60);
    18: timeUnitTable.put("d", 1000L * 60 * 60 * 24);
    19: 
    20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    21: try {
    22: String[] levelArray = levelString.split(" ");
    23: for (int i = 0; i < levelArray.length; i++) {
    24: String value = levelArray[i];
    25: String ch = value.substring(value.length() - 1);
    26: Long tu = timeUnitTable.get(ch);
    27: 
    28: int level = i + 1;
    29: if (level > this.maxDelayLevel) {
    30: this.maxDelayLevel = level;
    31: }
    32: long num = Long.parseLong(value.substring(0, value.length() - 1));
    33: long delayTimeMillis = tu * num;
    34: this.delayLevelTable.put(level, delayTimeMillis);
    35: }
    36: } catch (Exception e) {
    37: log.error("parseDelayLevel exception", e);
    38: log.info("levelString String = {}", levelString);
    39: return false;
    40: }
    41: 
    42: return true;
    43: }
    

2.2 Producer 发送定时消息

  • ��发送时,设置消息的 延迟级别
Message msg = new Message(...);
msg.setDelayTimeLevel(level);

2.3 Broker 存储定时消息

  • �� 存储消息时,延迟消息进入 TopicSCHEDULE_TOPIC_XXXX
  • �� 延迟级别 与 消息队列编号 做 固定映射:QueueId = DelayLevel - 1

核心代码如下:

 1: // :arrow_down::arrow_down::arrow_down:【CommitLog.java】
 2: /**
3: * 添加消息,返回消息结果
4: *
5: * @param msg 消息
6: * @return 结果
7: */
 8: public PutMessageResult putMessage(final MessageExtBrokerInner msg){
 9: // ....(省略代码)
10: 
11: // 定时消息处理
12: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
15: // Delay Delivery
16: if (msg.getDelayTimeLevel() > 0) {
17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
19: }
20: 
21: // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。
22: topic = ScheduleMessageService.SCHEDULE_TOPIC;
23: 
24: // 延迟级别 与 消息队列编号 做固定映射
25: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
26: 
27: // Backup real topic, queueId
28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
31: 
32: msg.setTopic(topic);
33: msg.setQueueId(queueId);
34: }
35: }
36: 
37: // ....(省略代码)
38: }
39: 
40: // :arrow_down::arrow_down::arrow_down:【ScheduleMessageService.java】
41: /**
42: * 根据 延迟级别 计算 消息队列编号
43: * QueueId = DelayLevel - 1
44: *
45: * @param delayLevel 延迟级别
46: * @return 消息队列编号
47: */
48: public static int delayLevel2QueueId(final int delayLevel){
49: return delayLevel - 1;
50: }
  • �� 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。这样, ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。

核心代码如下:

 1: // :arrow_down::arrow_down::arrow_down:【CommitLog.java】
 2: /**
3: * check the message and returns the message size
4: *
5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
6: */
 7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody){
 8: try {
 9: // // ....(省略代码)
10: 
11: // 17 properties
12: short propertiesLength = byteBuffer.getShort();
13: if (propertiesLength > 0) {
14: // ....(省略代码)
15: String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
16: if (tags != null && tags.length() > 0) {
17: tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
18: }
19: 
20: // Timing message processing
21: {
22: String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
23: if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
24: int delayLevel = Integer.parseInt(t);
25: 
26: if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
27: delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
28: }
29: 
30: if (delayLevel > 0) {
31: tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
32: storeTimestamp);
33: }
34: }
35: }
36: }
37: 
38: // ....(省略代码)
39: 
40: return new DispatchRequest(//
41: topic, // 1
42: queueId, // 2
43: physicOffset, // 3
44: totalSize, // 4
45: tagsCode, // 5
46: storeTimestamp, // 6
47: queueOffset, // 7
48: keys, // 8
49: uniqKey, //9
50: sysFlag, // 9
51: preparedTransactionOffset// 10
52: );
53: } catch (Exception e) {
54: }
55: 
56: return new DispatchRequest(-1, false /* success */);
57: }
58: 
59: // :arrow_down::arrow_down::arrow_down:【ScheduleMessageService.java】
60: /**
61: * 计算 投递时间【计划消费时间】
62: *
63: * @param delayLevel 延迟级别
64: * @param storeTimestamp 存储时间
65: * @return 投递时间【计划消费时间】
66: */
67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp){
68: Long time = this.delayLevelTable.get(delayLevel);
69: if (time != null) {
70: return time + storeTimestamp;
71: }
72: 
73: return storeTimestamp + 1000;
74: }

2.4 Broker 发送定时消息

  • �� 对 SCHEDULE_TOPIC_XXXX 每条消费队列对应 单独一个 定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。

下图是发送定时消息的处理逻辑图:

实现代码如下:

 1: /**
2: * :arrow_down::arrow_down::arrow_down: 发送(投递)延迟消息定时任务
3: */
 4: class DeliverDelayedMessageTimerTask extends TimerTask{
 5: /**
6: * 延迟级别
7: */
 8: private final int delayLevel;
 9: /**
10: * 位置
11: */
 12: private final long offset;
 13: 
 14: public DeliverDelayedMessageTimerTask(int delayLevel, long offset){
 15: this.delayLevel = delayLevel;
 16: this.offset = offset;
 17: }
 18: 
 19: @Override
 20: public void run(){
 21: try {
 22: this.executeOnTimeup();
 23: } catch (Exception e) {
 24: // XXX: warn and notify me
 25: log.error("ScheduleMessageService, executeOnTimeup exception", e);
 26: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
 27: this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
 28: }
 29: }
 30: 
 31: /**
32: * 纠正可投递时间。
33: * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
34: *
35: * @param now 当前时间
36: * @param deliverTimestamp 投递时间
37: * @return 纠正结果
38: */
 39: private long correctDeliverTimestamp(final long now, final long deliverTimestamp){
 40: long result = deliverTimestamp;
 41: 
 42: long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
 43: if (deliverTimestamp > maxTimestamp) {
 44: result = now;
 45: }
 46: 
 47: return result;
 48: }
 49: 
 50: public void executeOnTimeup(){
 51: ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
 52: 
 53: long failScheduleOffset = offset;
 54: 
 55: if (cq != null) {
 56: SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 57: if (bufferCQ != null) {
 58: try {
 59: long nextOffset = offset;
 60: int i = 0;
 61: for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
 62: long offsetPy = bufferCQ.getByteBuffer().getLong();
 63: int sizePy = bufferCQ.getByteBuffer().getInt();
 64: long tagsCode = bufferCQ.getByteBuffer().getLong();
 65: 
 66: long now = System.currentTimeMillis();
 67: long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 68: 
 69: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 70: 
 71: long countdown = deliverTimestamp - now;
 72: 
 73: if (countdown <= 0) { // 消息到达可发送时间
 74: MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
 75: if (msgExt != null) {
 76: try {
 77: // 发送消息
 78: MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
 79: PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
 80: if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
 81: continue;
 82: } else { // 发送失败
 83: // XXX: warn and notify me
 84: log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
 85: 
 86: // 安排下一次任务
 87: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
 88: 
 89: // 更新进度
 90: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
 91: return;
 92: }
 93: } catch (Exception e) {
 94: // XXX: warn and notify me
 95: log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
 96: + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
 97: }
 98: }
 99: } else {
100: // 安排下一次任务
101: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102: 
103: // 更新进度
104: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105: return;
106: }
107: } // end of for
108: 
109: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110: 
111: // 安排下一次任务
112: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113: 
114: // 更新进度
115: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116: return;
117: } finally {
118: bufferCQ.release();
119: }
120: } // end of if (bufferCQ != null)
121: else { // 消费队列已经被删除部分,跳转到最小的消费进度
122: long cqMinOffset = cq.getMinOffsetInQueue();
123: if (offset < cqMinOffset) {
124: failScheduleOffset = cqMinOffset;
125: log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126: + cqMinOffset + ", queueId=" + cq.getQueueId());
127: }
128: }
129: } // end of if (cq != null)
130: 
131: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132: }
133: 
134: /**
135: * 设置消息内容
136: *
137: * @param msgExt 消息
138: * @return 消息
139: */
140: private MessageExtBrokerInner messageTimeup(MessageExt msgExt){
141: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142: msgInner.setBody(msgExt.getBody());
143: msgInner.setFlag(msgExt.getFlag());
144: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145: 
146: TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147: long tagsCodeValue =
148: MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149: msgInner.setTagsCode(tagsCodeValue);
150: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151: 
152: msgInner.setSysFlag(msgExt.getSysFlag());
153: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154: msgInner.setBornHost(msgExt.getBornHost());
155: msgInner.setStoreHost(msgExt.getStoreHost());
156: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157: 
158: msgInner.setWaitStoreMsgOK(false);
159: MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160: 
161: msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162: 
163: String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164: int queueId = Integer.parseInt(queueIdStr);
165: msgInner.setQueueId(queueId);
166: 
167: return msgInner;
168: }
169: }

2.5 Broker 持久化定时发送进度

  • �� 定时消息发送进度存储在文件( ../config/delayOffset.json )里
  • �� 每 10s 定时持久化发送进度。

核心代码如下:

 1: // :arrow_down::arrow_down::arrow_down:【ScheduleMessageService.java】
 2: /**
3: public void start() {
4: // 定时发送消息
5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
6: Integer level = entry.getKey();
7: Long timeDelay = entry.getValue();
8: Long offset = this.offsetTable.get(level);
9: if (null == offset) {
10: offset = 0L;
11: }
12:
13: if (timeDelay != null) {
14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
15: }
16: }
17:
18: // 定时持久化发送进度
19: this.timer.scheduleAtFixedRate(new TimerTask() {
20:
21: @Override
22: public void run() {
23: try {
24: ScheduleMessageService.this.persist();
25: } catch (Exception e) {
26: log.error("scheduleAtFixedRate flush exception", e);
27: }
28: }
29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
30: }

3. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

  • �� Consumer 将消费失败的消息发回 Broker ,进入 延迟消息队列 。即,消费失败的消息,不会立即消费。

核心代码如下:

 1: // :arrow_down::arrow_down::arrow_down:【SendMessageProcessor.java】
 2: /**
3: * 消费者发回消息
4: *
5: * @param ctx ctx
6: * @param request 请求
7: * @return 响应
8: * @throws RemotingCommandException 当远程调用异常
9: */
10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
11: throws RemotingCommandException{
12: // ....(省略代码)
13: // 处理 delayLevel(独有)。
14: int delayLevel = requestHeader.getDelayLevel();
15: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
16: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
17: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
18: }
19: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
20: // ....(省略代码)
21: } else {
22: if (0 == delayLevel) {
23: delayLevel = 3 + msgExt.getReconsumeTimes();
24: }
25: msgExt.setDelayTimeLevel(delayLevel);
26: }
27: 
28: // ....(省略代码)
29: return response;
30: }

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。