Maison  >  Article  >  base de données  >  Explication détaillée de l'exemple d'outil de restauration de binlog basé sur MySQL

Explication détaillée de l'exemple d'outil de restauration de binlog basé sur MySQL

PHP中文网
PHP中文网original
2017-06-21 10:27:261685parcourir

Les conditions de mise à jour et de suppression sont mal écrites ou même pas écrites du tout, ce qui entraîne des erreurs d'exploitation des données et la nécessité de restaurer les enregistrements de lignes mal exploités. Cette situation se produit réellement. Vous pouvez choisir d'utiliser le fichier de sauvegarde + binlog pour restaurer dans l'environnement de test, puis effectuer une réparation des données, mais cela nécessite en réalité un certain temps et des ressources.

En fait, si le format binlog est ligne, le fichier binlog enregistrera en détail les opérations impliquées dans chaque transaction et stockera les enregistrements de ligne affectés par chaque transaction. Le fichier binlog peut-il être utilisé pour effectuer une analyse inverse. base de données ? Qu'en est-il des changements dans les enregistrements de lignes ?
Il existe de nombreux scripts et outils connexes dans l'industrie. Cependant, avec la mise à jour de la version MySQL, les modifications apportées au contenu des enregistrements du journal binaire et les exigences incohérentes, la plupart des scripts ne sont pas adaptés aux besoins personnels actuels, j'ai donc commencé à le faire. écrivez-les.


Si vous réimprimez, s'il vous plaît Remarque Source du blog Ming : www.cnblogs.com/xinysu/, les droits d'auteur appartiennent au Blog Park Sujia Xiaoluobo. J'espère que vous soutiendrez!


Uniquement en Test de la version MySQL 5.6/5.7, l'environnement d'exploitation python nécessite l'installation du module pymysql.

1 Contenu de l'implémentation

Basé sur le fichier binlog, pour certaines transactions, certaines tables et certaines périodes de temps L'ensemble la base de données est restaurée pour implémenter la fonction flashback. Pendant le traitement de l'outil, les enregistrements de ligne modifiés par la transaction dans le binlog seront stockés dans la table. Grâce à la colonne dml_sql, vous pouvez afficher toutes les modifications des enregistrements de ligne dans chaque transaction, et via undo_sql, vous pouvez afficher le contenu SQL restauré. Comme indiqué ci-dessous, effectuez ensuite l'opération de restauration en fonction du contenu de la table.
Alors quels sont les avantages de ce script ?
  • Le rollback est divisé en 2 commandes : la première commande analyse le binglog et le stocke dans la base de données ; la deuxième commande effectue l'opération de rollback ; 🎜>Lors de la restauration, le script d'exécution et le script de restauration peuvent être stockés dans la base de données, et le contenu de la mise à jour et le contenu de la restauration peuvent être visualisés

  • Selon la table d'analyse stockée ; il est pratique de spécifier la transaction ou la table spécifiée à récupérer

  • Sortie de journal détaillée, indiquant la progression de l'analyse et la progression de l'exécution.

  • Analyser la capture d'écran de la sortie du journal binaire (analyser le fichier binlog 1G)

Restaurer la sortie de la base de données Capture d'écran :
2 Principe

Prérequis : L'instance a démarré binlog et le format est ROW.

Utilisez Python pour effectuer une analyse et un traitement de texte sur le fichier journal après mysqlbinlog Pendant tout le processus de traitement, les 6 points difficiles suivants doivent être traités :
.
Juger le début et la fin d'une transaction
  1. L'ordre d'exécution de la même transaction doit être exécuté dans l'ordre inverse

  2. Analyser le rollback SQL

  3. Traitement de table différent pour la même opération de transaction

  4. Traitement des caractères d'échappement, tels que les sauts de ligne, les tabulations caractères, etc.

  5. Conversion de la valeur du paramètre de type de données d'horodatage

  6. Traitement des nombres négatifs

  7. A une seule transaction implique une modification de ligne Opérations SQL max_allow

  8. Rollback pour une certaine table au lieu de la base de données entière

