一、引入依赖

 <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>

二、编写配置文件

spring:
application: kafka
kafka:
bootstrap-servers: ip:9092
producer:
retries: 1
batch-size: 16384
buffer-memory: 1024000
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
enable-auto-commit: true
auto-commit-interval: 10
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

三、注入生产者

@Configuration
public class ProductorConfiugutration { @Value("${spring.kafka.bootstrap-servers}")
private String server;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerlializer; @Bean("peopleTemplate")
public KafkaTemplate<String, String> getPeopleTemplate() {
KafkaTemplate<String, String> peopleTemple = new KafkaTemplate<String, String>(getFactory());
peopleTemple.setDefaultTopic("topic.people");
return peopleTemple;
} public ProducerFactory getFactory() {
return new DefaultKafkaProducerFactory(getProductorConfig());
} public Map<String, Object> getProductorConfig() {
Map<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
hashMap.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
hashMap.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
hashMap.put(ProducerConfig.RETRIES_CONFIG, retries);
hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerlializer);
return hashMap;
}
}

四、注入消费者

@Configuration
public class ConsumerConfiguration { @Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String interval;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String rest;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Value("${spring.kafka.bootstrap-servers}")
private String server; @Bean("userConsumer")
public KafkaConsumer<String,String> getUserConsumer(){
List<String> topic=new ArrayList<String>();
topic.add("topic.people");
KafkaConsumer<String,String> userConsumner=new KafkaConsumer<String, String>(getConfig());
userConsumner.subscribe(topic);
return userConsumner;
} public ConsumerFactory<String,String> getFaactory() {
return new DefaultKafkaConsumerFactory(getConfig());
} public Map<String, Object> getConfig() {
Map<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, rest);
hashMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, interval);
hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
hashMap.put(ConsumerConfig.GROUP_ID_CONFIG,"2");
return hashMap;
}
}

五、发送消息和消费消息

@RestController
public class KafkaController { @Resource
KafkaTemplate<String,String> peopleTemplate; @Resource
KafkaConsumer<String,String> userConsumer; @RequestMapping("/send")
public String sendMessage(){ People people = new People();
people.setAddress("北京通州");
people.setAge(24);
people.setName("小白");
people.setSex(1);
String jsonString = JSON.toJSONString(people);
peopleTemplate.sendDefault(jsonString); return "发送成功";
} @RequestMapping("getmesssage")
public String getMessage(){ ConsumerRecords<String, String> message= userConsumer.poll(100); for(ConsumerRecord<String,String> msg:message){
System.out.println(msg.value());
} return "获取消息";
}
}

结果

本文简单粗暴,爱看不看

最新文章

  1. iOS. PercentEscape是错用的URLEncode,看看AFN和Facebook吧
  2. HTML5
  3. 【小白的CFD之旅】09 初识FLUENT
  4. Jmeter测试数据库
  5. shell学习之路:流程控制(for)
  6. Qt4过渡至Qt5
  7. poj 1474 Video Surveillance (半平面交)
  8. PHP - PHPExcel操作xls文件
  9. Understanding GC pauses in JVM, HotSpot&#39;s minor GC.
  10. 使用PLSql连接Oracle时报错ORA-12541: TNS: 无监听程序
  11. HDU 5618 Jam&#39;s problem again
  12. 取得phpcms网站下所有栏目的内容链接
  13. HashMap完全解读
  14. C/C++调用Golang 二
  15. 初入python 用户输入,if,(while 循环)
  16. Linux内核入门到放弃-网络-《深入Linux内核架构》笔记
  17. Node.js创建本地简易服务器
  18. 【C++ Primer | 07】常用算法
  19. 安装mysql以及修改mysql字符集问题
  20. HTTP 总结

热门文章

  1. MySQL索引分析及使用
  2. DCL单例模式中的缺陷及单例模式的其他实现
  3. 用微信小程序做一个小电商 sku
  4. js练习题之查找数组中的位子
  5. leetcode 38:path-sum
  6. Qt基础之菜单栏
  7. 依赖注入DI(IOC)容器快速入门
  8. springMVC请求调用过程
  9. Docker学习第三天
  10. Service Cloud零基础学习(二)Entitlement &amp; MileStone