Canal的简单使用


canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据,用于实际工作中,比较实用,特此记录一下

Canal简介

canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务

基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息

keyword:数据库同步,增量订阅&消费。

首先,下载canal安装包:

https://github.com/alibaba/canal/releases

安装步骤:

在opt下(或者其他目录)创建一个canal文件夹

因为canal解压后并没有整体的文件夹,是把里面的内容一下解压出来的

然后进入canal文件夹,将安装包canal.deployer-1.0.24.tar.gz传输进来

解压:

tar -zxvf canal.deployer-1.0..tar.gz

可以删除安装包canal.deployer-1.0.24.tar.gz了

修改canal配置文件

vi conf/example/instance.properties

修改红框内的内容 
canal.instance.dbUsername = ****          #数据库用户名 
canal.instance.dbPassword = ****           #数据库密码 
canal.instance.defaultDatabaseName = ****   #指定需要同步的数据库 
canal.instance.connectionCharset = UTF-8      #指定编码方式

然后保存:

esc

:wq

配置mysql数据库 

vi /etc/my.cnf 

添加以下三行内容,如果原来存在,则不需要添加,只需对当前配置项进行修改即可

log-bin=mysql-bin             #添加这一行就ok 
binlog-format=ROW           #选择row模式 
server-id=1       #配置mysql replaction需要定义,不能和canal的slaveId重复

然后保存:

esc

:wq

配置canal用户 

用root用户登录mysql:

mysql -uroot -p

回车,输入密码

创建“canal”用户:

create user canal identified by 'canal';

有可能会出现如下情况:

直接刷新:

FLUSH PRIVILEGES;

然后再次创建“canal”用户

为“canal”用户赋予相应权限:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

然后刷新权限:

FLUSH PRIVILEGES;

退出mysql

exit;

重启mysql服务

sudo service mysqld restart

cd到bin目录,启动canal-server

cd bin/

./startup.sh

查看启动状态:

tail -f -n  ../logs/canal/canal.log

用客户端代码进行检测

package com.test;

import java.net.InetSocketAddress;
import java.util.List; import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message; public class TestCanal { public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("***canal所在的主机ip***", 11111),
"example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
// System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
} connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
} System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
} private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
} RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
} EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
} private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
} }

直接启动mian函数

然后任意选择一张表,插入数据库一条数据

控制台输出内容

更多信息可以参考:

谈谈对Canal( 增量数据订阅与消费 )的理解

http://www.importnew.com/25189.html

canal集群部署与java接入

https://blog.csdn.net/my201110lc/article/details/78836270

最新文章

  1. logging 日志模块学习
  2. 第五百八十五天 how can I 坚持
  3. 常用天气预报API接口整理(转)
  4. UIStepper步进器 ——事件驱动型控件,(一个+和-按钮的)
  5. JDK中的Timer和TimerTask详解(zhuan)
  6. kettle的job
  7. codeforces 392B Tower of Hanoi
  8. WPF的MVVM模式
  9. javascript face ++
  10. Android数据库升级、降级、创建(onCreate() onUpgrade() onDowngrade())[4]
  11. bzoj 3238 Ahoi2013 差异
  12. Oracle查询指定某一天数据,日期匹配
  13. cookie 和session 详解
  14. Spring Data JPA方法定义规范
  15. Linux内核哈希表分析与应用
  16. 20145127 《Java程序设计》第四次实验报告
  17. go语言之进阶篇面向对象编程
  18. Specified VM install not found: type Standard VM, name jdk1.7
  19. Android基础总结(三)SQLite,ListView,对话框
  20. MYSQL--表分区、查看分区(转)

热门文章

  1. Solaris 11 配置IP地址
  2. Python合并多个Excel数据
  3. 使用ML.NET实现白葡萄酒品质预测
  4. SignalR学习笔记(二)高并发应用
  5. 华为oj之字符串反转
  6. C++、Java语法差异对照表
  7. ConcurrentModificationException 异常处理
  8. Android--逐帧动画FrameAnimation
  9. Spring Boot分布式系统实践【1】-架构设计
  10. 【深度学习与TensorFlow 2.0】卷积神经网络(CNN)