Heim  >  Artikel  >  Datenbank  >  Detaillierte Erläuterung des Beispiels eines MySQL-basierten Binlog-Rollback-Tools

Detaillierte Erläuterung des Beispiels eines MySQL-basierten Binlog-Rollback-Tools

PHP中文网
PHP中文网Original
2017-06-21 10:27:261730Durchsuche

Die Bedingungen für Aktualisierung und Löschung werden falsch oder gar nicht geschrieben, was zu Datenoperationsfehlern führt und die Notwendigkeit verursacht, die fehlerhaften Zeilendatensätze wiederherzustellen. Diese Situation tritt tatsächlich auf. Sie können die Sicherungsdatei + das Binlog zur Wiederherstellung in der Testumgebung verwenden und dann die Datenreparatur durchführen. Dies erfordert jedoch tatsächlich eine gewisse Zeit und Ressourcen.

Wenn das Binlog-Format tatsächlich eine Zeile ist, zeichnet die Binlog-Datei die an jeder Transaktion beteiligten Vorgänge im Detail auf und speichert die von jeder Transaktion betroffenen Zeilendatensätze. Kann die Binlog-Datei zum Umkehren der Analyse verwendet werden? Datenbank? Was ist mit Änderungen in Zeilendatensätzen?
Es gibt viele verwandte Skripte und Tools in der Branche. Aufgrund der Aktualisierung der MySQL-Version, der Änderungen im Binlog-Datensatzinhalt und der inkonsistenten Anforderungen sind die meisten Skripte jedoch nicht für den persönlichen aktuellen Nutzungsbedarf geeignet, weshalb ich damit begonnen habe Schreiben Sie sie.


Wenn Sie es erneut drucken, bitte Beachten Sie die Quelle des Ming-Blogs: www.cnblogs.com/xinysu/, das Urheberrecht liegt bei Blog Park Sujia Xiaoluobo. Ich hoffe, Sie unterstützen!


Nur ​​in Beim Testen der MySQL 5.6/5.7-Version erfordert die Python-Betriebsumgebung die Installation des pymysql-Moduls.

1 Implementierungsinhalt

Basierend auf der Binlog-Datei, für bestimmte Transaktionen, bestimmte Tabellen und bestimmte Zeiträume Das Ganze Die Datenbank wird zurückgesetzt, um die Flashback-Funktion zu implementieren. Während der Tool-Verarbeitung werden die durch die Transaktion im Binlog geänderten Zeilendatensätze in der Tabelle gespeichert. Über die Spalte dml_sql können Sie alle Zeilendatensatzänderungen innerhalb jeder Transaktion anzeigen, und über undo_sql können Sie den zurückgesetzten SQL-Inhalt anzeigen . Führen Sie dann wie unten gezeigt den Rollback-Vorgang basierend auf dem Tabelleninhalt durch.
Was sind also die Vorteile dieses Skripts?
  • Rollback ist in zwei Befehle unterteilt: Der erste Befehl analysiert das Binglog und speichert es in der Datenbank; der zweite Befehl führt den Rollback-Vorgang aus; 🎜>Beim Rollback können das Ausführungsskript und das Rollback-Skript in der Datenbank gespeichert und der Aktualisierungsinhalt und Rollback-Inhalt angezeigt werden

  • Gemäß der gespeicherten Analysetabelle; Es ist praktisch, die wiederherzustellende Transaktion oder bestimmte Tabelle anzugeben.

  • Detaillierte Protokollausgabe, die den Analyse- und Ausführungsfortschritt anzeigt.

  • Binlog-Ausgabe-Screenshot analysieren (1G-Binlog-Datei analysieren)

Setzen Sie die Ausgabe der Datenbank zurück Screenshot:
2 Prinzip

Voraussetzung: Die Instanz hat binlog gestartet und das Format ist ROW.

Verwenden Sie Python, um nach mysqlbinlog eine Textanalyse und -verarbeitung für die Protokolldatei durchzuführen. Während des gesamten Verarbeitungsprozesses müssen die folgenden 6 schwierigen Punkte behandelt werden:
Beurteilen Sie den Beginn und das Ende einer Transaktion
  1. Die Ausführungsreihenfolge derselben Transaktion muss in umgekehrter Reihenfolge ausgeführt werden

  2. Rollback-SQL analysieren

  3. Unterschiedliche Tabellenverarbeitung für denselben Transaktionsvorgang

  4. Escape-Zeichenverarbeitung, z. B. Zeilenumbrüche, Tab Zeichen usw.

  5. Zeitstempel-Datentyp-Parameterwertkonvertierung

  6. Verarbeitung negativer Zahlen

  7. A Eine einzelne Transaktion beinhaltet die Zeilenänderung. SQL-Operationen max_allow

  8. Rollback für eine bestimmte Tabelle statt der gesamten Datenbank

