Rumah >pangkalan data >Oracle >Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan)

Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan)

WBOY
WBOYke hadapan
2022-01-18 17:59:093873semak imbas

Artikel ini membawakan kepada anda penangkapan data masa nyata dan penalaan prestasi Oracle, dan berkongsi beberapa butiran penting semasa proses percubaan saya harap ia akan membantu semua orang.

Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan)

Flink CDC mengeluarkan versi terbaharu 2.1 pada 15 November 2021, yang menambah sokongan untuk Oracle dengan memperkenalkan komponen Debezium terbina dalam. Penulis segera memuat turun versi ini untuk kegunaan percubaan dan berjaya melaksanakan penangkapan data masa nyata dan penalaan prestasi Oracle Sekarang saya akan berkongsi beberapa butiran penting semasa proses percubaan.

Persekitaran percubaan:

Oracle: 11.2.0.4.0 (pengerahan RAC)

Flink: 1.13.1

Hadoop: 3.2.1

Sebarkan dan gunakan melalui Flink on Yarn

1 Tidak dapat menyambung ke pangkalan data

Menurut dokumentasi rasmi, dalam. Flink SQL CLI Masukkan pernyataan berikut:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );

Kemudian cuba amati melalui pilih * daripada TEST dan mendapati anda tidak boleh menyambung ke Oracle seperti biasa:

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

Berdasarkan mesej ralat, ini mungkin disebabkan oleh Flink CDC tersilap MY_SERVICE_NAME (nama perkhidmatan Oracle) yang disediakan dalam maklumat sambungan untuk SID. Jadi saya cuba membaca kod sumber Flink CDC yang melibatkan Oracle Connector, dan mendapati bahawa dalam com.ververica.cdc.connectors.oracle.OracleValidator, kod untuk sambungan Oracle adalah seperti berikut:

public static Connection openConnection(Properties properties) throws SQLException {
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection(
            "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}

Sebagaimana boleh dilihat dari atas, dalam Dalam versi semasa Flink CDC, tiada perbezaan antara kaedah sambungan SID dan Nama Perkhidmatan Sebaliknya, kaedah sambungan SID ditulis secara langsung dalam kod (iaitu, port dan dbname dipisahkan oleh ":").

Bermula dari Oracle 8i, Oracle telah memperkenalkan konsep Nama Perkhidmatan untuk menyokong penggunaan kluster (RAC) bagi pangkalan data A Nama Perkhidmatan boleh digunakan sebagai konsep logik pangkalan data untuk menyatukan sambungan kepada kejadian SID yang berbeza daripada pangkalan data. Berdasarkan ini, dua kaedah berikut boleh dipertimbangkan:

Dalam penyata jadual cipta Flink CDC, gantikan nama pangkalan data dengan Nama Perkhidmatan dengan salah satu SID. Kaedah ini boleh menyelesaikan masalah sambungan, tetapi ia tidak dapat menyesuaikan diri dengan senario sebenar penggunaan gugusan Oracle arus perdana

Ubah suai kod sumber. Khususnya, dalam projek baharu, anda boleh menulis semula kaedah com.ververica.cdc.connectors.oracle.OracleValidator dan menukarnya kepada kaedah sambungan Nama Perkhidmatan (iaitu, gunakan "/" untuk memisahkan port dan nama db), itu ialah:

"jdbc:oracle:thin:@" hostname ":" port "/" dbname, userName, userpwd);

Pengarang menggunakan kaedah kedua untuk mencapai sambungan biasa ke pangkalan data sambil mengekalkan Penggunaan ciri Nama Perkhidmatan Oracle.

2. Jadual Oracle tidak ditemui

Ikuti langkah di atas dan amati semula melalui pilih * daripada TEST data masih tidak dapat diperolehi secara normal dan ralat dilaporkan Seperti berikut:

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

Diperhatikan bahawa jadual yang disebut dalam log ralat ialah MY_SERVICE_NAME.MY_SCHEMA.test Mengapa nama pangkalan data dan Nama skema dalam huruf besar dan huruf kecil dalam nama jadual?

Perhatikan bahawa ralat ini dilaporkan oleh pakej io.debezium Dengan menganalisis kod sumber pakej (daripada fail pom.xml Flink CDC, kami sedang menggunakan versi debezium 1.5.4). boleh dilihat bahawa dalam io .debezium.relational.Tables mengandungi kod berikut:

private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

Dapat dilihat bahawa pembangun Debezium secara seragam mentakrifkan "ketidakpekaan huruf besar" sebagai "nama jadual perlu ditukar kepada huruf kecil". Ini benar untuk PostgreSQL, Mysql, dsb. yang disokong oleh Debezium. Walau bagaimanapun, untuk pangkalan data Oracle, "ketidakpekaan huruf besar" bermaksud bahawa apabila menyimpan metamaklumat dalaman, nama jadual perlu ditukar kepada huruf besar

Oleh itu, selepas Debezium membaca konfigurasi "ketidakpekaan huruf besar", Menurut perkara di atas logik kod, ralat hanya akan dilaporkan apabila cuba membaca nama jadual huruf kecil.

Memandangkan Debezium tidak membetulkan masalah ini sehingga versi stabil terkini 1.7.1, dan versi pembangunan terkini 1.8.0, kami boleh memintas masalah ini melalui dua kaedah berikut:

Jika anda perlu menggunakan ciri "tidak sensitif huruf besar" Oracle, anda boleh mengubah suai kod sumber secara langsung dan menukar di atas kepada Huruf Kecil kepada Huruf Besar (ini juga merupakan kaedah yang dipilih oleh pengarang); kod sumber, tidak perlu menggunakan ciri Oracle " "Case insensitive", anda boleh menambah 'debezium.database.tablename.case.insensitive'='false' dalam kenyataan cipta, seperti yang ditunjukkan dalam contoh berikut:

Kelemahan kaedah ini ialah ia hilang Disebabkan oleh ciri "tidak peka huruf besar" Oracle, nama jadual huruf besar mesti dinyatakan secara jelas dalam 'nama jadual'.
create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
'schema-name'='MY_SCHEMA',
'table-name'='TEST',
'debezium.database.tablename.case.insensitive'='false' );

Perlu diambil perhatian bahawa untuk parameter database.tablename.case.insensitive, Debezium pada masa ini hanya menetapkannya kepada benar secara lalai untuk Oracle 11g, dan ia ditetapkan kepada palsu secara lalai untuk versi Oracle yang lain. Oleh itu, jika pembaca tidak menggunakan versi Oracle 11g, tidak perlu mengubah suai parameter ini, tetapi nama jadual huruf besar masih perlu dinyatakan dengan jelas.

3. Kelewatan data adalah besar Lengah data adalah besar, kadangkala ia mengambil masa 3-5 minit untuk menangkap perubahan data. Untuk masalah ini, penyelesaian yang jelas telah diberikan dalam FAQ CDC Flink: tambahkan dua item konfigurasi berikut pada kenyataan cipta:

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

那么为什么要这样做呢?我们依然可以通过分析源码和日志,结合 Oracle Logminer 的工作原理来加深对工具的理解。

对 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法进行。为节约篇幅,本文不列出实际的源码,仅提炼出关键过程绘于下面的流程图,有兴趣的读者可以对照该流程图,结合实际源码进行分析:

Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan)

采用 redo_log_catalog 的方式,可以监控数据表的 DDL 信息,且由于 archive logs 被永久保存到磁盘上,可以在数据库宕机后依然正常获取到宕机前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。

根据笔者实际测试情况,如果 debezium.log.mining.strategy 为默认配置 redo_log_catalog,则不仅需要多执行第 ① 步操作 (该操作耗时约为半分钟到 1 分钟之间),在第 ④ 步,根据 archived logs 的数据量,耗时也会在 1 分钟到 4 分钟之间浮动;在第 ⑤ 步,实际查询 V$LOGMNR_CONTENTS 视图也常常需要十几秒才能完成。

