上一遍我简单介绍了kafka的生成者使用,调用方式比较简单,今天我给大家分享下封装kafka消费者,作为中间件,我们做的就是最大程度的解耦,使业务方接入我们依赖程度降到最低。

  第一步,我们先配置一个消费者核心类

  

package com.meiren.message.kafka.consumer;

import com.meiren.message.kafka.beans.ConsumerProperty;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; /**
* Created by zhangwentao on 2017/5/18.
*/
public class ConsumerHandler {
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors; public ConsumerHandler(ConsumerProperty consumerProperty, List<String> topics) {
Properties props = new Properties();
props.put("bootstrap.servers", consumerProperty.getBrokerList());
props.put("group.id", consumerProperty.getGroupId());
props.put("enable.auto.commit", consumerProperty.getEnableAutoCommit());
props.put("auto.commit.interval.ms", consumerProperty.getAutoCommitInterval());
props.put("session.timeout.ms", consumerProperty.getSessionTimeout());
props.put("key.deserializer", consumerProperty.getKeySerializer());
props.put("value.deserializer", consumerProperty.getValueSerializer());
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
} public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());
Thread t = new Thread(new Runnable(){//启动一个子线程来监听kafka消息
public void run(){
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
System.out.println("监听到kafka消息。。。。。。");
executors.submit(new ConsumerWorker(record));
}
}
}});
t.start(); } public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}

这个核心类有3个部分组成:1.构造方法(生成一个消费者配置,订阅topic),2.开启多线程监听消费者  3.关闭线程是和消费者

 public ConsumerHandler(ConsumerProperty  consumerProperty, List<String> topics) 
consumerProperty是消费者配置信息类,包含了消费者的配置属性和topic
public class ConsumerProperty {

    private String brokerList;

    private String groupId;

    private  String enableAutoCommit="true";

    private String autoCommitInterval="1000";

    private String sessionTimeout="30000";

    private String keySerializer="org.apache.kafka.common.serialization.StringDeserializer";

    private String valueSerializer="org.apache.kafka.common.serialization.StringDeserializer";
/**
* topic以及消费的实现类
*/
private List<MessageContainer> messageContainers;

  2.监听消费者信息

 public void execute(int workerNum) { } 这段代码的入参是线程数,开启一个线程池ThreadPoolExecutor,建立一个长连接,每200毫秒去kafka服务器拉取消息,每拉到一个消息,就分配给一个线程类ConsumerWorker去处理这个消息
这里要特别注意是,监听kafka的过程需要另起一个线程去监听,不然主线程会一直在while(true)里面阻塞掉。 3.关闭线程池和消费者(一般情况下会一直处于监听状态)
第二步,我们设置服务启动的时候去监听
public class PropertyFactory {

    public  static ProducerProperty producerProperty;

    public  static ConsumerProperty consumerProperty;

    public ProducerProperty getProducerProperty() {
return producerProperty;
} public void setProducerProperty(ProducerProperty producerProperty) {
this.producerProperty = producerProperty;
} public ConsumerProperty getConsumerProperty() {
return consumerProperty;
} public void setConsumerProperty(ConsumerProperty consumerProperty) {
this.consumerProperty = consumerProperty;
} ConsumerHandler consumer=null; @PostConstruct
public void startListerConsumer(){
consumer= new ConsumerListener(consumerProperty).startListen();
} @PreDestroy
public void shutDown(){
if(consumer!=null){
consumer.shutdown();
}
}
}

这是一个属性工程的bean,当这个bean被创建完成后,会执行startListerConsumer()方法(@PostConstruct的含义就是在bean被创建之后执行)  ,startListerConsumer的作用开启监听

ConsumerHandler  consumers = new ConsumerHandler( consumerProperty, topics);
consumers.execute(workerNum);

另外,我们看到这个beanFactory有2个属性ProducerProperty 和ConsumerProperty ,这个2个分别是消费者个和生产者的配置,是bean在初始化的时候注入进去的

这里重点介绍一下说ConsumerProperty 的messageContainers属性,它是一个集合对象,包含需要订阅的topic和处理该Topic的实现了MessageListener接口的实现类

public class MessageContainer {
private String topic; private MessageListener messageHandle; }
public interface MessageListener {

    public void onMessage(ConsumerMessageBO message);
}

上面说到监听到每个消息都会分配一个
ConsumerWorker去处理消息,我们看看具ConsumerWorker的
public class ConsumerWorker implements Runnable {

         private ConsumerRecord<String, String> consumerRecord;

