跳到主要内容

十六、RocketMQ源码分析顺序消息消费实现原理


本节目录

1、 消息队列负载;

2、 消息拉取;

3、 消息顺序消息消费;

  • 3.1核心属性与构造函数
  • 3.2 start方法
  • 3.3 submitConsumeRequest
  • 3.4 ConsumeMessageOrderlyService#ConsumeRequest
  • 3.4 消息队列锁实现

所谓顺序消费,rocketmq 支持同一消费队列上的消息顺序消费。

消息消费涉及3个点:
1、 消息队列重新负载;
2、 消息拉取;
3、 消息消费;

按照消息消费步骤来揭开 RocketMQ 顺序消息消费实现原理。

1、消息队列负载

RocketMQ 在同一个 JVM 进程拥有一个 clientConfigId(客户端ID),该JVM进程中不同的消息消费组的消息客户端ID相同,因为在JVM进程中对于每一个 ClientConfig 只会实例化一个 MQClientInstance。消息消费的第一个步骤是首先要为消费组内的所有消息者分配消息消费队列。RocetMQ 中通过RebalanceService线程实现消费队列负载。

RebalanceImpl#updateProcessQueueTableInRebalance
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) { // @1
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}