kafka producer partitions分区器(七)
2024-08-29 20:10:15
消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用默认的方式进行数据分区。
有没有指定partition可以从源码中看出:
public ProducerRecord(String topic, Integer partition, K key, V value) {} 如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用默认的分区策略来进行数据分区
1.根据key进行分区
public class CustomPartitioner { private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class); public static void main(String[] args) {
//1.加载配置信息
Properties prop = loadProperties(); //2.创建生产者
KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop); String sendContent = "hello_kafka";
IntStream.range(0, 10).forEach(i ->{
try {
ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i); //topic key value
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
} }); }
//配置文件的设置
public static Properties loadProperties() {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks", "all"); //发送到所有的ISR队列中
return prop;
}
}
2.自定义分区
同样在使用自定义分区的时候,需要写实现类和在producer中配置引用
我们在这个示例中,根据key来分区,key在序列化的时候用的是IntegerSerializer,在ProducerRecord中我们没有指定partition
自定义分区器
public class CustomPartition implements Partitioner{ @Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub } @SuppressWarnings({ "null", "unused" })
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partitionNum = cluster.partitionsForTopic(topic).size();
int partition = (Integer)key%partitionNum;
return key == null? 0:partition;
} @Override
public void close() {
// TODO Auto-generated method stub }
}
生产者
public class ProducerDemo { private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class); public static void main(String[] args) throws InterruptedException, ExecutionException {
//1.加载配置信息
Properties prop = loadProperties(); //2.创建生产者
KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop); //3.发送内容
String sendContent = "hello_kafka";
IntStream.range(0, 10).forEach(i ->{
try {
ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
} });
producer.close(); //回调拦截器中的close方法 } //配置文件的设置
public static Properties loadProperties() {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("partitioner.class", "com.zpb.partitioner.CustomPartition");
prop.put("acks", "all");
return prop;
}
}
最新文章
- ASCII码而已
- Asp.Net_Mvc_获取当前Url、Controller、Action
- Javascript 事件对象进阶(二)拖拽的应用 - 登录框的拖拽
- iOS学习之iOS沙盒(sandbox)机制和文件操作之NSFileManager(三)
- Excel with COM
- 领导者/追随者(Leader/Followers)模型和半同步/半异步(half-sync/half-async)模型都是常用的客户-服务器编程模型
- Spark实战3:Maven_Java_HelloWorld
- PostgreSQL rule view materialized view examples
- GC学习笔记
- 关于java classpath问题
- hdu 1044(bfs+状压)
- C# 如何以参数的形式调用.exe程序
- iOS UIButton加在window上点击无效果问题
- c#委托和事件(下) 分类: C# 2015-03-09 08:42 211人阅读 评论(0) 收藏
- 5_Navigation Bar
- TCP和UDP的差别
- Android项目--浅析系统通讯录中的那些方法
- Ubuntu超好用软件:markdown编辑器
- mysql安装及常见问题
- Gethub readme 撰写