查看原文
其他

分布式消息队列 RocketMQ 源码分析 —— Message 顺序发送与消费

2017-12-26 芋艿 芋道源码

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. Producer 顺序发送

  • 3. Consumer 严格顺序消费

  • 3.1 获得(锁定)消息队列

  • 3.2 移除消息队列

  • 3.3 消费消息队列

    • 3.1.1 消费消息

    • 3.1.2 处理消费结果

    • 3.13 消息处理队列核心方法


友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。


1. 概述

建议前置阅读内容:

  • 《RocketMQ 源码分析 —— Message 发送与接收》

  • 《RocketMQ 源码分析 —— Message 拉取与消费(下)》

当然对 Message 发送与消费已经有一定了解的同学,可以选择跳过。


RocketMQ 提供了两种顺序级别:

  • 普通顺序消息 : Producer 将相关联的消息发送到相同的消息队列。

  • 完全严格顺序 :在 普通顺序消息 的基础上, Consumer 严格顺序消费。

绝大部分场景下只需要用到普通顺序消息
例如说:给用户发送短信消息 + 发送推送消息,将两条消息发送到不同的消息队列,若其中一条消息队列消费较慢造成堵塞,用户可能会收到两条消息会存在一定的时间差,带来的体验会相对较差。当然类似这种场景,即使有一定的时间差,不会产生系统逻辑上BUG。另外, 普通顺序消息性能能更加好。
那么什么时候使用使用完全严格顺序?如下是来自官方文档的说明:

目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息


😈上代码!!!

2. Producer 顺序发送

官方发送顺序消息的例子

  1.  1: package org.apache.rocketmq.example.ordermessage;

  2.  2:

  3.  3: import java.io.UnsupportedEncodingException;

  4.  4: import java.util.List;

  5.  5: import org.apache.rocketmq.client.exception.MQBrokerException;

  6.  6: import org.apache.rocketmq.client.exception.MQClientException;

  7.  7: import org.apache.rocketmq.client.producer.DefaultMQProducer;

  8.  8: import org.apache.rocketmq.client.producer.MQProducer;

  9.  9: import org.apache.rocketmq.client.producer.MessageQueueSelector;

  10. 10: import org.apache.rocketmq.client.producer.SendResult;

  11. 11: import org.apache.rocketmq.common.message.Message;

  12. 12: import org.apache.rocketmq.common.message.MessageQueue;

  13. 13: import org.apache.rocketmq.remoting.common.RemotingHelper;

  14. 14: import org.apache.rocketmq.remoting.exception.RemotingException;

  15. 15:

  16. 16: public class Producer {

  17. 17:     public static void main(String[] args) throws UnsupportedEncodingException {

  18. 18:         try {

  19. 19:             MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

  20. 20:             producer.start();

  21. 21:

  22. 22:             String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

  23. 23:             for (int i = 0; i < 100; i++) {

  24. 24:                 int orderId = i % 10;

  25. 25:                 Message msg =

  26. 26:                     new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,

  27. 27:                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

  28. 28:                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  29. 29:                     @Override

  30. 30:                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  31. 31:                         Integer id = (Integer) arg;

  32. 32:                         int index = id % mqs.size();

  33. 33:                         return mqs.get(index);

  34. 34:                     }

  35. 35:                 }, orderId);

  36. 36:

  37. 37:                 System.out.printf("%s%n", sendResult);

  38. 38:             }

  39. 39:

  40. 40:             producer.shutdown();

  41. 41:         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {

  42. 42:             e.printStackTrace();

  43. 43:         }

  44. 44:     }

  45. 45: }

  • 第 28 至 35 行 :实现了根据 id%mqs.size() 来进行消息队列的选择。当前例子,我们传递 orderId 作为参数,那么相同的 orderId 能够进入相同的消息队列


MessageQueueSelector 接口的源码

  1.  1: public interface MessageQueueSelector {

  2.  2:

  3.  3:     /**

  4.  4:      * 选择消息队列

  5.  5:      *

  6.  6:      * @param mqs 消息队列

  7.  7:      * @param msg 消息

  8.  8:      * @param arg 参数

  9.  9:      * @return 消息队列

  10. 10:      */

  11. 11:     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

  12. 12: }


