Home  >  Article  >  Database  >  Detailed explanation of mysql-based binlog rollback tool example

Detailed explanation of mysql-based binlog rollback tool example

PHP中文网
PHP中文网Original
2017-06-21 10:27:261732browse

The conditions for update and delete are written incorrectly or even not written at all, resulting in data operation errors and the row records that have been misoperated need to be restored. This situation actually happens. You can choose to use the backup file + binlog to restore to the test environment, and then perform data repair, but this actually requires a certain amount of time and resources.

In fact, if the binlog format is row, the binlog file will record the operations involved in each transaction in detail, and store the row records affected by each transaction. Can the binlog file be used to reverse parse the database? What about changes in row records?
There are many related scripts and tools in the industry. However, with the update of MySQL version, changes in binlog record content and inconsistent requirements, most scripts are not suitable for personal current use needs, so I started to write them. mysql flash back script.


## If reprinted, please note Ming blog source: www.cnblogs.com/xinysu/, the copyright belongs to Blog Park Sujia Xiaoluobo. Hope you support!


only at MySQL 5.6/5.7 version testing, the python operating environment requires the pymysql module to be installed.

1 Implementation content

Based on the binlog file, for certain transactions, certain tables, and certain segments during a certain period of time The entire time database is rolled back to implement the flashback function. During the tool processing, the row records modified by the transaction in the binlog will be stored in the table. Through the dml_sql column, you can view all row record changes within each transaction, and through undo_sql, you can view the rolled back SQL content. As shown below, then perform the rollback operation based on the table content.
So what are the advantages of this script?
  • Rollback is divided into 2 commands: the first command analyzes the binglog and stores it into the database; the second command performs the rollback operation;

  • When rolling back, the execution script and the rollback script can be stored in the database, and the updated content and rollback content can be viewed;

  • According to the stored analysis table, it is convenient to specify Transaction or specified table to recover;

  • Detailed log output, indicating the analysis progress and execution progress.

Analyze binlog output screenshot (analyze 1G binlog file)
Roll back the output of the database Screenshot:

2 Principle

Prerequisite: The instance has started binlog and the format is ROW.
Use python to perform text analysis and processing on the log file after mysqlbinlog. During the entire processing process, there are the following 6 difficult points that need to be dealt with:
  1. Judge the start and end of a transaction

  2. The execution order of the same transaction needs to be executed in reverse order

  3. Parse rollback SQL

  4. Same transaction operation, different table processing

  5. Escape character processing, such as line breaks, tab characters, etc.

  6. timestamp data type parameter value conversion

  7. Negative number processing

  8. A single transaction involves row modification SQL operation max_allow

  9. Rollback for a certain table instead of the entire database

2.1 The beginning and end of the transaction

Judge according to the position where Xid appears, start reading from the beginning of the binlog file, and extract the SQL statement when encountering it. Until Xid is encountered, the previously extracted SQL is summarized into a transaction. Then continue to extract SQL statements until the next Xid is encountered, and then summarize the SQL of this transaction into one transaction. This cycle continues until the sequential traversal of the file ends.

2.2 Transaction internal reverse order processing

In the same transaction, If there are changes to records in multiple tables and rows, the SQL should be rolled back in reverse order during rollback. So, how to store the extracted SQL in reverse order? The idea is as follows:
  • The modified SQL of each row of records is separated out

  • Store the independent SQL in reverse order

Assumption The forward-order transaction SQL statements are stored in the variable dml_sql, and the reverse-order SQL statements that can be rolled back are stored in the variable undo_sql. Extract the SQL of row record modification in order and store it in the variable record_sql, then assign the value undo_sql =record_sql + undo_sql, and then set the record_sql variable blank. In this way, the execution of SQL within the reverse transaction can be realized.

2.3 Parse the rollback SQL

First, check the log content of the binlog and find that the SQL situation of row modification is as follows. You need to pay attention during the extraction process These problems:
  • The column name matching of the row record, the column serial number stored in the binlog file, cannot directly use the

  • WHERE part and SET There are no keywords or symbols between parts, AND or commas need to be added

  • DELETE SQL needs to be reversed to INSERT

  • UPDATE SQL needs to be Replace the WHERE and SET parts

  • INSERT SQL needs to be reversed to DELETE

2.4 Processing of different tables in the same transaction

In the same transaction, data modifications to different tables are allowed. This needs to be paid attention to when replacing column numbers with column names.
There is a row of records before each row record, containing the 'Table_map' mark, which will indicate which table is modified in this row of records. You can follow this prompt to replace the column serial number in the binlog with the column name.

2.5 Escape character processing

The binlog file processes non-space whitespace characters and uses escape character string storage. For example, in the insert column of the table, the record contains Newline character, in fact, in the binlog file, \x0a is used to replace the newline operation, so during the process of rolling back the data, the escape character needs to be processed.

Note one thing here, the escape character of 039 is not in the function It is processed uniformly in esc_code, but is processed separately.

The transfer character table is as shown below:

2.6 Timestamp data type processing

The actual stored value of timestamp in the database is of type INT, which needs to be converted using the function from_unixtime.
Create a test table tbtest with only one timestamp column. After storing the value, view the contents of the binlog. The specific screenshot is as follows:

When processing row records, the value of timestamp must be processed and the from_unixtime function conversion added.

2.7 Negative value processing

This was not considered when I first wrote the code. During extensive testing, it was discovered that all integer data types will store a maximum range value when storing negative numbers. The mechanism of how binlog handles this is not very clear. The test is as follows:

Therefore, when encountering various data types of INT and VALUE is a negative number, this range value needs to be removed before execution Execute undo_sql.

2.8 The total SQL of a single transaction row record exceeds max_allowed_package processing

After analyzing the binlog, two SQL types are stored, one is the modified SQL of the row record, that is, dml_sql; the other The first is row record rollback sql, that is, undo_sql. It can be seen from the code that the column storing these two SQLs is longtext, which can store up to 4G of content. However, the packet size of a single session in MySQL is limited. The limiting parameter is max_allowed_packet. The default size is 4Mb and the maximum is 1G. Therefore, before using this script, please manually set the database instance that stores the binlog file and the online database instance. Parameters:
set global max_allowed_packet = 1073741824; #Remember to modify it later
What if it works? Then the rollback can only be done in segments. First, roll back to this large transaction, and then execute this large transaction separately, and then continue to roll back. This part cannot be executed using pymysql or the source file, so this operation can only be done manually. Please ask some capable people to modify this logic code! ! !

2.9 Targeted rollback

Assuming that there is no clear time point for the misoperation, there is only one interval, and there are other table operations in this interval, then at this time , you need to add the --database option when analyzing the binlog file, and first select the binlog file in the same database.
The processing here is to store the dml_sql and undo_sql of this interval into the database table, and then delete the transactions that do not need to be rolled back, and the remaining transactions that need to be rolled back. Perform the rollback operation again.

3 Instructions for use

3.1 Parameter description

This script has slightly more parameters. You can view specific instructions with --help.

I like to use various colors to classify parameters (blingbling is colorful, it looks so interesting and energetic), so I will explain these parameters by color .
  • Yellow area: These 6 parameters provide the relevant values ​​​​for analyzing and storing the binlog file, indicating the link method of the database that stores the analysis results, the location of the binlog file, and the method of storing the results. Table name;

  • Blue area: These 4 parameters provide a DB instance connection method that is consistent with the online database table structure. It only needs to have the same table structure as the online one, not necessarily It needs to be a master-slave library;

  • Green area: The most important option -a, 0 means only analyzing the binlog file, 1 means only performing rollback operations, 0 must be executed first. Execution 1;

  • Purple area: Give an example.

3.2 Application scenario description

  • Rolling back the entire database for a certain period of time

    • Need to roll back all SQL operations in a certain period of time to a certain point in time

    • In this case, most of them use backup files+ Binlog solution

    • But this script can also be satisfied, but please do not operate directly online. First -a=0, check the analysis results, whether they are consistent, if so, stop a certain From the database, execute it on the slave database, and finally develop business access to check whether it has been restored to the specified time point and whether the data is normal.

  • Rolling back certain operations on certain tables during a certain period of time

    • For example, the development submitted a batch update script , all test levels were verified to be OK, and submitted for online execution. However, after execution, it was found that a business was missing from the test, causing some fields to be updated and affecting other businesses. Now it is necessary to urgently roll back the batch-updated tables to the original rows. Record

    • This cannot be handled purely from a technical perspective, but must be considered comprehensively

      • In this case, how to review the modification operation of tab A table?

      • Personally, I think this method is more feasible. Dump the data of tabA table to the test environment, and then analyze the binlog file undo sql from 11 o'clock to 12 o'clock, and then test The environment rolls back the table to 11 o'clock. Then, the development and business compare the 11 o'clock data in the test environment with the existing data online to see which rows and columns need to be rolled back online, and which ones are If not required, then develop and submit the SQL script, and then execute it online. In fact, here, DBA only provides one role, which is to roll back table tab A to a certain point in time in a new environment, but does not provide directrollback of SQL online. deal with.

  • Rolling back a certain/some SQL

    • This situation is relatively common. A delete in an update lacks the where condition or the where condition execution error

    • In this case, find the corresponding transaction and perform rollback. For the rollback process, please refer to the above. Yes, I am so timid and fearful

##3.3 Test Case

3.3.1 Full database rollback for a certain period of time

Suppose you need to roll back all operations on the database between 9:10 and 9:15:
  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 线上测试环境修改set global max_allowed_packet = 1073741824

  • 回滚数据,action=1

  • 线上测试环境修改set global max_allowed_packet = 4194304

 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1 54 \033[0m                        
 55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id<=tran_num:381             logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))382             undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;&#39;.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=&#39;&#39;387 388             for u_row in undo_rows:389                 try:390                     self.on_cur.execute(u_row[&#39;undo_sql&#39;])391                     self.on_mysqlconn.commit()392                 except Exception:393                     print(&#39;auto_id:&#39;,u_row[&#39;auto_id&#39;])394             id+=number395 396 397     def undo_file(self,number):398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408         for num in tran_rows:409             tran_num=num[&#39;table_rows&#39;]410 411         logging.info(&#39;copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql&#39;)412         logging.info(&#39;\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m&#39;.format(tran_num,int(tran_num/number)+1,number))413 414         with open(&#39;/tmp/flashback_undosql/undo_file_flashback.sql&#39;, &#39;w&#39;) as w_f:415             while id<=tran_num:416                 logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))417                 undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421                 for u_row in undo_rows:422                     try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427                     except Exception:428                         print('auto_id',u_row['auto_id'])429                     #time.sleep(2)430                 id+=number431 432     def do(self):433         if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436             #self.binlogdesc()437         elif self.action=='1':438             self.undosql(10000)439 440     def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()

 

The above is the detailed content of Detailed explanation of mysql-based binlog rollback tool example. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn