我們在實際專案中,盡量規避分散式交易。 但是,有些時候是真的需要做一些服務拆分從而會引出分散式事務問題。
同時,分散式事務也是面試中市場被問到,可以拿著這個案例練練手,面試就可以說上個123了。
這裡舉個業務栗子:用戶領取優惠券,需要扣減用戶領取次數,然後記錄一個用戶領取優惠券記錄。
#原本這裡可以使用訊息佇列方式,採用非同步化去新增使用者領取記錄。但是,這裡需求是就是需要用戶領完立刻就能查看到自己的領取記錄,那我們這裡就引入了Atomikos
來實現分散式事務問題。
#分散式交易是指跨越多個電腦或資料庫的事務,這些電腦或資料庫之間可能存在網路延遲、故障或不一致的情況。分散式事務需要確保所有操作的原子性、一致性、隔離性和持久性,以確保資料的正確性和完整性。
分散式事務協定主要有兩種:2PC(Two-Phase Commit)和3PC(Three-Phase Commit)。
2PC是目前最常用的分散式事務協議,其流程分為兩個階段:準備階段和提交階段。在準備階段,事務協調者向所有參與者發出準備請求,參與者將本地事務執行到prepare狀態,並將prepare結果回傳給事務協調者。在提交階段,如果所有參與者都執行成功,則事務協調者向所有參與者發出提交請求,參與者將本地事務提交,否則事務協調者向所有參與者發出回滾請求,參與者將本地事務回滾。
3PC是2PC的改良版,其在2PC的基礎上增加了一個準備提交階段。在準備提交階段,協調者向參與者詢問是否可以提交,如果參與者返回同意,則在提交階段直接提交,否則在提交階段回滾。
分散式事務解決方案有:
JTA(Java Transaction API),是J2EE的程式介面規範,它是XA協定的JAVA實作。它主要定義了:
一個事務管理器的介面javax.transaction.TransactionManager
,定義了有關事務的開始、提交、撤回等>操作。
一個符合XA規範的資源定義介面javax.transaction.xa.XAResource
,一個資源如果要支援JTA事務,就需要讓它的資源實作該XAResource
接口,並實作該接口定義的兩階段提交相關的接口。
如果我們有一個應用,它使用JTA介面實現事務,應用在運行的時候,就需要一個實作JTA的容器,一般情況下,這是一個J2EE容器,像JBoss,Websphere等應用伺服器。
但是,也有一些獨立的框架實作了JTA,例如Atomikos, bitronix都提供了jar包方式的JTA實作框架。這樣我們就能夠在Tomcat或Jetty之類的伺服器上運行使用JTA實作事務的應用系統。
在上面的本地事務和外部事務的區別中說到,JTA事務是外部事務,可以用來實現對多個資源的事務性。它正是透過每個資源實現的XAResource
來進行兩階段提交的控制。有興趣的同學可以看看這個介面的方法,除了commit, rollback等方法以外,還有end()
, forget()
, isSameRM()
, prepare()
等等。光從這些介面就能夠想像JTA在實現兩階段事務的複雜性。
XA是由X/Open組織提出的分散式事務的架構(或稱為協定)。 XA架構主要定義了(全域)事務管理器(Transaction Manager)和(局部)資源管理器(Resource Manager)之間的介面。 XA介面是雙向的系統接口,在事務管理器(Transaction Manager)以及一個或多個資源管理器(Resource Manager)之間形成通訊橋樑。也就是說,在基於XA的一個事務中,我們可以針對多個資源進行事務管理,例如一個系統存取多個資料庫,或即存取資料庫、又存取像訊息中間件這樣的資源。這樣我們就能夠實現在多個資料庫和訊息中間件直接實現全部提交、或全部取消的事務。 XA規範不是java的規範,而是一種通用的規範, 目前各種資料庫、以及許多訊息中間件都支援XA規範。
JTA是滿足XA規格的、用於Java開發的規格。所以,當我們說,使用JTA實現分散式事務的時候,其實是說,使用JTA規範,實現系統內多個資料庫、訊息中間件等資源的事務。
Atomikos是一個非常受歡迎的開源事務管理器,並且可以嵌入到你的Spring Boot應用中。 Tomcat應用程式伺服器沒有實作JTA規範,當使用Tomcat作為應用程式伺服器的時候,需要使用第三方的事務管理器類別來作為全域的事務管理器,而Atomikos框架就是這個作用,將事務管理整合到應用程式中,而不依賴application server。
說一堆的理論沒什麼用,show me the code。
技術堆疊:Spring Boot MyBatis Atomikos MySQL
#如果你依照本文程式碼,注意你的mysql版本。
先建好兩個資料庫(my-db_0和my-db_1),然後每個庫裡各建一張表。
資料庫my-db_0中:
CREATE TABLE `t_user_0` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `age` int NOT NULL, `gender` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
資料庫my-db_1中:
CREATE TABLE `t_user_1` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `age` int NOT NULL, `gender` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
這裡只是為了示範分散式事務,不用在意表的具體意義。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tian</groupId> <artifactId>spring-boot-atomikos</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <name>spring-boot-atomikos</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- mybatis依赖 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--分布式事务--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> </dependencies> <build> <plugins> <!-- 要使生成的jar可运行,需要加入此插件 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> <resources> <resource> <directory>src/main/java</directory> <excludes> <exclude>**/*.java</exclude> </excludes> </resource> <resource> <!-- 编译xml文件 --> <directory>src/main/resources</directory> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
server.port=9001 spring.application.name=atomikos-demo spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true spring.datasource.user0.user=root spring.datasource.user0.password=123456 spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true spring.datasource.user1.user=root spring.datasource.user1.password=123456 mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml mybatis.typeAliasesPackage=com.tian.entity mybatis.configuration.cache-enabled=true
/** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 配置好两个数据源 */ @Configuration public class DataSourceConfig { // 将这个对象放入spring容器中(交给Spring管理) @Bean // 读取 application.yml 中的配置参数映射成为一个对象 @ConfigurationProperties(prefix = "spring.datasource.user0") public XADataSource getDataSource0() { // 创建XA连接池 return new MysqlXADataSource(); } /** * 创建Atomikos数据源 * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean */ @Bean @DependsOn("getDataSource0") @Primary public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) { //这里的AtomikosDataSourceBean使用的是spring提供的 AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource); atomikosDataSourceBean.setMaxPoolSize(20); return atomikosDataSourceBean; } @Bean @ConfigurationProperties(prefix = "spring.datasource.user1") public XADataSource getDataSource1() { // 创建XA连接池 return new MysqlXADataSource(); } @Bean @DependsOn("getDataSource1") public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) { //这里的AtomikosDataSourceBean使用的是spring提供的 AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource); return atomikosDataSourceBean; } }
@Configuration @MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate") public class MybatisPreConfig { @Autowired @Qualifier("dataSourcePre") private DataSource dataSource; /** * 创建 SqlSessionFactory */ @Bean @Primary public SqlSessionFactory preSqlSessionFactory() throws Exception{ SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); bean.setMapperLocations(new PathMatchingResourcePatternResolver(). getResources("classpath*:com/tian/mapper/user0/*.xml")); return bean.getObject(); } /** * 通过 SqlSessionFactory 来创建 SqlSessionTemplate */ @Bean @Primary public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用 return new SqlSessionTemplate(sqlSessionFactory); } }
("classpath*:com/tian/mapper/user1/*.xml")
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.tian.mapper.user0.User0Mapper"> <!-- --> <cache eviction="LRU" flushInterval="10000" size="1024" /> <resultMap id="BaseResultMap" type="com.tian.entity.User0"> <id column="id" jdbcType="BIGINT" property="id" /> <result column="user_name" jdbcType="VARCHAR" property="userName" /> <result column="age" jdbcType="INTEGER" property="age" /> <result column="gender" jdbcType="INTEGER" property="gender" /> </resultMap> <sql id="Base_Column_List"> id, user_name, age, gender </sql> <insert id="insert" parameterType="com.tian.entity.User0"> insert into t_user_0 (id, user_name,age, gender) values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER}) </insert> </mapper>
public interface User0Mapper { int insert(User0 record); }
/** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 模拟三种场景:正常、制造异常、数据库异常 */ @Service public class UserServiceImpl implements UserService { @Resource private User0Mapper user0Mapper; @Resource private User1Mapper user1Mapper; /** * 正常逻辑 同时对两个数据库进行 插入数据 */ @Transactional @Override public int transaction1() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); user0.setUserName("111111"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); return 1; } /** * 正常逻辑 同时对两个数据库进行 插入数据 * 数据插入完后 出现异常 */ @Transactional @Override public int transaction2() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); user0.setUserName("111111"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); //认为制造一个异常 int a=1/0; return 1; } /** * 第一个数据插入成功 第二个数据插入失败 */ @Transactional @Override public int transaction3() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); //故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); return 1; } }
## controller
<pre class="brush:php;toolbar:false;">@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private UserService userService;
@PostMapping("/test1")
public CommonResult test1() {
int i = 0;
try {
i = userService.transaction1();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
@PostMapping("/test2")
public CommonResult test2() {
int i = 0;
try {
i = userService.transaction2();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
@PostMapping("/test3")
public CommonResult test3() {
int i = 0;
try {
i = userService.transaction3();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
}</pre>
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; /** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 项目启动类 */ @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) //@ComponentScan(basePackages = {"com.tian"}) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }########測試########################################################## #####啟動項目,分別測試以下三個:#########http://localhost:9001/user/test1### 結果:兩個資料庫中,表格資料都新增一條#########http://localhost:9001/user/test2### 結果:拋出除數無法為Zero的例外,兩個資料庫都沒有新增資料。 ###
http://localhost:9001/user/test3
結果:拋出資料欄位值太長異常,兩個資料庫都沒有新增資料。
好了,到此我們已經實作了分散式事務。
以上是Spring Boot+MyBatis+Atomikos+MySQL(附源碼)的詳細內容。更多資訊請關注PHP中文網其他相關文章!