Producer 选择队列发送消息方法的源码

  1. 16: private SendResult sendSelectImpl(//

  2. 17:     Message msg, //

  3. 18:     MessageQueueSelector selector, //

  4. 19:     Object arg, //

  5. 20:     final CommunicationMode communicationMode, //

  6. 21:     final SendCallback sendCallback, final long timeout//

  7. 22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

  8. 23:     this.makeSureStateOK();

  9. 24:     Validators.checkMessage(msg, this.defaultMQProducer);

  10. 25:

  11. 26:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

  12. 27:     if (topicPublishInfo != null && topicPublishInfo.ok()) {

  13. 28:         MessageQueue mq = null;

  14. 29:         try {

  15. 30:             mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);

  16. 31:         } catch (Throwable e) {

  17. 32:             throw new MQClientException("select message queue throwed exception.", e);

  18. 33:         }

  19. 34:

  20. 35:         if (mq != null) {

  21. 36:             return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);

  22. 37:         } else {

  23. 38:             throw new MQClientException("select message queue return null.", null);

  24. 39:         }

  25. 40:     }

  26. 41:

  27. 42:     throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);

  28. 43: }

  • 第 30 行 :选择消息队列。

  • 第 36 行 :发送消息。

3. Consumer 严格顺序消费

Consumer 在严格顺序消费时,通过 把锁保证严格顺序消费。

  • Broker 消息队列锁(分布式锁) :

    • 集群模式下, Consumer 从 Broker 获得该锁后,才能进行消息拉取、消费。

    • 广播模式下, Consumer 无需该锁。

  • Consumer 消息队列锁(本地锁) : Consumer 获得该锁才能操作消息队列。

  • Consumer 消息处理队列消费锁(本地锁) : Consumer 获得该锁才能消费消息队列。

可能同学有疑问,为什么有 Consumer 消息队列锁还需要有 Consumer 消息队列消费锁呢?😈让我们带着疑问继续往下看。


3.1 获得(锁定)消息队列

