参考网址:

https://blog.csdn.net/lansetiankong12/article/details/54946641

1.新建Maven项目-KafkaMaven

-》点击next按钮

-》点击next按钮

-》点击finish按钮,项目新建成功

2.生产者配置文件:kafka-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="group.id" value="0"/>
<entry key="retries" value="10"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>

<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>

<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="myTopic"/>
</bean>
</beans>

3.消费者配置文件:kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>

<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>

<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.kafka.service.impl.MsgConsumerServiceImpl"/>

<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="test"/><!-- test为topic主题名 -->
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>

<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
</beans>

4.生产者测试类

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/config/kafka-producer.xml" })
public class ProduceMessage {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
@Test
public void testTemplateSend() {

//topic主题test是上篇http://www.cnblogs.com/Bud-blog/p/9020018.html中新建的主题名称
kafkaTemplate.send("test", "www.686868.com");
}
}

5.消费者监听消息

package com.kafka.service.impl;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

/**
* 消费者监听
*
*/
public class MsgConsumerServiceImpl implements MessageListener<Integer, String>
{
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
}
}

6.消费者测试类

import org.springframework.beans.BeansException;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerTest {
public static void main(String[] args) {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("config/kafka-consumer.xml");
context.start();
} catch (BeansException e) {
e.printStackTrace();
}

synchronized (ConsumerTest.class) {
while (true) {
try {
ConsumerTest.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

7.pom依赖如下:

8.项目视图

9.启动消费者测试类(注意:启动测试类前,需要启动zookeeper和kafka程序)

启动成功控制台如下显示

10.启动生产者测试类,启动成功控制台如下显示

9.consumer接收消息,控制台显示如下:

最新文章

  1. ASP.NET WebAPi(selfhost)之文件同步或异步上传
  2. 使用TaskManager爬取2万条代理IP实现自动投票功能
  3. ReactNative 当前url和cookies的获取
  4. apache通过cgi调用exe程序
  5. spring事物传播机制与隔离级别
  6. commondline 之三 执行jar文件
  7. [cocos2d-x&#183;解Bug]关于cocos2d-x游戏在android锁屏状态下播放Bgm的解决方法
  8. [置顶] VB6基本数据库应用(三):连接数据库与SQL语句的Select语句初步
  9. mysql中TPS, QPS 的计算方式
  10. RSA加解密
  11. MYSQL修改字段
  12. 函数和常用模块【day05】:装饰器高潮(三)
  13. C# 多种方式连接Oracle。
  14. WebSphere集群环境修改IHS端口号的方法 分类: WebSphere 2015-08-06 13:41 14人阅读 评论(0) 收藏
  15. (zhuan) 资源|TensorFlow初学者必须了解的55个经典案例
  16. [Docker] Linking Node.js and MongoDB Containers
  17. FFmpeg(10)-利用FFmpeg进行视频像素格式和尺寸的转换(sws_getCachedContext(), sws_scale())
  18. Sublime text 3 中Package Control 的安装与使用方法
  19. django实现瀑布流、组合搜索、阶梯评论、验证码
  20. Bakery

热门文章

  1. 数据库【redis】基本命令
  2. 最新webstorm
  3. python day09
  4. 增强for
  5. MySQL数据库执行计划(简单版)
  6. Arguments Optional 计算两个参数之和的 function
  7. 编译安装MySQL5.6失败的相关问题解决方案
  8. Linux C启动时创建pid文件
  9. jQuery的deferred对象实战应用(附:Exchar动态多条数据展示并在topic展示详细数据)
  10. 正则化(Regularization)本质