2.1 Der Anfang und das Ende der Transaktion

Lesen Sie basierend auf der Position, an der Xid erscheint, vom Anfang der Binlog-Datei und extrahieren Sie die SQL-Anweisung, wenn Sie darauf stoßen Zuvor extrahiertes SQL wird in einer Transaktion zusammengefasst. Anschließend werden weiterhin SQL-Anweisungen extrahiert, bis die nächste XID auftritt, und dann wird das SQL dieser Transaktion in einer Transaktion zusammengefasst.

2.2

Transaktionsinterne Rückabwicklung

In derselben Transaktion Wenn es Änderungen an Datensätzen in mehreren Tabellen und Zeilen gibt, sollte das SQL beim Rollback in umgekehrter Reihenfolge zurückgesetzt werden. Wie speichert man also das extrahierte SQL in umgekehrter Reihenfolge? Die Idee ist wie folgt:
  • Das geänderte SQL jeder Datensatzzeile wird getrennt

  • Das unabhängige SQL wird in umgekehrter Reihenfolge gespeichert

Annahme: Die SQL-Anweisungen für Transaktionen in Vorwärtsreihenfolge werden in der Variablen dml_sql gespeichert, und die SQL-Anweisungen in umgekehrter Reihenfolge, die zurückgesetzt werden können, werden in der Variablen undo_sql gespeichert. Extrahieren Sie die SQL der Zeilendatensatzänderung der Reihe nach und speichern Sie sie in der Variablen record_sql, weisen Sie dann den Wert undo_sql =record_sql + undo_sql zu und setzen Sie dann die Variable record_sql leer. Auf diese Weise kann die Ausführung von SQL innerhalb der umgekehrten Transaktion realisiert werden .

2.3 Rollback-SQL analysieren

Überprüfen Sie zunächst den Protokollinhalt von binlog und stellen Sie fest, dass die SQL-Situation der Zeilenänderung wie folgt ist Während des Extraktionsprozesses müssen folgende Probleme beachtet werden:
  • Der Spaltennamenabgleich von Zeilendatensätzen und die in der Binlog-Datei gespeicherte Spaltenseriennummer können nicht direkt verwendet werden

  • WHERE-Teil und SET Es müssen keine Schlüsselwörter oder Symbole zwischen den Teilen UND oder Kommas hinzugefügt werden

  • DELETE SQL muss in INSERT umgekehrt werden

  • UPDATE SQL muss umgekehrt werden. Ersetzen Sie die WHERE- und SET-Teile

  • INSERT SQL muss in DELETE umgekehrt werden

2.4 Verarbeitung verschiedener Tabellen in derselben Transaktion

In derselben Transaktion sind Datenänderungen zulässig Dies muss beim Ersetzen von Spaltennummern durch Spaltennamen beachtet werden.
Vor jedem Zeilendatensatz befindet sich eine Datensatzzeile mit der Markierung „Table_map“, die angibt, welche Tabelle in dieser Datensatzzeile geändert wurde. Sie können dieser Eingabeaufforderung folgen, um die Spaltenseriennummer zu ersetzen im Binlog mit dem Spaltennamen.

2.5 Verarbeitung von Escape-Zeichen

Die Binlog-Datei verarbeitet Nicht-Leerzeichen-Leerzeichen und verwendet die Speicherung von Escape-Zeichenfolgen, die beispielsweise in der Tabelleneinfügungsspalte enthalten sind Tatsächlich wird x0a in der Binlog-Datei verwendet, um die Newline-Operation zu ersetzen. Daher muss beim Zurücksetzen der Daten das Escape-Zeichen verarbeitet werden.

Beachten Sie hier eines: Das Escape-Zeichen von 039 ist nicht im Funktion Es wird einheitlich in esc_code verarbeitet, aber separat verarbeitet.

Die Übertragungszeichentabelle ist unten dargestellt:

2.6 Verarbeitung des Zeitstempel-Datentyps

Der tatsächlich in der Datenbank gespeicherte Zeitstempelwert ist vom Typ INT und muss mit der Funktion from_unixtime konvertiert werden.
Erstellen Sie eine Testtabelle tbtest mit nur einer Zeitstempelspalte. Sehen Sie sich nach dem Speichern des Werts den Inhalt des Binlogs an. Der spezifische Screenshot lautet wie folgt:

Bei der Verarbeitung von Zeilendatensätzen muss der Wert des Zeitstempels verarbeitet und die Funktionskonvertierung from_unixtime hinzugefügt werden.

2.7 Verarbeitung negativer Werte

Dies wurde nicht berücksichtigt, als ich den Code zum ersten Mal schrieb. Bei umfangreichen Tests wurde festgestellt, dass alle ganzzahligen Datentypen beim Speichern negativer Zahlen einen maximalen Bereichswert speichern. Der Mechanismus, wie Binlog damit umgeht, ist nicht ganz klar. Der Test läuft wie folgt ab:

Wenn also verschiedene Datentypen wie INT und VALUE auf eine negative Zahl stoßen, muss dieser Bereichswert vorher entfernt werden Ausführung Führen Sie undo_sql aus.

2.8 Die Gesamt-SQL eines einzelnen Transaktionszeilendatensatzes übersteigt die max_allowed_package-Verarbeitung

Nach der Analyse des Binlogs werden zwei SQL-Typen gespeichert, einer davon ist der geänderte SQL der Zeile Datensatz, d. h. dml_sql; der andere ist Zeilendatensatz-Rollback, d. h. undo_sql. Aus dem Code ist ersichtlich, dass es sich bei der Spalte, in der diese beiden SQLs gespeichert sind, um Langtext handelt, der bis zu 4 GB Inhalt speichern kann. Die Paketgröße einer einzelnen Sitzung in MySQL ist jedoch begrenzt. Die Standardgröße beträgt 4 MB und das Maximum beträgt 1 GB. Legen Sie daher vor der Verwendung dieses Skripts die Datenbankinstanz fest, in der die Binlog-Datei gespeichert wird die Online-Datenbankinstanz:
set global max_allowed_packet = 1073741824; #Denken Sie daran, es später zu ändern
Was ist, wenn es funktioniert? Dann kann das Rollback nur in Abschnitten durchgeführt werden. Führen Sie zunächst ein Rollback zu dieser großen Transaktion durch und führen Sie dann das Rollback fort. Dieser Teil kann nicht mit pymysql oder der Quelldatei ausgeführt werden, daher ist dies möglich nur manuell erfolgen. Bitten Sie einige fähige Leute, diesen Logikcode zu ändern! ! !

2.9 Gezieltes Rollback

Unter der Annahme, dass es keinen klaren Zeitpunkt für die Fehloperation gibt, gibt es nur ein Intervall, und in diesem Intervall gibt es dann andere Tabellenoperationen Zu diesem Zeitpunkt müssen Sie beim Analysieren der Binlog-Datei die Option --database hinzufügen und zunächst die Binlog-Datei in derselben Datenbank auswählen.
Die Verarbeitung hier besteht darin, dml_sql und undo_sql dieses Intervalls in der Datenbanktabelle zu speichern und dann die Transaktionen zu löschen, die nicht zurückgesetzt werden müssen, sowie die verbleibenden Transaktionen, die zurückgesetzt werden müssen. Führen Sie den Rollback-Vorgang erneut durch.

3 Gebrauchsanweisung

3.1 Parameterbeschreibung

Dieses Skript hat etwas mehr Parameter. Sie können spezifische Anweisungen mit --help anzeigen.

Ich verwende gerne verschiedene Farben, um Parameter zu klassifizieren (Blingbling ist bunt, es sieht so interessant und energiegeladen aus), deshalb werde ich diese Parameter durch erklären Farbe .
  • Gelber Bereich: Diese 6 Parameter liefern die relevanten Werte zum Analysieren und Speichern der Binlog-Datei und geben die Verknüpfungsmethode der Datenbank an, in der die Analyseergebnisse gespeichert werden, sowie den Speicherort Die Binlog-Datei und die Methode zum Speichern der Ergebnisse Muss nur die gleiche Tabellenstruktur wie die Online-Bibliothek haben, nicht unbedingt Es muss eine Master-Slave-Bibliothek sein

  • Grüner Bereich: Die wichtigste Option -a, 0 bedeutet nur; Analyse der Binlog-Datei, 1 bedeutet, dass nur Rollback-Vorgänge ausgeführt werden, 0 muss zuerst ausgeführt werden

  • Lila Bereich: Beispiel.

  • 3.2 Beschreibung des Anwendungsszenarios

