(1.1.5가 많이 바뀌었는데 여기서 클라이언트는 1.1.4를 사용합니다.)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>top.yueshushu</groupId> <artifactId>learn</artifactId> <version>1.0-SNAPSHOT</version> <name>Canal</name> <description>学习 Canal</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!--导入自动热步署的依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!--引入MySql的驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--引入springboot与mybatis整合的依赖--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.4</version> </dependency> <!-- 引入pagehelper分页插件 --> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>1.2.5</version> </dependency> <!--添加 druid-spring-boot-starter的依赖的依赖--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.14</version> </dependency> <!--SpringBoot 的aop 模块--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!--添加canal的依赖. 重要. 使用 1.1.4--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.4</version> </dependency> </dependencies> <build> <!--将该目录下的文件全部打包成类的路径--> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
/** * 一个简单的canal 的连接测试程序 */ @Test public void connectionTest() { //1. 创建连接 填充对应的地址信息 ,要监控的实例和相应的用户名和密码 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); //2. 进行连接 canalConnector.connect(); log.info(">>>连接成功:{}", canalConnector); }
17:26:32.179 [main ] INFO top.yueshu.learn.CanalDemoTest - >>>연결 성공: com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3
/** * 获取数据信息. 可以发现,未获取到数据 . 这个应该是实时的. */ @Test public void getDataTest() { //1. 创建连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 进行连接 canalConnector.connect(); //3. 注册,看使用哪个数据库表 canalConnector.subscribe("springboot.user"); //4. 获取 1条数据 Message message = canalConnector.get(1); log.info("获取的数据:id:{},数据:{}", message.getId(), message); if (message.getId() == -1) { log.info(">>>未获取到数据"); return; } //5. 获取相应的数据集合 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { log.info(">>>获取数据 {}", entry); //获取表名 CanalEntry.Header header = entry.getHeader(); log.info(">>>获取表名:{}", header.getTableName()); CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name()); //获取数据 ByteString storeValue = entry.getStoreValue(); log.info(">>>输出存储的值:{}", storeValue); } }
기본 라이브러리 데이터 조각 삽입
insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用户',24,'男','学习canal');
다시 실행:
/** * 获取数据信息. 获取现在的数据. 再次执行时,就没有这个数据了. */ @Test public void getNowDataTest() { //1. 创建连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 进行连接 canalConnector.connect(); //3. 注册,看使用哪个数据库表 canalConnector.subscribe("springboot.user"); for (;;) { //4. 获取 1条数据 Message message = canalConnector.get(1); log.info("获取的数据:id:{},数据:{}", message.getId(), message); if (message.getId() == -1) { log.info(">>>未获取到数据"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //5. 获取相应的数据集合 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { log.info(">>>获取数据 {}", entry); //获取表名 CanalEntry.Header header = entry.getHeader(); log.info(">>>获取表名:{}", header.getTableName()); CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name()); //获取数据 ByteString storeValue = entry.getStoreValue(); log.info(">>>输出存储的值:{}", storeValue); } } }
해당 데이터 변경 정보를 언제든지 얻을 수 있습니다.
storeValue의 값은 해석하기 어렵다는 것을 알게 될 것입니다. 이 데이터는 구문 분석이 필요합니다.
/** * 将 storeValue 进行解析,解析成我们能看懂的语句. * 对数据库 cud 进行处理操作观看一下. * 发现,点是不好的,也有多余的记录信息. * * @throws Exception 异常 */ @Test public void convertDataTest() throws Exception { //1. 创建连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); //2. 进行连接 canalConnector.connect(); canalConnector.subscribe("springboot.user"); for (;;) { //获取信息 Message message = canalConnector.get(1); if (message.getId() == -1L) { // log.info("未获取到数据"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } continue; } List<CanalEntry.Entry> entryList = message.getEntries(); //对获取到的数据进行处理 log.info(">>获取到{}条数据", entryList.size()); for (CanalEntry.Entry entry : entryList) { CanalEntry.Header header = entry.getHeader(); log.info(">>>获取表名:{}", header.getTableName()); //获取类型. CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name()); //获取存入日志的值 ByteString storeValue = entry.getStoreValue(); //将这个值进行解析 CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue); String sql = rowChange.getSql(); log.info(">>>获取对应的sql:{}", sql); // 这个sql 可能是 批量的sql语句 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { log.info(">>>获取信息:{}", rowData); //对数据进行处理 List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); beforeColumnsList.forEach( n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated()) ); afterColumnsList.forEach( n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated()) ); } } } }
sql을 다시 실행
insert into springboot.user(id,name,age,sex,description) values(2,'canal添加用户2',25,'男','学习canal2');
TRANSACTIONBEGIN 등 다른 타입도 처리되는 것으로 확인됨
/** * 类型转换数据 * * @throws Exception 异常 */ @Test public void dataTypeTest() throws Exception { CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); canalConnector.subscribe("springboot.user"); for(;;){ Message message = canalConnector.get(1); if (message.getId() == -1) { TimeUnit.SECONDS.sleep(1); continue; } List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); //只要 RowData 数据类型的 if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>对表 {} 进行操作", tableName); ByteString storeValue = entry.getStoreValue(); RowChange rowChange = RowChange.parseFrom(storeValue); //行改变 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } } } private void deleteHandler(RowChange rowChange) { log.info(">>>>执行删除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue()); } } } private void updateHandler(RowChange rowChange) { log.info(">>>执行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowChange 行改变 */ private void insertHandler(RowChange rowChange) { log.info(">>>执行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue()); } } }
삽입, 업데이트, 삭제가 별도로 처리되는 것으로 확인됨 .
테스트 프로그램을 먼저 시작하세요.
정보가 인쇄되지 않습니다.
메인 테이블은 add 문을 실행합니다:
insert into springboot.user(id,name,age,sex,description) values(4,'canal添加用户4',25,'男','学习canal4');
정보가 인쇄됩니다:
이것은 매우 읽기 쉽습니다.
메인 테이블이 수정 작업을 실행합니다.
update springboot.user set name='开开心心',age=26,description='岳泽霖' where id =4;
업데이트할 때 각 필드가 업데이트되면 이전과 마찬가지로 로그 소비가 생성되지 않습니다.
메인 테이블은 삭제 작업을 수행합니다.
delete from springboot.user where id =4;
위 인수는 모두 한 번에 하나의 데이터를 가져옵니다. 효율성이 상대적으로 낮습니다
/** * 一次性获取多条数据。 * sql 执行多条。 */ @Test public void dataMoreTest() throws Exception { //1. 创建 canal连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); // 订阅哪个对象 canalConnector.subscribe("springboot.user"); for (; ; ) { // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS); Message message = canalConnector.get(3); if (message.getId() == -1) { // 未获取到数据 continue; } List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>>对表{} 执行操作", tableName); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //对类型进行处理 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } } } private void deleteHandler(CanalEntry.RowChange rowChange) { log.info(">>>>执行删除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue()); } } } private void updateHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowChange 行改变 */ private void insertHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue()); } } }
수정 사항:
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS); Message message = canalConnector.get(3);
.get(3)은 한 번에 3개의 레코드를 가져오는 것을 의미합니다.
canalConnector.get(3, 5L, TimeUnit. SECONDS); 5초 내에 3개의 레코드를 얻는 것을 의미합니다.
두 가지 트리거 조건이 있는데, 하나는 3개의 레코드를 얻는 것이고 다른 하나는 5초에 도달하는 것입니다.
효과 표시 정보는 기존과 동일하므로 다시 시연되지 않습니다.
/** * 一次性获取多条数据。 * sql 执行多条。 */ @Test public void dataMoreTest() throws Exception { //1. 创建 canal连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); // 订阅哪个对象 canalConnector.subscribe("springboot.user"); for (; ; ) { Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS); if (message.getId() == -1) { // 未获取到数据 TimeUnit.MILLISECONDS.sleep(500); continue; } log.info(">>>>获取对应的 id: {}",message.getId()); List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>>对表{} 执行操作", tableName); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //对类型进行处理 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } //进行回滚 // canalConnector.rollback(); //确认ack 配置 canalConnector.ack(message.getId()); } } private void deleteHandler(CanalEntry.RowChange rowChange) { log.info(">>>>执行删除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue()); } } } private void updateHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowChange 行改变 */ private void insertHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue()); } } }
주요 정보:
메시지 메시지 = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
//Rollback//canalConnector.rollback();
//ack 구성 확인 canalConnector.ack(message.getId());
메시지가 소비되었는지 수동으로 확인하세요.
메시지 롤백()이 롤백되면 메시지가 다시 소비됩니다.
canalConnector.rollback();
실행문 :
insert into springboot.user(id,name,age,sex,description) values(5,'canal添加用户5',25,'男','学习canal5');
수동확인이 되면
canalConnector.ack(message.getId());
한번만 소모됩니다.
위 내용은 SpringBoot가 Canal 메소드를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!