Home  >  Article  >  Database  >  The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices)

The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices)

WBOY
WBOYforward
2022-01-18 17:59:093663browse

This article brings you real-time data capture and performance tuning of Oracle, and shares some key details during the trial process. I hope it will be helpful to everyone.

The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices)

#Flink CDC released the latest version 2.1 on November 15, 2021, which adds support for Oracle by introducing the built-in Debezium component. The author immediately downloaded this version for trial use and successfully implemented real-time data capture and performance tuning of Oracle. Now I will share some key details during the trial process.

Trial environment:

Oracle: 11.2.0.4.0 (RAC deployment)

Flink: 1.13.1

Hadoop: 3.2.1

Deployed using Flink on Yarn

1. Unable to connect to the database

According to the official documentation, in Flink SQL CLI Enter the following statement:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );

Then try to observe through select * from TEST and find that Oracle cannot be connected normally. The error is as follows:

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

Judging from the error message, it may be due to Flink CDC mistakenly connecting The MY_SERVICE_NAME (Oracle's service name) provided in the message is mistaken for the SID. So I tried to read the source code of Flink CDC involving Oracle Connector, and found that in com.ververica.cdc.connectors.oracle.OracleValidator, the code for Oracle connection is as follows:

public static Connection openConnection(Properties properties) throws SQLException {
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection(
            "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}

As can be seen from the above, in the current version In Flink CDC, there is no distinction between the connection methods of SID and Service Name. Instead, the connection method of SID is written directly in the code (that is, port and dbname are separated by ":").

Starting from Oracle 8i, Oracle has introduced the concept of Service Name to support database cluster (RAC) deployment. A Service Name can be used as a logical concept of a database to unify the connections to different SID instances of the database. . Based on this, you can consider the following two methods:

In the create table statement of Flink CDC, replace database-name by Service Name with one of the SIDs. This method can solve the connection problem, but it cannot adapt to the real scenario of mainstream Oracle cluster deployment;

Modify the source code. Specifically, you can rewrite the com.ververica.cdc.connectors.oracle.OracleValidator method in the new project and change it to the connection method of Service Name (that is, use "/" to separate the port and dbname), that is:

"jdbc:oracle:thin:@" hostname ":" port "/" dbname, userName, userpwd);

The author uses the second method to achieve normal connection to the database while retaining Use of the Oracle Service Name feature.

2. The Oracle table cannot be found

#Follow the above steps and observe again through select * from TEST. It is found that the data is still unable to be obtained normally and an error is reported. As follows:

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

It is observed that the table mentioned in the error log is MY_SERVICE_NAME.MY_SCHEMA.test. Why are the database name and Schema name in uppercase, but the table name is in lowercase?

Notice that this error is reported by the io.debezium package. By analyzing the source code of the package (from the pom.xml file of Flink CDC, we currently use debezium 1.5.4 version), we can see that in io There is the following code in .debezium.relational.Tables:

private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

It can be seen that the developers of Debezium uniformly define "case insensitivity" as "the table name needs to be converted to lowercase". This is true for PostgreSQL, Mysql, etc. supported by Debezium. However, for the Oracle database, "case insensitivity" means that the table name needs to be converted to uppercase when storing internal metainformation

Therefore, after Debezium reads the "case insensitivity" configuration, According to the above code logic, an error will only be reported when trying to read a lowercase table name.

Since Debezium has not fixed this problem until the latest stable version 1.7.1, and the latest development version 1.8.0, we can bypass this problem through the following two methods:

If you need to use Oracle's "case-insensitive" feature, you can directly modify the source code and change the above toLowercase to toUppercase (this is also the method chosen by the author);

If you are unwilling to modify the source code, there is no need to use Oracle " "Case insensitive" feature, you can add 'debezium.database.tablename.case.insensitive'='false' in the create statement, as shown in the following example:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
'schema-name'='MY_SCHEMA',
'table-name'='TEST',
'debezium.database.tablename.case.insensitive'='false' );

The disadvantage of this method is that it loses Oracle " "Case insensitive" feature, the uppercase table name must be explicitly specified in 'table-name'.

It should be noted that for the database.tablename.case.insensitive parameter, Debezium currently only sets it to true by default for Oracle 11g, and it is set to false by default for other Oracle versions. Therefore, if the reader is not using Oracle 11g version, there is no need to modify this parameter, but the uppercase table name still needs to be explicitly specified.

3. The data delay is large

The data delay is large, and sometimes it takes 3-5 minutes to capture data changes. For this problem, a clear solution has been given in the Flink CDC FAQ: add the following two configuration items to the create statement:

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

那么为什么要这样做呢?我们依然可以通过分析源码和日志,结合 Oracle Logminer 的工作原理来加深对工具的理解。

对 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法进行。为节约篇幅,本文不列出实际的源码,仅提炼出关键过程绘于下面的流程图,有兴趣的读者可以对照该流程图,结合实际源码进行分析:

The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices)

