跳到主要内容

二十二、RocketMQ源码分析之RocketMQ事务消息实现原理中篇—-事务消息状态回查


上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器,消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,当执行本地事务,如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢?原来RocketMQ使用TransactionalMessageCheckService线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。TransactionalMessageCheckService的检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来改变默认值,单位为毫秒。

温馨提示:文末附有流程图。

TransactionalMessageCheckService#onWaitEnd

protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // @1
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); // @2
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); // @3
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}