            public ConsumerWorker(ConsumerRecord record) {
this.consumerRecord = record;
} public void run() {
ConsumerMessageBO consumerMessageBO= JSONObject.parseObject(consumerRecord.value(),ConsumerMessageBO.class);
consumerMessageBO.setOffset(consumerRecord.offset());
consumerMessageBO.setPartition(consumerRecord.partition());
for(MessageContainer messageContainer: PropertyFactory.consumerProperty.getMessageContainers()){
if(consumerRecord.topic().equals(messageContainer.getTopic())){
messageContainer.getMessageHandle().onMessage(consumerMessageBO);
}
} }

根据监听到topic,然后和ConsumerProperty 的messageContainers属性的topic进行比对,找到对应topic处理的实现类调用其onMessage方法

我们JAVA的核心代码基本已经写完了

第三步、业务方接入我们封装的部分

新建一个spring-kafka.xml文件


<bean id="consumerProperty" class="com.meiren.message.kafka.beans.ConsumerProperty">
<property name="brokerList" value="${kafka.bootstrap.servers}"/>
<property name="groupId" value="${kafka.group.id}"/>
<property name="messageContainers" >
<list>
<ref bean="smsMessageContainer"></ref>
<ref bean="emailMessageContainer"></ref>
</list>
</property>
</bean>
<bean id="producerProperty" class="com.meiren.message.kafka.beans.ProducerProperty">
<property name="brokerList" value="${kafka.bootstrap.servers}"/>
</bean> <bean id ="emailMessageHandler" class="com.meiren.message.kafka.handle.EmailMessageHandler"/>
<bean id ="smsMessageHandler" class="com.meiren.message.kafka.handle.SmsMessageHandler"/> <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
<constructor-arg value="sms_async_send"/>
<property name="messageHandle" ref="smsMessageHandler"></property>
</bean>
<bean id ="emailMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
<constructor-arg value="email_async_send"/>
<property name="messageHandle" ref="emailMessageHandler"></property>
</bean>   
  <!--配置工厂类 -->
 
<bean class="com.meiren.message.kafka.beans.PropertyFactory">
    <property name="consumerProperty" ref="consumerProperty"/>
     <property name="producerProperty" ref="producerProperty"/>
  </bean>
</beans>

这个配置文件对应的就是PropertyFactory的属性,其实就是消费者个和生产者的配置。

我们配置好这个文件后,我们需要一个消息实现类

public class SmsMessageHandler  implements MessageListener{
public static final Logger log= LoggerFactory.getLogger(SmsMessageHandler.class);
@Autowired
private SmsSendLogDao smsSendLogDao;
public void onMessage(ConsumerMessageBO consumerMessageBO) { } }catch (Exception e){
System.out.println("转换消息异常:"+e.getMessage());
} }

只要实现了MessageListener接口,并且在spring-kafka.xm配置好对应的topic就可以了

 <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
<constructor-arg value="sms_async_send"/>
<property name="messageHandle" ref="smsMessageHandler"></property>
</bean>

整个接入就完成了,由于这是第一版本,所以封装的程度还不算很好,但是也基本符合应用(一个配置文件,一个实现类),有不足之处将会在后面版本进行完善迭代。

至此我们已经将kafka集成spring的功能简单实现了,下一篇我将介绍消息队列(kafka)的一些实际应用。


 
 

  

最新文章

  1. windows server 2008 各版本号下载地址(微软官网)
  2. Maximum length of a table name in MySQL
  3. KMP--路过
  4. Linux中profile与bashrc的作用
  5. 封装SqlHelper
  6. 安装完zend server后,无法访问http://localhost:10081/ZendServer/的解决办法
  7. canonical 标签介绍
  8. 将用户信息保存到Cookie中
  9. 【最小费用最大流】【HDU1533】【Going Home】
  10. python3 多线程获取数据实例
  11. Android 工程师
  12. JavaScript for/in 语句 遍历数组内容
  13. ionic2程序调试
  14. iOS开发之资料收集
  15. Eureka控制台参数说明
  16. Asp.Net MVC 获取当前 Controller Action Area
  17. Queue CodeForces - 353D (思维dp)
  18. ZH奶酪:C语言中malloc()和free()函数解析
  19. tomcat 容器下web项目由http改为https操作步骤及相关的坑
  20. layer插件open方法回掉值问题

热门文章

  1. 网络编程----粘包以及粘包问题的解决、FTP上传
  2. 图像BMP格式介绍
  3. c# extern 关键字
  4. nova-api源码分析(APP中用到的开源库)
  5. 解决tomcat占用8080端口问题图文详解
  6. 维护后面的position sg函数概念,离线+线段 bzoj 3339
  7. 2017 ACM Arabella Collegiate Programming Contest(solved 11/13)
  8. Jugs(回溯法)
  9. 关于NuGet
  10. [php]http响应头解析