采用 redo_log_catalog 的方式,可以监控数据表的 DDL 信息,且由于 archive logs 被永久保存到磁盘上,可以在数据库宕机后依然正常获取到宕机前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。

根据笔者实际测试情况,如果 debezium.log.mining.strategy 为默认配置 redo_log_catalog,则不仅需要多执行第 ① 步操作 (该操作耗时约为半分钟到 1 分钟之间),在第 ④ 步,根据 archived logs 的数据量,耗时也会在 1 分钟到 4 分钟之间浮动;在第 ⑤ 步,实际查询 V$LOGMNR_CONTENTS 视图也常常需要十几秒才能完成。

此外,由于 archive logs 在实际系统中增长速度较快,因此在实际使用中,常会配合进行定期删除或转储过期日志的操作。由于上述第 ④ 步的耗时较长,笔者观察到在第 ④ 步执行的过程中,在一定概率下会发生第 ② 步加入的a rchive logs 已过期而被删除转储的情况,于是在第 ⑤ 步查询的时候,会由于找不到第 ② 步加入的日志,而报下面的错误:

ORA-00308: cannot open archive log '/path/to/archive/log/...'
ORA-27037: unable to obtain file status

一般来说,Flink CDC 所需要监控的表,特别是对于业务系统有重大意义的表,一般不会进行 DDL 操作,仅需要捕捉 DML 操作即可,且对于数据库宕机等极特殊情况,也可使用在数据库恢复后进行全量数据更新的方式保障数据的一致性。因而,online_catalog 的方式足以满足我们的需要。

另外,无论使用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步实际需要的日志不同步的问题,因此,加入 'debezium.log.mining.continuous.mine'='true' 参数,将实时搜集日志的工作交给 Oracle 自动完成,即可规避这一问题。

笔者按照这两个参数配置后,数据延迟一般可以从数分钟降至 5 秒钟左右。

四、调节参数继续降低数据延迟

上述流程图的第 ③ 步和第 ⑦ 步,提到了根据配置项来确定 LogMiner 监控时序范围,以及确定休眠时间。下面对该过程进行进一步分析,并对单个表的进一步调优给出一般性的方法论。

通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 方法,可了解到 debezium 对监控时序范围和休眠时间的调节原理。为便于读者理解,将该方法用流程图说明如下:

The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices)

从上述的流程图中可以看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长能够尽可能和数据库自身 SCN 增加的步长一致。由此可见:

log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的表现有关,和单个表的数据变化情况无关;

log.mining.batch.size.default 不仅仅是监控时序范围的起始值,还是监控时序范围变化的阈值。所以如果要实现更灵活的监控时序范围调整,可考虑适当减小该参数;

由于每一次确定监控时序范围时,都会根据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠时间更灵活的调整,可考虑适当增大 log.mining.sleep.time.increment.ms;

log.mining.batch.size.max 不能过小,否则会有监控时序范围永远无法追上数据库当前 SCN 的风险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:

if (currentBatchSize == batchSizeMax) {
    LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
}

如果当前的监控时序范围达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提示。在实际应用中,观察 Flink CDC 产生的 log 是否包含该提示,便可得知 log.mining.batch.size.max 的值是否合理。

五、Debezium Oracle Connector 的隐藏参数

In fact, we have learned two hidden parameters from the above: debezium.database.tablename.case.insensitive (see the second section) and debezium.log.mining.continuous.mine (see the third section) , these two parameters have no actual description in the official documentation of Debezium, but they can actually be used. By analyzing the source code, all the hidden parameters of Debezium Oracle Connector are now given, and their descriptions are as follows:

The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices)

The author believes that in addition to the two parameters we have used above, the same It is worth focusing on the log.mining.history.recorder.class parameter. Since this parameter currently defaults to io.debezium.connector.oracle.logminer.NeverHistoryRecorder, which is an empty class, when analyzing Flink CDC behavior, we customize a class that implements the io.debezium.connector.oracle.logminer.HistoryRecorder interface. , which can realize personalized monitoring of Flink CDC behavior without modifying the source code.

Recommended tutorial: "Oracle Tutorial"

The above is the detailed content of The most systematic mastery of real-time extraction of Oracle data from the Flink CDC series (mining and tuning practices). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:juejin.im. If there is any infringement, please contact admin@php.cn delete