pom.xml 添加 canal.client 依赖
(1.1.5 改动很大,这儿客户端用 1.1.4)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>top.yueshushu</groupId> <artifactId>learn</artifactId> <version>1.0-SNAPSHOT</version> <name>Canal</name> <description>学习 Canal</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!--导入自动热步署的依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!--引入MySql的驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--引入springboot与mybatis整合的依赖--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.4</version> </dependency> <!-- 引入pagehelper分页插件 --> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>1.2.5</version> </dependency> <!--添加 druid-spring-boot-starter的依赖的依赖--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.14</version> </dependency> <!--SpringBoot 的aop 模块--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!--添加canal的依赖. 重要. 使用 1.1.4--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.4</version> </dependency> </dependencies> <build> <!--将该目录下的文件全部打包成类的路径--> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
业务功能处理
简单连接程序
/** * 一个简单的canal 的连接测试程序 */ @Test public void connectionTest() { //1. 创建连接 填充对应的地址信息 ,要监控的实例和相应的用户名和密码 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); //2. 进行连接 canalConnector.connect(); log.info(">>>连接成功:{}", canalConnector); }
17:26:32.179 [main] INFO top.yueshushu.learn.CanalDemoTest - >>>连接成功:com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3
单次获取数据
/** * 获取数据信息. 可以发现,未获取到数据 . 这个应该是实时的. */ @Test public void getDataTest() { //1. 创建连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 进行连接 canalConnector.connect(); //3. 注册,看使用哪个数据库表 canalConnector.subscribe("springboot.user"); //4. 获取 1条数据 Message message = canalConnector.get(1); log.info("获取的数据:id:{},数据:{}", message.getId(), message); if (message.getId() == -1) { log.info(">>>未获取到数据"); return; } //5. 获取相应的数据集合 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { log.info(">>>获取数据 {}", entry); //获取表名 CanalEntry.Header header = entry.getHeader(); log.info(">>>获取表名:{}", header.getTableName()); CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name()); //获取数据 ByteString storeValue = entry.getStoreValue(); log.info(">>>输出存储的值:{}", storeValue); } }
在主库里面插入一条数据
insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用户',24,'男','学习canal');
再次执行:
循环获取数据
/** * 获取数据信息. 获取现在的数据. 再次执行时,就没有这个数据了. */ @Test public void getNowDataTest() { //1. 创建连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 进行连接 canalConnector.connect(); //3. 注册,看使用哪个数据库表 canalConnector.subscribe("springboot.user"); for (;;) { //4. 获取 1条数据 Message message = canalConnector.get(1); log.info("获取的数据:id:{},数据:{}", message.getId(), message); if (message.getId() == -1) { log.info(">>>未获取到数据"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //5. 获取相应的数据集合 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { log.info(">>>获取数据 {}", entry); //获取表名 CanalEntry.Header header = entry.getHeader(); log.info(">>>获取表名:{}", header.getTableName()); CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name()); //获取数据 ByteString storeValue = entry.getStoreValue(); log.info(">>>输出存储的值:{}", storeValue); } } }
可以随时获取相应的数据变更信息。
会发现, storeValue 的值是很难解读的。 需要将这个数据解析出来。
解析 storeValue 值
/** * 将 storeValue 进行解析,解析成我们能看懂的语句. * 对数据库 cud 进行处理操作观看一下. * 发现,点是不好的,也有多余的记录信息. * * @throws Exception 异常 */ @Test public void convertDataTest() throws Exception { //1. 创建连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal" ); //2. 进行连接 canalConnector.connect(); canalConnector.subscribe("springboot.user"); for (;;) { //获取信息 Message message = canalConnector.get(1); if (message.getId() == -1L) { // log.info("未获取到数据"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } continue; } List<CanalEntry.Entry> entryList = message.getEntries(); //对获取到的数据进行处理 log.info(">>获取到{}条数据", entryList.size()); for (CanalEntry.Entry entry : entryList) { CanalEntry.Header header = entry.getHeader(); log.info(">>>获取表名:{}", header.getTableName()); //获取类型. CanalEntry.EntryType entryType = entry.getEntryType(); log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name()); //获取存入日志的值 ByteString storeValue = entry.getStoreValue(); //将这个值进行解析 CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue); String sql = rowChange.getSql(); log.info(">>>获取对应的sql:{}", sql); // 这个sql 可能是 批量的sql语句 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { log.info(">>>获取信息:{}", rowData); //对数据进行处理 List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); beforeColumnsList.forEach( n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated()) ); afterColumnsList.forEach( n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated()) ); } } } }
再次执行sql
insert into springboot.user(id,name,age,sex,description) values(2,'canal添加用户2',25,'男','学习canal2');
不同的类型进行不同的处理
发现 其他类型的 如: TRANSACTIONBEGIN 也进行了处理
/** * 类型转换数据 * * @throws Exception 异常 */ @Test public void dataTypeTest() throws Exception { CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); canalConnector.subscribe("springboot.user"); for(;;){ Message message = canalConnector.get(1); if (message.getId() == -1) { TimeUnit.SECONDS.sleep(1); continue; } List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); //只要 RowData 数据类型的 if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>对表 {} 进行操作", tableName); ByteString storeValue = entry.getStoreValue(); RowChange rowChange = RowChange.parseFrom(storeValue); //行改变 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } } } private void deleteHandler(RowChange rowChange) { log.info(">>>>执行删除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue()); } } } private void updateHandler(RowChange rowChange) { log.info(">>>执行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowChange 行改变 */ private void insertHandler(RowChange rowChange) { log.info(">>>执行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue()); } } }
插入,更新,删除,分别进行了处理.
先启动测试程序:
不打印任何信息。
主表执行添加语句:
insert into springboot.user(id,name,age,sex,description) values(4,'canal添加用户4',25,'男','学习canal4');
会打印信息:
这个可读性就非常高了.
主表执行修改的操作.
update springboot.user set name='开开心心',age=26,description='岳泽霖' where id =4;
更新时,若每一个字段都跟原先一样,不会产生日志消费。
主表执行删除的操作:
delete from springboot.user where id =4;
上面的获取,都是一条数据一条数据获取的。效率比较低
一次性获取多条数据
/** * 一次性获取多条数据。 * sql 执行多条。 */ @Test public void dataMoreTest() throws Exception { //1. 创建 canal连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); // 订阅哪个对象 canalConnector.subscribe("springboot.user"); for (; ; ) { // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS); Message message = canalConnector.get(3); if (message.getId() == -1) { // 未获取到数据 continue; } List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>>对表{} 执行操作", tableName); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //对类型进行处理 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } } } private void deleteHandler(CanalEntry.RowChange rowChange) { log.info(">>>>执行删除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue()); } } } private void updateHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowChange 行改变 */ private void insertHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue()); } } }
修改点:
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS); Message message = canalConnector.get(3);
.get(3) 表示 一次性获取3条记录.
canalConnector.get(3, 5L, TimeUnit.SECONDS); 表示5秒之内获取3条记录,
有两个触发条件,一个是获取了3条,一个是到了5秒。
效果展示信息与之前是一致的,就不重新演示了。
ack 配置信息
/** * 一次性获取多条数据。 * sql 执行多条。 */ @Test public void dataMoreTest() throws Exception { //1. 创建 canal连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalConnector.connect(); // 订阅哪个对象 canalConnector.subscribe("springboot.user"); for (; ; ) { Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS); if (message.getId() == -1) { // 未获取到数据 TimeUnit.MILLISECONDS.sleep(500); continue; } log.info(">>>>获取对应的 id: {}",message.getId()); List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { CanalEntry.EntryType entryType = entry.getEntryType(); if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) { continue; } String tableName = entry.getHeader().getTableName(); log.info(">>>>对表{} 执行操作", tableName); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //对类型进行处理 CanalEntry.EventType eventType = rowChange.getEventType(); switch (eventType) { case INSERT: { insertHandler(rowChange); break; } case UPDATE: { updateHandler(rowChange); break; } case DELETE: { deleteHandler(rowChange); break; } default: { break; } } } //进行回滚 // canalConnector.rollback(); //确认ack 配置 canalConnector.ack(message.getId()); } } private void deleteHandler(CanalEntry.RowChange rowChange) { log.info(">>>>执行删除的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue()); } } } private void updateHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行更新的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, String> beforeValueMap = beforeColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); Map<String, String> afterValueMap = afterColumnsList.stream().collect( Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue ) ); beforeValueMap.forEach((column, beforeValue) -> { String afterValue = afterValueMap.get(column); Boolean update = beforeValue.equals(afterValue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowChange 行改变 */ private void insertHandler(CanalEntry.RowChange rowChange) { log.info(">>>执行添加 的方法"); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (!StringUtils.hasText(column.getValue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue()); } } }
主要信息:
Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
//进行回滚 // canalConnector.rollback();
//确认ack 配置canalConnector.ack(message.getId());
手动确认消息消费了.
当消息 rollback() 回滚后,会再次消费这条消息.
canalConnector.rollback();
执行语句:
insert into springboot.user(id,name,age,sex,description) values(5,'canal添加用户5',25,'男','学习canal5');
如果变成 手动确认,
canalConnector.ack(message.getId());
则只消费一次.
以上是SpringBoot怎么整合Canal方法的详细内容。更多信息请关注PHP中文网其他相关文章!

新兴技术对Java的平台独立性既有威胁也有增强。1)云计算和容器化技术如Docker增强了Java的平台独立性,但需要优化以适应不同云环境。2)WebAssembly通过GraalVM编译Java代码,扩展了其平台独立性,但需与其他语言竞争性能。

不同JVM实现都能提供平台独立性,但表现略有不同。1.OracleHotSpot和OpenJDKJVM在平台独立性上表现相似,但OpenJDK可能需额外配置。2.IBMJ9JVM在特定操作系统上表现优化。3.GraalVM支持多语言,需额外配置。4.AzulZingJVM需特定平台调整。

平台独立性通过在多种操作系统上运行同一套代码,降低开发成本和缩短开发时间。具体表现为:1.减少开发时间,只需维护一套代码;2.降低维护成本,统一测试流程;3.快速迭代和团队协作,简化部署过程。

Java'splatformindependencefacilitatescodereusebyallowingbytecodetorunonanyplatformwithaJVM.1)Developerscanwritecodeonceforconsistentbehavioracrossplatforms.2)Maintenanceisreducedascodedoesn'tneedrewriting.3)Librariesandframeworkscanbesharedacrossproj

要解决Java应用程序中的平台特定问题,可以采取以下步骤:1.使用Java的System类查看系统属性以了解运行环境。2.利用File类或java.nio.file包处理文件路径。3.根据操作系统条件加载本地库。4.使用VisualVM或JProfiler优化跨平台性能。5.通过Docker容器化确保测试环境与生产环境一致。6.利用GitHubActions在多个平台上进行自动化测试。这些方法有助于有效地解决Java应用程序中的平台特定问题。

类加载器通过统一的类文件格式、动态加载、双亲委派模型和平台无关的字节码,确保Java程序在不同平台上的一致性和兼容性,实现平台独立性。

Java编译器生成的代码是平台无关的,但最终执行的代码是平台特定的。1.Java源代码编译成平台无关的字节码。2.JVM将字节码转换为特定平台的机器码,确保跨平台运行但性能可能不同。

多线程在现代编程中重要,因为它能提高程序的响应性和资源利用率,并处理复杂的并发任务。JVM通过线程映射、调度机制和同步锁机制,在不同操作系统上确保多线程的一致性和高效性。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

VSCode Windows 64位 下载
微软推出的免费、功能强大的一款IDE编辑器

Atom编辑器mac版下载
最流行的的开源编辑器

EditPlus 中文破解版
体积小,语法高亮,不支持代码提示功能

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 英文版
推荐:为Win版本,支持代码提示!