1、单独KafkaConsumer实例and多worker线程。
将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,同时维护一个或者若各干consumer实例执行消息获取任务。
本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。

 package com.bie.kafka.kafkaWorker;

 import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException; /**
*
* @Description TODO
* @author biehl
* @Date 2019年6月1日 下午3:28:53
*
* @param <K>
* @param <V>
*
* 1、consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。 另外consumer位移提交也在该类中完成。
*
*/
public class ConsumerThreadHandler<K, V> { // KafkaConsumer实例
private final KafkaConsumer<K, V> consumer;
// ExecutorService实例
private ExecutorService executors;
// 位移信息offsets
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); /**
*
* @param brokerList
* kafka列表
* @param groupId
* 消费组groupId
* @param topic
* 主题topic
*/
public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
Properties props = new Properties();
// broker列表
props.put("bootstrap.servers", brokerList);
// 消费者组编号Id
props.put("group.id", groupId);
// 非自动提交位移信息
props.put("enable.auto.commit", "false");
// 从最早的位移处开始消费消息
props.put("auto.offset.reset", "earliest");
// key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// value反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// 将配置信息装配到消费者实例里面
consumer = new KafkaConsumer<>(props);
// 消费者订阅消息,并实现重平衡rebalance
// rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
// 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { /**
* 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交位移
consumer.commitSync(offsets);
} /**
* rebalance完成后会调用onPartitionsAssigned方法。
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 清除位移信息
offsets.clear();
}
});
} /**
* 消费主方法
*
* @param threadNumber
* 线程池中的线程数
*/
public void consume(int threadNumber) {
executors = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
// 消费者一直处于等待状态,等待消息消费
while (true) {
// 从主题中获取消息
ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L));
// 如果获取到的消息不为空
if (!records.isEmpty()) {
// 将消息信息、位移信息封装到ConsumerWorker中进行提交
executors.submit(new ConsumerWorker<>(records, offsets));
}
// 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
this.commitOffsets();
}
} catch (WakeupException e) {
// 此处忽略此异常的处理.WakeupException异常是从poll方法中抛出来的异常
//如果不忽略异常信息,此处会打印错误哦,亲
//e.printStackTrace();
} finally {
// 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
this.commitOffsets();
// 关闭consumer
consumer.close();
}
} /**
* 尽量降低synchronized块对offsets锁定的时间
*/
private void commitOffsets() {
// 尽量降低synchronized块对offsets锁定的时间
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
// 保证线程安全、同步锁,锁住offsets
synchronized (offsets) {
// 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
if (offsets.isEmpty()) {
return;
}
// 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
// 清除位移信息offsets
offsets.clear();
}
// 将封装好的位移信息unmodfiedMap集合进行同步提交
// 手动提交位移信息
consumer.commitSync(unmodfiedMap);
} /**
* 关闭消费者
*/
public void close() {
// 在另一个线程中调用consumer.wakeup();方法来触发consume的关闭。
// KafkaConsumer不是线程安全的,但是另外一个例外,用户可以安全的在另一个线程中调用consume.wakeup()。
// wakeup()方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用
consumer.wakeup();
// 关闭ExecutorService实例
executors.shutdown();
} }
 package com.bie.kafka.kafkaWorker;

 import java.util.List;
import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition; /**
*
* @Description TODO
* @author biehl
* @Date 2019年6月1日 下午3:45:38
*
* @param <K>
* @param <V>
*
* 1、本质上是一个Runnable,执行真正的消费逻辑并且上报位移信息给ConsumerThreadHandler。
*
*/
public class ConsumerWorker<K, V> implements Runnable { // 获取到的消息
private final ConsumerRecords<K, V> records;
// 位移信息
private final Map<TopicPartition, OffsetAndMetadata> offsets; /**
* ConsumerWorker有参构造方法
*
* @param records
* 获取到的消息
* @param offsets
* 位移信息
*/
public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = records;
this.offsets = offsets;
} /**
*
*/
@Override
public void run() {
// 获取到分区的信息
for (TopicPartition partition : records.partitions()) {
// 获取到分区的消息记录
List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition);
// 遍历获取到的消息记录
for (ConsumerRecord<K, V> record : partConsumerRecords) {
// 打印消息
System.out.println("topic: " + record.topic() + ",partition: " + record.partition() + ",offset: "
+ record.offset()
+ ",消息记录: " + record.value());
}
// 上报位移信息。获取到最后的位移消息,由于位移消息从0开始,所以最后位移减一获取到位移位置
long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - ).offset();
// 同步锁,锁住offsets位移
synchronized (offsets) {
// 如果offsets位移不包含partition这个key信息
if (!offsets.containsKey(partition)) {
// 就将位移信息设置到map集合里面
offsets.put(partition, new OffsetAndMetadata(lastOffset + ));
} else {
// 否则,offsets位移包含partition这个key信息
// 获取到offsets的位置信息
long curr = offsets.get(partition).offset();
// 如果获取到的位置信息小于等于上一次位移信息大小
if (curr <= lastOffset + ) {
// 将这个partition的位置信息设置到map集合中。并保存到broker中。
offsets.put(partition, new OffsetAndMetadata(lastOffset + ));
}
}
}
}
} }
 package com.bie.kafka.kafkaWorker;

 /**
*
* @Description TODO
* @author biehl
* @Date 2019年6月1日 下午4:13:25
*
* 1、单独KafkaConsumer实例和多worker线程。
* 2、将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,
* 同时维护一个或者若各干consumer实例执行消息获取任务。
* 3、本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,
* 之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
*
*
*/ public class ConsumerMain { public static void main(String[] args) {
// broker列表
String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
// 主题信息topic
String topic = "topic1";
// 消费者组信息group
String groupId = "group2";
// 根据ConsumerThreadHandler构造方法构造出消费者
final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
final int cpuCount = Runtime.getRuntime().availableProcessors();
System.out.println("cpuCount : " + cpuCount);
// 创建线程的匿名内部类
Runnable runnable = new Runnable() { @Override
public void run() {
// 执行consume,在此线程中执行消费者消费消息。
handler.consume(cpuCount);
}
};
// 直接调用runnable此线程,并运行
new Thread(runnable).start(); try {
// 此线程休眠20000
Thread.sleep(20000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Starting to close the consumer...");
// 关闭消费者
handler.close();
} }

待续......

最新文章

  1. EBS中加载FORM使用的JavaBean的JAR包
  2. Phabricator是什么,代码审查工具
  3. 用excel做分组散点图
  4. cookie和session详解
  5. .net 使用memcache做缓存
  6. innodb_buffer_pool_size 大小建议
  7. Android开发, 如何看logcat
  8. [ionic开源项目教程] - 第13讲 Service层优化,提取公用Service,以及生活和农业两大模块的实现
  9. 通过userAgent判断手机浏览器类型
  10. RedHat 7 常用命令总结
  11. html5+ XMLHttpRequest
  12. qsc oj 22 哗啦啦村的刁难(3)(随机数,神题)
  13. TCP/IP协议三次握手与四次握手流程解析(转)
  14. sololearn的c++学习记录_4m11d
  15. jquery 设置占位符
  16. android 给view添加阴影
  17. c# 集合去重并筛选
  18. java-IO流-字节流-概述及分类、FileInputStream、FileOutputStream、available()方法、定义小数组、BufferedInputStream、BufferedOutputStream、flush和close方法的区别、流的标准处理异常代码
  19. [转] initrd详解
  20. Windows系统下运行某些程序时缺少“Msflxgrd.ocx”的解决方法

热门文章

  1. org.springframework.util.Base64Utils线程安全问题
  2. go-变量
  3. PHP+Ajax点击加载更多列表数据实例
  4. 树莓派4B到货开箱体验
  5. linux shell通过curl获取HTTP请求的状态码
  6. 细数C++中的for循环
  7. C# Dictionary增加的方法
  8. QNetworkRequest加Authorization头,适应Rest风格的API
  9. Word模板注入攻击
  10. Android中使用WebView实现全屏切换播放网页视频