업데이트 및 삭제 조건을 잘못 기재하거나 전혀 기재하지 않아 데이터 연산 오류가 발생하고, 잘못 연산된 행 레코드를 복원해야 하는 상황이 발생합니다. 이런 상황은 실제로 발생합니다. 백업 파일 + binlog를 사용하여 테스트 환경으로 복원한 후 데이터 복구를 수행할 수도 있지만 실제로는 일정량의 시간과 리소스가 필요합니다.
1 구현 내용

Rollback은 2가지 명령으로 나누어집니다. 첫 번째 명령은 binglog를 분석하여 데이터베이스에 저장하고, 두 번째 명령은 롤백 작업을 수행합니다.
롤백 시 실행 스크립트와 롤백 스크립트를 통합할 수 있습니다. 데이터베이스에 저장된 업데이트 내용과 롤백 내용을 볼 수 있습니다.
저장된 분석 테이블에 따라 복원할 트랜잭션이나 지정된 테이블을 지정하는 것이 편리합니다.
자세한 로그 출력에는 분석 진행 상황과 실행 진행 상황.


2 원칙
동일한 트랜잭션 실행 순서를 역순으로 실행해야 함
SQL 구문 분석 및 롤백
동일 트랜잭션 작업 및 다른 테이블 처리
개행 문자, 탭 문자 등 이스케이프 문자 처리 등
timestamp 데이터 유형 매개변수 값 변환
음수 처리
단일 트랜잭션에 행 수정 SQL 작업 포함 max_allow
전체 데이터베이스 롤백 대신 특정 테이블에 대한 롤백 수행
의 시작과 끝은 Xid가 나타나는 위치에 따라 판단됩니다. binlog 파일의 시작 부분부터 읽어서 Xid를 만날 때까지 추출합니다. , 이전에 추출된 SQL은 트랜잭션으로 요약됩니다. 그런 다음 다음 Xid를 만날 때까지 SQL 문을 계속 추출한 다음 이 트랜잭션의 SQL을 하나의 트랜잭션으로 요약합니다. 이 주기는 파일의 순차적 순회가 끝날 때까지 계속됩니다.

2.2
동일한 트랜잭션에서 여러 테이블과 여러 행 레코드가 변경되면 롤백 중에 SQL을 역순으로 롤백해야 하는데 어떻게 해야 합니까? 추출된 SQL을 역순으로 저장하려면? 아이디어는 다음과 같습니다:
레코드 각 행의 수정된 SQL을 분리한다
독립적인 SQL을 역순으로 저장한다
정방향 트랜잭션 SQL 문이 변수 dml_sql에 저장된다고 가정하면, 역순으로 SQL은 undo_sql 변수에 롤백으로 저장할 수 있습니다. 행 레코드 수정 SQL을 순서대로 추출하여 Record_sql 변수에 저장한 후 undo_sql =record_sql + undo_sql 값을 할당한 후 Record_sql 변수를 공백으로 설정하면 역트랜잭션 내 SQL 실행이 구현될 수 있다. .
2.3 롤백 SQL 구문 분석
먼저 binlog의 로그 내용을 확인하여 행 수정의 SQL 상황이 다음과 같은지 확인합니다. 추출 과정에서 다음 사항에 주의해야 합니다.
행 레코드의 열 이름 일치, binlog 파일에 저장된 열 번호는 직접 사용할 수 없습니다. WHERE 부분과 SET 부분 사이에 키워드나 기호를 추가해야 합니다. SQL을 INSERT
UPDATE SQL로 바꿔야 합니다. INSERT SQL을 DELETE로 바꿔야 하는데, 컬럼 일련번호를 로 바꿀 때 주의가 필요합니다. 열 이름.
각 행 레코드 앞에는 'Table_map' 표시가 포함된 레코드 행이 있으며, 이는 이 레코드 행에서 어떤 테이블이 수정되었는지 나타냅니다. 이 프롬프트에 따라 binlog의 열 일련 번호를 다음으로 바꿀 수 있습니다. 열 이름. -
2.5 이스케이프 문자 처리
binlog 파일은 이스케이프 문자 문자열 저장소를 사용하여 공백이 아닌 공백 문자를 처리합니다. 예를 들어 테이블의 삽입 열에 있는 레코드에는 개행 문자가 포함되어 있지만 실제로는 binlog 파일에 있습니다. , 개행 연산을 대체하는 데 x0a가 사용되므로 데이터 롤백 과정에서 이스케이프 문자를 처리해야 합니다.
-

