一 引言
在经典的CAP理论中一致性是指分布式或多副本系统中数据在任一时刻均保持逻辑与物理状态的统一,这是确保业务逻辑正确性和系统可靠性的核心要素。在单体应用单一数据库中可以直接通过本地事务(ACID)保证数据的强一致性。
然而随着微服务架构的普及和业务场景的复杂化,原来的原子性操作会随着系统拆分而无法保障原子性从而产生一致性问题,但业务实际又需要保障一致性,为此BASE理论提出了最终一致性来解决这类问题。那么如何在跨服务、跨数据库的事务中保证数据最终一致性。
二 CAP理论与BASE理论
在经典的CAP理论中提到一个分布式系统中,一致性(C)、可用性(A)、分区容错性(P)最多只能同时实现两点,不可能三者兼顾。实际上这是一个伪命题,必须从 A 和 C 选择一个和 P 组合,更进一步基本上都会选择 A,相比一致性,系统一旦不可用或不可靠都可能会造成整个站点崩溃,所以一般都会选择 AP。
BASE理论源于对大规模互联网分布式系统实践的总结,作为CAP定理中一致性与可用性矛盾的实践性补充逐步演化形成。该理论主张在无法保证强一致性的场景下,系统可基于业务特性灵活调整架构设计,通过基本可用性保障、允许短暂中间状态等机制,确保数据最终达成一致性状态,从而在分布式环境中实现可靠服务能力与业务需求的平衡。
三 一致性失效场景及其解决方案
这里有一个简化的仓库上架的流程(在实际业务中可能还会涉及到履约,仓储库存等等),体现分布式系统中可能出现的一致性问题,在分布式系统中的处理流程可能如下所示:
1.操作员操作商品仓库上架
商品在仓储系统(WMS)中上架,写入仓储数据库
仓储系统通知中央库存系统(SCI)添加可用库存
仓储系统通知交易该商品可以进行售卖
多服务协作交互示例
简化代码示例:
@Transactionalpublic void upper(upperRequest request) {
// 1. 写入仓储数据库UpperDo upperDo = buildUpperDo(request);wmsService.upper(upperDo);
// 2. 调用rpc添加中央库存系统库存SciAInventoryRequest sciInventoryRequest = buildSciAInventoryRequest(request);sciRpcService.addInventory(sciInventoryRequest)
// 3. 发送商品可以售卖的消息TradeMessageRequest tradeMessage = buildTradeMessageRequest(request);sendMessageToDealings(tradeMessage);
// 4. 其他处理recordLog(buildLogRequest(request))return;}
整个时序逻辑拆解到事务层面执行流程如下:
发送消息
在第5步添加sci库存之前任意一步出现问题,事务都会回滚,对其他系统的影响为0,所以不存在一致性问题。
但是,在此之后出现问题都有可能会出现事务问题。
调用写RPC
在分布式系统中,调用RPC一般可以分为着两类:
1.读RPC:当前数据结构不完整,需要通过其他服务补充数据,对其他服务无影响。
2.写RPC:当前业务操作、数据变更需要通知其他服务,对其他服务有影响。
调用写RPC添加sci可用库存可能出现的问题:
调用处理成功,返回成功。【数据一致】
调用处理成功,返回失败。【数据不一致】
对于这种情况,最简单的做法是直接操作重试,但是需要下游幂等处理,保证同样的请求效果一致。这里重试的方式,即重新操作上架,此外也可以直接在rpc方法中异步重试机制(这种方式不会阻塞整体流程,但是增大了数据不一致的风险)。如果重试失败可能需要研发介入排查具体失败的原因(对于写RPC的接口超时问题,需要研发关注,配置告警或抛出特定异常等)。
针对RPC方法重试,可以考虑采用本地消息表的方式实现,具体参考3.3.本地消息表。
消息发送
写RPC调用成功后,会给trade服务发送消息,而后提交事务,整个流程结束。
Rocket消息发送有多种方式,不同的方式适用场景不一,一般业务逻辑使用同步发送消息配合重试机制即可,对于一致性要求高的场景,可以考虑事务消息确保消息与本地事务的原子性。
同步消息+重试
同步消息比异步消息更可靠,比事务消息性能更高是一种广泛采用的方式。
同步消息通过confirm机制能保证消息发送成功:生产者发送同步消息后,等待Broker返回确认结果(SendResult)。如果 Broker 成功接收并存储消息,返回成功状态;否则返回失败状态。消息发送失败时,Rocket默认自动重试2次,支持手动设置,提高消息发送的可靠性。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setRetryTimesWhenSendFailed(3); // 设置重试次数为 3 次producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());SendResult sendResult = producer.send(msg); // 同步发送if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("Send Success: " + sendResult);} else {log.warn("Send Failed: " + sendResult);}
同步消息+重试机制能尽可能的保证消息成功发送,但是在这种情况下仍可能出现一致性问题:消息成功发送,在提交事务之前,依然可能出现问题(第8步出现问题),导致事务回滚,但是下游的消息是无法回滚的。
为此在RocketMQ中提供了事务消息作为一种解决方案。
RocketMQ事务消息
RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
发送事物消息
Rocket的事务消息可以确保消息和本地事务的原子性,但是实现起来很复杂,性能也比较低,特别是需要实现回查本地事务状态,这是一个比较复杂的问题,需要case by case,每一个消息都需要单独写逻辑,还必须确保消息体中的数据支持回查本地事务状态,对代码入侵度较高。
在笔者的了解中我司事务消息的使用情况不多,对于低并发且强一致性的场景可以考虑使用这种方式。在这个业务场景中使用事务消息可以解决3.2.1中出现的消息发送成功但事务回滚的问题,但是这个场景使用这种方式并不太合适。最终结果可能是整体数据一致性提升2%-3%,但是业务性能下降20%-30%。
spring提供给了一种事件发布-订阅机制可以解决事务回滚但消息依然发送成功的问题,并且性能损失几乎可以忽略。
事务事件+同步消息
事务事件是指在事务执行的不同阶段触发的事件。这些事件通常用于处理次要逻辑,例如发送领域事件、消息或者邮件等。
spring通过事务管理@Transactional和事件发布机制ApplicationEventPublisher,可以实现类似事务事件的功能。事件发布后事件广播器(SimpleApplicationEventMulticaster)接收事件,根据事件类型匹配所有的监听者(getApplicationListeners)。
@Servicepublic class wmsService {@Autowiredprivate ApplicationEventPublisher eventPublisher;
@Transactionalpublic void upper(upperRequest request) {
// 1. 写入仓储数据库UpperDo upperDo = buildUpperDo(request);wmsService.upper(upperDo);
// 3. 发布上架事件UpperFinishEvent upperFinishEvent = buildUpperFinishEvent(request)eventPublisher.publishEvent(upperFinishEvent);return;}}
@Componentpublic class upperFinishEventListener {@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void handleUpperFinishEvent(UpperFinishEvent event) {// 处理事件
// 1. 调用rpc添加中央库存系统库存SciAInventoryRequest sciInventoryRequest = buildSciAInventoryRequest(event);sciRpcService.addInventory(sciInventoryRequest)
// 2. 发送商品可以售卖的消息TradeMessageRequest tradeMessage = buildTradeMessageRequest(event);sendMessageToDealings(tradeMessage);
// 2. 其他处理recordLog(buildLogRequest(event))}}
上述流程在写完DB,调用写RPC之后,发布上架完成的事件并提交事务。upperFinishEventListener订阅上架完成的事件,并发送可以售卖的消息。
通过这种方式可以在事务提交之后再发送消息。通过事务事件保证事务提交,通过重试机制和confirm机制确保生产者发送消息成功。
本地消息表
在上述过程中我们选择使用事务事件+同步消息可以来替代事务消息,但是事务事件对RPC调用并不太友好,本地事务提交之后,调用写RPC就一定要成功,不然一致性问题就无法保证。
为此可以考虑使用本地消息表这个方案:将需要分布式处理的事件通过本地消息日志存储的方式来异步执行,通过异步线程或者自动Job发起重试,确保上下游一致。
将上述流程抽象为代码可以实现一个一致性框架,通过注解实现无侵入、策略化、通用性和高复用性的能力。然后本地消息表的方式仍然存在一些问题:
高并发场景不适用,写本地消息会带来延迟可能出现数据积压,影响系统的吞吐量。
业务逻辑过程会长时间的占用事务,造成大事务问题。
本地消息报文巨大,难以存储等。
四 总结
本文分析的场景都是解决生产者端的一致性问题。结合部分场景探讨不同方式的优缺点。
事务事件+普通消息&重试 :适合对实时一致性要求不高、需要异步处理的场景、适合高并发场景,可靠性一般,实现简单但需手动处理重试和幂等性。
事务消息 :适合一致性要求较高的场景(如金融交易),性能较低,实现复杂但能确保消息与事务的原子性。
本地消息表 :适合跨服务事务、异步任务处理和最终一致性场景,高并发场景可能出现数据积压,实现简单且可靠性高,但存在延迟性和资源占用问题。
在分布式系统中,很难有能100%保证一致性的方案,正如《人月神话》中说的“没有不存在缺陷的软件,只是尚未发现缺陷”。
在上面提到的各种方案中,笔者所在团队高并发场景很少,所以一般都采用本地详细表的方式来处理一致性问题,这既可以处理写RPC的调用问题,也能通过消息状态显示的统一失败情况,统一进行重试。