2.1 Le début et la fin de la transaction

En fonction de la position où Xid apparaît, lisez depuis le début du fichier binlog et extrayez l'instruction SQL lorsque vous la rencontrez, jusqu'à ce que Xid soit rencontré. Le SQL précédemment extrait est résumé dans une transaction. Continuez ensuite à extraire les instructions SQL jusqu'à ce que le prochain Xid soit rencontré, puis résumez le SQL de cette transaction en une seule transaction. Ce cycle se poursuit jusqu'à la fin du parcours séquentiel du fichier.

2.2

Traitement interne des commandes inversées de transaction

Dans la même transaction, S'il y a des modifications dans les enregistrements dans plusieurs tables et lignes, le SQL doit être annulé dans l'ordre inverse lors de la restauration. Alors, comment stocker le SQL extrait dans l'ordre inverse ? L'idée est la suivante :
  • Le SQL modifié de chaque ligne d'enregistrements est séparé

  • Le SQL indépendant est stocké dans l'ordre inverse

Hypothèse L'instruction SQL de transaction en séquence directe est stockée dans la variable dml_sql, et l'instruction SQL en séquence inverse qui peut être annulée est stockée dans la variable undo_sql. Extrayez le SQL de la modification de l'enregistrement de ligne dans l'ordre et stockez-le dans la variable record_sql, puis attribuez la valeur undo_sql = record_sql + undo_sql, puis définissez la variable record_sql vide De cette façon, l'exécution de SQL dans la transaction inverse peut être réalisée. .

2.3 Analyser le rollback SQL

Tout d'abord, vérifiez le contenu du journal de binlog et constatez que la situation SQL de modification de ligne est la suivante. Il faut faire attention pendant le processus d'extraction. Ces problèmes :
  • La correspondance du nom de colonne des enregistrements de ligne, le numéro de série de la colonne stocké dans le fichier binlog, ne peut pas utiliser directement le

  • Partie WHERE et SET Il n'y a pas de mots-clés ou de symboles entre les parties AND ou des virgules doivent être ajoutées

  • DELETE SQL doit être inversé en INSERT

  • UPDATE SQL doit être inversé Remplacer les parties WHERE et SET

  • INSERT SQL doit être inversé en DELETE

2.4 Traitement de différentes tables dans une même transaction

Dans une même transaction, modifications de données dans différentes tables sont autorisés. Il faut y prêter attention lors du remplacement des numéros de colonne par des noms de colonne.
Il y a une ligne d'enregistrements devant chaque enregistrement de ligne, contenant la marque 'Table_map', qui indiquera quelle table est modifiée dans cette ligne d'enregistrements. Vous pouvez suivre cette invite pour remplacer le numéro de série de la colonne. dans le binlog avec le nom de la colonne.

2.5 Traitement des caractères d'échappement

Le fichier binlog gère les caractères d'espacement non-espaces et utilise le stockage de chaînes de caractères d'échappement. Par exemple, dans la colonne d'insertion du tableau, l'enregistrement contient. Caractère de nouvelle ligne, en fait, dans le fichier binlog, x0a est utilisé pour remplacer l'opération de nouvelle ligne, donc pendant le processus de restauration des données, le caractère d'échappement doit être traité.

Notez une chose ici, le caractère d'échappement de 039 n'est pas dans le function Ils sont traités uniformément dans esc_code, mais sont traités séparément.

Le tableau des caractères de transfert est présenté ci-dessous :

2.6 Traitement du type de données d'horodatage

La valeur réelle stockée de l'horodatage dans la base de données est de type INT, qui doit être convertie à l'aide de la fonction from_unixtime.
Créez une table de test tbtest avec une seule colonne d'horodatage. Après avoir stocké la valeur, affichez le contenu du binlog. La capture d'écran spécifique est la suivante :

Lors du traitement des enregistrements de ligne, la valeur de l'horodatage doit être traitée et la conversion de la fonction from_unixtime ajoutée.
2.7 Traitement des valeurs négatives

