创建Kafka生产者:

  Kafka生产者有3个必选属性:

    bootstrap.servers   broker地址清单,格式为host:port   ,清单中不必包含所有broker,但至少2个

    key.serializer  = org.apache.kafka.common.serialization.Serializer 接口类,生产者使用这个类把键对象序列化为字节数组

        Kafka还提供了ByteArraySerializer,StringSerializer,IntegerSerializer 实现类

    value.serializer  与key.serializer可以将值序列化

  

发送消息有3种方式:

  发送并忘记(fire-and-forget)  发送消息给服务器,但是不关心是否正常到达

  同步发送  使用send()发送信息,返回一个future对象,调用get()方法进行等待,了解信息是否正常发送

  异步发送    调用send()函数,并指定一个回调函数,服务器在返回响应时调用该函数

生产者的可选属性:

  acks 指定有多少个分区副本收到消息,生产者才会认为消息写入是成功的

    ack=0  生产者在成功写入消息之前不会等待任何来自服务器的响应

    ack=1 只要集群的首领节点收到信息,生产者就会收到一个来自服务器的成功响应

    ack=all 所有参数与复制的节点全部收到消息,生产者才会收到一个来自服务器的响应,这种模式最安全

  buffer.memory  设置生产者内存缓冲区大小,生产者使用它缓冲要发送到服务器的消息

  compression.type  默认消息发送不使用压缩,该参数可以设置为snappy,gzip,lz4

  retries 生产者可以重发消息的次数,默认 每次重试之间等待100ms,可以使用参数 retry.backoff.ms参数改变这个时间间隔

  batch.size 指定一个批次可以使用的内存大小,按照字节数计算

  linger.ms 指定生产者在发送批次之前等待更多消息加入批次的时间。Kafkaproducer 会在批次填满或linger.ms达到上限时把批次发送出去

  client.id 该参数可以指定任意的字符串,服务器会用它识别信息的来源,还可以用在日志和配额指标里

  max.in.flight.requests.per.connection  指定生产者在收到服务器响应之前可以发送多少个消息,值越高越占用内存;设为1可以保证消息是按照发送顺序写入服务器的

  timeout.ms   reuqest.timeout.ms   metadata.fetch.timeout.ms

    request.timeout.ms 指定生产者在发送数据时等待服务器返回响应的时间

    metadata.timeout.ms 指定生产者在获取元数据时等待服务器返回响应时间

    tiemout.ms指定broker等待同步副本返回消息确认的时间,与acks的配置相匹配

  max.block.ms  指定调用send()或partitionsFor()方法获取元数据时生产者的阻塞时间,当阻塞时间到达max.block.ms时,生产者会抛出异常

  max.request.size 控制生产者发送的请求大小,指能发送的单个消息的最大值或单个请求里所有消息的总和

      broker对可接收的消息最大值也有自己的限制(message.max.bytes),两边配置最好匹配,避免生产者发送消息被拒绝

  receive.buffer.bytes 和 send.buffer.bytes

      这2个参数分别指定了TCP socket接收和发送数据包缓冲区大小,默认-1

  max.in.flight.requests.per.connection=1 保证了消息的顺序,如果大于1,第一批次写入失败后,重试成功可能会改变消息的顺序

序列化器:

  自定义序列化器

  Avro序列化

    Kafka使用Avro序列器是通过schema注册表来实现的,schema注册表不属于Kafka

    

  

最新文章

  1. failed to push some refs to 'git@github.com:***.git' hint: Updates were rejected b
  2. Linux命令行–更多bash shell命令(转)
  3. .net 时间戳和日期互转 【转】http://www.cnblogs.com/zhuiyi/p/5307540.html
  4. 如何在Maven官网下载到历史版本
  5. oracle for update和for update nowait(for update wait)的区别
  6. [LeetCode]Integer Break(Dp或胡搞或推公式)
  7. 什么是javascript中的静态方法?一个例子让你懂~!
  8. activity的启动模式和栈管理
  9. Java Elasticsearch新手入门教程
  10. docker环境 宿主机和容器之间复制文件
  11. NOIP2018 前流水账
  12. tensorflow 高级api使用分布式之配置
  13. 【TP3.2+onethink】radio+checkbox+select 空间 编辑页面选中,附录 js 返回上一页
  14. stm32 定时器TIM时钟步骤
  15. arcgis JavaScript 加载 mapbox地图
  16. logback.xml配置示例
  17. nginx访问日志出现大量的500状态信息,用postman返回 Internal Server Error,Too Many Attempts.错误的解决办法
  18. cmake 语法
  19. FDMemTable三层提交数据总是不成功的原因
  20. Unity3D手游开发日记(7) - 适合移动平台的天气效果

热门文章

  1. Kafka文件存储机制及offset存取
  2. 在github上搭建个人博客
  3. js对象与字符串的想到转换
  4. js 原型链和继承(转)
  5. 转 .NET4.5之初识async与await
  6. 【AI】神经网络基本词汇
  7. 记录一份Oracle 正确的监听配置文件listener.ora与tnsnames.ora
  8. Windows下安装配置Yaf框架的方法及创建典型合理的Demo目录结构
  9. 13 form表单
  10. 面向对象编程之Java多态