ホームページ >Java >&#&チュートリアル >SpringBoot が Canal メソッドを統合する方法

SpringBoot が Canal メソッドを統合する方法

WBOY
WBOY転載
2023-05-13 09:22:051277ブラウズ

pom.xml canal.client 依存関係を追加

(1.1.5 は大きく変更されており、ここのクライアントは 1.1.4 を使用しています)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>top.yueshushu</groupId>
    <artifactId>learn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Canal</name>
    <description>学习 Canal</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--导入自动热步署的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!--引入MySql的驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--引入springboot与mybatis整合的依赖-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.4</version>
        </dependency>
        <!-- 引入pagehelper分页插件 -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!--添加 druid-spring-boot-starter的依赖的依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.14</version>
        </dependency>
        <!--SpringBoot 的aop 模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!--添加canal的依赖. 重要.  使用  1.1.4-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.4</version>
        </dependency>
    </dependencies>
    <build>
        <!--将该目录下的文件全部打包成类的路径-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

ビジネス関数処理

シンプル接続プログラム

/**
     * 一个简单的canal 的连接测试程序
     */
    @Test
    public void connectionTest() {
        //1. 创建连接  填充对应的地址信息 ,要监控的实例和相应的用户名和密码
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        //2. 进行连接
        canalConnector.connect();
        log.info(">>>连接成功:{}", canalConnector);
    }

17:26:32.179 [main] 情報top.yueshushu.learn.CanalDemoTest - >>>接続成功: com.alibaba.otter.canal.client.impl SimpleCanalConnector@31ef45e3

データの単一取得

/**
     * 获取数据信息. 可以发现,未获取到数据 .  这个应该是实时的.
     */
    @Test
    public void getDataTest() {
        //1. 创建连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal",
                "canal"
        );
        // 进行连接
        canalConnector.connect();
        //3. 注册,看使用哪个数据库表
        canalConnector.subscribe("springboot.user");
        //4. 获取 1条数据
        Message message = canalConnector.get(1);
        log.info("获取的数据:id:{},数据:{}", message.getId(), message);
        if (message.getId() == -1) {
            log.info(">>>未获取到数据");
            return;
        }
        //5. 获取相应的数据集合
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            log.info(">>>获取数据 {}", entry);
            //获取表名
            CanalEntry.Header header = entry.getHeader();
            log.info(">>>获取表名:{}", header.getTableName());
            CanalEntry.EntryType entryType = entry.getEntryType();
            log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
            //获取数据
            ByteString storeValue = entry.getStoreValue();
            log.info(">>>输出存储的值:{}", storeValue);
        }
    }

SpringBoot が Canal メソッドを統合する方法

データをメイン ライブラリに挿入

insert into springboot.user(id,name,age,sex,description) values(1,&#39;canal添加用户&#39;,24,&#39;男&#39;,&#39;学习canal&#39;);

実行again:

SpringBoot が Canal メソッドを統合する方法

データを取得するためのループ

/**
     * 获取数据信息. 获取现在的数据.  再次执行时,就没有这个数据了.
     */
    @Test
    public void getNowDataTest() {
        //1. 创建连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal",
                "canal"
        );
        // 进行连接
        canalConnector.connect();
        //3. 注册,看使用哪个数据库表
        canalConnector.subscribe("springboot.user");
        for (;;) {
            //4. 获取 1条数据
            Message message = canalConnector.get(1);
            log.info("获取的数据:id:{},数据:{}", message.getId(), message);
            if (message.getId() == -1) {
                log.info(">>>未获取到数据");
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            //5. 获取相应的数据集合
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                log.info(">>>获取数据 {}", entry);
                //获取表名
                CanalEntry.Header header = entry.getHeader();
                log.info(">>>获取表名:{}", header.getTableName());
                CanalEntry.EntryType entryType = entry.getEntryType();
                log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
                //获取数据
                ByteString storeValue = entry.getStoreValue();
                log.info(">>>输出存储的值:{}", storeValue);
            }
        }
    }

対応するデータ変更情報をいつでも取得できます。

storeValue の値は解釈が難しいことがわかります。このデータは解析する必要があります。

storeValue 値を解析する

/**
     * 将 storeValue 进行解析,解析成我们能看懂的语句.
     * 对数据库 cud 进行处理操作观看一下.
     * 发现,点是不好的,也有多余的记录信息.
     *
     * @throws Exception 异常
     */
    @Test
    public void convertDataTest() throws Exception {
        //1. 创建连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal", "canal"
        );
        //2. 进行连接
        canalConnector.connect();
        canalConnector.subscribe("springboot.user");
        for (;;) {
            //获取信息
            Message message = canalConnector.get(1);
            if (message.getId() == -1L) {
                // log.info("未获取到数据");
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            List<CanalEntry.Entry> entryList = message.getEntries();
            //对获取到的数据进行处理
            log.info(">>获取到{}条数据", entryList.size());
            for (CanalEntry.Entry entry : entryList) {
                CanalEntry.Header header = entry.getHeader();
                log.info(">>>获取表名:{}", header.getTableName());
                //获取类型.
                CanalEntry.EntryType entryType = entry.getEntryType();
                log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name());
                //获取存入日志的值
                ByteString storeValue = entry.getStoreValue();
                //将这个值进行解析
                CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);
                String sql = rowChange.getSql();
                log.info(">>>获取对应的sql:{}", sql);
                // 这个sql 可能是 批量的sql语句
                List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                for (CanalEntry.RowData rowData : rowDatasList) {
                    log.info(">>>获取信息:{}", rowData);
                    //对数据进行处理
                    List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                    List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                    beforeColumnsList.forEach(
                            n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(),
                                    n.getValue(), n.getUpdated())
                    );
                    afterColumnsList.forEach(
                            n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())
                    );
                }
            }
        }
    }

