跳到主要内容

十九、RocketMQ 主从同步读写分离机制


关于主从同步最新理解:RocketMQ 主从同步若干问题答疑

RocketMQ在消息拉取时是如何根据消息消费队列MessageQueue来选择Broker的呢?消息消费队列如图所示:
 

RocketMQ根据MessageQueue查找Broker地址的唯一依据便是brokerName,从RocketMQ的Broker组织实现来看,同一组Broker(M-S)服务器,其brokerName相同,主服务器的brokerId为0,从服务器的brokerId大于0,那RocketMQ根据brokerName如何定位到哪一台Broker上来呢?

PullAPIWrapper#pullKernelImpl

FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);

RocketMQ的MQClientInstance类提供了根据brokerName、brokerId查找Broker地址的方法,返回值如图:
 
MQClientInstance#findBrokerAddressInSubscribe

public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
final boolean onlyThisBroker
) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
brokerAddr = map.get(brokerId);
slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr != null;
if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = entry.getKey() != MixAll.MASTER_ID;
found = true;
}
}
if (found) {
return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
}