一简介

python-mysql-replication 是由python实现的 MySQL复制协议工具,我们可以用它来解析binlog 获取日志的insert,update,delete等事件 ,并基于此做其他业务需求。比如数据更改时失效缓存,监听dml事件通知下游业务方做对应处理。

其项目信息

网址     http://www.github.com/noplay/python-mysql-replication
官方文档 https://python-mysql-replication.readthedocs.io

二 实践

2.1 安装配置

获取源代码

git clone http://www.github.com/noplay/python-mysql-replication

使用pip 安装

pip install mysql-replication

权限:

可以直接使用复制账号也可以使用其他账号,但是该账号必须SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'replicator'@'%' IDENTIFIED BY 'xxxxx';

数据库日志相关的参数设置如下:

log_bin=on ,binlog_format=row,binlog_row_image=FULL

2.2 核心类介绍

python-mysql-replication 的入口是类BinLogStreamReader(),我们在使用该工具时需要实例化一个BinLogStreamReader()对象 stream,BinLogStreamReader 通过 ReportSlave 向主库注册作为一个slave角色,用于接受MySQL的binlog广播。有兴趣的可以研究其代码具体实现。

该实例提供解析 binlog 各种事件的集合,每个事件也是一个对象。

初始化BinLogStreamReader()实例需要使用的参数如下:

connection_settings: 数据库的连接配置信息
resume_stream:从位置或binlog的最新事件或旧的可用事件开始
log_file:设置复制开始日志文件
log_pos:设置复制开始日志pos(resume_stream应该为true)
auto_position:使用master_auto_position gtid设置位置
blocking:如果设置为True,会持续监听binlog事件,如果设置为False 则会一次性解析所有可获取的binlog。
only_events:只解析指定的事件 比如only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],参数类型是一个数组。 #### 以上是比较常用的参数 ignored_events:设置哪些事件可以被忽略。也是一个数组。 only_tables,ignored_tables,only_schemas,ignored_schemas ##根据字面意思理解 freeze_schema:如果为true,则不支持ALTER TABLE速度更快。
skip_to_timestamp:在达到指定的时间戳之前忽略所有事件,否则会解析所有可访问的binlog
report_slave:用于向主库注册SHOW SLAVE HOSTS中slave,该值可以是字典比如{'hostname':'127.0.0.1', 'username':'root', 'password':'rep', 'port':3306} slave_uuid:在SHOW SLAVE HOSTS中slave_uuid。
fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常。

2.3 如何使用呢?

最简单的用法 脚本名 pyreplica.py

from pymysqlreplication import BinLogStreamReader
MYSQL_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": ""
} def main():
# server_id is your slave identifier, it should be unique.
# set blocking to True if you want to block and wait for the next event at
# the end of the stream
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
server_id=3,
blocking=True) for binlogevent in stream:
binlogevent.dump()
stream.close() ###如果blocking=True ,改行记录可以不用。
if __name__ == "__main__":
main()

开启两个窗口,一个窗口执行,另外一个窗口操作mysql 写入或者修改数据

python pyreplica.py

输出如下:

=== GtidEvent ===
Date: 2019-06-25T17:41:34
Log position: 339
Event size: 42
Read bytes: 25
Commit: False
GTID_NEXT: cc726403-93d1-11e9-90b7-ecf4bbde7778:13
()
=== QueryEvent ===
Date: 2019-06-25T17:41:34
Log position: 411
Event size: 49
Read bytes: 49
Schema: test
Execution time: 0
Query: BEGIN
()
=== TableMapEvent ===
Date: 2019-06-25T17:41:34
Log position: 456
Event size: 22
Read bytes: 21
Table id: 126
Schema: test
Table: x
Columns: 2
()
=== WriteRowsEvent ===
Date: 2019-06-25T17:41:34
Log position: 500
Event size: 21
Read bytes: 12
Table: test.x
Affected columns: 2
Changed rows: 1
Values:
--
('*', u'a', ':', 1)
('*', u'id', ':', 18)
()
=== XidEvent ===
Date: 2019-06-25T17:41:34
Log position: 531
Event size: 8
Read bytes: 8
Transaction ID: 1293393
()

2.3 拓展

基于该工具提供的日志事件解析我们可以做很多事情,比较有名的工具 binlog2sql 利用该工具解析binlog 做数据回滚 。

mysql-replication.py

#!/usr/bin/env python
# -*- coding: utf-8 -*- from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import sys
import json mysql_settings = {'host': '127.0.0.1','port': 3306,
'user': 'replica', 'passwd': 'xxxx'}
def main(): stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=1,
blocking=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent]) for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["before_values"] = dict(row["before_values"].items())
event["after_values"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print json.dumps(event)
sys.stdout.flush() if __name__ == "__main__":
main()

执行脚本结果 如下图

除了解析binlog,我们还可以用python-mysql-replication 做数据全量加增量迁移。比如仅仅迁移某些大表而不是整个库的时候,可以用到。有兴趣的朋友可以想想大概的算法。

最新文章

  1. Laravel5.0学习--01 入门
  2. windows下配置Faster-RCNN
  3. 修改dedecms默认文章来源 "未知"改为关键词
  4. 【C语言入门教程】2.4 浮点型数据
  5. cocos2dx从入门到精通课程
  6. CVE漏洞爬虫java代码依赖-TestNG
  7. jQuery技术内幕预览版.pdf2
  8. GPU 的硬件基本概念,Cuda和Opencl名词关系对应
  9. H.数7(模拟)
  10. 载入OpenSSL的动态库——学会使用tryToLoadOpenSslWin32Library和QPair
  11. 高性能JSON库---FastJson(阿里巴巴)
  12. 候选键(unique)
  13. 互联网创业应该如何找到创意 - RethinkDB创始人Slava Akhmechet的几点建议
  14. Web 版 powerdesigner (Canvas) 技术分享
  15. Ansible - 简介和应用自动化基础实践
  16. 强大的oracle分析函数
  17. form表单利用iframe高仿ajax
  18. [工作日志]2018-11-15 主要: 改bug
  19. windows注册表
  20. arm linux下编译库System.Net.Primitives.dll和System.Xml.XmlSerializer.dll

热门文章

  1. 洛谷P2265 路边的水沟
  2. Excel催化剂开源第43波-Excel选择对象Selection在.Net开发中的使用
  3. JVM调优之探索CMS和G1的物理内存归还机制
  4. 从后端到前端之Vue(三)小结以及一颗真实的大树
  5. Java EE.Servlet.会话管理
  6. codemirror使用
  7. 并查集_HDU 1232_畅通工程
  8. 单页面(如react,vue)网站的服务器渲染 SSR 之 SEO 大杀器 Rendertron
  9. html5教程 《实用技巧》—让你的网站变成响应式的3个简单步骤
  10. java中dao层和service层的区别是什么