无论是在spark streaming消费kafka,或是监控kafka的数据时,我们经常会需要知道offset最新情况

kafka数据的topic基于分区,并且通过每个partition的主分区可以获取offset的最新情况

GetOffsetShellWrap
//这是对kafka自带工具包的扩展object GetOffsetShellWrap {

  //在主函数添加一个参数map
  def main(args: Array[String],map: ArrayBuffer[String]): Unit = {    //对参数的解析
    val parser = new OptionParser
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
      .withRequiredArg
      .describedAs("hostname:port,...,hostname:port")
      .ofType(classOf[String])
    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
      .withRequiredArg
      .describedAs("topic")
      .ofType(classOf[String])
    val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions")
      .withRequiredArg
      .describedAs("partition ids")
      .ofType(classOf[String])
      .defaultsTo("")
    val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
      .withRequiredArg
      .describedAs("timestamp/-1(latest)/-2(earliest)")
      .ofType(classOf[java.lang.Long])
    val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
      .withRequiredArg
      .describedAs("count")
      .ofType(classOf[java.lang.Integer])
      .defaultsTo(1)
    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
      .withRequiredArg
      .describedAs("ms")
      .ofType(classOf[java.lang.Integer])
      .defaultsTo(1000)

    if(args.length == 0)
      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")

    val options = parser.parse(args : _*)

    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
  //获取参数的值
    val clientId = "GetOffsetShell"
    val brokerList = options.valueOf(brokerListOpt)
    ToolsUtils.validatePortOrDie(parser, brokerList)
    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
    val topic = options.valueOf(topicOpt)
    var partitionList = options.valueOf(partitionOpt)
    var time = options.valueOf(timeOpt).longValue
    val nOffsets = options.valueOf(nOffsetsOpt).intValue
    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
      System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
        "kafka-list-topic.sh to verify")
      System.exit(1)
    }
    val partitions =
      if(partitionList == "") {
        topicsMetadata.head.partitionsMetadata.map(_.partitionId)
      } else {
        partitionList.split(",").map(_.toInt).toSeq
      }    //遍历每个主分区
    partitions.foreach { partitionId =>
      val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
      partitionMetadataOpt match {
        case Some(metadata) =>
          metadata.leader match {
            case Some(leader) =>
              val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
              val topicAndPartition = TopicAndPartition(topic, partitionId)
              val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
              val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
//把获取到的offset进行存储
              map += "%s:%d:%s".format(topic, partitionId, offsets.mkString(","))
            case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
          }
        case None => System.err.println("Error: partition %d does not exist".format(partitionId))
      }
    }
  }
}
GetOffsetShellWrapScalaTest
object GetOffsetShellWrapScalaTest {
  def main(args: Array[String]) {
    var arr = ArrayBuffer[String]();
    arr+="--broker-list=hadoop-01:9092"
    arr+="-topic=2017-11-6-test"
    arr+="--time=-1"
    val resule = getOffset(arr.toArray)
    for(i<-resule){
      println("我自己获取到的偏移量=> "+i)
    }
  }
  def getOffset(args: Array[String]) : Array[String]={
    val map = new ArrayBuffer[String]()
    GetOffsetShellWrap.main(args.toArray,map)
    map.toArray
  }
}

结果输出:

2017-11-6-test:2:16099
2017-11-6-test:1:15930
2017-11-6-test:0:16096

最新文章

  1. UIAppearance
  2. window resize的时候禁止频繁触发事件
  3. JavaScript Promise API
  4. jdbc基础 (二) 通过properties配置文件连接数据库
  5. ZABBIX作集中式NGINX性能监控的注意要点
  6. Hql没有limit,替换方案
  7. Windows 服务多语言化时读取配置文件失败的问题。
  8. HTTP协议-------&gt;资源和URL
  9. Android 深入理解Android中的自定义属性
  10. OO随笔
  11. lnamp环境搭建博客、论坛
  12. mac上安装memcache
  13. mysql设计表时注意事项
  14. VirtualBox 端口转发 SSH
  15. 域PC脱域
  16. Github安全开源工具集合
  17. Ubuntu 编译安装 Xdebug
  18. ng-深度学习-课程笔记-0: 概述
  19. C# 中的动态创建技术
  20. php编程知识点2018

热门文章

  1. python类库26[sqlite]
  2. [每日一讲] Python系列:字符串(下)
  3. CSS3基础——笔记+实战案例(CSS基本用法、CSS层叠性、CSS继承性)
  4. entry 遍历 Map 元素
  5. z-tree的使用
  6. eclipse在线安装ermaster插件
  7. 实战build-react(二)-------引入Ant Design(增加)
  8. [BZO3572][HNOI2014]世界树:虚树+倍增
  9. Spring Cloud云架构 - SSO单点登录之OAuth2.0 根据token获取用户信息(4)
  10. HTTP访问控制(CORS)踩坑小记