この記事では、Oracle のリアルタイム データ キャプチャとパフォーマンス チューニングについて説明し、試用プロセス中のいくつかの重要な詳細を共有します。
#Flink CDC は、2021 年 11 月 15 日に最新バージョン 2.1 をリリースしました。これにより、組み込みの Debezium コンポーネントが導入され、Oracle のサポートが追加されました。著者はすぐにこのバージョンを試用するためにダウンロードし、Oracle のリアルタイム データ キャプチャとパフォーマンス チューニングの実装に成功しました。ここで、試用プロセス中の重要な詳細をいくつか共有します。
トライアル環境:
Oracle: 11.2.0.4.0 (RAC デプロイ)
Flink: 1.13.1
Hadoop: 3.2.1
Flink on Yarn を使用してデプロイされます
1. データベースに接続できません
公式ドキュメントによると、Flink SQL ではCLI に次の文を入力します:
create table TEST (A string) WITH ('connector'='oracle-cdc', 'hostname'='10.230.179.125', 'port'='1521', 'username'='myname', 'password'='***', 'database-name'='MY_SERVICE_NAME', 'schema-name'='MY_SCHEMA', 'table-name'='TEST' );
その後、select * from TEST で観察してみると、Oracle が正常に接続できないことがわかります。エラーは次のとおりです:
[ERROR] Could not execute SQL statement. Reason: oracle.net.ns.NetException: Listener refused the connection with the following error: ORA-12505, TNS:listener does not currently know of SID given in connect descriptor
エラー メッセージから判断すると、 Flink CDC が誤って接続していることが原因である可能性があります。メッセージで提供された MY_SERVICE_NAME (Oracle のサービス名) が SID と間違っています。そこで、Oracle Connector に関連する Flink CDC のソース コードを読もうとしたところ、com.ververica.cdc.connectors.oracle.OracleValidator で、Oracle 接続のコードが次のとおりであることがわかりました。上記からわかるように、現在のバージョンの Flink CDC では、SID とサービス名の接続方法の区別はなく、SID の接続方法がコード内に直接記述されています (つまり、ポートとデータベース名が で区切られています)。 ":")。
Oracle 8i 以降、Oracle はデータベースのクラスター (RAC) デプロイメントをサポートするためにサービス名の概念を導入しました。サービス名は、異なる SID インスタンスへの接続を統合するためのデータベースの論理概念として使用できます。データベースの。これに基づいて、次の 2 つの方法を検討できます。
Flink CDC のテーブル作成ステートメントで、database-name を SID の 1 つを使用したサービス名に置き換えます。この方法では接続の問題は解決できますが、主流の Oracle クラスター展開の実際のシナリオには適応できません。
ソース コードを変更してください。具体的には、新しいプロジェクトの com.ververica.cdc.connectors.oracle.OracleValidator メソッドを書き換えて、サービス名の接続メソッド (つまり、「/」を使用してポートと DB 名を区切る) に変更できます。 :
"jdbc:oracle:thin:@" hostname ":" port "/" dbname, userName, userpwd);
作者は 2 番目の方法を使用してデータベースへの通常の接続を実現しますOracle サービス名機能の使用を維持しながら。
2. Oracle テーブルが見つかりません#上記の手順に従い、select * from TEST を実行して再度観察すると、次のことがわかります。
public static Connection openConnection(Properties properties) throws SQLException { DriverManager.registerDriver(new oracle.jdbc.OracleDriver()); String hostname = properties.getProperty("database.hostname"); String port = properties.getProperty("database.port"); String dbname = properties.getProperty("database.dbname"); String userName = properties.getProperty("database.user"); String userpwd = properties.getProperty("database.password"); return DriverManager.getConnection( "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd); }
エラー ログに記載されているテーブルは MY_SERVICE_NAME.MY_SCHEMA.test であることがわかりますが、データベース名とスキーマはなぜですか。テーブル名に大文字と小文字の両方を使用しますか?
このエラーは io.debezium パッケージによって報告されていることに注意してください。パッケージのソース コード (Flink CDC の pom.xml ファイルから) を分析すると、現在 debezium 1.5.4 バージョンを使用しています。 io には、.debezium.relational.Tables に次のコードがあることがわかります:
[ERROR] Could not execute SQL statement. Reason: io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test. Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
Debezium の開発者は、「大文字と小文字を区別しない」ことを「テーブル名を変換する必要がある」と統一して定義していることがわかります。小文字にします。」これは、Debezium でサポートされている PostgreSQL、Mysql などに当てはまります。ただし、Oracle データベースの場合、「大文字と小文字を区別しない」とは、内部メタ情報を保存するときにテーブル名を大文字に変換する必要があることを意味します
したがって、Debezium が「大文字と小文字を区別しない」設定を読み取った後、上記のコード ロジックに従って、 、小文字のテーブル名を読み取ろうとした場合にのみエラーが報告されます。
Debezium は最新の安定バージョン 1.7.1 および最新の開発バージョン 1.8.0 までこの問題を修正していないため、次の 2 つの方法でこの問題を回避できます。 Oracle の「大文字と小文字を区別しない」機能を使用する必要がある場合は、ソース コードを直接変更して、上記の toLowercase を toUppercase に変更できます (これは作成者が選択した方法でもあります)。ソース コードでは、Oracle の "大文字と小文字を区別しない" 機能を使用する必要はありません。次の例に示すように、create ステートメントに 'debezium.database.tablename.case.insensitive'='false' を追加できます。
private TableId toLowerCaseIfNeeded(TableId tableId) { return tableIdCaseInsensitive ? tableId.toLowercase() : tableId; }この方法の欠点は、Oracle の大文字と小文字を区別しない機能が失われることです。大文字のテーブル名を 'table-name' で明示的に指定する必要があります。 database.tablename.case.insensitive パラメータについては、Debezium は現在、Oracle 11g の場合のみデフォルトで true に設定されており、他の Oracle バージョンではデフォルトで false に設定されていることに注意してください。したがって、リーダーが Oracle 11g バージョンを使用していない場合、このパラメータを変更する必要はありませんが、大文字のテーブル名を明示的に指定する必要があります。 3. データ遅延が大きい データ遅延が大きく、データの変更をキャプチャするのに 3 ~ 5 分かかる場合があります。この問題については、Flink CDC FAQ に明確な解決策が記載されています。次の 2 つの構成項目を create ステートメントに追加します。
'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true'
那么为什么要这样做呢?我们依然可以通过分析源码和日志,结合 Oracle Logminer 的工作原理来加深对工具的理解。
对 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法进行。为节约篇幅,本文不列出实际的源码,仅提炼出关键过程绘于下面的流程图,有兴趣的读者可以对照该流程图,结合实际源码进行分析:
采用 redo_log_catalog 的方式,可以监控数据表的 DDL 信息,且由于 archive logs 被永久保存到磁盘上,可以在数据库宕机后依然正常获取到宕机前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。
根据笔者实际测试情况,如果 debezium.log.mining.strategy 为默认配置 redo_log_catalog,则不仅需要多执行第 ① 步操作 (该操作耗时约为半分钟到 1 分钟之间),在第 ④ 步,根据 archived logs 的数据量,耗时也会在 1 分钟到 4 分钟之间浮动;在第 ⑤ 步,实际查询 V$LOGMNR_CONTENTS 视图也常常需要十几秒才能完成。
此外,由于 archive logs 在实际系统中增长速度较快,因此在实际使用中,常会配合进行定期删除或转储过期日志的操作。由于上述第 ④ 步的耗时较长,笔者观察到在第 ④ 步执行的过程中,在一定概率下会发生第 ② 步加入的a rchive logs 已过期而被删除转储的情况,于是在第 ⑤ 步查询的时候,会由于找不到第 ② 步加入的日志,而报下面的错误:
ORA-00308: cannot open archive log '/path/to/archive/log/...' ORA-27037: unable to obtain file status
一般来说,Flink CDC 所需要监控的表,特别是对于业务系统有重大意义的表,一般不会进行 DDL 操作,仅需要捕捉 DML 操作即可,且对于数据库宕机等极特殊情况,也可使用在数据库恢复后进行全量数据更新的方式保障数据的一致性。因而,online_catalog 的方式足以满足我们的需要。
另外,无论使用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步实际需要的日志不同步的问题,因此,加入 'debezium.log.mining.continuous.mine'='true' 参数,将实时搜集日志的工作交给 Oracle 自动完成,即可规避这一问题。
笔者按照这两个参数配置后,数据延迟一般可以从数分钟降至 5 秒钟左右。
四、调节参数继续降低数据延迟
上述流程图的第 ③ 步和第 ⑦ 步,提到了根据配置项来确定 LogMiner 监控时序范围,以及确定休眠时间。下面对该过程进行进一步分析,并对单个表的进一步调优给出一般性的方法论。
通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 方法,可了解到 debezium 对监控时序范围和休眠时间的调节原理。为便于读者理解,将该方法用流程图说明如下:
从上述的流程图中可以看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长能够尽可能和数据库自身 SCN 增加的步长一致。由此可见:
log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的表现有关,和单个表的数据变化情况无关;
log.mining.batch.size.default 不仅仅是监控时序范围的起始值,还是监控时序范围变化的阈值。所以如果要实现更灵活的监控时序范围调整,可考虑适当减小该参数;
由于每一次确定监控时序范围时,都会根据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠时间更灵活的调整,可考虑适当增大 log.mining.sleep.time.increment.ms;
log.mining.batch.size.max 不能过小,否则会有监控时序范围永远无法追上数据库当前 SCN 的风险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:
if (currentBatchSize == batchSizeMax) { LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize); }
如果当前的监控时序范围达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提示。在实际应用中,观察 Flink CDC 产生的 log 是否包含该提示,便可得知 log.mining.batch.size.max 的值是否合理。
五、Debezium Oracle Connector 的隐藏参数
実際には、上記から 2 つの隠しパラメーターを学習しました。 debezium.database.tablename.case.insensitive (2 番目のセクションを参照) と debezium.log.mining.continuous.mine (3 番目のセクションを参照) です。これら 2 つは、 Debezium の公式ドキュメントにはパラメータに関する具体的な説明はありませんが、実際には使用できます。ソース コードを分析すると、Debezium Oracle Connector のすべての隠しパラメータが明らかになり、その説明は次のとおりです。
著者は、2 つのパラメータに加えて、上で使用したものと同じです。 log.mining.history.recorder.class パラメーターに注目する価値があります。現在、このパラメータのデフォルトは空のクラスである io.debezium.connector.oracle.logminer.NeverHistoryRecorder であるため、Flink CDC の動作を分析するときは、io.debezium.connector.oracle.logminer.HistoryRecorder インターフェイスを実装するクラスをカスタマイズします。 、ソースコードを変更することなく、Flink CDC の動作のパーソナライズされた監視を実現できます。
推奨チュートリアル: 「Oracle チュートリアル 」
以上がFlink CDC シリーズからの Oracle データのリアルタイム抽出に関する最も体系的な習得 (マイニングとチューニングの実践)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。