使用flink FlinkKafkaProducer 往kafka写入数据的时候要求使用EXACTLY_ONCE语义

本以为本以为按照官网写一个就完事,但是却报错了

代码

package com.meda.test

import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaProducer, KafkaSerializationSchema}

//创建一个DataStream
val dStream: DataStream[MapDt] = ... //kafka配置
val kafkaPro:Properties = ... //创建FlinkKafkaProducer 指定EXACTLY_ONCE
val kafkaSink: FlinkKafkaProducer[ResultDt] = new FlinkKafkaProducer[ResultDt]("top[ic", new ResultDtSerialization("flink-topic-lillcol"), kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE) case class ResultDt(id: String, date_h: String, star: String, end: String, watermark: String, pv: Long, uv: Long) class ResultDtSerialization(topic: String) extends KafkaSerializationSchema[ResultDt] {
override def serialize(t: ResultDt, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](topic, t.toString.getBytes())
}
}

遇到问题

FlinkKafkaProducer.Semantic指定为FlinkKafkaProducer.Semantic.AT_LEAST_ONCE时,执行没有问题。

FlinkKafkaProducer.Semantic指定为FlinkKafkaProducer.Semantic.EXACTLY_ONCE时,执行报下面的错误:

org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:984)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)
[INFO ] 2019-12-24 15:25:35,212 method:org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1324)

错误大意是:

事务超时大于broker允许的最大值(transaction.max.timeout.ms)

一开始想都没想去修改transaction.max.timeout.ms的值,但是没有解决问题。


解决办法

官网关于Kafka Producers and Fault Tolerance有一段说明

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes.
This property will not allow to set transaction timeouts for the producers larger than it’s value.
FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.

Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes

生产者设置事务超时不允许大于这个值,这个属性不允许为大于其值的

但是在默认的情况下,FlinkKafkaProducer011设置事务超时属性(transaction.timeout.ms)为1 hour, 超过默认transaction.max.timeout.ms15 minutes。

因此在使用EXACTLY_ONCE语义的时候需要增大transaction.max.timeout.ms的值。

按照个和说法我只要transaction.max.timeout.ms增加到大于1 hour(即3 600 000ms)‬以上就可以了,但是经过测试还是不行。

最后通过修改生产者的事务超时属性transaction.timeout.ms解决问题

transaction.timeout.ms从1hour降到5 minutes 成功解决问题。

//增加配置属性操作如下:
kafkaPro.setProperty("transaction.timeout.ms",1000*60*5+"")

本文原创文章,转载请注明出处!!!

最新文章

  1. Qt之Qwt学习之安装
  2. tinyfox for linux 独立版 fox.sh
  3. JMeter教程01-下载和安装
  4. Page Security
  5. php中count获取多维数组长度的方法
  6. 【原创】如何在Android中为TextView动态设置drawableLeft等
  7. 将服务器返回的URL或者网址截取出来特定的字符,然后将字符返回,一般根据返回的字符判断用户是否登录等即时状态
  8. 作品第一课----循环改变DIV颜色
  9. 通过 iTextSharp 实现PDF 审核盖章
  10. android 中文件加密 解密 算法实战
  11. IntelliJ IDEA如何设置头注释,自定义author和date
  12. AI2(App Inventor 2)离线版服务器单机版
  13. Mad Libs 游戏
  14. 【Teradata Utility】系统工具使用
  15. 16解释器模式Interpreter
  16. NPOI将DataGridView中的数据导出+导出Chart图表图片至Excel
  17. 立个FLAG
  18. linux环境下的c++编程
  19. PHP对象的遍历
  20. 一次简单的C++编译错误

热门文章

  1. 【2017中国大学生程序设计竞赛 - 网络选拔赛】Friend-Graph
  2. 误将SELINUXTYPE看成SELINUX后,将其值改为disabled。导致操作系统服务启动,无法进入单用户模式
  3. Delphi 虚拟桌面
  4. NX二次开发-Block UI C++界面Body Collector(体收集器)控件的获取(持续补充)
  5. 虚拟机安装(Cent OS)
  6. Jmeter断言-所有断言讲解
  7. delphi基础篇之数据类型之一:1.简单类型(Simple)
  8. 3.2 Redux TodoApp
  9. 【转】/bin/bash^M: bad interpreter: 没有那个文件或目录
  10. Intel Pin基础