前言:分布式事务
在传统的数据库操作中,数据库本身就提供了ACID的相关特性操作,但是由于业务数据增大数据压力导致我们不得不将数据分到多个表中,如果还在一个库里还可以使用ACID原则,只要两个分在不同的数据库上ACID原则就不能实现,数据就会出现不一致。业务的服务化,我们在操作一个业务逻辑过程中,涉及两个或多个数据源,当A数据源出现异常,那么B数据源的操作回滚。
要掌握分布式事务需要了解以下的概念
- CAP原理
- ACID原理与BASE原理
- 基于XA协议的两阶段提交:在实际应用中因为提交过程比较复杂,如果提交中间出现失败就会导致数据库刮起状态
- 事务补偿机制(TCC),符号ACID原理
- 理解起来非常简单,实现比较复杂
- 操作多个数据库时,一个成功了一个失败了,成功的数据库要有一个补偿接口进行失败的回滚,让数据恢复到最初操作
- 基于本地消息的最终一致性(对外对接应用,直接使用http访问和数据库结合进行消息补偿),符号BASE原理
- 基于MQ消息队列的最终一致性(最后用上人工补偿),符号BASE原理
eg:recive money 收钱(支付模块)-> insert order 下订单(订单模块) ,add point 加积分(用户积分模块) ,减库存(仓储模块)
不能使用强一致性,只能基于MQ实现最终一致性,使用多线程不安全(如果线程任务失败无法重来),使用消息队列,消息消费失败进入死信对列,需要人工干预补偿
Java领域针对分布式事务的解决方案就是JTA(Java Transaction API),SpringBoot官方提供的Atomikos 和 Bitronix的两种解决思路。
业务的服务化,就是微服务,把原来单机支撑的应用服务,拆解为一块一块独立的服务。例如用户中心、订单中心、账户中心、库存中心。对于订单中心,有专门的数据库存储订单信息,用户中心也有专门的数据库存储用户信息,库存中心也会有专门的数据库存储库存信息。这就是数据切分方案上所讲的垂直切分,数据应用服务化。但往往就会产生分布式事务问题,如要同时对订单进行操作,那么就会涉及到订单数据库和账户数据库,为了保证数据一致性,就需要用到分布式事务。
1. CAP原理的解析
- C-Consistent,一致性,具体是指操作成功后,所有的节点在同一时间看到数据完全一致,所以一致性就是指数据一致性(典型是Zookeeper,它会让等待所有节点的数据一致时,才让你查询到,例如A节点往B节点同步数据,一个请求路由到B节点查询新增的数据,是查询不到它会让你等待直到B同步完数据 )
- A-Availability,可用性,指服务一致可用,在规定时间内完成响应(典型是Eureka)
- P-Partition tolerance,分区容错性,指分布式系统在遇到某个节点或网络分区故障的时候,依旧可以对外提供服务
CAP原则只能满足其中两项:CP、AP
我们在使用分布式系统的时候就是为了满足某个节点不可用的情况下,系统还能对外服务,这正是P的体现,如果服务不满足P的话就不是一个分布式系统,在分布式系统中P总是成立的
分布式系统,那么A和C能不能同时满足呢?不能
client轮询访问A和B,A —> B同步的过程中
- 能同时访问A和B,那么就是保证可用性A,但这时A和B的数据是不一样的。
- 如何要保证访问A和B都能得到一样的数据,那么就必须等待数据同步完成,等待的过程中,服务对外是不可用的,就是不能访问A也不能访问B,直到数据同步完成对外服务才可用,那么就是保证一致性C
- 所以A和C是不能同时满足的
2. ACID与BASE
在关系型数据库中,最大的特点就是事务处理,也就是ACID
- A 原子性,整个事务中的所有操作,要么全部完成,要么全部不做,没有中间状态。对于事务在执行中发生错误,所有的操作都会被回滚,整个事务就像从没被执行过一样;
- C 一致性,事务的执行必须保证系统的一致性,就拿转账为例,A有500元,B有300元,如果在一个事务里A成功给B转账50元,那么不管并发多少,不管发生什么,只要事务执行成功了,那么最后A账户一定是450元,B账户一定是350元;
- I 隔离性,事务与事务之间不会互相影响,一个事务的中间状态不会被其他事务感知;
- D 持久性,事务完成,那么事务对数据所做的变更就完全保存在数据库中,即使发生停电,系统宕机也是如此。
ACID强调的是强一致性,要么全做,要么全不做,传统的数据库都有ACID的特性,在CAP原理中,保证的是CA(可用和一致,就是说可用的时候一定是一种的数据 ),在分布式系统流行的今天,ACID在逐步向BASE转换
BASE是 Basically Availiable(基本可用),Soft-state(软状态),Eventually consistent(最终一致性)
- Basically Availiable(基本可用):分布式系统在出现故障的情况下,允许损失部分可用性,保障核心可用
- Soft-state(软状态):就是指系统允许出现中间状态,该中间状态不会影响系统的整体可用性,分布式存储中一般一份数据会有两到三个副本,允许不同副本间数据的同步延时就是软状态的体现。Mysql的主从复制也就是软状态的体现
- Eventually consistent(最终一致性):所有的数据副本经过一段时间,最终能够达到一致性,弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况
BASE模型是传统ACID模型的发面,不同与ACID,BASE强调牺牲高一致性,从而获得可用性,数据允许在一段时间内不一致,但最终保持一致就行。
3. 分布式事务的常见解决方案
本地消息表
本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,思路如下:
1、消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。 2、消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。 3、生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。
直接用RocketMQ的事务消息方案得了,它会将本地事务与发送消息绑定为一个原子性操作。
XA实现两阶段协议
什么是2PC?
2PC就是两阶段提交协议,将数据库的事务提交分为两个阶段
- P准备阶段 prepare phase
- C提交阶段 commit phase
XA方案
-
XA是由X/Open组织提出的分布式事务规范
-
整体是由一个事务管理器Transaction Manager(TM)和多个本地资源管理器Resource Manager(RM)组成
在XA规范中,RM就是数据库,TM就是应用程序,TM生成全局的txId事务Id,调用XAResource接口,把多个本地事务协调为全局统一的分布式事务。Oracle、DB2这些商业数据库都实现了XA接口,mysql数据库也支持,但性能没有前者理想。
-
提交分为两个阶段
第一阶段
TM会询问所有的数据库,如果都是ready了才commit,如果其中一个不ready所有都回滚
第二阶段
如果到了第二阶段,第一个commit了,第二个无法commit,这个时候第一个commit就不能回滚了,这个时候就需要人工干预了,进行补偿, 比如发MQ消息进行补偿,短信通知让人工操作。
可以发现在第一阶段与第二阶段都是同步按顺序执行的,效率低。
XA两阶段提交方式的总结:
-
保证数据的强一致性
-
大部分商业数据库都实现了XA接口,使用分布式事务的成本较低,但性能不理想,并发量高的场景,容易造成获取数据库资源阻塞,大量线程会阻塞在获取事务控制器TM。
-
XA方式效率比较低,性能与本地事务相差10倍
准备阶段TM给所有的数据库送指令并接受反馈(同步),提交阶段也是,自然效率就低下了。在prepare阶段需要等待所有参与子事务的反馈,因此可能造成数据库资源锁定时间过长,不适合并发高以及子事务生命周长较长的业务场景。
- commit阶段出现问题,事务出现不一致,需要人工干预回滚
- MySQL v5.7及以上版本均支持XA协议,但是支持的不太理想,mysql的XA实现,没有记录prepare阶段日志,主备切换会导致主库与备库数据不一致。在prepare阶段需要等待所有其他的本地事务反馈,因此可能造成数据库资源锁定时间过长,不适合并发高以及子事务生命周长较长的业务场景。两阶段提交这种解决方案属于牺牲了一部分可用性来换取的一致性。
- mysql-connector-java驱动v5.0以上版本支持XA协议
- Java系统中,数据源可以采用Atomikos充当TM的角色,把多个本地事务协调为全局统一的分布式事务。
SpringBoot整合Atomikos
引入atomikos 依赖包,但是它仅适合并发低的单体应用,因为两阶段协议对数据库连接的持锁时间过长,当应用访问连接数上升,就会导致较多线程阻塞在获取数据库连接上。对账模块rcc的java应用就是因为使用了atomikos数据源来解决分布式事务,访问并发上来后,经常出现查询数据库操作页面就报超时,因为连接池管理对象上存在激烈的锁竞争,后来的请求线程在获取数据库连接被阻塞了,都在等待前面的请求线程释放数据库连接。后来改为使用druid数据源,避免这个问题,分布式事务的问题就要使用其他方案,个人建议微服务应用使用阿里的Seata框架解决分布式事务。
Atomilos对池中connection的管理效率随着连接数的上升,呈现指数级的下降,测试环境中,连接最大配置为300,但实际上最大值没有超过70,线程dump发现连接池管理对象上存在激烈的锁竞争,导致很多线程等待前面的线程释放锁对象;
Atomikos从池里取得connection时,每次都会去进行select test,这个对获取连接的影响很大,取消这个测试,TPS 吞吐量大概能提高近1倍;
pom.xml导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
配置类中添加Jta分布式事务管理器,注入spring ioc
@Configuration
@EnableTransactionManagement
@AutoConfigureAfter(EnvironmentConfig.class)
public class MybatisPlusConfig {
@Bean("rccShardingSqlSessionFactory")
@DependsOn({"transactionManager"})
@Primary
public SqlSessionFactory rccShardingSqlSessionFactory(@Qualifier("rccShardingDataSource") DataSource dataSource, @Qualifier("globalConfiguration") GlobalConfig globalConfig) throws Exception {
String mapperLocations = "classpath:mapping/rcc/**/*.xml";
String typeAliasesPackage = "com.midea.mcsp.settlement.rcc.entity.rcc";
return buildSqlSessionFactory(dataSource, typeAliasesPackage, mapperLocations, globalConfig);
}
@Bean("mccShardingSqlSessionFactory")
@DependsOn({"transactionManager"})
public SqlSessionFactory mccShardingSqlSessionFactory(@Qualifier("mccShardingDataSource") DataSource dataSource, @Qualifier("globalConfiguration") GlobalConfig globalConfig) throws Exception {
String mapperLocations = "classpath:mapping/mcc/**/*.xml";
String typeAliasesPackage = "com.midea.mcsp.settlement.rcc.entity.mcc";
return buildSqlSessionFactory(dataSource, typeAliasesPackage, mapperLocations, globalConfig);
}
...
// Jta 分布式事务控制器
@Bean(name = "transactionManager")
public JtaTransactionManager transactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
UserTransaction userTransaction = new UserTransactionImp();
userTransaction.setTransactionTimeout(10000);
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
}
项目需要连接rcc、smc、mcc、ivc4个数据源,这4个数据源又使用了shardingsphere 分库分表(切分数据),所以有4个sharding的逻辑数据源
首先将数据源注入Spring容器中
@Configuration
public class EnvironmentConfig implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
public static Environment environment;
private static final List<String> modules = new ArrayList<>();
static {
modules.add("rcc");
modules.add("ivc");
modules.add("mcc");
modules.add("smc");
}
@Override
public void setEnvironment(Environment env) {
environment = env;
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
registerBeanDefinition( beanDefinitionRegistry);
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
public void registerBeanDefinition( BeanDefinitionRegistry beanDefinitionRegistry){
for(String module : modules){
String shardParam = environment.getProperty(CommonConstants.PREFIX + module + ".shard-count");
int shardCount = Objects.nonNull(shardParam) ? Integer.parseInt(shardParam) : 2;
String driverClass = environment.getProperty(CommonConstants.PREFIX + module + ".driver-class-name");
for(int i=0;i<shardCount;i++){
String read = ".read" + i + ".";
String write = ".write" + i + ".";
String readUrl = environment.getProperty(CommonConstants.PREFIX + module + read + "url");
String readUser = environment.getProperty(CommonConstants.PREFIX + module + read + "username");
String readPassWord = environment.getProperty(CommonConstants.PREFIX + module + read + "password");
String writeUrl = environment.getProperty(CommonConstants.PREFIX + module + write + "url");
String writeUser = environment.getProperty(CommonConstants.PREFIX + module + write + "username");
String writePassWord = environment.getProperty(CommonConstants.PREFIX + module + write + "password");
RootBeanDefinition readBeanDefinition = new RootBeanDefinition(DataSource.class);
configBeanDefinition(readBeanDefinition,module + "-readDataSource" + i, readUser, readPassWord, readUrl,driverClass);
RootBeanDefinition writeBeanDefinition = new RootBeanDefinition(DataSource.class);
configBeanDefinition(writeBeanDefinition,module + "-writeDataSource" + i, writeUser, writePassWord, writeUrl,driverClass);
beanDefinitionRegistry.registerBeanDefinition(module + "-readDataSource" + i, readBeanDefinition);
beanDefinitionRegistry.registerBeanDefinition(module + "-writeDataSource" + i, writeBeanDefinition);
}
// 模块的默认库写入bean定义信息注册器
String defaultDsUrl = environment.getProperty(CommonConstants.PREFIX + module + ".default-ds.url");
String defaultDsUser = environment.getProperty(CommonConstants.PREFIX + module + ".default-ds.username");
String defaultDsPassWord = environment.getProperty(CommonConstants.PREFIX + module + ".default-ds.password");
RootBeanDefinition defaultDsBeanDefinition = new RootBeanDefinition(DataSource.class);
configBeanDefinition(defaultDsBeanDefinition,module + "-defaultDataSource", defaultDsUser, defaultDsPassWord, defaultDsUrl,driverClass);
beanDefinitionRegistry.registerBeanDefinition(module + "-defaultDataSource", defaultDsBeanDefinition);
}
}
private void configBeanDefinition(RootBeanDefinition beanDefinition, String name, String user, String passWord, String url, String driverClass){
beanDefinition.setSource("EnvironmentConfig");
Properties prop = build(url, user, passWord, driverClass);
beanDefinition.getPropertyValues().add("xaProperties", prop);
beanDefinition.getPropertyValues().add("minPoolSize",3);
beanDefinition.getPropertyValues().add("maxPoolSize",30);
beanDefinition.getPropertyValues().add("maxIdleTime",0);
beanDefinition.getPropertyValues().add("testQuery","select 1");
beanDefinition.getPropertyValues().add("xaDataSourceClassName", "com.alibaba.druid.pool.xa.DruidXADataSource");
beanDefinition.setRole(2);
beanDefinition.setBeanClass(AtomikosDataSourceBean.class);
}
private Properties build(String url,String user,String passWord, String driverClass) {
Properties prop = new Properties();
prop.put("url", url);
prop.put("username", user);
prop.put("password", passWord);
prop.put("driverClassName", driverClass);
return prop;
}
}
注意这里DataSource数据源要使用AtomikosDataSourceBean,即所谓的atomikos数据源,每一个数据库连接都是atomikos数据源,atomikos框架底层就是2pc 二阶段提交,大概流程是这样的:
- 将数据源交给atomikos,atomikos将其封装成atomikos数据源
- atomikos开启atomikos全局事务
- 拿atomikos数据源做sql操作
- 第一个atomikos事务做完操作后告诉TM完成了,准备commit或rollback,这时atomikos事务是一直占用着数据库资源的直到第二阶段正常commit或rollback,然后轮到下一个atomikos数据源的事务执行操作,TM等待所有的atomikos数据源执行完事务操作后,都没有问题了,就通知每个atomikos数据源commit,注意整个过程是同步的。如果第一阶段就发生了rollback,TM就会通知前面已经执行完sql操作的RM进行rollback。
这个atomikos数据源要结合JtaTransactionManager分布式事务控制器使用
@Transactional("transactionManager")
去掉Atomikos,两阶段控制分布式事务
后来并发多一点,导致大量请求阻塞在数据库资源获取上,前端经常出现请求超时,把atomikos数据源去掉,改为本地事务控制
@Configuration
public class EnvironmentConfig implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
.....
private void configBeanDefinition(RootBeanDefinition beanDefinition, String name, String user, String passWord, String url, String driverClass){
beanDefinition.setSource("EnvironmentConfig");
// minEvictableIdleTimeMillis 会话连接保持空闲而不被关闭的最小时间,默认30分钟
// maxEvictableIdleTimeMillis 会话连接保持空闲而不被关闭的最大时间,默认7小时
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.add("url", url).add("username", user).add("password", passWord)
.add("name",name)
.add("driverClassName", driverClass)
.add("initialSize",3)
.add("minIdle",3)
.add("maxActive",20)
.add("minEvictableIdleTimeMillis",1000*60*5) // 空闲连接最小5分钟后才关闭
.add("maxEvictableIdleTimeMillis",1000L*60L*60L) // 空闲连接最大60分钟后才关闭
.add("validationQuery","select 1")
.add("removeAbandoned",true)
.add("removeAbandonedTimeout",3600) // 60*60秒,从连接池获取的连接如果超过这个时间不归还到连接池,
// 就可能被垃圾回收器回收,避免连接泄漏,如果业务超过这个时间还没有处理完,可以适当延长这个时间
.add("logAbandoned",true)
;
beanDefinition.setPropertyValues(propertyValues);
beanDefinition.setBeanClass(DruidDataSource.class);
}
}
使用DruidDataSource
替换AtomikosDataSourceBean
@Configuration
@EnableTransactionManagement
@AutoConfigureAfter(EnvironmentConfig.class)
public class MybatisPlusConfig {
@Bean("rccShardingSqlSessionFactory")
@Primary
public SqlSessionFactory rccShardingSqlSessionFactory(@Qualifier("rccShardingDataSource") DataSource dataSource, @Qualifier("globalConfiguration") GlobalConfig globalConfig) throws Exception {
String mapperLocations = "classpath:mapping/rcc/**/*.xml";
String typeAliasesPackage = "com.midea.mcsp.settlement.rcc.entity.rcc";
return buildSqlSessionFactory(dataSource, typeAliasesPackage, mapperLocations, globalConfig);
}
@Bean("smcShardingSqlSessionFactory")
public SqlSessionFactory smcShardingSqlSessionFactory(@Qualifier("smcShardingDataSource") DataSource dataSource, @Qualifier("globalConfiguration") GlobalConfig globalConfig) throws Exception {
String mapperLocations = "classpath:mapping/smc/**/*.xml";
String typeAliasesPackage = "com.midea.mcsp.settlement.rcc.entity.smc";
return buildSqlSessionFactory(dataSource, typeAliasesPackage, mapperLocations, globalConfig);
}
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
UserTransaction userTransaction = new UserTransactionImp();
userTransaction.setTransactionTimeout(10000);
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
@Bean
@Primary
public PlatformTransactionManager rccManager(@Qualifier("rccShardingDataSource") DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}
@Bean
public PlatformTransactionManager smcManager(@Qualifier("smcShardingDataSource") DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}
@Bean
public PlatformTransactionManager ivcManager(@Qualifier("ivcShardingDataSource") DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}
@Bean
public PlatformTransactionManager mccManager(@Qualifier("mccShardingDataSource") DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}
}
4个数据源都实例化一个事务控制器DataSourceTransactionManager
,默认的本地事务控制器是rccManager
// 多数据源的情况下,必须要指定事务控制器
@Override
@Transactional(transactionManager = "rccManager", rollbackFor = Exception.class)
public void addWrtOffCstmRule(CommonRequest<AddWrtOffCstmRuleHeadReqDTO> request) {
...
}
事务消息最终一致性
事务消息作为一种异步确保型事务, 将两个事务分支通过MQ进行异步解耦,事务消息的设计流程同样借鉴了两阶段提交理论。
第三方的MQ是支持事务消息的,比如RocketMQ,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。流程如下:
- 事务发起方首先发送prepare消息到MQ。
- 在发送prepare消息成功后执行本地事务。
- 根据本地事务执行结果返回commit或者是rollback。
- 如果消息是rollback,MQ将删除该prepare消息不进行下发,如果是commit消息,MQ将会把这个消息发送给consumer端。
- 如果执行本地事务过程中,执行端挂掉,或者超时,MQ将会不停的询问其同组的其它producer来获取状态。
- Consumer端的消费成功机制有MQ保证。
两阶段提交基于MQ的解耦,那么就可以应用在高并发场景,将一个分布式事务拆成一个消息事务(A系统的本地操作+发消息)+B系统的本地操作,其中B系统的操作由消息驱动,只要消息事务成功,那么A操作一定成功,消息也一定发出来了,这时候B会收到消息去执行本地操作,如果本地操作失败,消息要重投(MQ的ACK确认需要设置为手动确认),直到B操作成功,这样就变相地实现了A与B的分布式事务
TCC 事务补偿机制
Try Confirm Cancel(TCC)是ACID的,保证强一致性 ,操作多个数据库时,一个成功了一个失败了,成功的数据库要有一个补偿接口进行失败的回滚,让数据恢复到最初操作。
其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。TCC模型是把锁的粒度完全交给业务处理。它分为三个阶段:
- Try 阶段主要是对业务系统做检测及资源预留
- Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
- Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
特点:
TCC模型对业务的侵入强,改造的难度大。
- TCC中的每个服务都需要开发对应的Try 、confirm、cancel API接口
- TCC整体性能高,锁加在了服务内部,并不是一个全局锁,锁的粒度小,有锁马上就释放了,提高了吞吐,不像2PC那样,并发上来,吞吐很低
- 通常情况下,采用TCC则认为Confirm阶段是不会出错的。即 :只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
- 通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
- 由于Confirm和Cancel失败需进行重试,因此需要实现为幂等性是指同一个操作无论请求多少次,其结果都相同。
TCC框架 : ByteTCC
Seata框架解决分布式事务
现在最新版本是1.4.2
消息队列与可靠消息服务方案
参考: