搜尋
首頁資料庫mysql教程mysql基於binlog回溯工具實例詳解

mysql基於binlog回溯工具實例詳解

Jun 21, 2017 am 10:27 AM
binlogmysql基於工具

 

    update、delete的條件寫錯誤甚至沒有寫,導致資料操作錯誤,需要恢復被誤操作的行記錄。這種情形,其實時有發生,可以選擇用備份檔+binlog來恢復到測試環境,然後再做資料修復,但是這樣其實需要耗費一定的時間跟資源。

    其實,如果binlog format為row,binlog檔案中是會詳細記錄每一個事務涉及到操作,並把每一個事務影響到行記錄均存儲起來,能否給予binlog 文件來反解析數據庫的行記錄變動情況呢?
    業界已有不少相關的腳本及工具,但隨著MySQL版本的更新、binlog記錄內容的變化以及需求不一致,大多數腳本不太適合個人目前的使用需求,所以開始著手編寫mysql的flash back腳本。
 


    若轉載,請註明博文來源: www.cnblogs.com/xinysu/   ,版權歸博客園蘇家小蘿蔔所有。望各位支持!
 


     #只在MySQL 5.6/5.7版本測試,python運行環境需要安裝pymysql模組。

1 實作內容

    根據binlog文件,對某個\某些交易、某段時間某些表格、某段時間全庫做回滾操作,實現閃回功能。工具處理過程中,會把binlog中的交易修改的行記錄儲存到表格中去,透過 dml_sql 列,可以查看每一個事務內部的所有行記錄變更情況,透過 undo_sql 查看回滾的SQL內容。如下圖,然後再根據表格內容做回滾操作。 
      
    那麼這個腳本有哪些優點呢?
  • 回滾分為2個命令:第一個命令分析binglog並儲存進入資料庫;第二個命令執行回滾操作;

##回滾的時候,可以把執行腳本跟回滾腳本統一存放到資料庫中,可以查看更新內容以及回滾內容;

根據儲存的分析表格,方便指定交易或指定表格來來恢復;

詳細的日誌輸出,說明分析進度跟執行進度。
        分析binlog的輸出截圖(分析1G的binlog檔案)
  1.    

  2.  

       回滾資料庫的輸出截圖:
  3.    

  4. 2 原則

  5.     前提:實例啟動了binlog並且格式為ROW。
  6.     

  7.     使用python對mysqlbinlog後的log檔案進行文字分析處理,在整個處理過程中,有以下6個疑點需要處理:
  8. #判斷一個事務的開始跟結束
  9. 同一個事務的執行順序需要反序執行
  10. 解析回滾SQL
  11. #同一個交易操作不同表格處理

轉義字元處理,例如換行符號、tab符等等

#timestamp資料類型參數值轉換

負數處理

#單一交易涉及到行修改SQL操作了max_allow

針對某個表格做回滾,而不是全函式庫回溯
######    #########2.1 交易的開始與結束## #######    依照Xid出現的位置來判斷,從binlog檔案的最開始開始讀取,遇到SQL語句則提取出來,直到遇到Xid,統一把之前提取出來的SQL匯總為一個事務,然後繼續提取SQL語句,直到遇到下一個Xid,再把這個事務的SQL匯總成一個事務,一直這樣循環,直到檔案順序遍歷結束。 ######    ###### ############2.2### ###事務內部反序處理########    同一個事務中,如果有多個表格多行記錄發生變更,在回滾的時候,應該反序回滾SQL,那麼,如何將提取出來的SQL反序儲存呢?思路如下:###
  • 每行記錄的修改SQL獨立出來

  • 將獨立出來的SQL反序儲存

   假設正序的事務SQL語句儲存在變數dml_sql 中,反序後的可以回滾的SQL儲存在變數undo_sql中。依序把行記錄修改的SQL抽取出來 儲存到變數 record_sql 中去,然後 賦值 undo_sql =record_sql + undo_sql ,再置空 record_sql 變量,如此,便可實現反序事務內部的執行SQL。

2.3 解析回溯SQL

    首先,檢視binlog的日誌內容,發現在行修改的SQL情況如下,擷取過程中需要注意這幾個問題:
  • 行記錄的列名配對,binlog file儲存的列序號,不能直接使用

  • ##WHERE部分跟SET部分之間並無關鍵字或符號,需要加入AND 或逗號

  • DELETE SQL 需要反轉為INSERT

  • UPDATE SQL 需要把WHERE 跟著SET的部分替換

  • INSERT SQL需要反轉為DELETE

##2.4 同事務不同表格處理

    同一個事務中,允許對不同表格進行資料修改,這點在列名替換列序號的時候,需要留意處理。

    每一個的行記錄前有一行記錄,含有 'Table_map' 標識,會說明這一行當行記錄是修改哪個表格,可以根據這個提示,來替換binlog裡邊的列序號為列名。
2.5 轉義字元處理

    binlog檔案在對非空格的空白字元處理,採用轉義字元字串存儲,比如,在表格insert一列記錄含換行符,而實際上在binlog檔案中,是使用了\x0a 取代了換行操作,所以在回滾資料的過程中,需要對轉義字元做處理。

   
   

    
    這裡注意一個地方,039的轉義字元是沒有在函數esc_code 中統一處理,而是單獨做另外處理。
   
 

    轉移字元表相見下圖:
   

   

#2.6 timestamp資料型別處理
    timestamp實際在資料庫中的儲存值是INT型,需要使用函數from_unixtime轉換。

    建立測試表格tbtest,只有一列timestamp的列,儲存值後面查看binlog的內容,截圖如下:

   

#在處理行記錄的時候,要對timestamp的value做處理,加入from_unixtime函數轉換。

2.7 負數值處理

    這個一開始寫程式碼的時候,並沒有考慮到。在大量測試的過程中發現,所有整數的資料類型,在儲存負數的時候,都會存入一個最大範圍值。 binlog在處理這塊的機制有些不是很了解。測試如下:

   
 

   所以當遇到INT的各種資料型態並且VALUE為負數的時候,需要把這個範圍值去除,才能執行執行undo_sql。

2.8 單一事務行記錄總SQL超過max_allowed_pa​​ckage處理
    分析binlog後儲存兩種sql類型,一種是行記錄的修改SQL,即dml_sql;一種是行記錄的回滾sql,即undo_sql。從程式碼可知,儲存這兩個sql的欄位是longtext,最大可儲存4G的內容。但是MySQL中單一會話的套件大小是有限制的,限制的參數為 max_allowed_pa​​cket,預設大小為4Mb,最大為1G,所以這個腳本使用前,請手動設定儲存binlog file的資料庫實例以及線上的資料庫實例這個參數:
set global max_allowed_pa​​cket = 1073741824; #記得後續修改回來#########    ###
    萬一操作了呢?那麼回滾只能分段來回滾,先回滾到 這個大事務,然後單獨執行這個大事務,緊接著繼續回滾,這部分不能使用pymysql嗲用source 檔案執行,所以只能手動做這個操作。 求高能人士修改這個邏輯代碼! ! !

2.9 針對性回溯

    假設誤操作的沒有明確的時間點,只有一個區間,而這個區間還有其他的表格操作,那麼這個時候,需要在分析binlog檔的時候,加入--database選項,先帥選到同一個資料庫中binlog檔。
    這裡的處理是將這段區間的dml_sql跟undo_sql都儲存到資料庫表格中,然後再刪除不需要回溯的事務,剩餘需要回溯的事務。再執行回滾操作。

3 使用說明

#3.1 參數說明

    這個腳本的參數稍微多一點,可以--help 查看具體說明。
   

 

    本人喜歡用各種顏色來分類參數(blingbling五顏六色,看著多有趣多精神),所以,按顏色來說明這些參數。
  • 黃色區域:這6個參數,提供的是分析並儲存binlog file的相關值,說明儲存分析結果的資料庫的連結方式、binlog檔案的位置以及儲存結果的表格名字;

  • 藍色區域:這4個參數,提供與線上資料庫表結構一致的DB實例連接方式,僅需跟線上一模一樣的表結構,不一定需要是主從庫;

  • 綠色區域:最最重要的選項-a,0代表僅分析binlog文件,1代表僅執行回滾操作,必須先執行0才可以執行1;

  • 紫色區域:舉例說明。

3.2 應用程式場景說明

  • #全庫回滾某段時間

    • 需要回滾某個時間段的所有SQL操作,回滾到某一個時間點

    • 這種情況下呢,大多數是使用備份檔案+ binlog解決

    • 但這個腳本也可以滿足,但請勿直接在線上操作,先-a=0,看下分析結果,是否符合,符合的話,停掉某個從庫,再在從庫上執行,最後開發業務接入檢查是否恢復到指定時間點,資料是否正常。

  • 某段時間某些表格回滾某些動作

    • #例如,開發提交了一個批次更新腳本,各個測試層面驗證沒有問題,提交線上執行,但是執行後,發現有個業務漏測試,導致某些字段更新後影響到其他業務,現在需要緊急把被批量更新的表格回滾到原先的行記錄

    • 這個並不能單純地從技術角度來處理,要綜合考慮

      • 這種情況下,如何回顧tab A表格的修改操作呢?

      • 個人覺得,這種方式比較行得通,dump tabA表格的資料到測試環境,然後再分析binlog file 從11點-12點的undo sql,接著在測試環境回滾該表格到11點這個時刻,緊接著,由開發跟業務對比測試環境11點的數據跟線上現有的數據中,看下是哪些行哪些列需要在線上進行回滾,哪些是不需要的,然後開發再提交SQL腳本,再在線上執行。其實,這裡邊,DBA只提供一個角色,就是把表格tab A 在一個新的環境上,回滾到某個時間點,但是不提供直接線上回滾SQL的處理。

  • 回滾某個/些SQL

    • 這種情況比較常見,某個update某個delete缺少where條件或where條件執行錯誤

    • 這種情況下,找到對應的事務,執行回滾即可,回滾流程請參考上面一說,對的,我就是這麼膽小怕事 

       

