실제 프로젝트에서는 분산 트랜잭션을 피하려고 노력합니다. 그러나 때로는 일부 서비스 분할을 수행해야 하므로 분산 트랜잭션 문제가 발생할 수 있습니다.
동시에 분산 거래도 면접 시 시장 질문으로 이 경우에 연습할 수 있고, 면접에서는 123으로 말할 수 있습니다.
여기 비즈니스 밤이 있습니다. 사용자가 쿠폰을 받을 때 사용자가 쿠폰을 받은 횟수를 차감한 다음 사용자가 쿠폰을 받은 기록이 기록됩니다.
원래 여기서는 메시지 큐 방식을 사용하고 비동기식을 사용하여 사용자 수집 기록을 추가할 수 있습니다. 하지만 여기서 요구 사항은 사용자가 수집 기록을 받은 후 즉시 확인할 수 있어야 하므로 분산 트랜잭션 문제를 구현하기 위해 여기에 Atomikos
을 도입했습니다.
분산 트랜잭션은 여러 컴퓨터나 데이터베이스에 걸쳐 있는 트랜잭션으로, 이러한 컴퓨터나 데이터베이스 간에 네트워크 지연, 오류 또는 불일치가 발생할 수 있습니다. 분산 트랜잭션은 데이터의 정확성과 무결성을 보장하기 위해 모든 작업의 원자성, 일관성, 격리 및 내구성을 보장해야 합니다.
분산 트랜잭션 프로토콜에는 2PC(2단계 커밋)와 3PC(3단계 커밋)의 두 가지 주요 유형이 있습니다.
2PC는 현재 가장 일반적으로 사용되는 분산 트랜잭션 프로토콜이며 프로세스는 준비 단계와 제출 단계의 두 단계로 나뉩니다. 준비 단계에서 트랜잭션 코디네이터는 모든 참가자에게 준비 요청을 발행하고 참가자는 로컬 트랜잭션을 준비 상태로 실행하고 준비 결과를 트랜잭션 코디네이터에게 반환합니다. 커밋 단계에서 모든 참가자가 성공적으로 실행되면 트랜잭션 코디네이터는 모든 참가자에게 커밋 요청을 보내고 참가자는 로컬 트랜잭션을 커밋합니다. 그렇지 않으면 트랜잭션 코디네이터는 모든 참가자에게 롤백 요청을 보내고 참가자는 로컬 트랜잭션을 롤백합니다. 거래롤.
3PC는 2PC를 기반으로 준비 및 제출 단계를 추가한 향상된 버전입니다. 제출 준비 단계에서 코디네이터는 참가자에게 제출할 수 있는지 묻습니다. 참가자가 동의하면 제출 단계에서 직접 제출되고, 그렇지 않으면 제출 단계에서 롤백됩니다.
분산 트랜잭션 솔루션 구현 솔루션에는 다음이 포함됩니다.
JTA(Java Transaction API)는 J2EE의 프로그래밍 인터페이스 사양이며 XA 프로토콜의 JAVA 구현입니다. 이는 주로 다음을 정의합니다:
A 트랜잭션 관리자 인터페이스 javax.transaction .TransactionManager
는 트랜잭션의 시작, 커밋, 철회 등의 작업을 정의합니다. javax.transaction.TransactionManager
,定义了有关事务的开始、提交、撤回等>操作。
一个满足XA规范的资源定义接口javax.transaction.xa.XAResource
,一种资源如果要支持JTA事务,就需要让它的资源实现该XAResource
javax.transaction .xa.XAResource
, 리소스가 JTA 트랜잭션을 지원하려는 경우 해당 리소스는 XAResource
인터페이스를 구현하고 이 인터페이스에 의해 정의된 2단계 제출 관련 인터페이스를 구현합니다.
JTA 인터페이스를 사용하여 트랜잭션을 구현하는 애플리케이션이 있는 경우 애플리케이션이 실행 중일 때 JTA를 구현하는 컨테이너가 필요합니다. 일반적으로 이는 JBoss, Websphere 및 기타 애플리케이션 서버와 같은 J2EE 컨테이너입니다. 그러나 JTA를 구현하는 일부 독립적인 프레임워크도 있습니다. 예를 들어 Atomikos와 bitronix는 모두 jar 패키지 형태로 JTA 구현 프레임워크를 제공합니다. 이러한 방식으로 JTA를 사용하여 Tomcat 또는 Jetty와 같은 서버에서 트랜잭션을 구현하는 애플리케이션 시스템을 실행할 수 있습니다. 🎜위의 로컬 트랜잭션과 외부 트랜잭션의 차이점에서 언급했듯이 JTA 트랜잭션은 외부 트랜잭션이며 여러 리소스에 대한 트랜잭션을 구현하는 데 사용할 수 있습니다. 모든 리소스 XAResource
来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end()
, forget()
, isSameRM()
, prepare()
등에서 정확히 그렇게 합니다. 이러한 인터페이스만으로도 2단계 트랜잭션 구현 시 JTA의 복잡성을 상상할 수 있습니다.
XA는 X/Open 조직에서 제안한 분산 트랜잭션 아키텍처(또는 프로토콜)입니다. XA 아키텍처는 주로 (글로벌) 트랜잭션 관리자와 (로컬) 리소스 관리자 간의 인터페이스를 정의합니다. XA 인터페이스는 트랜잭션 관리자와 하나 이상의 리소스 관리자 사이에 통신 브리지를 형성하는 양방향 시스템 인터페이스입니다. 즉, XA 기반 트랜잭션에서는 여러 리소스에 대한 트랜잭션 관리를 수행할 수 있습니다. 예를 들어 시스템이 여러 데이터베이스에 액세스하거나 데이터베이스와 메시지 미들웨어와 같은 리소스에 모두 액세스할 수 있습니다. 이러한 방식으로 여러 데이터베이스 및 메시지 미들웨어에서 제출되거나 취소된 모든 트랜잭션을 직접 구현할 수 있습니다. XA 사양은 Java 사양이 아니라 현재 다양한 데이터베이스와 많은 메시지 미들웨어가 XA 사양을 지원합니다.
JTA는 XA 사양을 충족하는 Java 개발을 위한 사양입니다. 따라서 분산 트랜잭션을 구현하기 위해 JTA를 사용한다고 말할 때 실제로는 JTA 사양을 사용하여 시스템의 여러 데이터베이스, 메시지 미들웨어 및 기타 리소스와의 트랜잭션을 구현한다는 의미입니다.
Atomikos는 매우 인기 있는 오픈 소스 트랜잭션 관리자이며 Spring Boot 애플리케이션에 포함될 수 있습니다. Tomcat 애플리케이션 서버는 JTA 사양을 구현하지 않습니다. Tomcat을 애플리케이션 서버로 사용하는 경우 타사 트랜잭션 관리자 클래스를 전역 트랜잭션 관리자로 사용해야 하며 Atomikos 프레임워크는 이를 수행하여 트랜잭션 관리를 애플리케이션에 통합합니다. 애플리케이션 서버에 의존하지 않습니다.
이론을 많이 이야기하는 것은 쓸모가 없습니다. 코드를 보여주세요.
기술 스택: Spring Boot+MyBatis+Atomikos+MySQL
이 문서의 코드를 따르는 경우 mysql 버전에 주의하세요.
먼저 두 개의 데이터베이스(my-db_0 및 my-db_1)를 구축한 다음 각 데이터베이스에 테이블을 생성합니다.
데이터베이스 my-db_0:
CREATE TABLE `t_user_0` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `age` int NOT NULL, `gender` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
데이터베이스 my-db_1:
CREATE TABLE `t_user_1` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `age` int NOT NULL, `gender` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
이는 분산 트랜잭션을 보여주기 위한 것일 뿐이므로 테이블의 구체적인 의미에 대해 걱정하지 마세요.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tian</groupId> <artifactId>spring-boot-atomikos</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <name>spring-boot-atomikos</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- mybatis依赖 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--分布式事务--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> </dependencies> <build> <plugins> <!-- 要使生成的jar可运行,需要加入此插件 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> <resources> <resource> <directory>src/main/java</directory> <excludes> <exclude>**/*.java</exclude> </excludes> </resource> <resource> <!-- 编译xml文件 --> <directory>src/main/resources</directory> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
server.port=9001 spring.application.name=atomikos-demo spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true spring.datasource.user0.user=root spring.datasource.user0.password=123456 spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true spring.datasource.user1.user=root spring.datasource.user1.password=123456 mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml mybatis.typeAliasesPackage=com.tian.entity mybatis.configuration.cache-enabled=true
/** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 配置好两个数据源 */ @Configuration public class DataSourceConfig { // 将这个对象放入spring容器中(交给Spring管理) @Bean // 读取 application.yml 中的配置参数映射成为一个对象 @ConfigurationProperties(prefix = "spring.datasource.user0") public XADataSource getDataSource0() { // 创建XA连接池 return new MysqlXADataSource(); } /** * 创建Atomikos数据源 * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean */ @Bean @DependsOn("getDataSource0") @Primary public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) { //这里的AtomikosDataSourceBean使用的是spring提供的 AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource); atomikosDataSourceBean.setMaxPoolSize(20); return atomikosDataSourceBean; } @Bean @ConfigurationProperties(prefix = "spring.datasource.user1") public XADataSource getDataSource1() { // 创建XA连接池 return new MysqlXADataSource(); } @Bean @DependsOn("getDataSource1") public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) { //这里的AtomikosDataSourceBean使用的是spring提供的 AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource); return atomikosDataSourceBean; } }
@Configuration @MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate") public class MybatisPreConfig { @Autowired @Qualifier("dataSourcePre") private DataSource dataSource; /** * 创建 SqlSessionFactory */ @Bean @Primary public SqlSessionFactory preSqlSessionFactory() throws Exception{ SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); bean.setMapperLocations(new PathMatchingResourcePatternResolver(). getResources("classpath*:com/tian/mapper/user0/*.xml")); return bean.getObject(); } /** * 通过 SqlSessionFactory 来创建 SqlSessionTemplate */ @Bean @Primary public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用 return new SqlSessionTemplate(sqlSessionFactory); } }
다른 하나는 기본적으로 동일합니다. 즉, 스캔 경로가 다음으로 변경됩니다.
("classpath*:com/tian/mapper/user1/*.xml")
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.tian.mapper.user0.User0Mapper"> <!-- --> <cache eviction="LRU" flushInterval="10000" size="1024" /> <resultMap id="BaseResultMap" type="com.tian.entity.User0"> <id column="id" jdbcType="BIGINT" property="id" /> <result column="user_name" jdbcType="VARCHAR" property="userName" /> <result column="age" jdbcType="INTEGER" property="age" /> <result column="gender" jdbcType="INTEGER" property="gender" /> </resultMap> <sql id="Base_Column_List"> id, user_name, age, gender </sql> <insert id="insert" parameterType="com.tian.entity.User0"> insert into t_user_0 (id, user_name,age, gender) values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER}) </insert> </mapper>
다른 하나는 기본적으로 동일하며 여기에 게시됩니다.
해당 매퍼 인터페이스도 매우 간단합니다.
public interface User0Mapper { int insert(User0 record); }
/** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 模拟三种场景:正常、制造异常、数据库异常 */ @Service public class UserServiceImpl implements UserService { @Resource private User0Mapper user0Mapper; @Resource private User1Mapper user1Mapper; /** * 正常逻辑 同时对两个数据库进行 插入数据 */ @Transactional @Override public int transaction1() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); user0.setUserName("111111"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); return 1; } /** * 正常逻辑 同时对两个数据库进行 插入数据 * 数据插入完后 出现异常 */ @Transactional @Override public int transaction2() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); user0.setUserName("111111"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); //认为制造一个异常 int a=1/0; return 1; } /** * 第一个数据插入成功 第二个数据插入失败 */ @Transactional @Override public int transaction3() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); //故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); return 1; } }
@RestController @RequestMapping("/user") public class UserController { @Resource private UserService userService; @PostMapping("/test1") public CommonResult test1() { int i = 0; try { i = userService.transaction1(); return CommonResult.success(i); } catch (Exception e) { e.printStackTrace(); } return CommonResult.success(i); } @PostMapping("/test2") public CommonResult test2() { int i = 0; try { i = userService.transaction2(); return CommonResult.success(i); } catch (Exception e) { e.printStackTrace(); } return CommonResult.success(i); } @PostMapping("/test3") public CommonResult test3() { int i = 0; try { i = userService.transaction3(); return CommonResult.success(i); } catch (Exception e) { e.printStackTrace(); } return CommonResult.success(i); } }
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; /** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 项目启动类 */ @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) //@ComponentScan(basePackages = {"com.tian"}) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
프로젝트를 시작하고 다음 세 가지를 각각 테스트합니다.
http :/ /localhost:9001/user/test1
결과: 두 데이터베이스 모두에 새 테이블 데이터가 추가되었습니다http://localhost:9001/user/test1
结果:两个数据库中,表数据都新增一条
http://localhost:9001/user/test2
http://localhost:9001/user/test2
결과: 제수가 0이 될 수 없다는 예외가 발생하고 새 데이터가 추가되지 않습니다. 데이터베이스 중 하나입니다. 🎜http://localhost:9001/user/test3
결과: 데이터 필드 값이 너무 길다는 예외가 발생하고 두 데이터베이스에 새 데이터가 추가되지 않습니다.
자, 이제 분산 트랜잭션을 구현했습니다.
위 내용은 Spring Boot+MyBatis+Atomikos+MySQL(소스 코드 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!