auto.offset.reset关乎kafka数据的读取。常用的二个值是latest和earliest,默认是latest。

如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以。

earliest

当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest

当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none

topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

1.latest和earliest区别

  1. earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  2. latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

2.创建topic

# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank
Created topic "tank". # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank
Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:
Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1

3.生产数据和接收生产数据

[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank
>1
>2
>3
。。。。。。。。。省略。。。。。。。。。
[root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning
1
2
3

4.测试代码

object tank {
def main(args: Array[String]): Unit = {
val pros: Properties = new Properties
pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")
/*分组由消费者决定,完全自定义,没有要求*/
pros.put("group.id", "tank")
//设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic
pros.put("enable.auto.commit", "false")
pros.put("auto.commit.interval.ms", "1000")
pros.put("max.poll.records", "5")
pros.put("session.timeout.ms", "30000")
//只有当offset不存在的时候,才用latest或者earliest
pros.put("auto.offset.reset", "latest") pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros) /*这里填写主题名称*/
consumer.subscribe(util.Arrays.asList("tank")) val system = akka.actor.ActorSystem("system")
system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer)) } object tankTest {
def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))
if (!records.isEmpty) {
for (record <- records) {
if (record.value != null && !record.value.equals("")) {
myLog.syncLog(record.value + "\t准备开启消费者出列数据", "kafka", "get")
}
}
consumer.commitSync() } }
}
}

最新文章

  1. 解析大型.NET ERP系统 版本控制
  2. Android开发案例 - 注册登录
  3. 【09-14】eclipse学习笔记
  4. CentOS linux下安装和配置Apache+SVN(用浏览器http方式访问SVN目录)
  5. mysql 免安装版本 命令安装
  6. python之量的概念
  7. 编程之美 3.1 字符串移位包含问 复杂度(O(N*K)
  8. [hadoop转载]tearsort
  9. Java内存区域 - 深入Java虚拟机读后总结
  10. 阿里云ECS试用
  11. Android Studio 工程.GitIgnore应该忽略的文件
  12. W5500 keep-alive的用途及使用
  13. JavaScript编写连连看
  14. SpringAOP简单入门
  15. Python(Django)项目与Apache的管理
  16. 平滑升级nginx到新版本
  17. vue - 新建一个项目
  18. 自学提高:JVM点滴
  19. 大兄dei,早点看清this吧
  20. Python Json &amp; Pickle模块

热门文章

  1. yum被系统升级锁定
  2. 【linux】系统编程-5-线程
  3. 2.1 关系型数据的收集--Sqoop
  4. 使用IDEA构建Spring Boot项目简单实例
  5. Java 使用线程池执行若干任务
  6. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  7. Openstack dashboard 仪表盘服务 (八)
  8. Linux面试必备
  9. 【计算机基础】常用的快捷键和DOS命令
  10. 国人之光:大数据分析神器Apache Kylin