Kafka API实战


package com.study.kafka; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;
import java.util.Random; /**
* @Auther: allen
* @Date: 2019/2/17 16:05
public class ProducerNew {
private final KafkaProducer<String, String> producer;
private final String topic; public ProducerNew(String topic, String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "");
// 等待所有副本节点的应答
props.put("acks", "all");
// 一批消息处理大小
props.put("batch.size", 16384);//16M
// 请求延时
props.put("linger.ms", 10);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);//32M // 使用自定义分区器,如果自定义则适用默认的 DefaultPartitioner,可以在ProducerConfig里面设置参数
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.study.kafka.partition.MySamplePartitioner"); // key和value的序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props);
this.topic = topic;
} public void producerMsg() throws InterruptedException {
String data = "Apache Storm is a free and open source distributed realtime computation system Storm makes it easy to reliably process unbounded streams of data doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!\n" +
"Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.\n" +
"Storm integrates with the queueing and database technologies you already use. A Storm topology consumes streams of data and processes those streams in arbitrarily complex ways, repartitioning the streams between each stage of the computation however needed. Read more in the tutorial.";
data = data.replaceAll("[\\pP‘’“”]", "");
String[] words = data.split(" ");
Random _rand = new Random(); Random rnd = new Random();
int events = 10;
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = System.currentTimeMillis();
int lastIPnum = rnd.nextInt(255);
String ip = "172.16.20." + lastIPnum;
String msg = words[_rand.nextInt(words.length)];
try {
producer.send(new ProducerRecord<>(topic, ip, msg));
System.out.println("Sent message: (" + ip + ", " + msg + ")");
} catch (Exception e) {
public static void main(String[] args) throws InterruptedException {
ProducerNew producer = new ProducerNew("test", args);
package com.study.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration;
import java.util.Collections;
import java.util.Properties; /**
* @Auther: allen
* @Date: 2019/2/17 16:06
public class ConsumerNew {
private final KafkaConsumer<Integer, String> consumer;
private final String topic; public ConsumerNew(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
// 记住 consumer 是需要依赖zk的,cosumer需要把自己最后一次的消费信息提交给zookeeper进行维护,来告知消费到哪里
props.put("zookeeper.connect", "");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// latest,earliest,none latest:读取最新的,earliest:从头开始
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props);
this.topic = topic;
} public void consumerMsg(){
try {
ConsumerRecords<Integer, String> records = consumer.poll(2000);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("*******************Received message: (" + record.key() + ", " + record.value() + ") at partition "+record.partition()+" offset " + record.offset());
} } catch (Exception e) {
} public static void main(String[] args) {
ConsumerNew Consumer = new ConsumerNew("test");
} }



package com.study.springboot.producer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.Date;
import java.util.UUID; /**
* @Auther: allen
* @Date: 2019/2/26 14:19
public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class); @Autowired
private KafkaTemplate<String, String> kafkaTemplate; private static Gson gson = new GsonBuilder().create(); // 发送消息
public void sendMessage(Message message) {
log.info("kafka sendMessage start"); // 内部组织下消息
message.setSendTime(new Date()); try {
kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), gson.toJson(message));
} catch (Exception e) {
log.error("发送数据出错!!!{}{}", kafkaTemplate.getDefaultTopic(), gson.toJson(message));
log.error("发送数据出错=====>", e);
} // 消息发送的监听器,用于回调返回信息
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
} @Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
} @Override
public boolean isInterestedInSuccess() {
return false;
log.info("kafka sendMessage end");
} public void sendMessage(String topic, String data) {
log.info("kafka sendMessage start");
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
public void onFailure(Throwable ex) {
log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
} @Override
public void onSuccess(SendResult<String, String> result) {
log.info("kafka sendMessage success topic = {}, data = {}",topic, data);
}); log.info("kafka sendMessage end");
package com.study.springboot.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; /**
* @Auther: allen
* @Date: 2019/2/26 14:20
public class Consumer { @KafkaListener(topics = {"test2","test"})
public void processMessage(String content) { System.out.println("消息被消费" + content);
} }

Kafka producer 拦截器(interceptor)


Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

对于 producer 而言,interceptor 使得用户在消息发送前以及producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息 从 而 形 成 一 个 拦 截 链 (interceptor chain) 。 Intercetpor 的 实 现 接 口 是org.apache.kafka.clients.producer.ProducerInterceptor




该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算

(3)onAcknowledgement(RecordMetadata, Exception):

该方法会在消息被应答或消息发送失败时调用,并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率


关闭 interceptor,主要用于执行一些资源清理工作

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。


实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

package com.study.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /**
* @Auther: allen
* @Date: 2019/2/17 11:58
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0; @Override
public void configure(Map<String, ?> configs) { } @Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
} @Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
} else {
} @Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
package com.study.kafka.interceptor;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.ArrayList;
import java.util.List;
import java.util.Properties; /**
* @Auther: allen
* @Date: 2019/2/17 12:01
public class InterceptorProducer { public static void main(String[] args) throws Exception {
// 1 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "");
// 默认为1;当为all时候值为-1,表示所有的都需要同步(一致性最高相对性能也会有所降低)
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2 构建拦截链
List<String> interceptors = new ArrayList<>();
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "test";
Producer<String, String> producer = new KafkaProducer<>(props); // 3 发送消息
for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic,"message" + i);
// message0 , -> 123129374927,message0
// 成功:
// 失败:
} // 4 一定要关闭producer,这样才会调用interceptor的close方法
package com.study.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /**
* @Auther: allen
* @Date: 2019/2/17 11:57
public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override
public void configure(Map<String, ?> configs) { } @Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个新的record,把时间戳写入消息体的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
} @Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override
public void close() { }






如果消息的 key 为 null,此时 producer 会使用默认的 partitioner 分区器将消息随机分布到 topic 的可用 partition 中。

如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值,使用 hash 值与 partition 数量取模,从而确定发送到哪个分区。

注意:此时 key 相同的消息会发送到相同的分区(只要 partition 的数量不变化)。

=== 默认的分区器的实现



    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;



public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){…………}



package com.study.kafka.partition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo; import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; /**
* 定义Kafka分区器
* @Auther: allen
* @Date: 2019/2/22 14:33
public class MySamplePartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
private Random random = new Random(); //我的分区器定义
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitioners = cluster.partitionsForTopic(topic);
int numPartitions = partitioners.size(); /**
* 由于我们按key分区,在这里我们规定:key值不允许为null。
* 在实际项目中,key为null的消息*,可以发送到同一个分区,或者随机分区。
int res = 1;
if (keyBytes == null) {
System.out.println("value is null");
res = random.nextInt(numPartitions);
} else {
// System.out.println("value is " + value + "\n hashcode is " + value.hashCode());
res = Math.abs(key.hashCode()) % numPartitions;
System.out.println("data partitions is " + res);
return res;
} @Override
public void close() { } @Override
public void configure(Map<String, ?> map) { }


增加机器,例如原来三台服务器的 kafka 集群增加两台机器成为有五台机器的 kafka 集群,跟搭建差不多




bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 2 --replication-factor 2 --config flush.messages=1


bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --partitions 3

1. Kafka集群partition replication默认自动分配分析

下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:





上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。



将所有N Broker和待分配的i个Partition排序.

将第i个Partition分配到第(i mod n)个Broker上.

将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.


ISR(in sync replica)的含义是同步的replica,相对的就有out of sync replica,也就是跟不上同步节奏的replica


1.kafka的topic可以设置有N个副本(replica),副本数最好要小于broker的数量,也就是要保证一个broker上的replica最多有一个,所以可以用broker id指定Partition replica。

2.创建副本的单位是topic的分区,每个分区有1个leader和0到多个follower,我们把多个replica分为Lerder replica和follower replica。

3.当producer在向partition中写数据时,根据ack机制,默认ack=1,只会向leader中写入数据,然后leader中的数据会复制到其他的replica中,follower会周期性的从leader中pull数据,但是对于数据的读写操作都在leader replica中,follower副本只是当leader副本挂了后才重新选取leader,follower并不向外提供服务。


kafka不是完全同步,也不是完全异步,是一种特殊的ISR(In Sync Replica)






