createDirectStream方式需要自己维护offset,使程序可以实现中断后从中断处继续消费数据。

KafkaManager.scala

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import scala.reflect.ClassTag /**
* Created by knowpigxia on 15-8-5.
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable { private val kc = new KafkaCluster(kafkaParams) /**
* 创建数据流
* @param ssc
* @param kafkaParams
* @param topics
* @tparam K
* @tparam V
* @tparam KD
* @tparam VD
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId) //从zookeeper上读取offset开始消费message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
} /**
* 创建数据流前,根据实际消费情况更新消费offsets
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {// 消费过
/**
* 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
* 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case(tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {// 没有消费过
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
} /**
* 更新zookeeper上的消费offsets
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}

  主程序中

def initKafkaParams = {
    Map[String, String](
      "metadata.broker.list" -> Constants.KAFKA_BROKERS,
      "group.id " -> Constants.KAFKA_CONSUMER_GROUP,
      "fetch.message.max.bytes" -> "20971520",
      "auto.offset.reset" -> "smallest"
    )
  } // kafka参数
val kafkaParams = initKafkaParams
val manager = new KafkaManager(kafkaParams)
val messageDstream = manager.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) // 更新offsets
manager.updateZKOffsets(rdd)

  

最新文章

  1. x01.Tetris: 俄罗斯方块
  2. logback 配置详解
  3. 不可错过的炒鸡棒的js迷你库
  4. EasyUI TreeGrid DataTable转换数据实现案例
  5. BZOJ 1051 &amp; 强联通分量
  6. sublime text 3 的在文件夹中查找的快捷键没有反应 的bug冲突
  7. oracle模糊查询效率可这样提高
  8. c#抽象工厂模式
  9. Jquery filter()方法简介
  10. shell 脚本阅读之二——ltp工具下的runltp
  11. Eclipse验证码
  12. Arcglobe三维信息系统开发常见问题
  13. Python中time和datetime模块的简单用法
  14. hp MSA50 5盘RAID5重建为4盘RAID5怎么恢复数据
  15. Ireport启动错误
  16. 构建SSH服务
  17. excel数据处理,公式
  18. VM_Centos7.3_X64_安装Oracle12C 总结笔记
  19. ionic1 添加百度地图插件 cordova-plugin-baidumaplocation
  20. iOS安装包瘦身的那些事儿

热门文章

  1. Chrome扩展及应用开发
  2. Selenium2+python自动化67-用例失败自动截图【转载】
  3. python算法:嵌套数组转变成一维数组
  4. brew安装mysql
  5. [DB2]Linux下安装db2 v9.7
  6. Vmware vSphere常见问题及解决办法
  7. Codeforces 189A. Cut Ribbon
  8. 集训day15 t1 poj3728
  9. uva10256
  10. 洛谷——P2758 编辑距离