此外,由于 archive logs 在实际系统中增长速度较快,因此在实际使用中,常会配合进行定期删除或转储过期日志的操作。由于上述第 ④ 步的耗时较长,笔者观察到在第 ④ 步执行的过程中,在一定概率下会发生第 ② 步加入的a rchive logs 已过期而被删除转储的情况,于是在第 ⑤ 步查询的时候,会由于找不到第 ② 步加入的日志,而报下面的错误:

ORA-00308: cannot open archive log '/path/to/archive/log/...'
ORA-27037: unable to obtain file status

一般来说,Flink CDC 所需要监控的表,特别是对于业务系统有重大意义的表,一般不会进行 DDL 操作,仅需要捕捉 DML 操作即可,且对于数据库宕机等极特殊情况,也可使用在数据库恢复后进行全量数据更新的方式保障数据的一致性。因而,online_catalog 的方式足以满足我们的需要。

另外,无论使用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步实际需要的日志不同步的问题,因此,加入 'debezium.log.mining.continuous.mine'='true' 参数,将实时搜集日志的工作交给 Oracle 自动完成,即可规避这一问题。

笔者按照这两个参数配置后,数据延迟一般可以从数分钟降至 5 秒钟左右。

四、调节参数继续降低数据延迟

上述流程图的第 ③ 步和第 ⑦ 步,提到了根据配置项来确定 LogMiner 监控时序范围,以及确定休眠时间。下面对该过程进行进一步分析,并对单个表的进一步调优给出一般性的方法论。

通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 方法,可了解到 debezium 对监控时序范围和休眠时间的调节原理。为便于读者理解,将该方法用流程图说明如下:

Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan)

从上述的流程图中可以看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长能够尽可能和数据库自身 SCN 增加的步长一致。由此可见:

log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的表现有关,和单个表的数据变化情况无关;

log.mining.batch.size.default 不仅仅是监控时序范围的起始值,还是监控时序范围变化的阈值。所以如果要实现更灵活的监控时序范围调整,可考虑适当减小该参数;

由于每一次确定监控时序范围时,都会根据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠时间更灵活的调整,可考虑适当增大 log.mining.sleep.time.increment.ms;

log.mining.batch.size.max 不能过小,否则会有监控时序范围永远无法追上数据库当前 SCN 的风险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:

if (currentBatchSize == batchSizeMax) {
    LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
}

如果当前的监控时序范围达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提示。在实际应用中,观察 Flink CDC 产生的 log 是否包含该提示,便可得知 log.mining.batch.size.max 的值是否合理。

五、Debezium Oracle Connector 的隐藏参数

Malah, kami telah mempelajari dua parameter tersembunyi daripada perkara di atas: debezium.database.tablename.case.insensitive (lihat bahagian kedua) dan debezium.log.mining.continuous.mine (lihat bahagian ketiga), kedua-duanya parameter tidak mempunyai penerangan sebenar dalam dokumentasi rasmi Debezium, tetapi ia sebenarnya boleh digunakan. Dengan menganalisis kod sumber, semua parameter tersembunyi Debezium Oracle Connector kini diberikan, dan penerangannya adalah seperti berikut:

Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan)

Pengarang percaya bahawa sebagai tambahan kepada dua parameter kami telah menggunakan di atas, sama Apa yang patut diberi perhatian khusus ialah parameter log.mining.history.recorder.class. Memandangkan parameter ini pada masa ini lalai kepada io.debezium.connector.oracle.logminer.NeverHistoryRecorder, iaitu kelas kosong, apabila menganalisis gelagat Flink CDC, kami menyesuaikan kelas yang melaksanakan antara muka io.debezium.connector.oracle.logminer.HistoryRecorder. , yang boleh merealisasikan pemantauan diperibadikan kelakuan Flink CDC tanpa mengubah suai kod sumber.

Tutorial yang disyorkan: "Tutorial Oracle"

Atas ialah kandungan terperinci Penguasaan paling sistematik pengekstrakan masa nyata data Oracle daripada siri Flink CDC (amalan perlombongan dan penalaan). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:juejin.im. Jika ada pelanggaran, sila hubungi admin@php.cn Padam