官方文档:   https://docs.spring.io/spring-kafka/reference/html/

@KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

demo类:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
} }
配置类及注解:
@Configuration
@EnableKafka
public class KafkaConfig { @Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
} @Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} @Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}

最新文章

  1. PHP 函数(数组字符串)
  2. HDU 1242 (BFS搜索+优先队列)
  3. HDU 2087 kmp模板题
  4. window.opener用法
  5. MyEclipse server窗口 Could not create the view: An unexpected exception was thrown 错误解决
  6. ASP.NET页面生命周期总结(完结篇)
  7. python logging模块使用
  8. 关于微软公有云Azure会计标准
  9. 关于post与get请求参数存在特殊字符问题
  10. 【ASP.NET Core】如何隐藏响应头中的 “Kestrel”
  11. SQL Server - DISTINCT
  12. bat路径中有空格
  13. Python的迭代器和生成器
  14. 【详解】WebSocket相关知识整理
  15. 表id关联数据获取至页面,制作下拉框多选进行数据多项获取(字段处理)
  16. shutdown命令详解
  17. 用C代码简要模拟实现一下RPC(远程过程调用)并谈谈它在代码调测中的重要应用【转】
  18. Navicat安装激活
  19. [转载]字符串匹配的Boyer-Moore算法
  20. Hadoop和HBase中出现 ssh登录 The authenticity of host 192.168.0.xxx can&#39;t be established.

热门文章

  1. mysql大数据的分表
  2. thinkphp自带的验证码出现的问题
  3. 【转】Linux下history命令用法
  4. &lt;stddef.h&gt;
  5. 实现div毛玻璃背景
  6. RecyclerView中item无法充满的问题
  7. 基于TensorFlow的车牌号识别系统
  8. cms中某些标题链接的单独写法
  9. 【PL/SQL】触发器示例:记录加薪
  10. html5——3D案例(立方体)