Cela n'a pas été pris en compte lorsque j'ai écrit le code pour la première fois. Au cours de tests approfondis, il a été découvert que tous les types de données entiers stockaient une valeur de plage maximale lors du stockage de nombres négatifs. Le mécanisme par lequel binlog gère cela n'est pas très clair. Le test est le suivant :

Ainsi, lorsque vous rencontrez divers types de données INT et VALUE est un nombre négatif, cette valeur de plage doit être supprimée avant exécution Exécutez undo_sql.
2.8 Le SQL total d'un enregistrement de ligne de transaction unique dépasse le traitement max_allowed_package

Après avoir analysé le binlog, deux types SQL sont stockés, l'un est le SQL modifié de la ligne record, c'est-à-dire dml_sql; l'autre Le premier est la restauration de l'enregistrement de ligne sql, c'est-à-dire undo_sql. Il ressort du code que la colonne stockant ces deux SQL est un texte long, qui peut stocker jusqu'à 4 Go de contenu. Cependant, la taille du paquet d'une seule session dans MySQL est limitée. Le paramètre limitant est max_allowed_packet. La taille par défaut est de 4 Mo et le maximum est de 1 Go. Par conséquent, avant d'utiliser ce script, veuillez définir manuellement l'instance de base de données qui stocke le fichier binlog et. l'instance de base de données en ligne. Paramètres :

set global max_allowed_packet = 1073741824; #N'oubliez pas de le modifier plus tard
Et si ça marchait ? Ensuite, la restauration ne peut être effectuée que par segments. Tout d'abord, revenez à cette transaction volumineuse, puis exécutez cette transaction volumineuse séparément, puis continuez la restauration. Cette partie ne peut pas être exécutée à l'aide de pymysql ou du fichier source, cette opération peut donc être effectuée. ne peut être effectué que manuellement. Veuillez demander à des personnes compétentes de modifier ce code logique ! ! !

2.9 Restauration ciblée

En supposant qu'il n'y a pas de moment précis pour l'opération incorrecte, il n'y a qu'un seul intervalle et il y a d'autres opérations de table dans cet intervalle, alors à ce stade, vous devez ajouter l'option --database lors de l'analyse du fichier binlog et sélectionner d'abord le fichier binlog dans la même base de données.
Le traitement ici consiste à stocker le dml_sql et undo_sql de cet intervalle dans la table de la base de données, puis à supprimer les transactions qui n'ont pas besoin d'être annulées et les transactions restantes qui doivent être annulées. Effectuez à nouveau l’opération de restauration.

3 Instructions d'utilisation

3.1 Description des paramètres

Ce script a un peu plus de paramètres. Vous pouvez afficher des instructions spécifiques avec --help.

J'aime utiliser différentes couleurs pour classer les paramètres (le blingbling est coloré, il a l'air si intéressant et énergique), je vais donc expliquer ces paramètres par couleur .
  • Zone jaune : Ces 6 paramètres fournissent les valeurs pertinentes pour l'analyse et le stockage du fichier binlog, indiquant la méthode de liaison de la base de données qui stocke les résultats de l'analyse, l'emplacement de le fichier binlog et la méthode de stockage des résultats. Nom de la table

  • Zone bleue : Ces 4 paramètres fournissent une méthode de connexion à l'instance de base de données qui est cohérente avec la structure de la table de la base de données en ligne. il doit seulement avoir la même structure de table que celle en ligne, pas nécessairement. Il doit s'agir d'une bibliothèque maître-esclave

  • Zone verte : l'option la plus importante -a, 0 signifie seulement ; en analysant le fichier binlog, 1 signifie effectuer uniquement des opérations de restauration, 0 doit être exécuté en premier

  • Zone violette : Exemple.

