消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用默认的方式进行数据分区。

  有没有指定partition可以从源码中看出:

 public ProducerRecord(String topic, Integer partition, K key, V value) {}

 如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用默认的分区策略来进行数据分区

1.根据key进行分区

public class CustomPartitioner {

    private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class);

    public static void main(String[] args) {
//1.加载配置信息
Properties prop = loadProperties(); //2.创建生产者
KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop); String sendContent = "hello_kafka";
IntStream.range(0, 10).forEach(i ->{
try {
ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);  //topic key value
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
} }); }
//配置文件的设置
public static Properties loadProperties() {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks", "all"); //发送到所有的ISR队列中
return prop;
}
}

 2.自定义分区

  同样在使用自定义分区的时候,需要写实现类和在producer中配置引用

  我们在这个示例中,根据key来分区,key在序列化的时候用的是IntegerSerializer,在ProducerRecord中我们没有指定partition

  自定义分区器

public class CustomPartition implements Partitioner{

    @Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub } @SuppressWarnings({ "null", "unused" })
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partitionNum = cluster.partitionsForTopic(topic).size();
int partition = (Integer)key%partitionNum;
return key == null? 0:partition;
} @Override
public void close() {
// TODO Auto-generated method stub }
}

  生产者

public class ProducerDemo {

    private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);

    public static void main(String[] args) throws InterruptedException, ExecutionException {
//1.加载配置信息
Properties prop = loadProperties(); //2.创建生产者
KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop); //3.发送内容
String sendContent = "hello_kafka";
IntStream.range(0, 10).forEach(i ->{
try {
ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
} });
producer.close(); //回调拦截器中的close方法 } //配置文件的设置
public static Properties loadProperties() {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("partitioner.class", "com.zpb.partitioner.CustomPartition");
prop.put("acks", "all");
return prop;
}
}

 

最新文章

  1. ASCII码而已
  2. Asp.Net_Mvc_获取当前Url、Controller、Action
  3. Javascript 事件对象进阶(二)拖拽的应用 - 登录框的拖拽
  4. iOS学习之iOS沙盒(sandbox)机制和文件操作之NSFileManager(三)
  5. Excel with COM
  6. 领导者/追随者(Leader/Followers)模型和半同步/半异步(half-sync/half-async)模型都是常用的客户-服务器编程模型
  7. Spark实战3:Maven_Java_HelloWorld
  8. PostgreSQL rule view materialized view examples
  9. GC学习笔记
  10. 关于java classpath问题
  11. hdu 1044(bfs+状压)
  12. C# 如何以参数的形式调用.exe程序
  13. iOS UIButton加在window上点击无效果问题
  14. c#委托和事件(下) 分类: C# 2015-03-09 08:42 211人阅读 评论(0) 收藏
  15. 5_Navigation Bar
  16. TCP和UDP的差别
  17. Android项目--浅析系统通讯录中的那些方法
  18. Ubuntu超好用软件:markdown编辑器
  19. mysql安装及常见问题
  20. Gethub readme 撰写

热门文章

  1. IO 的底层实现问题
  2. STS工具:mybayis连接oracle数据库
  3. Comet OJ - Contest #2 (D 错综的光影所迷惑的思念是) 容斥计数
  4. PHP基础--traits的应用
  5. Django-中间件实现1分钟内只允许三次访问
  6. LR性能测试课程及视频教程
  7. cmake语法入门记录
  8. Default Keyboard Shortcut Schemes
  9. git如何压栈某一个文件?
  10. 设置Win10默认启动的Linux子系统版本,启动指定Linux发行版