#3.3 測試案例

################################################# #3.3.1 全庫回溯某段時間#########假設需要回滾9點10分到9點15分間資料庫的所有操作:###
  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 线上测试环境修改set global max_allowed_packet = 1073741824

  • 回滚数据,action=1

  • 线上测试环境修改set global max_allowed_packet = 4194304

 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1 54 \033[0m                        
 55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id {} and auto_id  {} and auto_id 

 

以上是mysql基於binlog回溯工具實例詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
您什麼時候應該使用複合索引與多個單列索引?您什麼時候應該使用複合索引與多個單列索引?Apr 11, 2025 am 12:06 AM

在數據庫優化中,應根據查詢需求選擇索引策略:1.當查詢涉及多個列且條件順序固定時,使用複合索引;2.當查詢涉及多個列但條件順序不固定時,使用多個單列索引。複合索引適用於優化多列查詢,單列索引則適合單列查詢。

如何識別和優化MySQL中的慢速查詢? (慢查詢日誌,performance_schema)如何識別和優化MySQL中的慢速查詢? (慢查詢日誌,performance_schema)Apr 10, 2025 am 09:36 AM

要優化MySQL慢查詢,需使用slowquerylog和performance_schema:1.啟用slowquerylog並設置閾值,記錄慢查詢;2.利用performance_schema分析查詢執行細節,找出性能瓶頸並優化。

MySQL和SQL:開發人員的基本技能MySQL和SQL:開發人員的基本技能Apr 10, 2025 am 09:30 AM

MySQL和SQL是開發者必備技能。 1.MySQL是開源的關係型數據庫管理系統,SQL是用於管理和操作數據庫的標準語言。 2.MySQL通過高效的數據存儲和檢索功能支持多種存儲引擎,SQL通過簡單語句完成複雜數據操作。 3.使用示例包括基本查詢和高級查詢,如按條件過濾和排序。 4.常見錯誤包括語法錯誤和性能問題,可通過檢查SQL語句和使用EXPLAIN命令優化。 5.性能優化技巧包括使用索引、避免全表掃描、優化JOIN操作和提升代碼可讀性。

描述MySQL異步主奴隸複製過程。描述MySQL異步主奴隸複製過程。Apr 10, 2025 am 09:30 AM

MySQL異步主從復制通過binlog實現數據同步,提升讀性能和高可用性。 1)主服務器記錄變更到binlog;2)從服務器通過I/O線程讀取binlog;3)從服務器的SQL線程應用binlog同步數據。

mysql:簡單的概念,用於輕鬆學習mysql:簡單的概念,用於輕鬆學習Apr 10, 2025 am 09:29 AM

MySQL是一個開源的關係型數據庫管理系統。 1)創建數據庫和表:使用CREATEDATABASE和CREATETABLE命令。 2)基本操作:INSERT、UPDATE、DELETE和SELECT。 3)高級操作:JOIN、子查詢和事務處理。 4)調試技巧:檢查語法、數據類型和權限。 5)優化建議:使用索引、避免SELECT*和使用事務。

MySQL:數據庫的用戶友好介紹MySQL:數據庫的用戶友好介紹Apr 10, 2025 am 09:27 AM

MySQL的安裝和基本操作包括:1.下載並安裝MySQL,設置根用戶密碼;2.使用SQL命令創建數據庫和表,如CREATEDATABASE和CREATETABLE;3.執行CRUD操作,使用INSERT,SELECT,UPDATE,DELETE命令;4.創建索引和存儲過程以優化性能和實現複雜邏輯。通過這些步驟,你可以從零開始構建和管理MySQL數據庫。

InnoDB緩衝池如何工作,為什麼對性能至關重要?InnoDB緩衝池如何工作,為什麼對性能至關重要?Apr 09, 2025 am 12:12 AM

InnoDBBufferPool通過將數據和索引頁加載到內存中來提升MySQL數據庫的性能。 1)數據頁加載到BufferPool中,減少磁盤I/O。 2)臟頁被標記並定期刷新到磁盤。 3)LRU算法管理數據頁淘汰。 4)預讀機制提前加載可能需要的數據頁。

MySQL:初學者的數據管理易用性MySQL:初學者的數據管理易用性Apr 09, 2025 am 12:07 AM

MySQL適合初學者使用,因為它安裝簡單、功能強大且易於管理數據。 1.安裝和配置簡單,適用於多種操作系統。 2.支持基本操作如創建數據庫和表、插入、查詢、更新和刪除數據。 3.提供高級功能如JOIN操作和子查詢。 4.可以通過索引、查詢優化和分錶分區來提升性能。 5.支持備份、恢復和安全措施,確保數據的安全和一致性。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
3 週前By尊渡假赌尊渡假赌尊渡假赌

熱工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

將Eclipse與SAP NetWeaver應用伺服器整合。