一、前言

Binlog是MySQL数据库的二进制日志,用于记录用户对数据库操作的SQL语句(除了数据查询语句)信息。而Binlog格式也有三种,分别为STATEMENT、ROW、MIXED。STATMENT模式基于SQL语句的复制,每一条会修改数据的SQL语句会记录。ROW模式除了记录SQL语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,会占用较多的空间。MIXED比较灵活的记录,当遇到表结构变更的时候,就会记录为STATMENT模式,当遇到了数据更新或者删除情况下就会变为ROW模式。Binlog三个用途分别为数据恢复、复制、审计。

Canal是阿里MySQL数据库Binlog的增量订阅&消费组件 ,基于数据库Binlog可以监控数据库数据的变化,进而用于数据同步等业务。分为Canal Server与Canal Client,前者读取Binlog解析后存储,后者连接前者消费。

二、安装搭建

1、下载安装包。并上传至服务器中。下载地址为:https://github.com/alibaba/canal/releases

2、将home文件夹中的压缩包解压至安装路径(如下图所示)。

 tar -xzf /home/canal.deployer-1.1.3.tar.gz -C /usr/java/canal

3、进入canal文件夹,修改配置文件(如下图所示)。

 vi conf/example/instance.properties

 canal.instance.dbUsername=root #数据库账号
canal.instance.dbPassword=1234 #数据库密码
canal.instance.defaultDatabaseName = corporate_genealogy #数据库
canal.instance.connectionCharset = UTF-8 #数据库编码

4、配置MySQL数据库,开启Binlog,并选择模式为ROW(如下图所示)。

 vi /etc/my.cnf

 #canal
log-bin=mysql-bin
binlog-format=ROW
server_id=1

5、数据库创建canal用户,赋予权限,并刷新(如下图所示)。

ps:这里遇到一个异常信息,是因为数据库密码过于简单,不符合密码策略,需要修改一下策略。。。

 mysql -uroot -p1234
 SHOW VARIABLES LIKE 'validate_password%';
 set global validate_password_policy=LOW;
 set global validate_password_length=4;
 create user canal identified by 'canal';
 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
 FLUSH PRIVILEGES;

6、退出并重启MySQL。

 exit;
 sudo service mysqld restart;

7、进入canal的bin文件夹,启动canal-server。

 ./startup.sh

8、查看logs文件中日志是否启动成功(如下图所示)。

三、客户端代码检测

ps:需要注意的是服务器防火墙需打开对应端口号,这里是11111。

1、添加Maven依赖

 <!-- Canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>

2、测试类代码

 import java.net.InetSocketAddress;
import java.util.List; import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message; public class TestCanal { public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服务器IP", 11111),
"example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
} private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
} RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
} EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
} private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
} }

3、Navicat 连接对应数据库进行一些添加删除更新操作,控制台输出如下图所示。

四、总结展望

考虑到Canal的堆积能力并不强。堆积数据到10W+时,速度会变慢,并会出现假死现象。因此介入消息中间件MQ非常有必要,解决堆积能力问题,可以延后消费,能够方便的得到积压数据,进行监控报警。

本文部分学习参考了:https://www.cnblogs.com/java-spring/p/8930740.html

至此是关于介绍在Linux系统中阿里Canal中间件的初步搭建和使用,后续会介绍配合消息中间件等方式处理数据同步及其它业务逻辑。

如有疏漏错误之处,还请不吝赐教!

最新文章

  1. ssh框架的工作原理
  2. sql server 触发器应用 insert
  3. 用 MyEclipse 开发 Spring 入门操作
  4. [转]World Wind Java开发之五——读取本地shp文件
  5. Spring web Flow2学习笔记
  6. linux samba.tar.gz安装和配置
  7. iOS 中的 block 是如何持有对象的
  8. PYTHON的CGIServer的进化
  9. Mysql时间戳开始时间1970-01-01 00:00:00和PHP date慢8小时
  10. Hql中占位符(转)
  11. Mybatis简单的入门之增删改查
  12. jmeter配置、安装
  13. jquery $(document).ready() 与window.onload的区别(转)
  14. 拾遗与填坑《深度探索C++对象模型》3.3节
  15. Mac Segmentation fault: 11
  16. Windows10下使用python+selenium实现谷歌浏览器的自动控制
  17. SpringBoot统一处理异常
  18. 一键解包/打包boot.img/recovery.img工具(高通/MTK双版 支持android 5.1以上)
  19. solr特点三: defType(查询权重排序)
  20. Many-to-many relationships in EF Core 2.0 – Part 3: Hiding as ICollection

热门文章

  1. day28 作业
  2. 接口测试基础——session认证和token认证
  3. (四)学习了解OrchardCore笔记——将模块的名字添加到程序集的ModuleName
  4. 随笔java面试基础
  5. Python 简明教程 --- 23,Python 异常处理
  6. C++语法小记---抽象类和接口
  7. MySQL主从分离实现
  8. Azure Load Balancer(一) 为我们的Web项目提供负载均衡
  9. 第四课 OOP封装继承多态解析,接口抽象类选择 2019-04-21
  10. animation动画汇总(一阶段项目)