一、配置文件

application.yml

spring:
kafka:
one:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
enable-auto-commit: true
two:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
enable-auto-commit: true

二、生产者、消费者配置

2.1 第一个 Kafka

@EnableKafka
@Configuration
public class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.one.consumer.enable-auto-commit}")
private boolean enableAutoCommit; @Bean
public KafkaTemplate<String, String> kafkaOneTemplate() {
return new KafkaTemplate<>(producerFactory());
} @Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
} private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
} public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 不能写成 1
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
} private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}

2.2 第二个 Kafka

@Configuration
public class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.two.consumer.enable-auto-commit}")
private boolean enableAutoCommit; @Bean
public KafkaTemplate<String, String> kafkaTwoTemplate() {
return new KafkaTemplate<>(producerFactory());
} @Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
} private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
} public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
} private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}

三、生产者

@Controller
public class TestController { @Autowired
private KafkaTemplate kafkaOneTemplate;
@Autowired
private KafkaTemplate kafkaTwoTemplate; @RequestMapping("/send")
@ResponseBody
public String send() {
final String TOPIC = "TOPIC_1";
kafkaOneTemplate.send(TOPIC, "kafka one");
kafkaTwoTemplate.send(TOPIC, "kafka two"); return "success";
}
}

四、消费者

@Component
public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); final String TOPIC = "TOPIC_1"; // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
public void listenerOne(ConsumerRecord<?, ?> record) {
LOGGER.info(" kafka one 接收到消息:{}", record.value());
} @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
public void listenerTwo(ConsumerRecord<?, ?> record) {
LOGGER.info(" kafka two 接收到消息:{}", record.value());
}
}

运行结果

c.k.s.consumer.KafkaConsumer             :  kafka one 接收到消息:kafka one
c.k.s.consumer.KafkaConsumer : kafka two 接收到消息:kafka two

完整代码:GitHub

最新文章

  1. [JS]笔记13之Date对象
  2. Sql Server 2008 数据库附加失败提示9004错误解决办法
  3. 创建一个Android项目
  4. HW机试字符串压缩java(1)
  5. C# json与对象之间的相互转换
  6. CSS制作水平垂直居中对齐
  7. 玩转Web之Json(一)-----easy ui+ajax + json 中关于Json的解析问题
  8. springmvc继承activemq(原创)
  9. webservice 尝试加载 Oracle 客户端库时引发 BadImageFormatException。如果在安装 32 位 Oracle 客户端组件的情况下运行,将出现此问题
  10. C语言学习第六章
  11. 系统引导器GRUB
  12. this与base关键字
  13. linux:关于Linux系统中 CPU Memory IO Network的性能监测
  14. (后端)excel设置日期格式的步骤
  15. BZOJ.4298.[ONTAK2015]Bajtocja(Hash 启发式合并)
  16. JVM各垃圾收集器对比
  17. .NetCore中EFCore for MySql整理(二)
  18. JAVA eclipse 安装lombok
  19. myeclipse使用小技巧
  20. C#对json数据的解析

热门文章

  1. React react-redux props或state更新视图无法重新渲染问题
  2. 关于保存批量数据进入mysql
  3. 【Kata Daily 191010】Grasshopper - Summation(加总)
  4. ssh连接缓慢的问题分析
  5. MQ-gogogo
  6. java中保留两位小数的方法
  7. Vue3教程:用 Vue3 开发小程序,这里有一份实际的代码案例!
  8. 92. Reverse Linked List II 翻转链表II
  9. Java基础 之一 基本知识
  10. (3)ElasticSearch在linux环境中安装与配置head插件