여기서 주목해야 할 점은 039의 이스케이프 문자가 esc_code 함수에서 균일하게 처리되지 않고 별도로 처리된다는 점입니다.
전송 문자 테이블은 아래와 같습니다.
2.6 타임스탬프 데이터 형식 처리 
데이터베이스에 실제 저장되는 타임스탬프 값은 INT 형식이므로 필수 기능 사용 from_unixtime으로 변환합니다.
타임스탬프 열이 하나만 있는 테스트 테이블 tbtest를 생성합니다. 값을 저장한 후 binlog의 내용을 확인합니다. 구체적인 스크린샷은 다음과 같습니다.
행 레코드를 처리할 때 타임스탬프를 처리해야 합니다. 값을 입력하고 from_unixtime 함수 Convert를 추가합니다.
2.7 음수 처리
처음 코드를 작성할 때는 고려하지 않았습니다. 광범위한 테스트를 통해 모든 정수 데이터 유형이 음수를 저장할 때 최대 범위 값을 저장한다는 사실이 발견되었습니다. binlog가 이를 처리하는 방법에 대한 메커니즘은 그다지 명확하지 않습니다. 테스트 내용은 다음과 같습니다.
따라서 다양한 데이터 유형이 INT이고 VALUE가 음수인 경우 undo_sql을 실행하기 전에 이 범위 값을 제거해야 합니다.
2.8 단일 트랜잭션 행 레코드의 전체 SQL이 max_allowed_package 처리를 초과합니다
binlog를 분석한 후 두 가지 유형의 SQL이 저장됩니다. 하나는 행 레코드의 수정 SQL, 즉 dml_sql입니다. 행 레코드의 SQL, 즉 undo_sql을 롤백합니다. 이 두 SQL을 저장하는 열은 최대 4G의 콘텐츠를 저장할 수 있는 longtext라는 것을 코드에서 볼 수 있습니다. 그러나 MySQL에서는 단일 세션의 패킷 크기가 제한되어 있습니다. 기본 크기는 4Mb이고 최대 크기는 1G입니다. 따라서 이 스크립트를 사용하기 전에 binlog 파일을 저장하는 데이터베이스 인스턴스를 수동으로 설정하십시오. 온라인 데이터베이스 인스턴스:

set global max_allowed_packet = 1073741824 #나중에 수정해야 함
작동하면 어떻게 되나요? 그러면 롤백은 세그먼트 단위로만 가능합니다. 먼저 이 대규모 트랜잭션으로 롤백한 다음 이 대규모 트랜잭션을 별도로 실행한 다음 계속해서 롤백할 수 있습니다. 이 부분은 pymysql이나 소스 파일을 사용하여 실행할 수 없으므로 이 작업은 가능합니다. 수동으로만 수행할 수 있습니다. 유능한 사람들에게 이 논리 코드를 수정하도록 요청하십시오! ! !
2.9 Targeted Rollback
오작동에 대한 명확한 시점이 없고 간격이 하나만 있고 이 간격에 다른 테이블 작업이 있다고 가정하면 이때 binlog 파일을 분석할 때 필요한 것은 추가 -- 데이터베이스 옵션의 경우 먼저 동일한 데이터베이스에 있는 binlog 파일을 선택합니다.
여기서 처리하는 것은 이 간격의 dml_sql과 undo_sql을 데이터베이스 테이블에 저장한 후 롤백할 필요가 없는 트랜잭션과 롤백해야 할 나머지 트랜잭션을 삭제하는 것입니다. 롤백 작업을 다시 수행하십시오.
3 사용 지침
3.1 매개 변수 설명
이 스크립트에는 약간 더 많은 매개 변수가 있으며 --help를 사용하여 특정 지침을 볼 수 있습니다.

저는 매개변수를 분류할 때 다양한 색상을 사용하는 것을 좋아하므로(블링블링은 컬러풀하고 재미있고 에너지가 넘치게 보입니다) 이러한 매개변수를 색상별로 설명하겠습니다.
노란색 영역: 이 6개의 매개변수는 binlog 파일을 분석하고 저장하기 위한 관련 값을 제공하며, 분석 결과를 저장하는 데이터베이스의 링크 방법, binlog 파일의 위치 및 파일 이름을 나타냅니다. 결과가 저장되는 테이블
파란색 영역: 이 4개 매개변수는 온라인 데이터베이스 테이블 구조와 동일한 DB 인스턴스 연결 방법을 제공하며, 온라인 데이터베이스와 동일한 테이블 구조만 필요합니다. 반드시 마스터-슬레이브 데이터베이스여야 합니다.
녹색 영역: 가장 중요 -a, 0 옵션은 binlog 파일만 분석함을 의미하고, 1은 롤백 작업만 수행함을 의미하며, 1을 실행하기 전에 0을 실행해야 합니다. ;
보라색 영역: 예.
3.2 애플리케이션 시나리오 설명
특정 기간 동안 전체 데이터베이스 롤백
특정 기간의 모든 SQL 작업을 특정 시점으로 롤백해야 함
-
이런 경우에는 백업파일 + binlog
를 사용하면 대부분 해결됩니다. 하지만 이 스크립트도 요구사항을 충족할 수는 있지만 온라인에서 직접 실행하지 마시기 바랍니다. 우선 -a=0으로 분석 결과를 확인하시기 바랍니다. 일관성이 있는 경우 슬레이브 데이터베이스를 중지한 후 슬레이브 데이터베이스에서 실행하고 마지막으로 지정된 시점으로 복원되었는지, 데이터가 정상인지 확인하는 비즈니스 액세스를 개발합니다.
특정 기간 동안 특정 테이블에서 일부 작업이 롤백됩니다.
-
예를 들어 개발팀에서 일괄 업데이트 스크립트를 제출했는데 모든 테스트 수준에서 검증에 문제가 없었습니다. 온라인 실행을 위해 실행했지만, 실행 후 비즈니스가 테스트를 놓쳐 일부 필드가 업데이트되고 다른 비즈니스에 영향을 미치는 것으로 나타났습니다. 이제 일괄 업데이트된 테이블을 원래 행 레코드로 긴급히 롤백해야 합니다
기술적인 측면에서만 처리할 수는 없고 종합적으로 고려해야 합니다
-

이 경우 탭 A 테이블의 수정 작업을 어떻게 검토해야 할까요?
개인적으로는 이 방법이 더 타당하다고 생각합니다. tabA 테이블의 데이터를 테스트 환경에 덤프한 후, binlog 파일 undo sql을 11시부터 12시까지 분석한 후 테이블을 롤백합니다. 11시 테스트 환경 이때 개발 및 사업부는 테스트 환경의 11시 데이터와 온라인상의 기존 데이터를 비교하여 어떤 행과 열을 온라인으로 롤백해야 하는지, 어떤 것이 롤백되어야 하는지 확인하게 됩니다. 필요하지 않으면 개발이 이를 제출하고 SQL 스크립트가 온라인으로 실행됩니다. 실제로 여기서 DBA는 새로운 환경에서 테이블 탭 A를 특정 시점으로 롤백하는 역할만 제공할 뿐, SQL 처리에 대한 직접온라인 롤백은 제공하지 않습니다.
특정 SQL/일부 SQL 롤백
이 상황은 비교적 일반적입니다. 업데이트 또는 삭제에는 where 조건 또는 where 조건 실행 오류가 없습니다.
-
이 경우 다음을 찾으세요. 해당 트랜잭션의 롤백을 실행해 보세요. 네, 너무 소심하고 두렵습니다. 
3.3 테스트 케이스
3.3.1 데이터베이스 롤백 특정 시간에
9시 10분에서 9시 15분 사이에 데이터베이스의 모든 작업을 롤백해야 한다고 가정해 보겠습니다.
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
线上测试环境修改set global max_allowed_packet = 1073741824
回滚数据,action=1
线上测试环境修改set global max_allowed_packet = 4194304
1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name | Value | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
3.3.2 某段时间某些表格回滚某些操作
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
3.3.3 回滚某个/些SQL
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
4 python脚本
脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。
1 # -*- coding: utf-8 -*- 2 __author__ = 'xinysu' 3 __date__ = '2017/6/15 10:30' 4 5 6 7 import re 8 import os 9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16 17 import pymysql 18 from pymysql.cursors import DictCursor 19 20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h : host, the database host,which database will store the results after analysis 24 -u : user, the db user 25 -p : password, the db user's password 26 -P : port, the db port 27 28 -f : file path, the binlog file 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter
31 is test.tbevent 32 \033[1;34;40m 33 -oh : online host, the database host,which database have the online table schema 34 -ou : online user, the db user 35 -op : online password, the db user's password 36 -oP : online port, the db port 37 \033[1;32;40m 38 -a : action,
39 0 just analyse the binlog file ,and store sql in table;
40 1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m
42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
47 -oh=192.168.9.244 -oP=3310 -u=root -op=***
48 -a=0 49 50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
52 -oh=192.168.9.244 -oP=3310 -u=root -op=***
53 -a=1 54 \033[0m
55 ''' 56 57 class flashback: 58 def __init__(self): 59 self.host='' 60 self.user='' 61 self.password='' 62 self.port='3306' 63 self.fpath='' 64 self.tbevent='' 65 66 self.on_host='' 67 self.on_user='' 68 self.on_password='' 69 self.on_port='3306' 70 71 self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72 73 self._get_db() # 从输入参数获取连接数据库的相关参数值 74 75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78 self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79 logging.info('MySQL which userd to store binlog event connection is ok') 80 81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83 logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84 self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85 self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86 logging.info('MySQL which userd to analyse online table schema connection is ok') 87 88 logging.info('\033[33mMySQL connection is ok\033[0m') 89 90 self.dml_sql='' 91 self.undo_sql='' 92 93 self.tbfield_where = [] 94 self.tbfield_set = [] 95 96 self.begin_time='' 97 self.db_name='' 98 self.tb_name='' 99 self.end_time=''100 self.end_pos=''101 self.sqltype=0102 103 #_get_db用于获取执行命令的输入参数104 def _get_db(self):105 logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108 sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111 sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114 _argv = i.split('=')115 if _argv[0] == '-h':116 self.host = _argv[1]117 elif _argv[0] == '-u':118 self.user = _argv[1]119 elif _argv[0] == '-P':120 self.port = int(_argv[1])121 elif _argv[0] == '-f':122 self.fpath = _argv[1]123 elif _argv[0] == '-t':124 self.tbevent = _argv[1]125 elif _argv[0] == '-p':126 self.password = _argv[1]127 128 elif _argv[0] == '-oh':129 self.on_host = _argv[1]130 elif _argv[0] == '-ou':131 self.on_user = _argv[1]132 elif _argv[0] == '-oP':133 self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135 self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138 self.action = _argv[1]139 140 else:141 print(usage)142 143 #创建表格,用于存储分析后的BINLOG内容144 def create_tab(self):145 logging.info('creating table {} to store binlog event'.format(self.tbevent))146 create_tb_sql ='''147 CREATE TABLE IF NOT EXISTS {}(148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158 PRIMARY KEY (auto_id),159 INDEX sqltype(sqltype),160 INDEX dml_start_time (dml_start_time),161 INDEX dml_end_time (dml_end_time),162 INDEX end_log_pos (end_log_pos),163 INDEX db_name (db_name),164 INDEX table_name (table_name)165 )166 COLLATE='utf8_general_ci' ENGINE=InnoDB;167 TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170 self.cur.execute(create_tb_sql)171 logging.info('created table {} '.format(self.tbevent))172 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174 def tbschema(self,dbname,tbname):175 self.tbfield_where = []176 self.tbfield_set = []177 178 sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180 self.on_cur.execute(sql_tb)181 tbcol=self.on_cur.fetchall()182 183 i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187 self.tbfield_where.append('`'+l['Field']+'`')188 self.tbfield_set.append('`'+l['Field']+'`')189 i+=1190 else:191 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198 l = bl_line.index('server')199 m = bl_line.index('end_log_pos')200 n = bl_line.index('Table_map')201 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217 l = bl_line.index('server')218 m = bl_line.index('end_log_pos')219 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229 self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231 self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233 self.sqltype=3234 235 record_sql = ''236 undosql_desc = ''237 238 #同个事务内部的行记录修改SQL,反序存储239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241 undosql_desc = record_sql + undosql_desc242 record_sql = ''243 record_sql = record_sql + l244 else:245 record_sql = record_sql + l246 247 self.undo_sql = record_sql + undosql_desc248 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #处理非空格的空白特殊字符251 self.dml_sql = self.esc_code(self.dml_sql)252 self.undo_sql = self.esc_code(self.undo_sql)253 254 #单独处理 转移字符: \'255 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''") # + ';'256 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''") # + ';'257 258 if len(self.dml_sql)>500000000:259 with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260 w_f.write('begin;' + '\n')261 w_f.write(self.undo_sql)262 w_f.write('commit;' + '\n')263 self.dml_sql=''264 self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265 logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271 self.cur.execute(insert_sql)272 self.mysqlconn.commit()273 274 self.dml_sql = ''275 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282 sqlcomma=0283 self.create_tab()284 285 with open(self.fpath,'r') as binlog_file:286 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290 self.rowrecord(bline)291 bline=''292 elif bline.rstrip()=='### SET':293 bline = bline[3:]294 sqlcomma=1295 elif bline.rstrip()=='### WHERE':296 bline = bline[3:]297 sqlcomma = 2298 elif bline.startswith('### @'):299 len_f=len('### @')300 i=bline[len_f:].split('=')[0]301 302 #处理timestamp类型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305 bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #处理负数存储方式308 if bline.split('=')[1].startswith('-'):309 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310 bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313 bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315 bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318 bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321 self.dml_tran(bline)322 bline=''323 else:324 bline = ''325 326 if bline.rstrip('\n') != '':327 self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332 esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336 }337 338 for k,v in esc.items():339 sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344 countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346 self.cur.execute(countsql)347 count_row=self.cur.fetchall()348 349 update_count=0350 insert_couont=0351 delete_count=0352 for row in count_row:353 if row['sqltype']==1:354 insert_couont=row['numbers']355 elif row['sqltype']==2:356 update_count=row['numbers']357 elif row['sqltype']==3:358 delete_count=row['numbers']359 logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #这里会有几个问题:363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367 tran_num=1368 id=0369 370 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372 self.cur.execute(tran_num_sql)373 tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376 tran_num=num['table_rows']377 378 logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id {} and auto_id {} and auto_id
레코드 각 행의 수정된 SQL을 분리한다
독립적인 SQL을 역순으로 저장한다
행 레코드의 열 이름 일치, binlog 파일에 저장된 열 번호는 직접 사용할 수 없습니다. WHERE 부분과 SET 부분 사이에 키워드나 기호를 추가해야 합니다. SQL을 INSERT
UPDATE SQL로 바꿔야 합니다. INSERT SQL을 DELETE로 바꿔야 하는데, 컬럼 일련번호를 로 바꿀 때 주의가 필요합니다. 열 이름.
binlog 파일은 이스케이프 문자 문자열 저장소를 사용하여 공백이 아닌 공백 문자를 처리합니다. 예를 들어 테이블의 삽입 열에 있는 레코드에는 개행 문자가 포함되어 있지만 실제로는 binlog 파일에 있습니다. , 개행 연산을 대체하는 데 x0a가 사용되므로 데이터 롤백 과정에서 이스케이프 문자를 처리해야 합니다.

데이터베이스에 실제 저장되는 타임스탬프 값은 INT 형식이므로 필수 기능 사용 from_unixtime으로 변환합니다.
타임스탬프 열이 하나만 있는 테스트 테이블 tbtest를 생성합니다. 값을 저장한 후 binlog의 내용을 확인합니다. 구체적인 스크린샷은 다음과 같습니다.



set global max_allowed_packet = 1073741824 #나중에 수정해야 함

노란색 영역: 이 6개의 매개변수는 binlog 파일을 분석하고 저장하기 위한 관련 값을 제공하며, 분석 결과를 저장하는 데이터베이스의 링크 방법, binlog 파일의 위치 및 파일 이름을 나타냅니다. 결과가 저장되는 테이블
파란색 영역: 이 4개 매개변수는 온라인 데이터베이스 테이블 구조와 동일한 DB 인스턴스 연결 방법을 제공하며, 온라인 데이터베이스와 동일한 테이블 구조만 필요합니다. 반드시 마스터-슬레이브 데이터베이스여야 합니다.
녹색 영역: 가장 중요 -a, 0 옵션은 binlog 파일만 분석함을 의미하고, 1은 롤백 작업만 수행함을 의미하며, 1을 실행하기 전에 0을 실행해야 합니다. ;
보라색 영역: 예.
특정 기간 동안 전체 데이터베이스 롤백
특정 기간의 모든 SQL 작업을 특정 시점으로 롤백해야 함
-
이런 경우에는 백업파일 + binlog
를 사용하면 대부분 해결됩니다. 하지만 이 스크립트도 요구사항을 충족할 수는 있지만 온라인에서 직접 실행하지 마시기 바랍니다. 우선 -a=0으로 분석 결과를 확인하시기 바랍니다. 일관성이 있는 경우 슬레이브 데이터베이스를 중지한 후 슬레이브 데이터베이스에서 실행하고 마지막으로 지정된 시점으로 복원되었는지, 데이터가 정상인지 확인하는 비즈니스 액세스를 개발합니다.
특정 기간 동안 특정 테이블에서 일부 작업이 롤백됩니다.
-
예를 들어 개발팀에서 일괄 업데이트 스크립트를 제출했는데 모든 테스트 수준에서 검증에 문제가 없었습니다. 온라인 실행을 위해 실행했지만, 실행 후 비즈니스가 테스트를 놓쳐 일부 필드가 업데이트되고 다른 비즈니스에 영향을 미치는 것으로 나타났습니다. 이제 일괄 업데이트된 테이블을 원래 행 레코드로 긴급히 롤백해야 합니다
기술적인 측면에서만 처리할 수는 없고 종합적으로 고려해야 합니다
-
이 경우 탭 A 테이블의 수정 작업을 어떻게 검토해야 할까요?
개인적으로는 이 방법이 더 타당하다고 생각합니다. tabA 테이블의 데이터를 테스트 환경에 덤프한 후, binlog 파일 undo sql을 11시부터 12시까지 분석한 후 테이블을 롤백합니다. 11시 테스트 환경 이때 개발 및 사업부는 테스트 환경의 11시 데이터와 온라인상의 기존 데이터를 비교하여 어떤 행과 열을 온라인으로 롤백해야 하는지, 어떤 것이 롤백되어야 하는지 확인하게 됩니다. 필요하지 않으면 개발이 이를 제출하고 SQL 스크립트가 온라인으로 실행됩니다. 실제로 여기서 DBA는 새로운 환경에서 테이블 탭 A를 특정 시점으로 롤백하는 역할만 제공할 뿐, SQL 처리에 대한 직접온라인 롤백은 제공하지 않습니다.
특정 SQL/일부 SQL 롤백
이 상황은 비교적 일반적입니다. 업데이트 또는 삭제에는 where 조건 또는 where 조건 실행 오류가 없습니다.
-
이 경우 다음을 찾으세요. 해당 트랜잭션의 롤백을 실행해 보세요. 네, 너무 소심하고 두렵습니다.
3.3 테스트 케이스
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
线上测试环境修改set global max_allowed_packet = 1073741824
回滚数据,action=1
线上测试环境修改set global max_allowed_packet = 4194304
1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name | Value | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
1 # -*- coding: utf-8 -*- 2 __author__ = 'xinysu' 3 __date__ = '2017/6/15 10:30' 4 5 6 7 import re 8 import os 9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16 17 import pymysql 18 from pymysql.cursors import DictCursor 19 20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h : host, the database host,which database will store the results after analysis 24 -u : user, the db user 25 -p : password, the db user's password 26 -P : port, the db port 27 28 -f : file path, the binlog file 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter 31 is test.tbevent 32 \033[1;34;40m 33 -oh : online host, the database host,which database have the online table schema 34 -ou : online user, the db user 35 -op : online password, the db user's password 36 -oP : online port, the db port 37 \033[1;32;40m 38 -a : action, 39 0 just analyse the binlog file ,and store sql in table; 40 1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 47 -oh=192.168.9.244 -oP=3310 -u=root -op=*** 48 -a=0 49 50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 52 -oh=192.168.9.244 -oP=3310 -u=root -op=*** 53 -a=1 54 \033[0m 55 ''' 56 57 class flashback: 58 def __init__(self): 59 self.host='' 60 self.user='' 61 self.password='' 62 self.port='3306' 63 self.fpath='' 64 self.tbevent='' 65 66 self.on_host='' 67 self.on_user='' 68 self.on_password='' 69 self.on_port='3306' 70 71 self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72 73 self._get_db() # 从输入参数获取连接数据库的相关参数值 74 75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78 self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79 logging.info('MySQL which userd to store binlog event connection is ok') 80 81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83 logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84 self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85 self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86 logging.info('MySQL which userd to analyse online table schema connection is ok') 87 88 logging.info('\033[33mMySQL connection is ok\033[0m') 89 90 self.dml_sql='' 91 self.undo_sql='' 92 93 self.tbfield_where = [] 94 self.tbfield_set = [] 95 96 self.begin_time='' 97 self.db_name='' 98 self.tb_name='' 99 self.end_time=''100 self.end_pos=''101 self.sqltype=0102 103 #_get_db用于获取执行命令的输入参数104 def _get_db(self):105 logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108 sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111 sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114 _argv = i.split('=')115 if _argv[0] == '-h':116 self.host = _argv[1]117 elif _argv[0] == '-u':118 self.user = _argv[1]119 elif _argv[0] == '-P':120 self.port = int(_argv[1])121 elif _argv[0] == '-f':122 self.fpath = _argv[1]123 elif _argv[0] == '-t':124 self.tbevent = _argv[1]125 elif _argv[0] == '-p':126 self.password = _argv[1]127 128 elif _argv[0] == '-oh':129 self.on_host = _argv[1]130 elif _argv[0] == '-ou':131 self.on_user = _argv[1]132 elif _argv[0] == '-oP':133 self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135 self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138 self.action = _argv[1]139 140 else:141 print(usage)142 143 #创建表格,用于存储分析后的BINLOG内容144 def create_tab(self):145 logging.info('creating table {} to store binlog event'.format(self.tbevent))146 create_tb_sql ='''147 CREATE TABLE IF NOT EXISTS {}(148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158 PRIMARY KEY (auto_id),159 INDEX sqltype(sqltype),160 INDEX dml_start_time (dml_start_time),161 INDEX dml_end_time (dml_end_time),162 INDEX end_log_pos (end_log_pos),163 INDEX db_name (db_name),164 INDEX table_name (table_name)165 )166 COLLATE='utf8_general_ci' ENGINE=InnoDB;167 TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170 self.cur.execute(create_tb_sql)171 logging.info('created table {} '.format(self.tbevent))172 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174 def tbschema(self,dbname,tbname):175 self.tbfield_where = []176 self.tbfield_set = []177 178 sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180 self.on_cur.execute(sql_tb)181 tbcol=self.on_cur.fetchall()182 183 i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187 self.tbfield_where.append('`'+l['Field']+'`')188 self.tbfield_set.append('`'+l['Field']+'`')189 i+=1190 else:191 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198 l = bl_line.index('server')199 m = bl_line.index('end_log_pos')200 n = bl_line.index('Table_map')201 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217 l = bl_line.index('server')218 m = bl_line.index('end_log_pos')219 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229 self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231 self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233 self.sqltype=3234 235 record_sql = ''236 undosql_desc = ''237 238 #同个事务内部的行记录修改SQL,反序存储239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241 undosql_desc = record_sql + undosql_desc242 record_sql = ''243 record_sql = record_sql + l244 else:245 record_sql = record_sql + l246 247 self.undo_sql = record_sql + undosql_desc248 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #处理非空格的空白特殊字符251 self.dml_sql = self.esc_code(self.dml_sql)252 self.undo_sql = self.esc_code(self.undo_sql)253 254 #单独处理 转移字符: \'255 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''") # + ';'256 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''") # + ';'257 258 if len(self.dml_sql)>500000000:259 with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260 w_f.write('begin;' + '\n')261 w_f.write(self.undo_sql)262 w_f.write('commit;' + '\n')263 self.dml_sql=''264 self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265 logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271 self.cur.execute(insert_sql)272 self.mysqlconn.commit()273 274 self.dml_sql = ''275 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282 sqlcomma=0283 self.create_tab()284 285 with open(self.fpath,'r') as binlog_file:286 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290 self.rowrecord(bline)291 bline=''292 elif bline.rstrip()=='### SET':293 bline = bline[3:]294 sqlcomma=1295 elif bline.rstrip()=='### WHERE':296 bline = bline[3:]297 sqlcomma = 2298 elif bline.startswith('### @'):299 len_f=len('### @')300 i=bline[len_f:].split('=')[0]301 302 #处理timestamp类型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305 bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #处理负数存储方式308 if bline.split('=')[1].startswith('-'):309 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310 bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313 bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315 bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318 bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321 self.dml_tran(bline)322 bline=''323 else:324 bline = ''325 326 if bline.rstrip('\n') != '':327 self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332 esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336 }337 338 for k,v in esc.items():339 sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344 countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346 self.cur.execute(countsql)347 count_row=self.cur.fetchall()348 349 update_count=0350 insert_couont=0351 delete_count=0352 for row in count_row:353 if row['sqltype']==1:354 insert_couont=row['numbers']355 elif row['sqltype']==2:356 update_count=row['numbers']357 elif row['sqltype']==3:358 delete_count=row['numbers']359 logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #这里会有几个问题:363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367 tran_num=1368 id=0369 370 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372 self.cur.execute(tran_num_sql)373 tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376 tran_num=num['table_rows']377 378 logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id {} and auto_id {} and auto_id
위 내용은 mysql 기반 binlog 롤백 도구 예제에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

데이터베이스 및 프로그래밍에서 MySQL의 위치는 매우 중요합니다. 다양한 응용 프로그램 시나리오에서 널리 사용되는 오픈 소스 관계형 데이터베이스 관리 시스템입니다. 1) MySQL은 웹, 모바일 및 엔터프라이즈 레벨 시스템을 지원하는 효율적인 데이터 저장, 조직 및 검색 기능을 제공합니다. 2) 클라이언트 서버 아키텍처를 사용하고 여러 스토리지 엔진 및 인덱스 최적화를 지원합니다. 3) 기본 사용에는 테이블 작성 및 데이터 삽입이 포함되며 고급 사용에는 다중 테이블 조인 및 복잡한 쿼리가 포함됩니다. 4) SQL 구문 오류 및 성능 문제와 같은 자주 묻는 질문은 설명 명령 및 느린 쿼리 로그를 통해 디버깅 할 수 있습니다. 5) 성능 최적화 방법에는 인덱스의 합리적인 사용, 최적화 된 쿼리 및 캐시 사용이 포함됩니다. 모범 사례에는 거래 사용 및 준비된 체계가 포함됩니다

MySQL은 소규모 및 대기업에 적합합니다. 1) 소기업은 고객 정보 저장과 같은 기본 데이터 관리에 MySQL을 사용할 수 있습니다. 2) 대기업은 MySQL을 사용하여 대규모 데이터 및 복잡한 비즈니스 로직을 처리하여 쿼리 성능 및 트랜잭션 처리를 최적화 할 수 있습니다.

InnoDB는 팬텀 읽기를 차세대 점화 메커니즘을 통해 효과적으로 방지합니다. 1) Next-Keylocking은 Row Lock과 Gap Lock을 결합하여 레코드와 간격을 잠그기 위해 새로운 레코드가 삽입되지 않도록합니다. 2) 실제 응용 분야에서 쿼리를 최적화하고 격리 수준을 조정함으로써 잠금 경쟁을 줄이고 동시성 성능을 향상시킬 수 있습니다.

MySQL은 프로그래밍 언어가 아니지만 쿼리 언어 SQL은 프로그래밍 언어의 특성을 가지고 있습니다. 1. SQL은 조건부 판단, 루프 및 가변 작업을 지원합니다. 2. 저장된 절차, 트리거 및 기능을 통해 사용자는 데이터베이스에서 복잡한 논리 작업을 수행 할 수 있습니다.

MySQL은 오픈 소스 관계형 데이터베이스 관리 시스템으로, 주로 데이터를 신속하고 안정적으로 저장하고 검색하는 데 사용됩니다. 작업 원칙에는 클라이언트 요청, 쿼리 해상도, 쿼리 실행 및 반환 결과가 포함됩니다. 사용의 예로는 테이블 작성, 데이터 삽입 및 쿼리 및 조인 작업과 같은 고급 기능이 포함됩니다. 일반적인 오류에는 SQL 구문, 데이터 유형 및 권한이 포함되며 최적화 제안에는 인덱스 사용, 최적화 된 쿼리 및 테이블 분할이 포함됩니다.

MySQL은 데이터 저장, 관리, 쿼리 및 보안에 적합한 오픈 소스 관계형 데이터베이스 관리 시스템입니다. 1. 다양한 운영 체제를 지원하며 웹 응용 프로그램 및 기타 필드에서 널리 사용됩니다. 2. 클라이언트-서버 아키텍처 및 다양한 스토리지 엔진을 통해 MySQL은 데이터를 효율적으로 처리합니다. 3. 기본 사용에는 데이터베이스 및 테이블 작성, 데이터 삽입, 쿼리 및 업데이트가 포함됩니다. 4. 고급 사용에는 복잡한 쿼리 및 저장 프로 시저가 포함됩니다. 5. 설명 진술을 통해 일반적인 오류를 디버깅 할 수 있습니다. 6. 성능 최적화에는 인덱스의 합리적인 사용 및 최적화 된 쿼리 문이 포함됩니다.

MySQL은 성능, 신뢰성, 사용 편의성 및 커뮤니티 지원을 위해 선택됩니다. 1.MYSQL은 효율적인 데이터 저장 및 검색 기능을 제공하여 여러 데이터 유형 및 고급 쿼리 작업을 지원합니다. 2. 고객-서버 아키텍처 및 다중 스토리지 엔진을 채택하여 트랜잭션 및 쿼리 최적화를 지원합니다. 3. 사용하기 쉽고 다양한 운영 체제 및 프로그래밍 언어를 지원합니다. 4. 강력한 지역 사회 지원을 받고 풍부한 자원과 솔루션을 제공합니다.

InnoDB의 잠금 장치에는 공유 잠금 장치, 독점 잠금, 의도 잠금 장치, 레코드 잠금, 갭 잠금 및 다음 키 잠금 장치가 포함됩니다. 1. 공유 잠금을 사용하면 다른 트랜잭션을 읽지 않고 트랜잭션이 데이터를 읽을 수 있습니다. 2. 독점 잠금은 다른 트랜잭션이 데이터를 읽고 수정하는 것을 방지합니다. 3. 의도 잠금은 잠금 효율을 최적화합니다. 4. 레코드 잠금 잠금 인덱스 레코드. 5. 갭 잠금 잠금 장치 색인 기록 간격. 6. 다음 키 잠금은 데이터 일관성을 보장하기 위해 레코드 잠금과 갭 잠금의 조합입니다.


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

드림위버 CS6
시각적 웹 개발 도구

안전한 시험 브라우저
안전한 시험 브라우저는 온라인 시험을 안전하게 치르기 위한 보안 브라우저 환경입니다. 이 소프트웨어는 모든 컴퓨터를 안전한 워크스테이션으로 바꿔줍니다. 이는 모든 유틸리티에 대한 액세스를 제어하고 학생들이 승인되지 않은 리소스를 사용하는 것을 방지합니다.

에디트플러스 중국어 크랙 버전
작은 크기, 구문 강조, 코드 프롬프트 기능을 지원하지 않음

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

WebStorm Mac 버전
유용한 JavaScript 개발 도구
