canal使用非flatmessage方式获取mysql bin log日志发至kafka比直接发送json效率要高很多,数据发到kafka后需要实时解析为json,这里可以使用strom或者flink,公司本来就是使用strom解析,但是在吞吐量上有瓶颈,优化空间不大。所以试一试通过flink来做。

非flatmessage需要使用特定的反序列化方式来处理为Message对象,所以这里需要自定义一个类

 /**
* 反序列化canal binlog
*
* @author @ 2019-02-20
* @version 1.0.0
*/
@PublicEvolving
public class MessageDeserializationSchema implements KeyedDeserializationSchema<Message> { private static final long serialVersionUID = -678988040385271953L;
private MessageDeserializer mesDesc; @Override
public Message deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
try {
if (mesDesc == null) {
mesDesc = new MessageDeserializer();
}
Message result = mesDesc.deserialize(topic, message);
//result.setMetaData(topic, partition, offset);
return result;
} catch (Exception e) {
System.out.println(e);
}
return null;
} @Override
public boolean isEndOfStream(Message nextElement) {
return false;
} @Override
public TypeInformation<Message> getProducedType() {
return getForClass(Message.class);
}
}

然后就可以获取到DataStream[Message],但是在做算子操作的时候就报错了,意思是不支持kryo序列化

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
props_ (com.alibaba.otter.canal.protocol.CanalEntry$Header)
header_ (com.alibaba.otter.canal.protocol.CanalEntry$Entry)
entries (com.alibaba.otter.canal.protocol.Message)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 29 more

参考官方文档,需要注册类的序列化方式:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html

  //message 不支持kryo序列化 不然在map flatmap的时候报错

  env.getConfig.addDefaultKryoSerializer(classOf[Message], classOf[StringSerializer])

如果在算子之间会有其他对象传输的话,也同样需要注册。最后通过测试,flink解析的量大概在单个solt 1W+/s 左右。

最新文章

  1. mybatis 使用经验小结
  2. ThinkPHP 自动验证相关注意
  3. cPage分页,asp.net自定义分页,url传值分页,支持datalist、gridview、Repeater等
  4. java中wait/notify机制
  5. mysql主从复制 转
  6. 给groupBox添加滚动条
  7. Socket 通讯
  8. linux 查看剩余内存数
  9. 【iOS】Quartz2D截屏
  10. [Algorithms(Princeton)] Week1 - Percolation
  11. 对比C++中的指针和引用
  12. 通过dbcc page来查看表中的数据
  13. 解决linux下oracle进入sqlplus环境中后退键显示^H、上下键无效与ctrl+l无法清屏等问题【weber出品必属精品】
  14. iScroll的简单使用
  15. 我的世界 ParaCraft 结合开源地图 OpenStreetMap 生成3D校园的方法简介
  16. 使用echarts-for-react 绘制折线图 报错:`series.type should be specified `
  17. JAVA递归实现全排列
  18. Unity3D手机斗地主游戏开发实战(02)_叫地主功能实现
  19. fiddler 抓手机包 and post get
  20. macbook hive安装

热门文章

  1. 一个Ajax读数据并使用IScroll显示辅助类
  2. Centos7添加新源
  3. 简单文本悬浮div提示效果
  4. 查表法解决calendar中月份及星期初始值为0的情况。
  5. imageview设置图片时超长超大图片超出限制(OpenGLRenderer: Bitmap too large to be uploaded into a texture (996x9116, max=4096x4096))
  6. Dubbo+Nacos做注册中心和配置中心
  7. 论文笔记:Fast(er) RCNN
  8. c++ Qt向PHP接口POST文件流
  9. 中国交建 WAF 基础平台 http://waf.ccccltd.cn/
  10. 强行杀windows服务