3.2 Description du scénario d'application

  • Rétablir l'intégralité de la base de données pendant une certaine période de temps

    • Besoin de restaurer toutes les opérations SQL dans une certaine période de temps jusqu'à un certain moment dans le temps

    • Dans ce cas, la plupart d'entre elles utilisent la sauvegarde files + solution binlog

    • Mais ce script peut également le satisfaire, mais veuillez ne pas l'utiliser directement en ligne. D'abord -a=0, et vérifiez les résultats de l'analyse pour voir s'ils sont cohérents. Si tel est le cas, arrêtez une certaine base de données, exécutez-la sur la base de données esclave et développez enfin l'accès métier pour vérifier si elle a été restaurée au moment spécifié et si les données sont normales.

  • Certaines tables ont annulé certaines opérations pendant une certaine période de temps

    • Par exemple, le développement a soumis un lot script de mise à jour, tous les niveaux de test ont été vérifiés et soumis pour exécution en ligne. Cependant, après l'exécution, il a été constaté qu'une entreprise manquait dans le test, ce qui a entraîné la mise à jour de certains champs et a affecté d'autres entreprises. restaurer de toute urgence les tables mises à jour par lots aux lignes d'origine

    • Cela ne peut pas être traité uniquement d'un point de vue technique, mais doit être considéré de manière globale

      • Dans ce cas, comment revoir l'opération de modification de l'onglet A table ?

      • Personnellement, je pense que cette méthode est plus réalisable. Videz les données de la table tabA dans l'environnement de test, puis analysez le fichier binlog annuler sql de 11 heures à 12 heures. horloge, puis test L'environnement ramène la table à 11 heures. Ensuite, le développement et l'entreprise comparent les données de 11 heures dans l'environnement de test avec les données existantes en ligne pour voir quelles lignes et colonnes doivent être restaurées. en ligne, et lesquels sont Si cela n'est pas nécessaire, développez et soumettez le script SQL, puis exécutez-le en ligne. En fait, ici, DBA ne fournit qu'un seul rôle, qui est de restaurer l'onglet de table A à un certain moment dans le nouvel environnement, mais il ne fournit pas

        direct accord de restauration SQL en ligne avec .

  • Rétablir un certain/certain SQL

    • Cette situation est relativement courante A. la suppression dans une mise à jour ne contient pas la condition Where ou l'erreur d'exécution de la condition Where

    • Dans ce cas, recherchez la transaction correspondante et effectuez une restauration. Veuillez vous référer à ce qui précède pour le processus de restauration. Je suis tellement timide et craintive

3.3 Cas de test

3.3. 1 Restauration complète de la base de données pendant une certaine période

Supposons que vous deviez restaurer toutes les opérations de la base de données entre 9h10 et 9h15 :
  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 线上测试环境修改set global max_allowed_packet = 1073741824

  • 回滚数据,action=1

  • 线上测试环境修改set global max_allowed_packet = 4194304

 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1 54 \033[0m                        
 55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id<=tran_num:381             logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))382             undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;&#39;.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=&#39;&#39;387 388             for u_row in undo_rows:389                 try:390                     self.on_cur.execute(u_row[&#39;undo_sql&#39;])391                     self.on_mysqlconn.commit()392                 except Exception:393                     print(&#39;auto_id:&#39;,u_row[&#39;auto_id&#39;])394             id+=number395 396 397     def undo_file(self,number):398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408         for num in tran_rows:409             tran_num=num[&#39;table_rows&#39;]410 411         logging.info(&#39;copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql&#39;)412         logging.info(&#39;\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m&#39;.format(tran_num,int(tran_num/number)+1,number))413 414         with open(&#39;/tmp/flashback_undosql/undo_file_flashback.sql&#39;, &#39;w&#39;) as w_f:415             while id<=tran_num:416                 logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))417                 undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421                 for u_row in undo_rows:422                     try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427                     except Exception:428                         print('auto_id',u_row['auto_id'])429                     #time.sleep(2)430                 id+=number431 432     def do(self):433         if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436             #self.binlogdesc()437         elif self.action=='1':438             self.undosql(10000)439 440     def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()

 

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn