>  기사  >  데이터 베이스  >  mysql 기반 binlog 롤백 도구 예제에 대한 자세한 설명

mysql 기반 binlog 롤백 도구 예제에 대한 자세한 설명

PHP中文网
PHP中文网원래의
2017-06-21 10:27:261684검색

업데이트 및 삭제 조건을 잘못 기재하거나 전혀 기재하지 않아 데이터 연산 오류가 발생하고, 잘못 연산된 행 레코드를 복원해야 하는 상황이 발생합니다. 이런 상황은 실제로 발생합니다. 백업 파일 + binlog를 사용하여 테스트 환경으로 복원한 후 데이터 복구를 수행할 수도 있지만 실제로는 일정량의 시간과 리소스가 필요합니다.

실제로 binlog 형식이 행인 경우 각 트랜잭션에 관련된 작업이 binlog 파일에 자세히 기록되고, 각 트랜잭션에 영향을 받는 행 레코드가 저장되며, binlog 파일을 사용하여 행을 역분석할 수 있나요? 데이터베이스에 변경 사항을 기록하고 있습니까? 상황은 어떻습니까?
업계에는 관련 스크립트와 도구가 많이 있습니다. 그러나 MySQL 버전 업데이트, binlog 기록 내용 변경 및 요구 사항 불일치로 인해 대부분의 스크립트가 개인의 현재 사용 요구 사항에 적합하지 않아 mysql 플래시백을 작성하기 시작했습니다. 스크립트.


재인쇄하는 경우 블로그 게시물의 출처를 표시해 주세요: www.cnblogs.com/xinysu/ , 저작권은 Blog Park Sujia Xiaoluobo에 있습니다. 당신이 지원하기를 바랍니다!


MySQL 5.6/5.7 버전에서만 테스트되었으며 Python 운영 환경에는 pymysql 모듈을 설치해야 합니다.

1 구현 내용

binlog 파일을 기반으로 특정 트랜잭션, 특정 기간 동안 특정 테이블, 특정 기간 동안 전체 데이터베이스에 대해 롤백 작업을 수행하여 플래시백 기능을 구현합니다. 툴 처리 중 트랜잭션에 의해 수정된 행 레코드는 binlog에 테이블에 저장됩니다. dml_sql 컬럼을 통해 각 트랜잭션 내의 모든 행 레코드 변경 사항을 볼 수 있으며, undo_sql을 통해 롤백된 SQL 내용을 볼 수 있습니다. 아래와 같이 테이블 내용을 기반으로 롤백 작업을 수행합니다.
그렇다면 이 스크립트의 장점은 무엇인가요?
  • Rollback은 2가지 명령으로 나누어집니다. 첫 번째 명령은 binglog를 분석하여 데이터베이스에 저장하고, 두 번째 명령은 롤백 작업을 수행합니다.

  • 롤백 시 실행 스크립트와 롤백 스크립트를 통합할 수 있습니다. 데이터베이스에 저장된 업데이트 내용과 롤백 내용을 볼 수 있습니다.

  • 저장된 분석 테이블에 따라 복원할 트랜잭션이나 지정된 테이블을 지정하는 것이 편리합니다.

  • 자세한 로그 출력에는 분석 진행 상황과 실행 진행 상황.

binlog 출력 스크린샷 분석(1G binlog 파일 분석)
롤백 데이터베이스 출력 스크린샷:

2 원칙

전제 조건: 인스턴스가 시작되었습니다. binlog이고 형식은 ROW입니다. . ㅋㅋ          Python을 사용하여 mysqlbinlog 이후 로그 파일의 텍스트 분석 및 처리를 수행합니다. 전체 처리 과정에서 처리해야 할 다음 6가지 어려운 사항이 있습니다.
트랜잭션의 시작과 끝 결정
  1. 동일한 트랜잭션 실행 순서를 역순으로 실행해야 함

  2. SQL 구문 분석 및 롤백

  3. 동일 트랜잭션 작업 및 다른 테이블 처리

  4. 개행 문자, 탭 문자 등 이스케이프 문자 처리 등

  5. timestamp 데이터 유형 매개변수 값 변환

  6. 음수 처리

  7. 단일 트랜잭션에 행 수정 SQL 작업 포함 max_allow

  8. 전체 데이터베이스 롤백 대신 특정 테이블에 대한 롤백 수행

2.1 Transaction

의 시작과 끝은 Xid가 나타나는 위치에 따라 판단됩니다. binlog 파일의 시작 부분부터 읽어서 Xid를 만날 때까지 추출합니다. , 이전에 추출된 SQL은 트랜잭션으로 요약됩니다. 그런 다음 다음 Xid를 만날 때까지 SQL 문을 계속 추출한 다음 이 트랜잭션의 SQL을 하나의 트랜잭션으로 요약합니다. 이 주기는 파일의 순차적 순회가 끝날 때까지 계속됩니다.

2.2

트랜잭션 내부 역순 처리

동일한 트랜잭션에서 여러 테이블과 여러 행 레코드가 변경되면 롤백 중에 SQL을 역순으로 롤백해야 하는데 어떻게 해야 합니까? 추출된 SQL을 역순으로 저장하려면? 아이디어는 다음과 같습니다:
  • 레코드 각 행의 수정된 SQL을 분리한다

  • 독립적인 SQL을 역순으로 저장한다

정방향 트랜잭션 SQL 문이 변수 dml_sql에 저장된다고 가정하면, 역순으로 SQL은 undo_sql 변수에 롤백으로 저장할 수 있습니다. 행 레코드 수정 SQL을 순서대로 추출하여 Record_sql 변수에 저장한 후 undo_sql =record_sql + undo_sql 값을 할당한 후 Record_sql 변수를 공백으로 설정하면 역트랜잭션 내 SQL 실행이 구현될 수 있다. .

2.3 롤백 SQL 구문 분석

먼저 binlog의 로그 내용을 확인하여 행 수정의 SQL 상황이 다음과 같은지 확인합니다. 추출 과정에서 다음 사항에 주의해야 합니다.
  • 행 레코드의 열 이름 일치, binlog 파일에 저장된 열 번호는 직접 사용할 수 없습니다. WHERE 부분과 SET 부분 사이에 키워드나 기호를 추가해야 합니다. SQL을 INSERT

  • UPDATE SQL로 바꿔야 합니다. INSERT SQL을 DELETE로 바꿔야 하는데, 컬럼 일련번호를 로 바꿀 때 주의가 필요합니다. 열 이름.

  • 각 행 레코드 앞에는 'Table_map' 표시가 포함된 레코드 행이 있으며, 이는 이 레코드 행에서 어떤 테이블이 수정되었는지 나타냅니다. 이 프롬프트에 따라 binlog의 열 일련 번호를 다음으로 바꿀 수 있습니다. 열 이름.
  • 2.5 이스케이프 문자 처리
  • binlog 파일은 이스케이프 문자 문자열 저장소를 사용하여 공백이 아닌 공백 문자를 처리합니다. 예를 들어 테이블의 삽입 열에 있는 레코드에는 개행 문자가 포함되어 있지만 실제로는 binlog 파일에 있습니다. , 개행 연산을 대체하는 데 x0a가 사용되므로 데이터 롤백 과정에서 이스케이프 문자를 처리해야 합니다.

여기서 주목해야 할 점은 039의 이스케이프 문자가 esc_code 함수에서 균일하게 처리되지 않고 별도로 처리된다는 점입니다.

전송 문자 테이블은 아래와 같습니다.

2.6 타임스탬프 데이터 형식 처리

데이터베이스에 실제 저장되는 타임스탬프 값은 INT 형식이므로 필수 기능 사용 from_unixtime으로 변환합니다.

타임스탬프 열이 하나만 있는 테스트 테이블 tbtest를 생성합니다. 값을 저장한 후 binlog의 내용을 확인합니다. 구체적인 스크린샷은 다음과 같습니다.

행 레코드를 처리할 때 타임스탬프를 처리해야 합니다. 값을 입력하고 from_unixtime 함수 Convert를 추가합니다.

2.7 음수 처리
처음 코드를 작성할 때는 고려하지 않았습니다. 광범위한 테스트를 통해 모든 정수 데이터 유형이 음수를 저장할 때 최대 범위 값을 저장한다는 사실이 발견되었습니다. binlog가 이를 처리하는 방법에 대한 메커니즘은 그다지 명확하지 않습니다. 테스트 내용은 다음과 같습니다.

