ホームページ  >  記事  >  データベース  >  mysql ベースの binlog ロールバック ツールの例の詳細な説明

mysql ベースの binlog ロールバック ツールの例の詳細な説明

PHP中文网
PHP中文网オリジナル
2017-06-21 10:27:261685ブラウズ

更新および削除の条件が誤って書き込まれているか、まったく書き込まれていない場合、データ操作エラーが発生し、誤操作された行レコードを復元する必要があります。この状況は実際に発生します。バックアップ ファイル + binlog を使用してテスト環境に復元し、その後データ修復を実行することもできますが、これには実際にはある程度の時間とリソースが必要です。

実際、binlog 形式が row の場合、各トランザクションに含まれる操作が binlog ファイルに詳細に記録され、各トランザクションによって影響を受ける行レコードが保存されます。binlog ファイルを使用して行を逆分析することはできますか。データベースに変更を記録するとどうなるでしょうか?
業界には関連するスクリプトやツールが多数ありますが、MySQL バージョンの更新、binlog レコードの内容の変更、および一貫性のない要件により、ほとんどのスクリプトは個人の現在の使用ニーズに適していないため、mysql フラッシュバックを作成し始めました。脚本。


転載する場合は、ブログ投稿の出典を明記してください: www.cnblogs.com/xinysu/ 、著作権は Blog Park Sujia Xiaoluobo に属します。応援していただければ幸いです!


MySQL 5.6/5.7 バージョンでのみテストされており、Python オペレーティング環境には pymysql モジュールがインストールされている必要があります。

1 実装内容

binlogファイルを元に、特定のトランザクション、一定期間の特定テーブル、一定期間のデータベース全体に対してロールバック操作を行い、フラッシュバック機能を実装します。ツールの処理中に、binlog 内のトランザクションによって変更された行レコードがテーブルに保存されます。dml_sql 列を使用して、各トランザクション内のすべての行レコードの変更を表示できます。また、undo_sql を使用して、ロールバックされた SQL コンテンツを表示できます。 。次に示すように、テーブルの内容に基づいてロールバック操作を実行します。
それでは、このスクリプトの利点は何でしょうか?
  • ロールバックは 2 つのコマンドに分かれています。最初のコマンドは binglog を分析してデータベースに保存します。2 番目のコマンドはロールバック操作を実行します。

  • ロールバックする場合、実行スクリプトとロールバック スクリプトを統合できます。データベースに保存されているため、更新内容とロールバック内容を確認できます。

  • 保存された分析テーブルに従って、復元するトランザクションまたは指定したテーブルを指定すると便利です。

  • 詳細なログ出力には、分析の進行状況が表示されます。実行の進行状況。

binlog 出力スクリーンショットを分析する (1G binlog ファイルを分析する)
ロールバックデータベースの出力スクリーンショット:
2 原則

前提条件: インスタンスが binlog を開始しており、形式はROWです。 ️ Python を使用して、mysqlbinlog の後のログ ファイルのテキスト分析と処理を実行します。処理プロセス全体で、次の 6 つの難しい点に対処する必要があります:

トランザクションの開始と終了の決定
    同じトランザクション 実行順序を逆に実行する必要がある
  1. SQLの解析とロールバック
  2. 同じトランザクション操作で異なるテーブル処理
  3. 改行、タブなどのエスケープ文字処理文字など
  4. タイムスタンプのデータ型パラメータ値の変換
  5. 負の数値の処理
  6. 単一トランザクションに行変更SQL操作が含まれるmax_allow
  7. データベース全体ではなく特定のテーブルのロールバックを実行rollback
  8. 2.1 トランザクション
  9. の開始と終了は、Xid が出現する位置に応じて判断されます。このサイクルは、ファイルの順次走査が終了するまで続きます。

2.2
トランザクション内部の逆順序処理

同じトランザクション内で、ロールバック中に複数のテーブルと複数の行レコードが変更された場合、SQL は逆の順序でロールバックされる必要があります。抽出したSQLを逆順に保存するには?アイデアは次のとおりです:

  • レコードの各行の変更されたSQLが分離されます

  • 独立したSQLが逆順に格納されます

順方向のトランザクションSQLステートメントが変数dml_sqlに格納されていると仮定すると、逆順のトランザクションSQLステートメントはSQLはロールバック変数undo_sqlに格納できます。行レコード変更のSQLを順番に抽出して変数record_sqlに格納し、値undo_sql =record_sql + undo_sqlを代入し、record_sql変数を空白に設定することで、リバーストランザクション内でのSQLの実行が実現できます。 。

2.3 ロールバック SQL を解析する

まず、binlog のログ内容を確認し、行変更の SQL 状況が次のとおりであることを確認します。抽出プロセス中は、次の問題に注意する必要があります。
  • 行レコードの列名マッチング、binlog ファイルに保存されている列番号を直接使用することはできません。WHERE 部分と SET 部分の間にキーワードや記号を追加する必要はありません。 SQLをINSERT

  • UPDATE SQLに置き換える必要があります。INSERT SQLをDELETEに置き換える必要があります。列のシリアル番号を置き換える場合は、この点に注意する必要があります。列名。

    各行レコードの前には、「Table_map」マークを含むレコードの行があり、この行でどのテーブルが変更されたかを示します。このプロンプトに従って、ビンログ内の列のシリアル番号をその列に置き換えることができます。名前。
  • 2.5 エスケープ文字の処理

  • binlog ファイルは、エスケープ文字列ストレージを使用して、スペース以外の空白文字を処理します。たとえば、テーブルの挿入列のレコードには改行文字が含まれていますが、実際には binlog ファイル内にあります。 、x0a は改行操作の置き換えに使用されるため、データをロールバックするプロセス中にエスケープ文字を処理する必要があります。

  • ここで注意すべき点は、エスケープ文字 039 は関数 esc_code 内で一律に処理されず、個別に処理されることです。

転送文字テーブルを以下に示します。

2.6 タイムスタンプデータ型の処理

データベースに実際に格納されるタイムスタンプの値は必須の INT 型です 関数を使用します変換する from_unixtime 。

タイムスタンプ列を 1 つだけ含むテストテーブル tbtest を作成します。値を保存した後、具体的なスクリーンショットは次のとおりです。

行レコードを処理するときは、タイムスタンプを処理する必要があります。値を入力し、from_unixtime 関数 Convert を追加します。

2.7 負の値の処理

これは、最初にコードを書いたときは考慮されていませんでした。広範なテスト中に、すべての整数データ型が負の数値を格納するときに最大範囲値を格納することが判明しました。 binlog がこれをどのように処理するかのメカニズムはあまり明確ではありません。テストは次のとおりです:

したがって、INT および VALUE が負の数値のさまざまなデータ型に遭遇した場合、undo_sql を実行する前にこの範囲の値を削除する必要があります。

2.8 単一トランザクション行レコードの合計 SQL が max_allowed_pa​​ckage 処理を超えています
バイナリログを分析すると、2 種類の SQL が保存されます。1 つは行レコードの変更 SQL、つまり dml_sql です。行レコードのロールバックSQL、つまりundo_sql。コードから、これら 2 つの SQL を格納する列はロングテキストであり、最大 4G のコンテンツを格納できることがわかります。ただし、MySQL の単一セッションのパケット サイズは制限されており、デフォルトのサイズは 4MB で、最大値は 1G です。そのため、このスクリプトを使用する前に、binlog ファイルを保存するデータベース インスタンスを手動で設定してください。オンライン データベース インスタンス:
set global max_allowed_pa​​cket = 1073741824; #後で変更することを忘れないでください

それが機能したらどうなりますか?この場合、ロールバックはセグメント単位でのみ実行できます。まず、この大規模なトランザクションを個別に実行し、その後、この部分は pymysql またはソース ファイルを使用して実行できないため、この操作は実行できます。手動でのみ実行してください。 有能な人にこのロジック コードの修正を依頼してください。 ! !

2.9 対象を絞ったロールバック

誤操作の明確な時点がないと仮定すると、間隔は 1 つだけで、この間隔には他のテーブル操作があるため、この時点で binlog ファイルを分析するときに、次のことが必要になります。追加する -- データベース オプションの場合、最初に同じデータベース内の binlog ファイルを選択します。
ここでの処理は、この間隔の dml_sql と undo_sql をデータベーステーブルに格納し、ロールバックする必要のないトランザクションと、ロールバックが必要な残りのトランザクションを削除します。ロールバック操作を再度実行してください。

3 使用説明

3.1 パラメータの説明

このスクリプトにはもう少し多くのパラメータがあり、--help を使用して具体的な手順を表示できます。

私はパラメータを分類するためにさまざまな色を使用するのが好きなので (ブリンブリングはカラフルで、とても面白くて元気に見えます)、これらのパラメータを色別に説明します。
  • 黄色の領域: これらの 6 つのパラメーターは、binlog ファイルの分析と保存に関連する値を提供し、分析結果を保存するデータベースのリンク方法、binlog ファイルの場所、名前を示します。結果が保存されるテーブル

  • 青色の領域: これらの 4 つのパラメーターは、オンライン データベースのテーブル構造と一致する DB インスタンスの接続方法を提供します。必要なのはオンライン データベースと同じテーブル構造だけです。必ずマスター/スレーブ データベースである必要があります。

  • 緑色の領域: 最も重要なオプション -a、0 は binlog ファイルの分析のみを意味し、1 はロールバック操作の実行のみを意味し、1 を実行する前に 0 を実行する必要があります。 ;

  • 紫色のエリア: 例。

