一、Canal介绍

  Canal的原理就是它自己伪装成slave, 向mysql发送dump协议,MySQL master接收到dump请求之后推送binlog文件给slave, 也就是canal。  

二、Canal安装

  1. 下载Canal

   wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

  2. 解压到/opt/softwares/canal目录, 解压完之后如下图所示:

  3. 配置instance

  4. 修改canal.properties

三、Mysql 安装

  1、mysql 安装

    yum install mysql

    yum install mysql-server

  2、启动mysql

    /etc/init.d/mysqld start 或者sevice mysqld start

  3、设置root用户密码

    mysqladmin -u root password '123456'

  4、登录msyql

    mysql -uroot -p123456

  5、检查并开启binlog复制功能及binlog模式是否为ROW模式

    参考: binlog详解

四、Canal抽取binlog

  Canal只是伪装成slave抽取binlog,Canal拿到binlog之后还需要交给业务方去做响应的处理,那么怎么去交给业务方呢?一般都是Canal获取到binlog之后写到kafka里,业务方订阅kafka topic消费binlog,完成业务逻辑处理。

  但是Canal不能直接写Kafka, 所以还需要有个client连接Canal,Canal获取binlog之后交给Client, Client在往Kafka里写binlog消息,Client代码如下:

  

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress;
import java.util.List; public class CanalClientExample { public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.41.254", 11111), "example", "canal", "canal");
try {
int batchSize = 1000; connector.connect();
connector.subscribe("zhengxinv6\\..*");
connector.rollback(); while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize); long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} continue;
} System.out.println("batchId = [" + batchId + "]");
printEntry(message.getEntries()); connector.ack(batchId); //提交确认
//connector.rollback(batchId);
} } finally {
connector.disconnect();
}
} private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
} CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException(
"ERROR ## parser of eromanga-event has an error,data:"
+ entry.toString(), e);
} CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset()+"",
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType)); for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
} private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue()
+ " update=" + column.getUpdated());
}
}
}

五、Canal使用过程出现的问题及解决方法

  参考:canal报错解决方法

  

参考:https://www.jianshu.com/p/6299048fad66

最新文章

  1. Ubuntu Server无线连接配置
  2. Asp.Net Core(.net内核)
  3. C# .Net 使用zxing.dll生成二维码,条形码
  4. macOS Sierra安装Apache2.4+PHP7.0+MySQL5.7.16
  5. ## Python中的Package和Jupyter中import包问题
  6. 读书笔记-你不知道的JS中-函数生成器
  7. Linux的基本命令(CentOS)
  8. 可达用户投资额的计算(Java)
  9. Python全站之路----常用模块----configparser模块
  10. 一个PHP session的误区,自己留着长记性看看
  11. Python入门 函数式编程
  12. pixi.js(入门)
  13. Python字符串解析方法汇总
  14. java.awt.headless 模式
  15. Android 获取手机内部信息,内核版本、基带版本、内部版本等
  16. IOS测试-Fastmonkey
  17. mysql修改root密码和对连接授权
  18. # 第二周c实践所遇见的问题
  19. Luogu 4284 [SHOI2014]概率充电器
  20. CCNA 课程 一

热门文章

  1. js模拟冒泡排序动态图(1轮)
  2. Qt 进程和线程之一:运行一个进程和进程间通信
  3. Unity Shader入门精要学习笔记 - 第14章非真实感渲染
  4. yii2 操作数据库
  5. 爬虫的两种解析方式 xpath和bs4
  6. 详解HTML中的表格标签
  7. Struct和Class的区别 转
  8. uvm_globals——告诉这个世界我爱你
  9. jmeter并发定时器
  10. 如何修改tomcat的启动方式为 run