获取kafka最新offset-scala
2024-09-05 18:43:58
无论是在spark streaming消费kafka,或是监控kafka的数据时,我们经常会需要知道offset最新情况
kafka数据的topic基于分区,并且通过每个partition的主分区可以获取offset的最新情况
GetOffsetShellWrap
//这是对kafka自带工具包的扩展object GetOffsetShellWrap { //在主函数添加一个参数map def main(args: Array[String],map: ArrayBuffer[String]): Unit = { //对参数的解析 val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg .describedAs("hostname:port,...,hostname:port") .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") .withRequiredArg .describedAs("partition ids") .ofType(classOf[String]) .defaultsTo("") val timeOpt = parser.accepts("time", "timestamp of the offsets before that") .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) //获取参数的值 val clientId = "GetOffsetShell" val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) var partitionList = options.valueOf(partitionOpt) var time = options.valueOf(timeOpt).longValue val nOffsets = options.valueOf(nOffsetsOpt).intValue val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) } val partitions = if(partitionList == "") { topicsMetadata.head.partitionsMetadata.map(_.partitionId) } else { partitionList.split(",").map(_.toInt).toSeq } //遍历每个主分区 partitions.foreach { partitionId => val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId) partitionMetadataOpt match { case Some(metadata) => metadata.leader match { case Some(leader) => val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) val topicAndPartition = TopicAndPartition(topic, partitionId) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets //把获取到的offset进行存储 map += "%s:%d:%s".format(topic, partitionId, offsets.mkString(",")) case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId)) } case None => System.err.println("Error: partition %d does not exist".format(partitionId)) } } } }
GetOffsetShellWrapScalaTest
object GetOffsetShellWrapScalaTest { def main(args: Array[String]) { var arr = ArrayBuffer[String](); arr+="--broker-list=hadoop-01:9092" arr+="-topic=2017-11-6-test" arr+="--time=-1" val resule = getOffset(arr.toArray) for(i<-resule){ println("我自己获取到的偏移量=> "+i) } } def getOffset(args: Array[String]) : Array[String]={ val map = new ArrayBuffer[String]() GetOffsetShellWrap.main(args.toArray,map) map.toArray } }
结果输出:
2017-11-6-test:2:16099 2017-11-6-test:1:15930 2017-11-6-test:0:16096
最新文章
- UIAppearance
- window resize的时候禁止频繁触发事件
- JavaScript Promise API
- jdbc基础 (二) 通过properties配置文件连接数据库
- ZABBIX作集中式NGINX性能监控的注意要点
- Hql没有limit,替换方案
- Windows 服务多语言化时读取配置文件失败的问题。
- HTTP协议------->;资源和URL
- Android 深入理解Android中的自定义属性
- OO随笔
- lnamp环境搭建博客、论坛
- mac上安装memcache
- mysql设计表时注意事项
- VirtualBox 端口转发 SSH
- 域PC脱域
- Github安全开源工具集合
- Ubuntu 编译安装 Xdebug
- ng-深度学习-课程笔记-0: 概述
- C# 中的动态创建技术
- php编程知识点2018
热门文章
- python类库26[sqlite]
- [每日一讲] Python系列:字符串(下)
- CSS3基础——笔记+实战案例(CSS基本用法、CSS层叠性、CSS继承性)
- entry 遍历 Map 元素
- z-tree的使用
- eclipse在线安装ermaster插件
- 实战build-react(二)-------引入Ant Design(增加)
- [BZO3572][HNOI2014]世界树:虚树+倍增
- Spring Cloud云架构 - SSO单点登录之OAuth2.0 根据token获取用户信息(4)
- HTTP访问控制(CORS)踩坑小记