SQL を再度実行する

insert into springboot.user(id,name,age,sex,description) 
values(2,&#39;canal添加用户2&#39;,25,&#39;男&#39;,&#39;学习canal2&#39;);

SpringBoot が Canal メソッドを統合する方法

異なる型は異なる方法で処理されます

他の型を検出する例: TRANSACTIONBEGIN も処理されます

/**
     * 类型转换数据
     *
     * @throws Exception 异常
     */
    @Test
    public void dataTypeTest() throws Exception {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal", "canal"
        );
        canalConnector.connect();
        canalConnector.subscribe("springboot.user");
        for(;;){
            Message message = canalConnector.get(1);
            if (message.getId() == -1) {
                TimeUnit.SECONDS.sleep(1);
                continue;
            }
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                //只要 RowData 数据类型的
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>对表 {} 进行操作", tableName);
                ByteString storeValue = entry.getStoreValue();
                RowChange rowChange = RowChange.parseFrom(storeValue);
                //行改变
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
        }
    }
    private void deleteHandler(RowChange rowChange) {
        log.info(">>>>执行删除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(RowChange rowChange) {
        log.info(">>>执行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入数据. 只有后的数据.
     *
     * @param rowChange 行改变
     */
    private void insertHandler(RowChange rowChange) {
        log.info(">>>执行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
            }

        }
    }

挿入、更新、削除がそれぞれ処理されます。

最初にテスト プログラムを開始します:

SpringBoot が Canal メソッドを統合する方法

情報を印刷しないでください。

#メイン テーブルは add ステートメントを実行します:

insert into springboot.user(id,name,age,sex,description) 
values(4,&#39;canal添加用户4&#39;,25,&#39;男&#39;,&#39;学习canal4&#39;);

情報が出力されます:

SpringBoot が Canal メソッドを統合する方法

##これは非常に読みやすいです。

メイン テーブルは変更操作を実行します。

update springboot.user set name=&#39;开开心心&#39;,age=26,description=&#39;岳泽霖&#39; where id =4;

更新時に、各フィールドが元のものと同じであれば、ログの消費は発生しません。

SpringBoot が Canal メソッドを統合する方法

メイン テーブルは削除操作を実行します。

delete from springboot.user where id =4;

SpringBoot が Canal メソッドを統合する方法

上記の取得はすべて、一度に 1 つのデータを取得します。 . .効率は比較的低くなります。

一度に複数のデータを取得する

/**
     * 一次性获取多条数据。
     * sql 执行多条。
     */
    @Test
    public void dataMoreTest() throws Exception {
        //1. 创建 canal连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        canalConnector.connect();
        // 订阅哪个对象
        canalConnector.subscribe("springboot.user");
        for (; ; ) {
           // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
            Message message = canalConnector.get(3);
            if (message.getId() == -1) {
                // 未获取到数据
                continue;
            }
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>>对表{} 执行操作", tableName);
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                //对类型进行处理
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
        }
    }
    private void deleteHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>>执行删除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>执行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入数据. 只有后的数据.
     *
     * @param rowChange 行改变
     */
    private void insertHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>执行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
            }
        }
    }

変更点:

// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
    Message message = canalConnector.get(3);

.get(3) は、一度に 3 つのレコードを取得することを意味します。

# #canalConnector.get(3, 5L, TimeUnit.SECONDS); 5 秒以内に 3 レコードを取得することを意味します

トリガー条件は 2 つあり、1 つは 3 レコードの取得、もう 1 つは5秒に達するまで。

エフェクト表示情報は前回と同じですので、再度の実証は行いません。

ack 構成情報

/**
     * 一次性获取多条数据。
     * sql 执行多条。 
     */
    @Test
    public void dataMoreTest() throws Exception {
        //1. 创建 canal连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        canalConnector.connect();
        // 订阅哪个对象
        canalConnector.subscribe("springboot.user");
        for (; ; ) {
             Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
            if (message.getId() == -1) {
                // 未获取到数据
                TimeUnit.MILLISECONDS.sleep(500);
                continue;
            }
            log.info(">>>>获取对应的 id: {}",message.getId());
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>>对表{} 执行操作", tableName);
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                //对类型进行处理
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
            //进行回滚
           // canalConnector.rollback();
            //确认ack 配置
           canalConnector.ack(message.getId());
        }
    }
    private void deleteHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>>执行删除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>执行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入数据. 只有后的数据.
     *
     * @param rowChange 行改变
     */
    private void insertHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>执行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
            }
        }
    }

メイン情報:

メッセージ message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);

//Rollback//canalConnector.rollback();

//確認応答設定の確認 canalConnector.ack(message.getId());

メッセージが消費されたことを手動で確認します。

メッセージ rollback() がロールバックされると、メッセージは再び消費されます。

canalConnector.rollback();

実行ステートメント:
insert into springboot.user(id,name,age,sex,description) 
values(5,&#39;canal添加用户5&#39;,25,&#39;男&#39;,&#39;学习canal5&#39;);

SpringBoot が Canal メソッドを統合する方法手動確認に変更すると、

canalConnector.ack(message.getId());

は 1 回だけ消費されます。

以上がSpringBoot が Canal メソッドを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。