集群模式下, Consumer 更新属于自己的消息队列时,会向 Broker 锁定该消息队列(广播模式下不需要)。如果锁定失败,则更新失败,即该消息队列不属于自己,不能进行消费。核心代码如下:

  1.  1: // ⬇️⬇️⬇️【RebalanceImpl.java】

  2.  2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {

  3.  3: // ..... 此处省略部分代码

  4.  4:     // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。

  5.  5:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组

  6.  6:     for (MessageQueue mq : mqSet) {

  7.  7:         if (!this.processQueueTable.containsKey(mq)) {

  8.  8:             if (isOrder && !this.lock(mq)) { // 顺序消息锁定消息队列

  9.  9:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);

  10. 10:                 continue;

  11. 11:             }

  12. 12:

  13. 13:             this.removeDirtyOffset(mq);

  14. 14:             ProcessQueue pq = new ProcessQueue();

  15. 15:             long nextOffset = this.computePullFromWhere(mq);

  16. 16:             if (nextOffset >= 0) {

  17. 17:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);

  18. 18:                 if (pre != null) {

  19. 19:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);

  20. 20:                 } else {

  21. 21:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);

  22. 22:                     PullRequest pullRequest = new PullRequest();

  23. 23:                     pullRequest.setConsumerGroup(consumerGroup);

  24. 24:                     pullRequest.setNextOffset(nextOffset);

  25. 25:                     pullRequest.setMessageQueue(mq);

  26. 26:                     pullRequest.setProcessQueue(pq);

  27. 27:                     pullRequestList.add(pullRequest);

  28. 28:                     changed = true;

  29. 29:                 }

  30. 30:             } else {

  31. 31:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);

  32. 32:             }

  33. 33:         }

  34. 34:     }

  35. 35:

  36. 36: // ..... 此处省略部分代码

  37. 37: }

  38. 38:

  39. 39: // ⬇️⬇️⬇️【RebalanceImpl.java】

  40. 40: /**

  41. 41:  * 请求Broker获得指定消息队列的分布式锁

  42. 42:  *

  43. 43:  * @param mq 队列

  44. 44:  * @return 是否成功

  45. 45:  */

  46. 46: public boolean lock(final MessageQueue mq) {

  47. 47:     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);

  48. 48:     if (findBrokerResult != null) {

  49. 49:         LockBatchRequestBody requestBody = new LockBatchRequestBody();

  50. 50:         requestBody.setConsumerGroup(this.consumerGroup);

  51. 51:         requestBody.setClientId(this.mQClientFactory.getClientId());

  52. 52:         requestBody.getMqSet().add(mq);

  53. 53:

  54. 54:         try {

  55. 55:             // 请求Broker获得指定消息队列的分布式锁

  56. 56:             Set<MessageQueue> lockedMq =

  57. 57:                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

  58. 58:

  59. 59:             // 设置消息处理队列锁定成功。锁定消息队列成功,可能本地没有消息处理队列,设置锁定成功会在lockAll()方法。

  60. 60:             for (MessageQueue mmqq : lockedMq) {

  61. 61:                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);

  62. 62:                 if (processQueue != null) {

  63. 63:                     processQueue.setLocked(true);

  64. 64:                     processQueue.setLastLockTimestamp(System.currentTimeMillis());

  65. 65:                 }

  66. 66:             }

  67. 67:

  68. 68:             boolean lockOK = lockedMq.contains(mq);

  69. 69:             log.info("the message queue lock {}, {} {}",

  70. 70:                 lockOK ? "OK" : "Failed",

  71. 71:                 this.consumerGroup,

  72. 72:                 mq);

  73. 73:             return lockOK;

  74. 74:         } catch (Exception e) {

  75. 75:             log.error("lockBatchMQ exception, " + mq, e);

  76. 76:         }

  77. 77:     }

  78. 78:

  79. 79:     return false;

  80. 80: }

  • ⬆️⬆️⬆️

  • 第 8 至 11 行 :顺序消费时,锁定消息队列。如果锁定失败,新增消息处理队列失败。


Broker 消息队列锁会过期,默认配置 30s。因此, Consumer 需要不断向 Broker 刷新该锁过期时间,默认配置 20s 刷新一次。核心代码如下:

  1.  1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】

  2.  2: public void start() {

  3.  3:     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {

  4.  4:         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  5.  5:             @Override

  6.  6:             public void run() {

  7.  7:                 ConsumeMessageOrderlyService.this.lockMQPeriodically();

  8.  8:             }

  9.  9:         }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);

  10. 10:     }

  11. 11: }

3.2 移除消息队列