Rollback der gesamten Datenbank für einen bestimmten Zeitraum

  • Alle SQL-Vorgänge in einem bestimmten Zeitraum müssen auf einen bestimmten Zeitpunkt zurückgesetzt werden
    • In diesem Fall verwenden die meisten von ihnen Backups Dateien + Binlog-Lösung

    • Aber dieses Skript kann es auch erfüllen, aber bitte führen Sie es nicht direkt online aus. Überprüfen Sie zuerst die Analyseergebnisse, um festzustellen, ob sie konsistent sind. Wenn ja, stoppen Sie einen bestimmten Vorgang aus der Datenbank, führen Sie ihn in der Slave-Datenbank aus und entwickeln Sie schließlich den Geschäftszugriff, um zu überprüfen, ob er zum angegebenen Zeitpunkt wiederhergestellt wurde und ob die Daten normal sind.

    • Einige Tabellen haben bestimmte Vorgänge während eines bestimmten Zeitraums zurückgesetzt
  • Zum Beispiel hat die Entwicklung einen Stapel übermittelt Beim Aktualisieren des Skripts wurden alle Teststufen als in Ordnung überprüft und zur Online-Ausführung übermittelt. Nach der Ausführung wurde jedoch festgestellt, dass ein Unternehmen im Test fehlte, was dazu führte, dass einige Felder aktualisiert wurden und sich nun auf andere Unternehmen auswirkte die stapelaktualisierten Tabellen dringend auf die ursprünglichen Zeilen zurückzusetzen
    • Dies kann nicht rein technisch gehandhabt werden, sondern muss umfassend betrachtet werden

      • Wie kann in diesem Fall der Änderungsvorgang der Tab-A-Tabelle überprüft werden?

      • Ich persönlich halte diese Methode für praktikabler. Speichern Sie die Daten der tabA-Tabelle in der Testumgebung und analysieren Sie dann die Binlog-Datei undo sql von 11 bis 12 Uhr. Anschließend wird die Tabelle in der Testumgebung auf 11 Uhr zurückgesetzt. Anschließend vergleichen die Entwicklung und das Unternehmen die 11-Uhr-Daten in der Testumgebung mit den vorhandenen Daten online, um festzustellen, welche Zeilen und Spalten zurückgesetzt werden müssen Wenn dies nicht erforderlich ist, entwickeln und übermitteln Sie das SQL-Skript und führen Sie es dann online aus. Tatsächlich bietet DBA hier nur eine Rolle, nämlich das Zurücksetzen der Tabellenregisterkarte A auf einen bestimmten Zeitpunkt in einer neuen Umgebung, bietet jedoch keine

      • direkte
      • Online-SQL-Rollback-Verarbeitung .

      • Ein bestimmtes/einiges SQL zurücksetzen
  • Diese Situation kommt relativ häufig vor A Beim Löschen fehlt die Where-Bedingung oder der Ausführungsfehler der Where-Bedingung.
    • Suchen Sie in diesem Fall die entsprechende Transaktion und führen Sie den Rollback-Vorgang wie oben beschrieben durch. Ich bin so schüchtern und ängstlich

    • 3.3 Testfall

3.3. 1 Vollständiges Datenbank-Rollback für einen bestimmten Zeitraum

Angenommen, Sie müssen alle Vorgänge der Datenbank zwischen 9:10 und 9:15 Uhr zurücksetzen:
  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 线上测试环境修改set global max_allowed_packet = 1073741824

  • 回滚数据,action=1

  • 线上测试环境修改set global max_allowed_packet = 4194304

 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1 54 \033[0m                        
 55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id<=tran_num:381             logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))382             undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;&#39;.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=&#39;&#39;387 388             for u_row in undo_rows:389                 try:390                     self.on_cur.execute(u_row[&#39;undo_sql&#39;])391                     self.on_mysqlconn.commit()392                 except Exception:393                     print(&#39;auto_id:&#39;,u_row[&#39;auto_id&#39;])394             id+=number395 396 397     def undo_file(self,number):398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408         for num in tran_rows:409             tran_num=num[&#39;table_rows&#39;]410 411         logging.info(&#39;copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql&#39;)412         logging.info(&#39;\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m&#39;.format(tran_num,int(tran_num/number)+1,number))413 414         with open(&#39;/tmp/flashback_undosql/undo_file_flashback.sql&#39;, &#39;w&#39;) as w_f:415             while id<=tran_num:416                 logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))417                 undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421                 for u_row in undo_rows:422                     try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427                     except Exception:428                         print('auto_id',u_row['auto_id'])429                     #time.sleep(2)430                 id+=number431 432     def do(self):433         if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436             #self.binlogdesc()437         elif self.action=='1':438             self.undosql(10000)439 440     def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()

 

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung des Beispiels eines MySQL-basierten Binlog-Rollback-Tools. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn