Spring for Apache Kafka @KafkaListener使用及注意事项
官方文档: https://docs.spring.io/spring-kafka/reference/html/
@KafkaListener
The @KafkaListener
annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
If, say, six TopicPartition
instances are provided and the concurrency
is 3
; each container gets two partitions. For five TopicPartition
instances, two containers get two partitions, and the third gets one. If the concurrency
is greater than the number of TopicPartitions
, the concurrency
is adjusted down such that each container gets one partition.
You can now configure a KafkaListenerErrorHandler
to handle exceptions. See Handling Exceptions for more information.
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId
on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners. To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.
示例:
demo类:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
配置类及注解:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
最新文章
- PHP 函数(数组字符串)
- HDU 1242 (BFS搜索+优先队列)
- HDU 2087 kmp模板题
- window.opener用法
- MyEclipse server窗口 Could not create the view: An unexpected exception was thrown 错误解决
- ASP.NET页面生命周期总结(完结篇)
- python logging模块使用
- 关于微软公有云Azure会计标准
- 关于post与get请求参数存在特殊字符问题
- 【ASP.NET Core】如何隐藏响应头中的 “Kestrel”
- SQL Server - DISTINCT
- bat路径中有空格
- Python的迭代器和生成器
- 【详解】WebSocket相关知识整理
- 表id关联数据获取至页面,制作下拉框多选进行数据多项获取(字段处理)
- shutdown命令详解
- 用C代码简要模拟实现一下RPC(远程过程调用)并谈谈它在代码调测中的重要应用【转】
- Navicat安装激活
- [转载]字符串匹配的Boyer-Moore算法
- Hadoop和HBase中出现 ssh登录 The authenticity of host 192.168.0.xxx can&#39;t be established.