3.2 アプリケーションシナリオの説明

  • 一定期間のデータベース全体のロールバック

    • 一定期間のすべての SQL 操作を一定の時点までロールバックする必要があります

    • この場合、ほとんどはバックアップファイル+binlogを使うことで解決します

    • このスクリプトでも要件を満たすことができますが、最初に-a=0を直接実行せずに、解析結果を確認してください。整合性がある場合は、スレーブ データベースを停止してから、スレーブ データベース上でビジネス アクセスを実行し、指定した時点に復元されているかどうか、およびデータが正常であるかどうかを確認します。

  • 特定の操作は、特定の期間中に特定のテーブルでロールバックされます

    • たとえば、開発はバッチ更新スクリプトを送信しましたが、すべてのテスト レベルでの検証に問題はありませんでした。オンライン実行のために送信されましたが、実行後にビジネスがテストに失敗したことが判明し、一部のフィールドが更新され、他のビジネスに影響を及ぼしました。バッチ更新されたテーブルを元の行レコードに緊急にロールバックする必要があります。

    • これは技術的な観点だけで対応することはできず、総合的に考える必要があります

      • この場合、タブAテーブルの変更操作をどのように見直すか?

      • 個人的には、tabA テーブルのデータをテスト環境にダンプし、11 時から 12 時までの binlog ファイルの undo SQL を分析してから、テーブルを 12 時にロールバックする方法がより実現可能だと思います。テスト環境の 11 時の時点で、開発とビジネスはテスト環境の 11 時のデータをオンラインの既存のデータと比較して、どの行と列がオンラインでロールバックされる必要があるかを確認します。必要ない場合は、開発側がそれを送信し、SQL スクリプトがオンラインで実行されます。実際、ここで DBA が提供する役割は 1 つだけで、新しい環境でテーブル タブ A を特定の時点にロールバックするというものですが、SQL 処理の直接オンライン ロールバックは提供しません。

  • 特定の SQL/一部の SQL のロールバック

    • この状況は、更新または削除で where 条件が欠落しているか、where 条件の実行エラーが発生することが比較的一般的です

    • この場合、対応する トランザクションのロールバックを実行するだけです。ロールバックのプロセスについては上記を参照してください。データベース全体のロールバック 一定の時間帯で

      9:10 から 9:15 までの間にデータベース上のすべての操作をロールバックする必要があるとします:

      • 准备测试环境实例存储分析后的数据 

      • 测试环境修改set global max_allowed_packet = 1073741824

      • mysqlbinlog分析binlog文件

      • python脚本分析文件,action=0

      • 线上测试环境修改set global max_allowed_packet = 1073741824

      • 回滚数据,action=1

      • 线上测试环境修改set global max_allowed_packet = 4194304

       1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
      27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
      42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

      3.3.2 某段时间某些表格回滚某些操作

      • 准备测试环境实例存储分析后的数据 

      • 测试环境修改set global max_allowed_packet = 1073741824

      • mysqlbinlog分析binlog文件

      • python脚本分析文件,action=0

      • 分析帅选需要的事务,rename表格

      • dump 对应的表格到测试环境

      • 回滚数据,action=1

      • 提交给开发业务对比数据

      3.3.3 回滚某个/些SQL

      • 准备测试环境实例存储分析后的数据 

      • 测试环境修改set global max_allowed_packet = 1073741824

      • mysqlbinlog分析binlog文件

      • python脚本分析文件,action=0

      • 分析帅选需要的事务,rename表格

      • dump 对应的表格到测试环境

      • 回滚数据,action=1

      • 提交给开发业务对比数据

      4 python脚本

           脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

        1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
       31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
       39         0 just analyse the binlog file ,and store sql in table; 
       40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
       42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
       47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
       48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
       52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
       53                        -a=1 54 \033[0m                        
       55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id<=tran_num:381             logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))382             undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;&#39;.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=&#39;&#39;387 388             for u_row in undo_rows:389                 try:390                     self.on_cur.execute(u_row[&#39;undo_sql&#39;])391                     self.on_mysqlconn.commit()392                 except Exception:393                     print(&#39;auto_id:&#39;,u_row[&#39;auto_id&#39;])394             id+=number395 396 397     def undo_file(self,number):398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408         for num in tran_rows:409             tran_num=num[&#39;table_rows&#39;]410 411         logging.info(&#39;copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql&#39;)412         logging.info(&#39;\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m&#39;.format(tran_num,int(tran_num/number)+1,number))413 414         with open(&#39;/tmp/flashback_undosql/undo_file_flashback.sql&#39;, &#39;w&#39;) as w_f:415             while id<=tran_num:416                 logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))417                 undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421                 for u_row in undo_rows:422                     try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427                     except Exception:428                         print('auto_id',u_row['auto_id'])429                     #time.sleep(2)430                 id+=number431 432     def do(self):433         if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436             #self.binlogdesc()437         elif self.action=='1':438             self.undosql(10000)439 440     def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()

       

以上がmysql ベースの binlog ロールバック ツールの例の詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。