集群模式下, Consumer 移除自己的消息队列时,会向 Broker 解锁该消息队列(广播模式下不需要)。核心代码如下:

  1.  1: // ⬇️⬇️⬇️【RebalancePushImpl.java】

  2.  2: /**

  3.  3:  * 移除不需要的队列相关的信息

  4.  4:  * 1. 持久化消费进度,并移除之

  5.  5:  * 2. 顺序消费&集群模式,解锁对该队列的锁定

  6.  6:  *

  7.  7:  * @param mq 消息队列

  8.  8:  * @param pq 消息处理队列

  9.  9:  * @return 是否移除成功

  10. 10:  */

  11. 11: @Override

  12. 12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {

  13. 13:     // 同步队列的消费进度,并移除之。

  14. 14:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);

  15. 15:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);

  16. 16:     // 集群模式下,顺序消费移除时,解锁对队列的锁定

  17. 17:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()

  18. 18:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {

  19. 19:         try {

  20. 20:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {

  21. 21:                 try {

  22. 22:                     return this.unlockDelay(mq, pq);

  23. 23:                 } finally {

  24. 24:                     pq.getLockConsume().unlock();

  25. 25:                 }

  26. 26:             } else {

  27. 27:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //

  28. 28:                     mq, //

  29. 29:                     pq.getTryUnlockTimes());

  30. 30:

  31. 31:                 pq.incTryUnlockTimes();

  32. 32:             }

  33. 33:         } catch (Exception e) {

  34. 34:             log.error("removeUnnecessaryMessageQueue Exception", e);

  35. 35:         }

  36. 36:

  37. 37:         return false;

  38. 38:     }

  39. 39:     return true;

  40. 40: }

  41. 41:

  42. 42: // ⬇️⬇️⬇️【RebalancePushImpl.java】

  43. 43: /**

  44. 44:  * 延迟解锁 Broker 消息队列锁

  45. 45:  * 当消息处理队列不存在消息,则直接解锁

  46. 46:  *

  47. 47:  * @param mq 消息队列

  48. 48:  * @param pq 消息处理队列

  49. 49:  * @return 是否解锁成功

  50. 50:  */

  51. 51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {

  52. 52:     if (pq.hasTempMessage()) { // TODO 疑问:为什么要延迟移除

  53. 53:         log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);

  54. 54:         this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {

  55. 55:             @Override

  56. 56:             public void run() {

  57. 57:                 log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);

  58. 58:                 RebalancePushImpl.this.unlock(mq, true);

  59. 59:             }

  60. 60:         }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);

  61. 61:     } else {

  62. 62:         this.unlock(mq, true);

  63. 63:     }

  64. 64:     return true;

  65. 65: }

  • ⬆️⬆️⬆️

  • 第 20 至 32 行 :获取消息队列消费锁,避免和消息队列消费冲突。如果获取锁失败,则移除消息队列失败,等待下次重新分配消费队列时,再进行移除。如果未获得锁而进行移除,则可能出现另外的 Consumer 和当前 Consumer 同时消费该消息队列,导致消息无法严格顺序消费。

  • 第 51 至 64 行 :解锁 Broker 消息队列锁。如果消息处理队列存在剩余消息,则延迟解锁 Broker 消息队列锁。❓为什么消息处理队列存在剩余消息不能直接解锁呢?😈我也不知道,百思不得其解。如果有知道的同学麻烦教育下俺。

3.3 消费消息队列

😏本节会类比并发消费消费队列,建议对照 PushConsumer并发消费消息 一起理解。

3.1.1 消费消息


  1.  1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】

  2.  2: class ConsumeRequest implements Runnable {

  3.  3:

  4.  4:     /**

  5.  5:      * 消息处理队列

  6.  6:      */

  7.  7:     private final ProcessQueue processQueue;

  8.  8:     /**

  9.  9:      * 消息队列

  10. 10:      */

  11. 11:     private final MessageQueue messageQueue;

  12. 12:

  13. 13:     @Override

  14. 14:     public void run() {

  15. 15:         if (this.processQueue.isDropped()) {

  16. 16:             log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);

  17. 17:             return;

  18. 18:         }

  19. 19:

  20. 20:         // 获得 Consumer 消息队列锁

  21. 21:         final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);

  22. 22:         synchronized (objLock) {

  23. 23:             // (广播模式) 或者 (集群模式 && Broker消息队列锁有效)

  24. 24:             if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

  25. 25:                 || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {

  26. 26:                 final long beginTime = System.currentTimeMillis();

  27. 27:                 // 循环

  28. 28:                 for (boolean continueConsume = true; continueConsume; ) {

  29. 29:                     if (this.processQueue.isDropped()) {

  30. 30:                         log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);

  31. 31:                         break;

  32. 32:                     }

  33. 33:

  34. 34:                     // 消息队列分布式锁未锁定,提交延迟获得锁并消费请求

  35. 35:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

  36. 36:                         && !this.processQueue.isLocked()) {

  37. 37:                         log.warn("the message queue not locked, so consume later, {}", this.messageQueue);

  38. 38:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);

  39. 39:                         break;

  40. 40:                     }

  41. 41:                     // 消息队列分布式锁已经过期,提交延迟获得锁并消费请求

  42. 42:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

  43. 43:                         && this.processQueue.isLockExpired()) {

  44. 44:                         log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);

  45. 45:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);

  46. 46:                         break;

  47. 47:                     }

  48. 48:

  49. 49:                     // 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。

  50. 50:                     long interval = System.currentTimeMillis() - beginTime;

  51. 51:                     if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {

  52. 52:                         ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);

  53. 53:                         break;

  54. 54:                     }

  55. 55:

  56. 56:                     // 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。

  57. 57:                     final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

  58. 58:                     List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);

  59. 59:                     if (!msgs.isEmpty()) {

  60. 60:                         final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

  61. 61:

  62. 62:                         ConsumeOrderlyStatus status = null;

  63. 63:

  64. 64:                         // ....省略代码:Hook:before

  65. 65:

  66. 66:                         // 执行消费

  67. 67:                         long beginTimestamp = System.currentTimeMillis();

  68. 68:                         ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

  69. 69:                         boolean hasException = false;

  70. 70:                         try {

  71. 71:                             this.processQueue.getLockConsume().lock(); // 锁定队列消费锁

  72. 72:

  73. 73:                             if (this.processQueue.isDropped()) {

  74. 74:                                 log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",

  75. 75:                                     this.messageQueue);

  76. 76:                                 break;

  77. 77:                             }

  78. 78:

  79. 79:                             status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);

  80. 80:                         } catch (Throwable e) {

  81. 81:                             log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //

  82. 82:                                 RemotingHelper.exceptionSimpleDesc(e), //

  83. 83:                                 ConsumeMessageOrderlyService.this.consumerGroup, //

  84. 84:                                 msgs, //

  85. 85:                                 messageQueue);

  86. 86:                             hasException = true;

  87. 87:                         } finally {

  88. 88:                             this.processQueue.getLockConsume().unlock(); // 锁定队列消费锁

  89. 89:                         }

  90. 90:

  91. 91:                         // ....省略代码:解析消费结果状态

  92. 92:

  93. 93:                         // ....省略代码:Hook:after

  94. 94:

  95. 95:                         ConsumeMessageOrderlyService.this.getConsumerStatsManager()

  96. 96:                             .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

  97. 97:

  98. 98:                         // 处理消费结果

  99. 99:                         continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);

  100. 100:                     } else {

  101. 101:                         continueConsume = false;

  102. 102:                     }

  103. 103:                 }

  104. 104:             } else {

  105. 105:                 if (this.processQueue.isDropped()) {

  106. 106:                     log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);

  107. 107:                     return;

  108. 108:                 }

  109. 109:

  110. 110:                 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);

  111. 111:             }

  112. 112:         }

  113. 113:     }

  114. 114:

  115. 115: }

  • ⬆️⬆️⬆️

  • 第 20 行 :获得 Consumer 消息队列锁。

  • 第 58 行 :从消息处理队列顺序获得消息。和并发消费获得消息不同。并发消费请求在请求创建时,已经设置好消费哪些消息。

  • 第 71 行 :获得 Consumer 消息处理队列消费锁。相比【 Consumer消息队列锁】,其粒度较小。这就是上文提到的❓为什么有 Consumer消息队列锁还需要有 Consumer 消息队列消费锁呢的原因。

  • 第 79 行 :执行消费

  • 第 99 行 :处理消费结果。

3.1.2 处理消费结果

顺序消费消息结果 ( ConsumeOrderlyStatus) 有四种情况:

  • SUCCESS :消费成功但不提交

  • ROLLBACK :消费失败,消费回滚。

  • COMMIT :消费成功提交并且提交。

  • SUSPEND_CURRENT_QUEUE_A_MOMENT :消费失败,挂起消费队列一会会,稍后继续消费。

考虑到 ROLLBACKCOMMIT 暂时只使用在 MySQLbinlog 场景,官方将这两状态标记为 @Deprecated。当然,相应的实现逻辑依然保留。

并发消费场景时,如果消费失败, Consumer 会将消费失败消息发回到 Broker 重试队列,跳过当前消息,等待下次拉取该消息再进行消费。

但是在完全严格顺序消费消费时,这样做显然不行。也因此,消费失败的消息,会挂起队列一会会,稍后继续消费。

不过消费失败的消息一直失败,也不可能一直消费。当超过消费重试上限时, Consumer 会将消费失败超过上限的消息发回到 Broker 死信队列。

让我们来看看代码:

  1.  1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】

  2.  2: /**

  3.  3:  * 处理消费结果,并返回是否继续消费

  4.  4:  *

  5.  5:  * @param msgs 消息

  6.  6:  * @param status 消费结果状态

  7.  7:  * @param context 消费Context

  8.  8:  * @param consumeRequest 消费请求

  9.  9:  * @return 是否继续消费

  10. 10:  */

  11. 11: public boolean processConsumeResult(//

  12. 12:     final List<MessageExt> msgs, //

  13. 13:     final ConsumeOrderlyStatus status, //

  14. 14:     final ConsumeOrderlyContext context, //

  15. 15:     final ConsumeRequest consumeRequest//

  16. 16: ) {

  17. 17:     boolean continueConsume = true;

  18. 18:     long commitOffset = -1L;

  19. 19:     if (context.isAutoCommit()) {

  20. 20:         switch (status) {

  21. 21:             case COMMIT:

  22. 22:             case ROLLBACK:

  23. 23:                 log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());

  24. 24:             case SUCCESS:

  25. 25:                 // 提交消息已消费成功到消息处理队列

  26. 26:                 commitOffset = consumeRequest.getProcessQueue().commit();

  27. 27:                 // 统计

  28. 28:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  29. 29:                 break;

  30. 30:             case SUSPEND_CURRENT_QUEUE_A_MOMENT:

  31. 31:                 // 统计

  32. 32:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  33. 33:                 if (checkReconsumeTimes(msgs)) { // 计算是否暂时挂起(暂停)消费N毫秒,默认:10ms

  34. 34:                     // 设置消息重新消费

  35. 35:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);

  36. 36:                     // 提交延迟消费请求

  37. 37:                     this.submitConsumeRequestLater(//

  38. 38:                         consumeRequest.getProcessQueue(), //

  39. 39:                         consumeRequest.getMessageQueue(), //

  40. 40:                         context.getSuspendCurrentQueueTimeMillis());

  41. 41:                     continueConsume = false;

  42. 42:                 } else {

  43. 43:                     commitOffset = consumeRequest.getProcessQueue().commit();

  44. 44:                 }

  45. 45:                 break;

  46. 46:             default:

  47. 47:                 break;

  48. 48:         }

  49. 49:     } else {

  50. 50:         switch (status) {

  51. 51:             case SUCCESS:

  52. 52:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  53. 53:                 break;

  54. 54:             case COMMIT:

  55. 55:                 // 提交消息已消费成功到消息处理队列

  56. 56:                 commitOffset = consumeRequest.getProcessQueue().commit();

  57. 57:                 break;

  58. 58:             case ROLLBACK:

  59. 59:                 // 设置消息重新消费

  60. 60:                 consumeRequest.getProcessQueue().rollback();

  61. 61:                 this.submitConsumeRequestLater(//

  62. 62:                     consumeRequest.getProcessQueue(), //

  63. 63:                     consumeRequest.getMessageQueue(), //

  64. 64:                     context.getSuspendCurrentQueueTimeMillis());

  65. 65:                 continueConsume = false;

  66. 66:                 break;

  67. 67:             case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 计算是否暂时挂起(暂停)消费N毫秒,默认:10ms

  68. 68:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  69. 69:                 if (checkReconsumeTimes(msgs)) {

  70. 70:                     // 设置消息重新消费

  71. 71:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);

  72. 72:                     // 提交延迟消费请求

  73. 73:                     this.submitConsumeRequestLater(//

  74. 74:                         consumeRequest.getProcessQueue(), //

  75. 75:                         consumeRequest.getMessageQueue(), //

  76. 76:                         context.getSuspendCurrentQueueTimeMillis());

  77. 77:                     continueConsume = false;

  78. 78:                 }

  79. 79:                 break;

  80. 80:             default:

  81. 81:                 break;

  82. 82:         }

  83. 83:     }

  84. 84:

  85. 85:     // 消息处理队列未dropped,提交有效消费进度

  86. 86:     if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

  87. 87:         this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);

  88. 88:     }

  89. 89:

  90. 90:     return continueConsume;

  91. 91: }

  92. 92:

  93. 93: private int getMaxReconsumeTimes() {

  94. 94:     // default reconsume times: Integer.MAX_VALUE

  95. 95:     if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {

  96. 96:         return Integer.MAX_VALUE;

  97. 97:     } else {

  98. 98:         return this.defaultMQPushConsumer.getMaxReconsumeTimes();

  99. 99:     }

  100. 100: }

  101. 101:

  102. 102: /**

  103. 103:  * 计算是否要暂停消费

  104. 104:  * 不暂停条件:存在消息都超过最大消费次数并且都发回broker成功

  105. 105:  *

  106. 106:  * @param msgs 消息

  107. 107:  * @return 是否要暂停

  108. 108:  */

  109. 109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {

  110. 110:     boolean suspend = false;

  111. 111:     if (msgs != null && !msgs.isEmpty()) {

  112. 112:         for (MessageExt msg : msgs) {

  113. 113:             if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {

  114. 114:                 MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));

  115. 115:                 if (!sendMessageBack(msg)) { // 发回失败,中断

  116. 116:                     suspend = true;

  117. 117:                     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

  118. 118:                 }

  119. 119:             } else {

  120. 120:                 suspend = true;

  121. 121:                 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

  122. 122:             }

  123. 123:         }

  124. 124:     }

  125. 125:     return suspend;

  126. 126: }

  127. 127:

  128. 128: /**

  129. 129:  * 发回消息。

  130. 130:  * 消息发回broker后,对应的消息队列是死信队列。

  131. 131:  *

  132. 132:  * @param msg 消息

  133. 133:  * @return 是否发送成功

  134. 134:  */

  135. 135: public boolean sendMessageBack(final MessageExt msg) {

  136. 136:     try {

  137. 137:         // max reconsume times exceeded then send to dead letter queue.

  138. 138:         Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

  139. 139:         String originMsgId = MessageAccessor.getOriginMessageId(msg);

  140. 140:         MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

  141. 141:         newMsg.setFlag(msg.getFlag());

  142. 142:         MessageAccessor.setProperties(newMsg, msg.getProperties());

  143. 143:         MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

  144. 144:         MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));

  145. 145:         MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

  146. 146:         newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

  147. 147:

  148. 148:         this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);

  149. 149:         return true;

  150. 150:     } catch (Exception e) {

  151. 151:         log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);

  152. 152:     }

  153. 153:

  154. 154:     return false;

  155. 155: }

  • ⬆️⬆️⬆️

  • 第 21 至 29 行 :消费成功。在自动提交进度( AutoCommit )的情况下, COMMIT、 ROLLBACK、 SUCCESS 逻辑已经统一

  • 第 30 至 45 行 :消费失败。当消息重试次数超过上限(默认 :16次)时,将消息发送到 Broker 死信队列,跳过这些消息。此时,消息队列无需挂起,继续消费后面的消息。

  • 第 85 至 88 行 :提交消费进度。

3.13 消息处理队列核心方法

😈涉及到的四个核心方法的源码:

  1.  1: // ⬇️⬇️⬇️【ProcessQueue.java】

  2.  2: /**

  3.  3:  * 消息映射

  4.  4:  * key:消息队列位置

  5.  5:  */

  6.  6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();    /**

  7.  7:  * 消息映射临时存储(消费中的消息)

  8.  8:  */

  9.  9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();

  10. 10:

  11. 11: /**

  12. 12:  * 回滚消费中的消息

  13. 13:  * 逻辑类似于{@link #makeMessageToCosumeAgain(List)}

  14. 14:  */

  15. 15: public void rollback() {

  16. 16:     try {

  17. 17:         this.lockTreeMap.writeLock().lockInterruptibly();

  18. 18:         try {

  19. 19:             this.msgTreeMap.putAll(this.msgTreeMapTemp);

  20. 20:             this.msgTreeMapTemp.clear();

  21. 21:         } finally {

  22. 22:             this.lockTreeMap.writeLock().unlock();

  23. 23:         }

  24. 24:     } catch (InterruptedException e) {

  25. 25:         log.error("rollback exception", e);

  26. 26:     }

  27. 27: }

  28. 28:

  29. 29: /**

  30. 30:  * 提交消费中的消息已消费成功,返回消费进度

  31. 31:  *

  32. 32:  * @return 消费进度

  33. 33:  */

  34. 34: public long commit() {

  35. 35:     try {

  36. 36:         this.lockTreeMap.writeLock().lockInterruptibly();

  37. 37:         try {

  38. 38:             // 消费进度

  39. 39:             Long offset = this.msgTreeMapTemp.lastKey();

  40. 40:

  41. 41:             //

  42. 42:             msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));

  43. 43:

  44. 44:             //

  45. 45:             this.msgTreeMapTemp.clear();

  46. 46:

  47. 47:             // 返回消费进度

  48. 48:             if (offset != null) {

  49. 49:                 return offset + 1;

  50. 50:             }

  51. 51:         } finally {

  52. 52:             this.lockTreeMap.writeLock().unlock();

  53. 53:         }

  54. 54:     } catch (InterruptedException e) {

  55. 55:         log.error("commit exception", e);

  56. 56:     }

  57. 57:

  58. 58:     return -1;

  59. 59: }

  60. 60:

  61. 61: /**

  62. 62:  * 指定消息重新消费

  63. 63:  * 逻辑类似于{@link #rollback()}

  64. 64:  *

  65. 65:  * @param msgs 消息

  66. 66:  */

  67. 67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {

  68. 68:     try {

  69. 69:         this.lockTreeMap.writeLock().lockInterruptibly();

  70. 70:         try {

  71. 71:             for (MessageExt msg : msgs) {

  72. 72:                 this.msgTreeMapTemp.remove(msg.getQueueOffset());

  73. 73:                 this.msgTreeMap.put(msg.getQueueOffset(), msg);

  74. 74:             }

  75. 75:         } finally {

  76. 76:             this.lockTreeMap.writeLock().unlock();

  77. 77:         }

  78. 78:     } catch (InterruptedException e) {

  79. 79:         log.error("makeMessageToCosumeAgain exception", e);

  80. 80:     }

  81. 81: }

  82. 82:

  83. 83: /**

  84. 84:  * 获得持有消息前N条

  85. 85:  *

  86. 86:  * @param batchSize 条数

  87. 87:  * @return 消息

  88. 88:  */

  89. 89: public List<MessageExt> takeMessags(final int batchSize) {

  90. 90:     List<MessageExt> result = new ArrayList<>(batchSize);

  91. 91:     final long now = System.currentTimeMillis();

  92. 92:     try {

  93. 93:         this.lockTreeMap.writeLock().lockInterruptibly();

  94. 94:         this.lastConsumeTimestamp = now;

  95. 95:         try {

  96. 96:             if (!this.msgTreeMap.isEmpty()) {

  97. 97:                 for (int i = 0; i < batchSize; i++) {

  98. 98:                     Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();

  99. 99:                     if (entry != null) {

  100. 100:                         result.add(entry.getValue());

  101. 101:                         msgTreeMapTemp.put(entry.getKey(), entry.getValue());

  102. 102:                     } else {

  103. 103:                         break;

  104. 104:                     }

  105. 105:                 }

  106. 106:             }

  107. 107:

  108. 108:             if (result.isEmpty()) {

  109. 109:                 consuming = false;

  110. 110:             }

  111. 111:         } finally {

  112. 112:             this.lockTreeMap.writeLock().unlock();

  113. 113:         }

  114. 114:     } catch (InterruptedException e) {

  115. 115:         log.error("take Messages exception", e);

  116. 116:     }

  117. 117:

  118. 118:     return result;

  119. 119: }


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存