Maison >Java >javaDidacticiel >Comment implémenter les transactions distribuées Java Spring Boot ?
Tout d'abord, qu'est-ce qu'une transaction distribuée ? Par exemple, lorsque nous exécutons une logique métier, nous avons deux étapes pour faire fonctionner respectivement la source de données A et la source de données B lorsque nous effectuons des modifications de données dans les données A. source, dans les données B Si une exception d'exécution se produit lors de l'exécution de la source, nous devons alors annuler le fonctionnement de la source de données B et annuler le fonctionnement de la source de données A, cette situation se produit souvent dans le secteur des paiements ; l'entreprise d'achat de billets ne parvient pas à payer à la fin.Ensuite, toutes les opérations précédentes doivent être annulées.Si les opérations précédentes sont distribuées dans plusieurs sources de données, il s'agit d'une annulation de transaction distribuée typique ; résoudre les transactions distribuées en Java La solution est JTA (Java Transaction API) ; springboot fournit officiellement les solutions Atomikos ou Bitronix
En fait, dans la plupart des cas, de nombreuses entreprises utilisent des files d'attente de messages pour mettre en œuvre des transactions distribuées ;
Cet article se concentre sur l'intégration d'Atomikos +mysql+mybatis+tomcat/jetty dans l'environnement springboot ;1 Dépendances du projetAjoutez les dépendances liées au springboot d'atomikos dans pom.xml :
<!--分布式事务-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
Cliquez sur Vous le ferez. constatez qu'il est intégré : transactions-jms
, transactions-jta
, transactions-jdbc
, javax.transaction-api
code>
2. Configuration de la source de donnéestransactions-jms
、transactions-jta
、transactions-jdbc
、javax.transaction-api
把数据源的相关配置项单独提炼到一个application.yml中:
注意:
这回我们的spring.datasource.type
是com.alibaba.druid.pool.xa.DruidXADataSource;
spring.jta.transaction-manager-id
的值在你的电脑中是唯一的,这个详细请阅读官方文档;
完整的yml文件如下:
spring: datasource: type: com.alibaba.druid.pool.xa.DruidXADataSource druid: systemDB: name: systemDB url: jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8 username: root password: root # 下面为连接池的补充设置,应用到上面所有数据源中 # 初始化大小,最小,最大 initialSize: 5 minIdle: 5 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 30 validationQuery: SELECT 1 validationQueryTimeout: 10000 testWhileIdle: true testOnBorrow: false testOnReturn: false # 打开PSCache,并且指定每个连接上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 filters: stat,wall # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 合并多个DruidDataSource的监控数据 useGlobalDataSourceStat: true businessDB: name: businessDB url: jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8 username: root password: root # 下面为连接池的补充设置,应用到上面所有数据源中 # 初始化大小,最小,最大 initialSize: 5 minIdle: 5 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 30 validationQuery: SELECT 1 validationQueryTimeout: 10000 testWhileIdle: true testOnBorrow: false testOnReturn: false # 打开PSCache,并且指定每个连接上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 filters: stat,wall # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 合并多个DruidDataSource的监控数据 useGlobalDataSourceStat: true #jta相关参数配置 jta: log-dir: classpath:tx-logs transaction-manager-id: txManager
在DruidConfig.java中实现多个数据源的注册;分布式事务管理器的注册;druid的注册
package com.zjt.config; import com.alibaba.druid.filter.stat.StatFilter; import com.alibaba.druid.support.http.StatViewServlet; import com.alibaba.druid.support.http.WebStatFilter; import com.alibaba.druid.wall.WallConfig; import com.alibaba.druid.wall.WallFilter; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.boot.web.servlet.ServletRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.env.Environment; import org.springframework.transaction.jta.JtaTransactionManager; import javax.sql.DataSource; import javax.transaction.UserTransaction; import java.util.Properties; /** * Druid配置 * * */ @Configuration public class DruidConfig { @Bean(name = "systemDataSource") @Primary @Autowired public DataSource systemDataSource(Environment env){ AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); Properties prop = build(env, "spring.datasource.druid.systemDB."); ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); ds.setUniqueResourceName("systemDB"); ds.setPoolSize(5); ds.setXaProperties(prop); return ds; } @Autowired @Bean(name = "businessDataSource") public AtomikosDataSourceBean businessDataSource(Environment env){ AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); Properties prop = build(env, "spring.datasource.druid.businessDB."); ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); ds.setUniqueResourceName("businessDB"); ds.setPoolSize(5); ds.setXaProperties(prop); return ds; } /** * 注入事物管理器 * @return */ @Bean(name = "xatx") public JtaTransactionManager regTransactionManager (){ UserTransactionManager userTransactionManager = new UserTransactionManager(); UserTransaction userTransaction = new UserTransactionImp(); return new JtaTransactionManager(userTransaction, userTransactionManager); } private Properties build(Environment env, String prefix){ Properties prop = new Properties(); prop.put("url", env.getProperty(prefix + "url")); prop.put("username", env.getProperty(prefix + "username")); prop.put("password", env.getProperty(prefix + "password")); prop.put("driverClassName", env.getProperty(prefix + "driverClassName", "")); prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class)); prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class)); prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class)); prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class)); prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class)); prop.put("maxPoolPreparedStatementPerConnectionSize", env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class)); prop.put("maxPoolPreparedStatementPerConnectionSize", env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class)); prop.put("validationQuery", env.getProperty(prefix + "validationQuery")); prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class)); prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class)); prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class)); prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class)); prop.put("timeBetweenEvictionRunsMillis", env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class)); prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class)); prop.put("filters", env.getProperty(prefix + "filters")); return prop; } @Bean public ServletRegistrationBean druidServlet(){ ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*"); //控制台管理用户,加入下面2行 进入druid后台就需要登录 //servletRegistrationBean.addInitParameter("loginUsername", "admin"); //servletRegistrationBean.addInitParameter("loginPassword", "admin"); return servletRegistrationBean; } @Bean public FilterRegistrationBean filterRegistrationBean(){ FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(); filterRegistrationBean.setFilter(new WebStatFilter()); filterRegistrationBean.addUrlPatterns("/*"); filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"); filterRegistrationBean.addInitParameter("profileEnable", "true"); return filterRegistrationBean; } @Bean public StatFilter statFilter(){ StatFilter statFilter = new StatFilter(); statFilter.setLogSlowSql(true); //slowSqlMillis用来配置SQL慢的标准,执行时间超过slowSqlMillis的就是慢。 statFilter.setMergeSql(true); //SQL合并配置 statFilter.setSlowSqlMillis(1000);//slowSqlMillis的缺省值为3000,也就是3秒。 return statFilter; } @Bean public WallFilter wallFilter(){ WallFilter wallFilter = new WallFilter(); //允许执行多条SQL WallConfig config = new WallConfig(); config.setMultiStatementAllow(true); wallFilter.setConfig(config); return wallFilter; } }
分别配置每个数据源对应的sqlSessionFactory,以及MapperScan扫描的包
MybatisDatasourceConfig.java
package com.zjt.config; import com.zjt.util.MyMapper; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; /** * * @description */ @Configuration // 精确到 mapper 目录,以便跟其他数据源隔离 @MapperScan(basePackages = "com.zjt.mapper", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory") public class MybatisDatasourceConfig { @Autowired @Qualifier("systemDataSource") private DataSource ds; @Bean public SqlSessionFactory sqlSessionFactory() throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(ds); //指定mapper xml目录 ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml")); return factoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate() throws Exception { SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // 使用上面配置的Factory return template; } //关于事务管理器,不管是JPA还是JDBC等都实现自接口 PlatformTransactionManager // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。 //在Spring容器中,我们手工注解@Bean 将被优先加载,框架不会重新实例化其他的 PlatformTransactionManager 实现类。 /*@Bean(name = "transactionManager") @Primary public DataSourceTransactionManager masterTransactionManager() { //MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源 // 与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。 return new DataSourceTransactionManager(ds); }*/ }
MybatisDatasource2Config.java
package com.zjt.config; import com.zjt.util.MyMapper; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; /** * * @description */ @Configuration // 精确到 mapper 目录,以便跟其他数据源隔离 @MapperScan(basePackages = "com.zjt.mapper2", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory2") public class MybatisDatasource2Config { @Autowired @Qualifier("businessDataSource") private DataSource ds; @Bean public SqlSessionFactory sqlSessionFactory2() throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(ds); //指定mapper xml目录 ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml")); return factoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate2() throws Exception { SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // 使用上面配置的Factory return template; } //关于事务管理器,不管是JPA还是JDBC等都实现自接口 PlatformTransactionManager // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。 //在Spring容器中,我们手工注解@Bean 将被优先加载,框架不会重新实例化其他的 PlatformTransactionManager 实现类。 /*@Bean(name = "transactionManager2") @Primary public DataSourceTransactionManager masterTransactionManager() { //MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源 // 与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。 return new DataSourceTransactionManager(ds); }*/ }
由于我们本例中只使用一个事务管理器:xatx,故就不在使用TxAdviceInterceptor.java
和TxAdvice2Interceptor.java
中配置的事务管理器了;有需求的童鞋可以自己配置其他的事务管理器;(见DruidConfig.java中查看)
新建分布式业务测试接口JtaTestService.java和实现类JtaTestServiceImpl.java
其实就是一个很简单的test01()方法,在该方法中我们分别先后调用classService.saveOrUpdateTClass(tClass);
和teacherService.saveOrUpdateTeacher(teacher);
实现先后操作两个数据源:然后我们可以自己debug跟踪事务的提交时机,此外,也可以在在两个方法全执行结束之后,手动制造一个运行时异常,来检查分布式事务是否全部回滚;
注意:
在实现类的方法中我使用的是:
@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })从而指定了使用哪个事务管理器,事务隔离级别(一般都用我这个默认的),回滚的条件(一般可以使用Exception),这三个可以自己根据业务实际修改;
package com.zjt.service3; import java.util.Map; public interface JtaTestService { public Map<String,Object> test01(); }
package com.zjt.service3.impl; import com.zjt.entity.TClass; import com.zjt.entity.Teacher; import com.zjt.service.TClassService; import com.zjt.service2.TeacherService; import com.zjt.service3.JtaTestService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.LinkedHashMap; import java.util.Map; @Service("jtaTestServiceImpl") public class JtaTestServiceImpl implements JtaTestService{ @Autowired @Qualifier("teacherServiceImpl") private TeacherService teacherService; @Autowired @Qualifier("tclassServiceImpl") private TClassService tclassService; @Override @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class }) public Map<String, Object> test01() { LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>(); TClass tClass=new TClass(); tClass.setName("8888"); tclassService.saveOrUpdateTClass(tClass); Teacher teacher=new Teacher(); teacher.setName("8888"); teacherService.saveOrUpdateTeacher(teacher); System.out.println(1/0); resultMap.put("state","success"); resultMap.put("message","分布式事务同步成功"); return resultMap; } }
建立JtaTestContoller.java,接受一个来自前端的http请求,触发JtaTestService 的test01方法
package com.zjt.web; import com.zjt.service3.JtaTestService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.LinkedHashMap; import java.util.Map; @Controller @RequestMapping("/jtaTest") public class JtaTestContoller { @Autowired @Qualifier("jtaTestServiceImpl") private JtaTestService taTestService; @ResponseBody @RequestMapping("/test01") public Map<String,Object> test01(){ LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>(); try { return taTestService.test01(); }catch (Exception e){ resultMap.put("state","fail"); resultMap.put("message","分布式事务同步失败"); return resultMap; } } }
//分布式事务测试 $("#JTATest").click(function(){ $.ajax({ type: "POST", url: "${basePath!}/jtaTest/test01", data: {} , async: false, error: function (request) { layer.alert("与服务器连接失败/(ㄒoㄒ)/~~"); return false; }, success: function (data) { if (data.state == 'fail') { layer.alert(data.message); return false; }else if(data.state == 'success'){ layer.alert(data.message); } } }); }); <button class="layui-btn" id="JTATest">同时向班级和老师表插入名为8888的班级和老师</button>
点击这个按钮,跳转到controller:
当正常执行了sql语句之后,我们可以发现数据库并没有变化,因为整个方法的事务还没有走完,当我们走到1/0这步时:
抛出运行时异常,并被spring事务拦截器拦截,并捕获异常:
在this.completeTransactionAfterThrowing(txInfo, var16);
Extraire les éléments de configuration pertinents de la source de données dans un application.yml séparément :
Remarque :
🎜spring.datasource.type
est com.alibaba.druid.pool.xa.DruidXADataSource ;
🎜 La valeur de spring.jta.transaction-manager-id
est unique sur votre ordinateur. Veuillez lire la documentation officielle pour plus de détails 🎜TxAdviceInterceptor.java code> et <code>TxAdvice2Interceptor.java
sont configurés dans le gestionnaire de transactions ; ceux qui en ont besoin peuvent configurer eux-mêmes d'autres gestionnaires de transactions (voir DruidConfig.java pour voir) 🎜🎜 5. Interface de test 🎜🎜🎜Créer un nouvelle interface de test métier distribuée JtaTestService.java et classe d'implémentation JtaTestServiceImpl.java🎜🎜🎜En fait, il s'agit d'une méthode test01() très simple, dans laquelle nous appelons classService.saveOrUpdateTClass(tClass) ; code>teacherService.saveOrUpdateTeacher(teacher);
🎜🎜 réaliser le fonctionnement de deux sources de données successivement : nous pouvons ensuite déboguer et suivre nous-mêmes le moment de soumission de la transaction. De plus, nous pouvons également utiliser les deux méthodes pour terminer. la transaction.Après l'exécution, créez manuellement une exception d'exécution pour vérifier si toutes les transactions distribuées sont annulées 🎜🎜🎜Remarque : 🎜🎜🎜🎜Dans la méthode d'implémentation de la classe, j'utilise : 🎜🎜rrreeerrreeerrreee🎜 6. Créez JtaTestContoller. java🎜🎜🎜Créez JtaTestContoller.java, acceptez une requête http depuis le front-end et déclenchez la méthode test01 de JtaTestService🎜🎜rrreee🎜Seven Ajoutez un bouton à test.ftl pour tester🎜rrreee🎜Eight. les résultats🎜🎜 🎜Cliquez sur ce bouton pour accéder au contrôleur : 🎜🎜🎜🎜🎜Après avoir exécuté l'instruction sql normalement, nous pouvons constater que la base de données n'a pas changé, car la transaction de l'ensemble de la méthode n'est pas encore terminée. Lorsque nous atteignons l'étape 1/0 : 🎜🎜🎜🎜🎜lance un runtime exception et est bloqué par spring L'intercepteur de transaction intercepte et capture l'exception : 🎜🎜🎜🎜🎜 annulera toutes les transactions dans la méthode this.completeTransactionAfterThrowing(txInfo, var16);
: 🎜🎜🎜22:09:04.243 connexion [http-nio -8080-exec- 5] INFO c.a.i.imp.CompositeTransactionImp - rollback() effectué de la transaction 192.168.1.103.tm0000400006🎜🎜🎜À ce moment, lorsque nous ouvrons à nouveau la vérification de la base de données, il n'y a toujours aucun changement, prouvant que la transaction distribuée la configuration est réussie. 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!