kafka拦截器原理|案例实操
拦截器原理
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
(4)close:关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
最新文章
- WAMPServer安装和配置
- 【整理】--【GPIO】OK6410
- 睡觉问题早晚成为我顶头疼的问题。。。-PHP
- phonegap 框架详解
- oracle备忘
- CAGradientLayer
- 矩阵的QR分解(三种方法)Python实现
- 【斜率DP】BZOJ 3675:[Apio2014]序列分割
- MySQL相关知识
- FindControl什么时候才会使用ObjectFromHWnd函数呢?——VCL很难调试,加一个日志函数,记录时间
- 转:使用C#的HttpWebRequest模拟登陆网站
- 如何设置 Internal 类,方法,属性对其他项目可见
- git寻根——^和~的区别(转)
- SQL Server的数据加密简介
- TCP/IP协议栈 --- 网络层(IP 首部 和分片)
- Beta之后的想法
- 3-(基础入门篇)稍微了解一下(需要知道的关于Lua的一些基本的知识)
- PHP 操作MySQL时mysql_connect( )和Mysqli( )的两种报错机制
- 通过Intel XDK编写跨平台app(二)
- Ubuntu : 在主机和虚拟机之间传文件