따라서 다양한 데이터 유형이 INT이고 VALUE가 음수인 경우 undo_sql을 실행하기 전에 이 범위 값을 제거해야 합니다.

2.8 단일 트랜잭션 행 레코드의 전체 SQL이 max_allowed_package 처리를 초과합니다
binlog를 분석한 후 두 가지 유형의 SQL이 저장됩니다. 하나는 행 레코드의 수정 SQL, 즉 dml_sql입니다. 행 레코드의 SQL, 즉 undo_sql을 롤백합니다. 이 두 SQL을 저장하는 열은 최대 4G의 콘텐츠를 저장할 수 있는 longtext라는 것을 코드에서 볼 수 있습니다. 그러나 MySQL에서는 단일 세션의 패킷 크기가 제한되어 있습니다. 기본 크기는 4Mb이고 최대 크기는 1G입니다. 따라서 이 스크립트를 사용하기 전에 binlog 파일을 저장하는 데이터베이스 인스턴스를 수동으로 설정하십시오. 온라인 데이터베이스 인스턴스:

set global max_allowed_packet = 1073741824 #나중에 수정해야 함

작동하면 어떻게 되나요? 그러면 롤백은 세그먼트 단위로만 가능합니다. 먼저 이 대규모 트랜잭션으로 롤백한 다음 이 대규모 트랜잭션을 별도로 실행한 다음 계속해서 롤백할 수 있습니다. 이 부분은 pymysql이나 소스 파일을 사용하여 실행할 수 없으므로 이 작업은 가능합니다. 수동으로만 수행할 수 있습니다. 유능한 사람들에게 이 논리 코드를 수정하도록 요청하십시오! ! !

2.9 Targeted Rollback

오작동에 대한 명확한 시점이 없고 간격이 하나만 있고 이 간격에 다른 테이블 작업이 있다고 가정하면 이때 binlog 파일을 분석할 때 필요한 것은 추가 -- 데이터베이스 옵션의 경우 먼저 동일한 데이터베이스에 있는 binlog 파일을 선택합니다.
여기서 처리하는 것은 이 간격의 dml_sql과 undo_sql을 데이터베이스 테이블에 저장한 후 롤백할 필요가 없는 트랜잭션과 롤백해야 할 나머지 트랜잭션을 삭제하는 것입니다. 롤백 작업을 다시 수행하십시오.

3 사용 지침

3.1 매개 변수 설명

이 스크립트에는 약간 더 많은 매개 변수가 있으며 --help를 사용하여 특정 지침을 볼 수 있습니다.

저는 매개변수를 분류할 때 다양한 색상을 사용하는 것을 좋아하므로(블링블링은 컬러풀하고 재미있고 에너지가 넘치게 보입니다) 이러한 매개변수를 색상별로 설명하겠습니다.
  • 노란색 영역: 이 6개의 매개변수는 binlog 파일을 분석하고 저장하기 위한 관련 값을 제공하며, 분석 결과를 저장하는 데이터베이스의 링크 방법, binlog 파일의 위치 및 파일 이름을 나타냅니다. 결과가 저장되는 테이블

  • 파란색 영역: 이 4개 매개변수는 온라인 데이터베이스 테이블 구조와 동일한 DB 인스턴스 연결 방법을 제공하며, 온라인 데이터베이스와 동일한 테이블 구조만 필요합니다. 반드시 마스터-슬레이브 데이터베이스여야 합니다.

  • 녹색 영역: 가장 중요 -a, 0 옵션은 binlog 파일만 분석함을 의미하고, 1은 롤백 작업만 수행함을 의미하며, 1을 실행하기 전에 0을 실행해야 합니다. ;

  • 보라색 영역: 예.

3.2 애플리케이션 시나리오 설명

  • 특정 기간 동안 전체 데이터베이스 롤백

    • 특정 기간의 모든 SQL 작업을 특정 시점으로 롤백해야 함

    • 이런 경우에는 백업파일 + binlog

    • 를 사용하면 대부분 해결됩니다. 하지만 이 스크립트도 요구사항을 충족할 수는 있지만 온라인에서 직접 실행하지 마시기 바랍니다. 우선 -a=0으로 분석 결과를 확인하시기 바랍니다. 일관성이 있는 경우 슬레이브 데이터베이스를 중지한 후 슬레이브 데이터베이스에서 실행하고 마지막으로 지정된 시점으로 복원되었는지, 데이터가 정상인지 확인하는 비즈니스 액세스를 개발합니다.

  • 특정 기간 동안 특정 테이블에서 일부 작업이 롤백됩니다.

    • 예를 들어 개발팀에서 일괄 업데이트 스크립트를 제출했는데 모든 테스트 수준에서 검증에 문제가 없었습니다. 온라인 실행을 위해 실행했지만, 실행 후 비즈니스가 테스트를 놓쳐 일부 필드가 업데이트되고 다른 비즈니스에 영향을 미치는 것으로 나타났습니다. 이제 일괄 업데이트된 테이블을 원래 행 레코드로 긴급히 롤백해야 합니다

    • 기술적인 측면에서만 처리할 수는 없고 종합적으로 고려해야 합니다

      • 이 경우 탭 A 테이블의 수정 작업을 어떻게 검토해야 할까요?

      • 개인적으로는 이 방법이 더 타당하다고 생각합니다. tabA 테이블의 데이터를 테스트 환경에 덤프한 후, binlog 파일 undo sql을 11시부터 12시까지 분석한 후 테이블을 롤백합니다. 11시 테스트 환경 이때 개발 및 사업부는 테스트 환경의 11시 데이터와 온라인상의 기존 데이터를 비교하여 어떤 행과 열을 온라인으로 롤백해야 하는지, 어떤 것이 롤백되어야 하는지 확인하게 됩니다. 필요하지 않으면 개발이 이를 제출하고 SQL 스크립트가 온라인으로 실행됩니다. 실제로 여기서 DBA는 새로운 환경에서 테이블 탭 A를 특정 시점으로 롤백하는 역할만 제공할 뿐, SQL 처리에 대한 직접온라인 롤백은 제공하지 않습니다.

  • 특정 SQL/일부 SQL 롤백

    • 이 상황은 비교적 일반적입니다. 업데이트 또는 삭제에는 where 조건 또는 where 조건 실행 오류가 없습니다.

    • 이 경우 다음을 찾으세요. 해당 트랜잭션의 롤백을 실행해 보세요. 네, 너무 소심하고 두렵습니다.

      3.3 테스트 케이스

3.3.1 데이터베이스 롤백 특정 시간에

9시 10분에서 9시 15분 사이에 데이터베이스의 모든 작업을 롤백해야 한다고 가정해 보겠습니다.

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 线上测试环境修改set global max_allowed_packet = 1073741824

  • 回滚数据,action=1

  • 线上测试环境修改set global max_allowed_packet = 4194304

 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1 54 \033[0m                        
 55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id<=tran_num:381             logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))382             undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;&#39;.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=&#39;&#39;387 388             for u_row in undo_rows:389                 try:390                     self.on_cur.execute(u_row[&#39;undo_sql&#39;])391                     self.on_mysqlconn.commit()392                 except Exception:393                     print(&#39;auto_id:&#39;,u_row[&#39;auto_id&#39;])394             id+=number395 396 397     def undo_file(self,number):398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408         for num in tran_rows:409             tran_num=num[&#39;table_rows&#39;]410 411         logging.info(&#39;copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql&#39;)412         logging.info(&#39;\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m&#39;.format(tran_num,int(tran_num/number)+1,number))413 414         with open(&#39;/tmp/flashback_undosql/undo_file_flashback.sql&#39;, &#39;w&#39;) as w_f:415             while id<=tran_num:416                 logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))417                 undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421                 for u_row in undo_rows:422                     try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427                     except Exception:428                         print('auto_id',u_row['auto_id'])429                     #time.sleep(2)430                 id+=number431 432     def do(self):433         if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436             #self.binlogdesc()437         elif self.action=='1':438             self.undosql(10000)439 440     def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()

 

위 내용은 mysql 기반 binlog 롤백 도구 예제에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.