转载:手写实现基于消息队列的分布式事务框架
分布式事务在现在的分布式开发领域已经成为必须考虑的因素,随着微服务、SOA 架构思想日益被开发者锁熟知,分布式事务的解决思路也不断被提出并被开源社区实现。
在互联网开发中,通常情况下,我们常常会采用一揽子“柔性事务”解决方案来保证系统内模块之间的数据的最终一致性。
业界知名的分布式事务解决方案有:
TCC 方案,它的开源实现有 TCC-Transaction、ByteTCC,阿里开源的 Seata 框架也加入了对 TCC 模式的支持;
可靠消息最终一致方案,代表的实现方式为 RocketMQ 的事务消息;
SAGA 事务,代表实现方式为华为开源的 serviceComb;
最大努力通知型解决方案,该方案与业务耦合较为严重,因此业界也没有一个较为抽象的开源实现;
消息溯源方案(或称为本地消息表方案),该方案实现较为简单,但也与业务耦合较为严重,据我调查暂时没有抽象的开源实现。
上述方案中,我挑选了第五种**消息溯源方案(本地消息表方案,后文均称为本地消息表方案)**作为自己写分布式事务框架的核心机制,旨在实现一个与业务无关的、基于消息的、异步确保的、最终一致的柔性事务框架。
框架的实现主要基于 RocketMQ 的普通消息,我们都知道 RocketMQ 已经支持了事务消息,这里只是基于它对本地消息表方案进行实现,原则上,本地消息表方案支持任意具有发布订阅能力的消息中间件。
上述其他实现在笔者的博客中有过较为系统的讲解,感兴趣的同学可以移步笔者的博客,本文就不再展开说明。
1. 本地消息表方案简介
为了加深读者的理解,方便行文,此处对本地消息表方案做一个介绍。
本地消息表方案源自 eBay,传入国内后多次被大厂(如:支付宝)落地,经过布道被大家所熟知。它的核心机理分为事务发起方(我们称事务上游),事务联动方(我们称下游),二者为分布式系统内两组不同的应用,它们应当使用独立的数据源。
1.1 事务上游处理逻辑
事务上游、执行本地业务的同时,将消息持久化到业务数据源中(持久化到数据库中),持久化流程与业务操作处于同一个事务中,保证了业务与消息同时成功持久化。
如果消息队列支持事务机制呢?就不用选择了本地消息表了,本地消息表是针对不支持事务机制的消息队列。
到这里,事务上游的业务就执行结束了,通过线程异步地轮询消息表,将待发送的消息投递到消息中间件的事务执行队列(我们将该队列的 Topic 称为事务执行 Topic)中。投递成功则更新消息状态为 [已投递]。
1.2 事务下游处理逻辑
事务下游拉取消息进行消费,在业务消费之前,首先将消息持久化到本地,持久化成功后执行消费逻辑。
下游通过重试最大限度地保证业务消费逻辑执行成功,如果达到某个设定的消费次数阈值仍旧消费失败,那么我们认为事务下游没办法将事务处理成功,此时事务下游拷贝之前持久化的消息,标记为回滚状态消息,并投递到业务回滚队列(我们将该队列的 Topic 称为事务回滚 Topic)中。投递成功则更新消息状态为 [已投递]。
事务上游需要实现回滚逻辑,接收到事务下游投递的回滚消息后,执行回滚逻辑,对业务进行回滚操作。该流程通过消费重试实现,如果达到最大消费次数仍旧不能回滚,则回滚消息会进入消息中间件的死信队列。此时需要人工干预,取出死信消息进行手工回滚操作,保证业务的正常运行。
一般而言,如果环境稳定,业务逻辑无严重 Bug,是不会出现一直重试都执行不完的情况,如果有,很大可能是代码逻辑有问题需要做进一步的排查。
2. 本地消息表方案注意点
本地消息表方案依赖消息中间件,通过消息发送阶段的 ACK 判断消息是否被持久化,一旦返回消息投递成功,则通过消息中间件本身的配置即可保证该消息不会丢失;通过消费阶段的重试加上业务系统的幂等保证事务下游与事务上游能够最大可能的达成最终一致。
如果还是存在异常,则需要人工干预,此处也能看出一点,技术方案往往都是折中产物,这也是最终一致性本身的特点,我们能够容忍一定时间的不一致状态,但是我们能够确保该不一致时间窗口之后,业务的上下游能够达成数据的一致性,建立在该前提下,我们才能够探讨分布式事务的柔性解决方案。
由于本地消息表方案依赖了消息中间件,因此我们需要保证消息中间件的高可靠,否则系统的可用性会因为引入第三方组件而下降。如:配置 RocketMQ 的多主多从集群模式,使用 Kafka 的多副本集群等,生产环境坚决不能出现单点风险。
除了消息中间件选型,我们还要保证消息持久化与本地事务要处于同一个事务域中。为什么要这么做呢?
本地消息表方案的使用,主要的目的是为了解决消息发送与事务提交不能保证同时成功同时失败,也就是消息发送本身与本地的事务并非是原子性的。
我们设想一个错误场景,某系统为了解决跨应用的分布式事务问题,因此引入了消息中间件,设想通过消息通讯作为上下游应用间的事务交互方式。即:上游处理业务完成后(往 DB 中插入了若干业务记录),然后发布一个普通消息到 MQ 的某 Topic,下游订阅该 Topic,对消息进行拉取并消费,完成下游业务。
那么这么做能够保证上下游数据的一致性吗?如果没有异常情况出现,当然是可以的,但是由于 DB 与 MQ 是不同的系统,因此可能出现插入 DB 成功,但发送消息到 MQ 失败;或者出现插入 DB 失败,但发送消息到 MQ 成功。如果出现这类异常情况,业务的上下游系统间数据便不是一致的。
我们分析一下异常情况发生的机理:
当上游进行业务处理完成后,提交了本地事务,接着进行消息发送时,上游系统宕机,当上游系统恢复后,消息也不会再发送了,那么上下游就会出现数据的不一致。
这个异常出现的原因就是没有保证消息发送与本地事务的原子性,而引入本地消息表则能解决这一问题。
当引入本地消息表,在业务事务内将消息进行持久化,由于业务数据与消息数据处于同一事务,因此二者一定是同时成功同时失败,也就是原子的。**当消息持久化后,通过异步线程扫表进行发送,如果出现系统宕机,在恢复之后,由于消息已经存在,因此能够将该持久化的消息扫描出来进行相应的投递操作。**这就保证了本地事务执行成功后,消息一定能发送出去。这里可能会有疑问,万一消息发送失败呢?当然是继续进行重发了,直到发出去为止。
如果本地事务直接就执行失败了,那么消息也不会持久化,此时业务就是失败的,需要业务方进行重试,这种情况下是不存在数据不一致的情况的。
3. shieldTXC 框架原理及实现
上文中,我们已经对本地消息表方案做了一个较为全面的了解,并对它是如何保证事务执行与消息发送同时成功同时失败的机理有了清晰的认知。
从本节开始,我们正式进入自己写分布式事务框架的部分。
笔者将这个本地消息表分布式事务框架命名为 shieldTXC,意思是神盾分布式事务框架,框架内核及 Demo 案例的代码已经打包上传至 GitHub 上,地址为:shieldTXC 源码地址。如果觉得这个喜欢可以点个 star 支持下。
体外话不多说,接下来我们一边看框架的原理图,在宏观上对框架的机理做一个了解,一边对相应的原理进行代码实现讲解,这样理论与实战相结合,相信会加深读者朋友的理解。
首先看下框架的核心原理图。
看起来还是比较整洁的,这也是笔者写代码的一个宗旨:
好的框架可以复杂,但架构一定是优雅的、清晰的。
我们配合代码深入分析一下这张图,主要分为两个主要的阶段:
首先来看下事务提交阶段的实现原理及相对应的代码实现。
3.1 框架分析之事务提交
图中左侧的红字标记该部分为上游应用(后续统称为上游)的事务提交阶段的运行逻辑,上游在这个阶段又分为两个子阶段。
3.1.1 第一阶段:本地事务与消息持久化
上游在执行本地事务成功在,在同一事务内对业务实体封装为消息体,调用 shieldTXC 提供的消息持久化接口将消息持久化到业务库中,框架的消息持久化方法需要保证事务性。
代码如下:
@Transactional(rollbackFor = Exception.class)
public void testTran() {
// 本地事务
doLocalTransaction();
// 消息持久化
TestTxMessage testTxMessage = new TestTxMessage();
testTxMessage.setName(UUID.randomUUID().toString().replace("-", "").substring(0, 10));
shieldTxcRocketMQProducerClient
.putMessage(testTxMessage, EventType.INSERT, TXType.COMMIT, testTxMessage.getName(),
UUID.randomUUID().toString());
}
这个阶段的重点在于消息持久化的实现。一起来看一下 shieldTXC 是如何实现的:
/**
* 消息持久化
* @param shieldTxcMessage
* @param eventType
* @param txType
* @param appId
*/
@Transactional(rollbackFor = Exception.class)
public void putMessage(AbstractShieldTxcMessage shieldTxcMessage,
EventType eventType,
TXType txType,
String appId,
String bizKey) {
Preconditions.checkNotNull(eventType, "Please insert eventType, type is:[com.shield.txc.constant.EventType]");
Preconditions.checkNotNull(bizKey, "Please insert unique bizKey!");
ShieldEvent event = new ShieldEvent();
event.setEventType(eventType.toString())
.setTxType(txType.toString())
.setEventStatus(EventStatus.PRODUCE_INIT.toString())
.setContent(shieldTxcMessage.encode())
.setBizKey(bizKey)
.setAppId(appId);
try {
// 入库失败回滚
boolean insertResult = this.getBaseEventService().insertEvent(event);
if (!insertResult) {
throw new BizException("insert ShieldEvent into DB occurred Exception!");
}
} catch (Exception e) {
// 异常回滚
throw new BizException("insert ShieldEvent into DB occurred Exception!", e);
}
}
为了便于统一管理,shieldTXC 定义了一个抽象消息类,业务方需要继承该抽象类,通过实现其中的 decode 与 encode 方法为业务消息提供编解码能力。
public abstract class AbstractShieldTxcMessage implements Serializable {
private static final long serialVersionUID = -2416427331208398607L;
/**消息序列化*/
public abstract String encode();
/**消息反序列化*/
public abstract void decode(String msg);
}
我们接着看上面的 putMessage 方法。
通过传参,组装了一个 ShieldEvent 持久化消息实体,并将其进行 insert 操作,通过判断入库是否成功决定是否抛出异常让事务进行回滚。
如果消息持久化成功,则本地事务提交;如果消息持久化失败,则本地事务回滚,业务结束。
3.1.2 第二阶段:消息投递
业务方完成上述第一阶段的消息持久化操作,就不需要进行其他的额外操作了。
此时框架开始执行第二阶段:消息投递阶段。
我们从图中可以清晰的看到,ShieldEvent 会在内部维护一个定时任务,扫描状态为初始化待发送消息,组装成功消息体并调用 MQ 的发送消息接口,将投递到事务提交队列。这个过程中要保证消息可达。
这个子阶段用文字描述起来比较简洁,我们接着看下框架代码是如何实现的,加深认知。
1. 定时任务初始化
整个 shieldTXC 内核已经打包为一个 Spring Boot 的 starter,因此可以通过 @Enable 注解简单的整合到 Spring Boot 应用中,框架本身提供了对核心 Bean 的初始化过程,这里我们主要看下扫描待发送消息的定时任务的初始化过程。
定时任务的初始化过程是在框架的 ShieldEventTxcConfiguration.java 类中实现的,类声明如下:
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class ShieldEventTxcConfiguration {}
通过 @Configuration 标记为一个配置 Bean,通过 @EnableConfigurationProperties(RocketMQProperties.class) 开启可配置能力。shieldTXC 支持通过配置文件进行配置。
这里重点分析一下扫描待发送消息定时任务的初始化:
/**
* 异步消息调度构造
* @param rocketMQProperties
* @return
*/
@Bean
@ConditionalOnMissingBean
@Order(value = 3)
public SendTxcMessageScheduler sendTxcMessageScheduler(RocketMQProperties rocketMQProperties) {
SendTxcMessageScheduler sendTxcMessageScheduler = new SendTxcMessageScheduler();
// 设置调度线程池参数
sendTxcMessageScheduler.setInitialDelay(rocketMQProperties.getTranMessageSendInitialDelay());
sendTxcMessageScheduler.setPeriod(rocketMQProperties.getTranMessageSendPeriod());
sendTxcMessageScheduler.setCorePoolSize(rocketMQProperties.getTranMessageSendCorePoolSize());
// 数据库操作
sendTxcMessageScheduler.setBaseEventService(baseEventService(baseEventRepository()));
// 消息发送
sendTxcMessageScheduler.setShieldTxcRocketMQProducerClient(rocketMQEventProducerClient(rocketMQProperties));
LOGGER.debug("Initializing [sendTxcMessageScheduler] instance success.");
// 执行调度
sendTxcMessageScheduler.schedule();
return sendTxcMessageScheduler;
}
通过 @Bean 标记为 Spring 容器中的 Bean,Spring 在初始化过程中会加载我们的 Bean,Bean 的 name 就是方法名,这个方式是 Spring 提供的通过 JavaConfig 方式进行 Bean 定义的方式,这里不做展开。
我们首先初始化了一个 SendTxcMessageScheduler 实例,对其进行参数的配置,诸如线程池参数、数据库操作相关的 Bean 依赖(通过相同方式进行初始化的)、消息发送的 Bean 依赖(通过相同方式进行初始化的)。依赖设置完成后调用 schedule() 方法开启任务调度。当应用启动完成之后便会自动开始进行待发送消息的扫描操作。
关于数据库操作 Bean、RocketMQ 操作的 Bean 初始化过程,感兴趣的读者可以去源码的 com.shield.txc.configuration.ShieldEventTxcConfiguration.java 类中查看,请恕本文不再展开。
2. 定时任务核心逻辑分析
初始化完成 SendTxcMessageScheduler 之后,具体又是如何对待发送消息进行处理的呢?带着疑问,我们深入 SendTxcMessageScheduler 这个调度类中一探究竟。
SendTxcMessageScheduler 方法的声明如下:
public class SendTxcMessageScheduler extends AbstractMessageScheduler implements Runnable {}
可以看到 SendTxcMessageScheduler 实例本身也是一个 Runnable 实例,这里暂且放一下,后续的分析中会用到,我们接着往下看。
上述的代码中在初始化完成 SendTxcMessageScheduler 实例后,调用了 schedule() 方法,那么我们重点看下这个方法。
public void schedule() {
executorService.scheduleAtFixedRate(
this,
this.initialDelay,
this.period,
this.timeUnit);
}
代码逻辑很简单,通过内部的 executorService 开启了调度流程,executorService 的初始化是在构造方法中完成的,默认构造方法如下:
public SendTxcMessageScheduler() {
executorService = Executors.newScheduledThreadPool(corePoolSize);
}
在 schedule() 方法中,调用 scheduleAtFixedRate 方法,第一个参数传入了 this 引用,线程池会定时执行 this 引用(也是一个 Runnable 引用)的 run 方法,方法逻辑:
@Override
public void run() {
// 查询并发送消息
try {
// 获取待调度的消息,初始态==初始化
List<ShieldEvent> shieldEvents = baseEventService.queryEventListByStatus(EventStatus.PRODUCE_INIT.toString());
if (CollectionUtils.isEmpty(shieldEvents)) {
return;
}
for (ShieldEvent shieldEvent : shieldEvents) {
// 发送前改状态
processBeforeSendMessage(shieldEvent);
// 发送消息核心逻辑
sendMessage(shieldEvent);
// 判断发送结果,成功则更新为已发送
processAfterSendMessage(shieldEvent);
}
} catch (Exception e) {
LOGGER.error("Sending rollback message occurred Exception!", e);
return;
}
}
这里主要采用了模板方法对业务逻辑进行了封装。
首先获取待调度的消息,状态为 PRODUCE_INIT(生产初始化)。默认获取 50 条。
如果未查询到消息,则结束本次调度,对于查询到的消息列表 shieldEvents 进行迭代:
首先进行发送前置操作,即方法 processBeforeSendMessage。将消息的状态从 PRODUCE_INIT 改为 PRODUCE_PROCESSING(生产处理中)。这么做的目的在于通过状态机方式的乐观锁达到支持并发的目的。这个思路在日常业务开发中也经常用到。
接着进行发送核心操作,这里需要看一下代码是如何实现的:
/**
* 发送事务消息
* @param shieldEvent
*/
@Override
public void sendMessage(ShieldEvent shieldEvent) {
int eventId = shieldEvent.getId();
// 组装Message
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage
.setId(String.valueOf(eventId))
.setAppId(shieldEvent.getAppId())
.setContent(shieldEvent.getContent())
.setEventType(shieldEvent.getEventType())
.setEventStatus(shieldEvent.getEventStatus())
.setTxType(shieldEvent.getTxType())
.setBizKey(shieldEvent.getBizKey());
String messgeBody = shieldTxcMessage.encode();
String topic = null;
BizResult bizResult = null;
// 发送commit消息,判断消息类型
if (TXType.COMMIT.toString().equals(shieldTxcMessage.getTxType())) {
topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE,
shieldTxcRocketMQProducerClient.getTopic());
Message commitMessage = new Message(topic, messgeBody.getBytes());
bizResult = shieldTxcRocketMQProducerClient.sendCommitMsg(commitMessage, eventId);
}
// 发送rollback消息
if (TXType.ROLLBACK.toString().equals(shieldTxcMessage.getTxType())) {
topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_ROLLBACK_STAGE,
shieldTxcRocketMQProducerClient.getTopic());
Message rollbackMessage = new Message(topic, messgeBody.getBytes());
bizResult = shieldTxcRocketMQProducerClient.sendRollbackMsg(rollbackMessage, eventId);
}
if (bizResult.getBizCode() != BizCode.SEND_MESSAGE_SUCC) {
LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[FAIL], Message Body:[{}]", messgeBody);
return;
}
LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{}]", messgeBody);
}
首先将消息实体转换为 ShieldTxcMessage。ShieldTxcMessage 是框架封装的消息类型,内部的 encode、decode 方法提供了序列化、反序列化能力。
接着判断消息类型,如果是 TXType.COMMIT 则将消息投递到事务提交队列;如果是 TXType.ROLLBACK 则将消息投递到事务回滚队列。
在上游业务中发送的消息均为事务提交消息,因此会被投递到事务提交队列中。
最后判断消息发送结果,如果发送失败会进行重试,该重试能力是 MQ 中间件客户端提供的。对于多次发送都失败的消息需要更改状态为初始化,继续进行投递,保证消息一定能发出去。长时间发布出去的消息进发送失败表,后续需要对该失败表进行扫描,触发本地业务回滚。这种情况因为极其少见,因此框架当前版本暂未实现,后续更新后在 GitHub 的页面中注明。
消息投递成功后,调用 processAfterSendMessage 方法进行状态更新,将处理中状态 PRODUCE_PROCESSING 改为 PRODUCE_PROCESSED (生产处理成功)。
到此就完成了分布式事务上游的消息发布流程。
3.1.3 第三阶段:消息消费,下游事务提交
为了方便读者对照,这里再贴一下原理图。我们接着分析事务下游应用(后文称下游)是如何对事务提交队列中的消息进行消费从而完成本地事务的。
1. 消费适配器初始化
下游应用启动过程中需要完成消费适配器的初始化,实现对事务提交队列消息的消费。该消费逻辑实现了下游与上游的最终一致性。
@Service
public class TxConsumeService implements InitializingBean {
@Value("${shield.event.rocketmq.nameSrvAddr}")
String nameSerAddr;
@Value("${shield.event.rocketmq.topicSource}")
String topic;
@Override
public void afterPropertiesSet() throws Exception {
new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcCommitListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("测试消费ShieldTxcCommitListener开始......");
Random ra =new Random();
int randomInt = ra.nextInt(10) + 1;
if (randomInt <= 5) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}));
}
}
上面这段代码为下游应用需要实现的,在应用启动阶段初始化了 ShieldTxcConsumerListenerAdapter 消费适配器。
ShieldTxcConsumerListenerAdapter
ShieldTxcConsumerListenerAdapter 的主要构造方法有三个,分别为:
事务下游可选构造方法,即分布式事务的最终执行者。
/**
* 事务下游可选
* @param nameSrvAddr
* @param topic
* @param txCommmtListener
*/
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
String topic,
ShieldTxcCommitListener txCommmtListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.txCommmtListener = txCommmtListener;
init();
}
事务上游可选构造方法,即分布式事务的最初发起者。
/**
* 事务上游可选
* @param nameSrvAddr
* @param topic
* @param txRollbackListener
*/
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
String topic,
ShieldTxcRollbackListener txRollbackListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.txRollbackListener = txRollbackListener;
init();
}
事务上下游可选构造方法,即一个应用处于上游和下游之间,它既是该分布式事务的上游又是该分布式事务的下游。
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
String topic,
ShieldTxcCommitListener txCommmtListener,
ShieldTxcRollbackListener txRollbackListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.txCommmtListener = txCommmtListener;
this.txRollbackListener = txRollbackListener;
init();
}
上述的 Demo 代码所在的应用为分布式事务的最终下游,因此只需要实现 ShieldTxcCommitListener 接口,完成 consumeMessage 回调方法。我们在其中模拟百分之五十提交,百分之五十重试的情况,测试正常与异常情况下框架的表现。
在 ShieldTxcConsumerListenerAdapter 构造方法中均调用了 init() 方法,该方法初始化了真正的消费者客户端,对 MQ 进行监听。
public ShieldTxcConsumerListenerAdapter init() {
// 初始化shieldTxcRocketMQConsumerClient
Preconditions.checkNotNull(this.nameSrvAddr, "please insert RocketMQ NameServer address");
shieldTxcRocketMQConsumerClient =
new ShieldTxcRocketMQConsumerClient(this.topic, this.nameSrvAddr, this.getTxCommmtListener(), this.getTxRollbackListener());
LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient] instance init success.");
return this;
}
shieldTxcRocketMQConsumerClient
我们进入 shieldTxcRocketMQConsumerClient 类中,观察一下它是如何进行初始化的。
public ShieldTxcRocketMQConsumerClient(String topic,
String nameSrvAddr,
ShieldTxcCommitListener txCommtListener,
ShieldTxcRollbackListener txRollbackListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
if (txCommtListener == null && txRollbackListener == null) {
throw new BizException("Please define at least one MessageListenerConcurrently instance, such as [ShieldTxcCommitListener] or [ShieldTxcRollbackListener] or both.");
}
if (txCommtListener != null) {
// 初始化事务提交消费者
initCommitConsumer(this.topic, this.nameSrvAddr, txCommtListener);
LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.CommmitConsumer] instance init success.");
}
if (txRollbackListener != null) {
// 初始化事务回滚消费者
initRollbackConsumer(this.topic, this.nameSrvAddr, txRollbackListener);
LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.RollbackListener] instance init success.");
}
}
上述代码是 ShieldTxcRocketMQConsumerClient 初始化流程,可以看到主要是判断了 ShieldTxcCommitListener、ShieldTxcRollbackListener 实例是否为空,如果不为空则进行对应消费者的初始化。两个初始化流程基本相同,我们重点看下 initCommitConsumer 方法的逻辑。
/**
* 初始化事务提交消费者
* @param topic
* @param nameSrvAddr
*/
private void initCommitConsumer(String topic, String nameSrvAddr, ShieldTxcCommitListener txCommtListener) {
commitConsumer =
new DefaultMQPushConsumer(
MessagePropertyBuilder.groupId(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic));
commitConsumer.setNamesrvAddr(nameSrvAddr);
// 从头开始消费
commitConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
commitConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
commitConsumer.registerMessageListener(txCommtListener);
// 订阅所有消息
try {
commitConsumer.subscribe(
MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic), "*");
// 启动消费者
commitConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("Loading [com.shield.txc.RocketMQEventConsumerClient.commmitConsumer] occurred exception", e);
}
}
相信不需要我再做多余的解释了,这里其实就是启动了一个 DefaultMQPushConsumer 消费者客户端,并对对应的 Topic 进行订阅,从而实现对对应队列中消息的消费。如果读者对 RocketMQ 不是很了解,可以到 RocketMQ 开发者中心进行相关的学习。
RocketMQ 中文开发者中心
回到原理图中,可以看到下游应用的逻辑中有一个 shieldTXC Commit 拦截器,这个拦截器才是消费阶段的重点,框架通过该拦截器对消费过程进行了代理,加入了前置后置操作,从而保证了消费阶段的分布式事务的一致性。
拦截器代码在 ShieldTxcCommitListener 中。
ShieldTxcCommitListener
ShieldTxcCommitListener 的类声明及构造方法如下:
public class ShieldTxcCommitListener implements MessageListenerConcurrently {
public ShieldTxcCommitListener(MessageListenerConcurrently txCommmtListener) {
this.txCommmtListener = txCommmtListener;
}
}
可以看到 ShieldTxcCommitListener 实现了 MessageListenerConcurrently 接口。当 ShieldTxcCommitListener 在构造过程中将外界传入的 MessageListenerConcurrently 实例的引用指向了内部的 MessageListenerConcurrently 引用。
ShieldTxcCommitListener 本身也是 MessageListenerConcurrently 实例,通过它的 consumeMessage 代理了外部传入的 MessageListenerConcurrently 实例,通过加入了切面逻辑对消费过程做了进一步的处理。
这里就对 ShieldTxcCommitListener 如何对真实的消费过程进行代理做深入的分析。
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgBody = new String(msg.getBody());
String msgId = msg.getMsgId();
LOGGER.debug("[ShieldTxcCommitListener]Consuming [COMMIT] Message start... msgId={},msgBody={}", msgId, msgBody);
}
}
首先进行前置参数的获取,这里不需要过多解释。
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);
ShieldEvent event = new ShieldEvent();
event.convert(shieldTxcMessage);
BaseEventService baseEventService = (BaseEventService) SpringApplicationHolder.getBean("baseEventService");
将消息体进行反序列化,并转换为 ShieldEvent 消息实体便于后续操作;通过 Spring 上下文获取到数据库操作 Bean,便于进行消息状态修改。
try {
// 消费幂等,查询消息是否存在,入库带唯一索引
// 消费次数大于等于阈值,回滚事务
int currReconsumeTimes = msg.getReconsumeTimes();
if (currReconsumeTimes >= CommonProperty.MAX_COMMIT_RECONSUME_TIMES) {
// 事务回滚操作,消息复制为回滚生产者,持久化
LOGGER.debug("[ShieldTxcCommitListener] START transaction rollback sequence! msgId={},currReconsumeTimes={}", msgId, currReconsumeTimes);
if (doPutRollbackMsgAfterMaxConsumeTimes(baseEventService, event, msgId)) {
LOGGER.debug("[ShieldTxcCommitListener] transaction rollback sequence executed SUCCESS! msgId={}", msgId);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
// 如果一直失败最后会进死信
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
这里获取了消息的当前消费次数,并与最大消费次数进行比对。
如果消费次数已经超过最大消费次数(默认为 10 次),则进行分布式事务的回滚操作。这么做的原因为:下游无论如何都没办法提交本地事务,如果不回滚则上下数据一致性就被破坏了,因此这里通过 doPutRollbackMsgAfterMaxConsumeTimes 进行了如下的业务处理:
将消息的状态由 CONSUME_PROCESSING(消费处理中,该状态在首次消费开始时就由 CONSUME_INIT 修改得到)改为 CONSUME_MAX_RECONSUMETIMES(达到最大消费次数)。
克隆消息,插入事务回滚消息,状态为 PRODUCE_INIT
完成后等待事务下游的 SendTxcMessageScheduler 对回滚消息进行投递。
String bizKey = shieldTxcMessage.getBizKey();
String txType = shieldTxcMessage.getTxType();
String eventStatua = shieldTxcMessage.getEventStatus();
String appId = shieldTxcMessage.getAppId();
String eventType = shieldTxcMessage.getEventType();
// 进行消息持久化
event.setEventType(eventType)
.setTxType(txType)
.setEventStatus(EventStatus.CONSUME_INIT.toString())
.setContent(shieldTxcMessage.getContent())
.setAppId(shieldTxcMessage.getAppId())
.setBizKey(bizKey)
.setId(Integer.valueOf(shieldTxcMessage.getId()));
// 入库失败回滚
boolean insertResult = baseEventService.insertEventWithId(event);
if (!insertResult) {
LOGGER.warn("[ShieldTxcCommitListener] insert shieldEvent Consume Message failed,msgId={}", msgId);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
如果当前没有达到最大消费次数则将消息进行持久化操作,这里在数据库中对消息的 id 与业务唯一键 bizkey 加了唯一索引,如果重复插入会报唯一约束异常,便不会重复插入,保证了消息的唯一性。
// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, event);
// 真实消费
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
消息入库后将消息状态改为 CONSUME_PROCESSING(消费处理中),接着进行真实的消费过程,通过拦截判断真实的消费结果对消息状态进行对应的修改。
} catch (Exception e) {
// 幂等处理:唯一约束触发则直接进行消费
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
LOGGER.debug("[ShieldTxcCommitListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
}
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
LOGGER.debug("[ShieldTxcCommitListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
}
// 其他异常重试
LOGGER.warn("ShieldTxcCommitListener Consume Message occurred Exception,msgId={}", msgId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return null;
}
这里为对消息重复插入的处理流程,如果消息重复入库,会抛出异常并被捕获,则直接进行真实消息消费流程,通过方法 doUpdateAfterConsumed 实现。
/**
* 拦截真实消费结果,根据消费结果更新消息状态
*
* @param consumeConcurrentlyStatus
* @param baseEventService
* @param shieldEvent
* @return
*/
private ConsumeConcurrentlyStatus doUpdateAfterConsumed(BaseEventService baseEventService,
ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
ShieldEvent shieldEvent) {
LOGGER.debug("[ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [{}]", consumeConcurrentlyStatus);
if (ConsumeConcurrentlyStatus.RECONSUME_LATER.name().equals(consumeConcurrentlyStatus.name())) {
// 消费失败,消费状态仍旧处理中
return consumeConcurrentlyStatus;
}
if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name().equals(consumeConcurrentlyStatus.name())) {
// 消费成功,处理中改完成,更新前状态:消费处理中
shieldEvent.setBeforeUpdateEventStatus(shieldEvent.getEventStatus());
// 更新后状态:消费完成
shieldEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
boolean updateBefore = baseEventService.updateEventStatusById(shieldEvent);
if (!updateBefore) {
// 更新失败,幂等重试.此时必定是系统依赖组件出问题了
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
调用 doUpdateAfterConsumed 时传入的参数 ConsumeConcurrentlyStatus 代表业务层返回的消费状态。该状态只有两个值:CONSUME_SUCCESS/RECONSUME_LATER。
如果返回 RECONSUME_LATER,表明事务提交消息消费失败,后续会继续进行重试。消息表中消息的状态仍旧为 CONSUME_PROCESSING,我们不做多余的处理。
如果返回 CONSUME_SUCCESS,表明事务提交消息消费成功,则将数据库中的这条消息状态改为 CONSUME_PROCESSED。如果修改不成功,则进行重试即可,业务侧的消费逻辑要注意保证消费幂等。
整个消费过程如果达到重试次数上限仍旧不能成功,则会触发全局事务的回滚操作,这就保证了整个分布式事务的闭环。
3.2 框架分析之事务回滚
上文我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。
这里主要看图的下半部分。
当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,shieldTXC 的消息发送线程 sendTxcMessageScheduler 会扫描到待发送的回滚消息并投递到 [事务回滚队列]。
3.2.1 第一阶段:实现回滚逻辑
事务上游应用在启动过程中初始化了 ShieldTxcConsumerListenerAdapter 消费适配器,并通过 ShieldTxcRollbackListener 实现了回滚逻辑。
@Service
public class TxConsumeService implements InitializingBean {
@Value("${shield.event.rocketmq.nameSrvAddr}")
String nameSerAddr;
@Value("${shield.event.rocketmq.topicSource}")
String topic;
@Override
public void afterPropertiesSet() throws Exception {
new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("测试消费【回滚】ShieldTxcRollbackListener");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}));
}
}
这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。
3.2.2 第二阶段:回滚拦截
同 ShieldTxcCommitListener 类似,ShieldTxcRollbackListener 也实现了对回滚消费逻辑的拦截,它的声明如下:
ShieldTxcRollbackListener
public class ShieldTxcRollbackListener implements MessageListenerConcurrently {
private MessageListenerConcurrently txRollbackListener;
public ShieldTxcRollbackListener(MessageListenerConcurrently txRollbackListener) {
this.txRollbackListener = txRollbackListener;
}
}
ShieldTxcRollbackListener 同样实现了 MessageListenerConcurrently 接口。在构造过程中将外界传入的 MessageListenerConcurrently 实例的引用指向了内部的 MessageListenerConcurrently 引用。
ShieldTxcRollbackListener 同样是 MessageListenerConcurrently 实例,通过它的 consumeMessage 方法代理了外部传入的 MessageListenerConcurrently 实例,通过加入了切面逻辑对消费过程做了进一步的处理。
我们重点对 ShieldTxcCommitListener 如何对真实的消费过程进行代理做深入的分析,核心思路同样是在真实的回滚消费逻辑之前加入前置处理,在真实消费逻辑之后加入后置处理。
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 测试打印消息体
for (MessageExt msg : msgs) {
String msgBody = new String(msg.getBody());
String msgId = msg.getMsgId();
LOGGER.debug("[ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId={},msgBody={}", msgId, msgBody);
}
}
获取参数进行日志打印。
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);
ShieldEvent rollbackEvent = new ShieldEvent();
rollbackEvent.convert(shieldTxcMessage);
BaseEventService baseEventService = (BaseEventService) SpringApplicationHolder.getBean("baseEventService");
将消息协议进行反序列化,并转换为消息实体,从 Spring 上下文中获取数据库持久化服务 BaseEventService,为后续数据库操作做准备。
try {
// 取参数
String bizKey = shieldTxcMessage.getBizKey();
String txType = shieldTxcMessage.getTxType();
String eventStatua = shieldTxcMessage.getEventStatus();
String appId = shieldTxcMessage.getAppId();
String eventType = shieldTxcMessage.getEventType();
// 回滚消息持久化
rollbackEvent.setEventType(eventType)
.setTxType(TXType.ROLLBACK.toString())
.setEventStatus(EventStatus.CONSUME_INIT.toString())
.setContent(shieldTxcMessage.getContent())
.setAppId(shieldTxcMessage.getAppId())
.setBizKey(bizKey)
.setId(Integer.valueOf(shieldTxcMessage.getId()));
// 入库失败回滚
boolean insertResult = baseEventService.insertEventWithId(rollbackEvent);
if (!insertResult) {
LOGGER.warn("[ShieldTxcRollbackListener] insert RollbackShieldEvent Consume Message failed,msgId={}", msgId);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
上游执行回滚消息持久化,持久化成功后状态为 CONSUME_INIT(消费初始化):
// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, rollbackEvent);
// 真实消费
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
消息持久化后将消息状态改为消费处理中 [CONSUME_PROCESSING],进行真实消费过程,真实消费完成后对消费结果做后置处理。
} catch (Exception e) {
// 幂等处理:唯一约束触发则直接进行消费
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
LOGGER.debug("[ShieldTxcRollbackListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
}
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
LOGGER.debug("[ShieldTxcRollbackListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
}
// 其他异常重试
LOGGER.warn("ShieldTxcRollbackListener Consume Message occurred Exception,msgId={}", msgId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return null;
}
这里的逻辑与 ShieldTxcCommitListener 的类似,对消费做幂等处理,重复消息不再入库,直接进行消费,上游回滚消费逻辑需要满足幂等性。
我们接着看一下 doUpdateAfterRollbackConsumed 方法如何进行消费后置处理。
/**
* 拦截真实消费结果,根据消费结果更新消息状态
*
* @param consumeConcurrentlyStatus
* @param baseEventService
* @param rollbackEvent
* @return
*/
private ConsumeConcurrentlyStatus doUpdateAfterRollbackConsumed(BaseEventService baseEventService,
ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
ShieldEvent rollbackEvent) {
if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
// 消费失败,消费状态仍旧处理中
return consumeConcurrentlyStatus;
}
if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
// 消费成功,处理中改完成,更新前状态:消费处理中
rollbackEvent.setBeforeUpdateEventStatus(rollbackEvent.getEventStatus());
// 更新后状态:消费处理中
rollbackEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
boolean updateBefore = baseEventService.updateEventStatusById(rollbackEvent);
if (!updateBefore) {
// 更新失败,幂等重试.此时必定是系统依赖组件出问题了
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
拦截真实回滚消费逻辑获取消费结果,如果是消费成功 [CONSUME_SUCCESS] ,更改消息状态为消费完成 [CONSUME_PROCESSED]。如果是消费失败,则会进行重试。
上游的回滚逻辑通过重试保证成功,如果达到 MQ 最大重试次数(RocketMQ 是 16 次)会进死信,此时需要人工介入进行补偿操作。
在上线之前只要进行了充分的测试,保证业务无严重 Bug,确保线上 MQ 集群、应用集群的高可用,一般通过重试的方式都能够达成最终一致。如果出现大量数据不一致的情况,那么大概率就是应用存在 Bug 或者 MQ 集群不稳定,此时需要人工介入进行排错。
3.3 小结
到这里我们就完成了整个框架原理图与代码实现的分析。
实现告一段落,我们接着看一个简单的应用 Demo,直观感受一下 shieldTXC 分布式事务框架的魅力。
4. shieldTXC 实战
项目整体的结构如下:
shieldTxc
|-txc-core ShieldTXC内核
|-txc-demo-up 分布式事务上游应用
|-txc-demo-down 分布式事务下游应用
首先对项目进行整体的编译,接着对应用 txc-demo-up、txc-demo-down,修改配置文件 application.properties:
########################################################################
#
# shield-txc
#
#########################################################################
# RocketMQ的nameserver地址:必填
shield.event.rocketmq.nameSrvAddr=127.0.0.1:9876
# 事务消息topic:非必填,默认为DEFAULT_TXC_XXX
shield.event.rocketmq.topicSource=DEFAULT_TXC_XXX
# 发送失败重试次数:非必填,默认为10次
shield.event.rocketmq.retryTimesWhenSendFailed=3次
# 消息发送调度初始化延时,单位:秒:非必填,默认为0
shield.event.rocketmq.tranMessageSendInitialDelay=0秒
# 消息发送调度间隔,单位:秒,非必填,默认为5秒
shield.event.rocketmq.tranMessageSendPeriod=5
# 消息发送调度核心线程数:非必填,默认为1
shield.event.rocketmq.tranMessageSendCorePoolSize=1
如果仅仅是对框架进行尝鲜,只需要配置 NameServer 地址即可。
接着需要初始化数据库,在两个应用的 application.properties 中配置数据源,并在各自的数据源中放置消息表,初始化脚本在项目根路径下的 script 目录下,直接导入数据库即可。
在正式测试 Demo 前需要保证有可用的 RocketMQ 环境,关于如何搭建 RocketMQ 读者可以自行学习。
MQ 环境就绪后,分别启动两个应用, 等待生产者端完成本地事务并将事务提交消息持久化。
上游应用打印日志如下:
事务上游本地事务开始,消息持久化.....
2019-08-04 12:35:27.117 |-DEBUG [pool-1-thread-1] com.shield.txc.schedule.SendTxcMessageScheduler [141] -| [SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{"id":"112","eventType":"INSERT","txType":"COMMIT","eventStatus":"PRODUCE_PROCESSING","content":"{\"name\":\"23abaae6a2\"}","appId":"23abaae6a2","bizKey":"2425b011-bdf8-4a64-977a-ffa73e8c23c6"}]
日志表明本地事务执行完成,消息持久化,我们看下数据库中的记录:
可以看到消息已经入库,并且应被处理为 [PRODUCE_PROCESSED](生产处理完成),也就是消息被投递到 MQ。
下游应用打印如下:
测试消费ShieldTxcCommitListener开始......
2019-08-04 12:35:27.802 |-DEBUG [ConsumeMessageThread_1] com.shield.txc.listener.ShieldTxcCommitListener [165] -| [ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [RECONSUME_LATER]
下游开始对事务提交消息进行消费,执行本地业务。
下游事务提交逻辑返回的状态为 [RECONSUME_LATER],重试三次后事务回滚,持久化事务回滚消息,并投递到 MQ。
下游数据库中消息记录如下:
可以看到符合我们的预期,事务提交消息消费三次后不再消费,状态更改为 [CONSUME_MAX_RECONSUMETIMES],同时投递了事务回滚消息,消息状态为 [PRODUCE_PROCESSED]。
我们回到上游应用,查看到日志如下:
2019-08-04 12:37:09.202 |-DEBUG [ConsumeMessageThread_1] com.shield.txc.listener.ShieldTxcRollbackListener [48] -| [ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId=C0A801653B9818B4AAC2122845CC0000,msgBody={"id":"113","eventType":"INSERT","txType":"ROLLBACK","eventStatus":"PRODUCE_PROCESSING","content":"{\"name\":\"23abaae6a2\"}","appId":"23abaae6a2","bizKey":"2425b011-bdf8-4a64-977a-ffa73e8c23c6"}
测试消费【回滚】ShieldTxcRollbackListener
表明上游应用消费了事务回滚消息,持久的回滚消息状态如图:
从这个完整的业务执行链路可以看到,shieldTXC 能够很好的处理分布式事务,保证分布式系统上下游数据的一致性。
如果下游消费事务提交消息返回 [CONSUME_SUCCESS],则下游直接提交事务并更改消息状态为 [CONSUME_PROCESSED],整个事务流程就结束了,这也是符合我们预期的。
5. 全文总结
分布式事务是分布式领域一个较为棘手的难题,在几年前,各种分布式事务解决方案作为大厂的技术壁垒是不会轻易对外公布的。
随着近年来开源社区的活跃,我们得以对各种解决方案一探究竟。
本文中,笔者从理论入手,辅助以代码讲解,对本地消息表方案做了一次较为深入的讲解及实现,希望本文的方案及实现思路能够对读者理解并实现分布式事务解决方案有所启发。
如果你有什么疑问和建议,也欢迎在评论区进行留言,在此先行谢过。