背景

Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase

实现思路

  1. 实现Kafka消息生产者模拟器
  2. Spark Streaming采用Direct Approach方式实时获取Kafka中数据
  3. Spark Streaming对数据进行业务计算后存储到HBase

组件版本

Spark 2.1.0  Kafka0.9.0.1 HBase1.2.0

代码实现

Kafka消息模拟器

object KafkaMessageGenerator {

  private val random = new Random()
private var pointer = - private val os_type = Array(
"Android", "IPhone OS",
"None", "Windows Phone"
) def click(): Double = {
random.nextInt()
} def getOsType(): String = {
pointer = pointer +
if (pointer >= os_type.length) {
pointer =
os_type(pointer)
} else {
os_type(pointer)
}
}
def main(args: Array[String]): Unit = { val topic = "user_events"
val props = new Properties()
props.put("bootstrap.servers", "10.3.71.154:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props)
while (true) {
val event: JSONObject = new JSONObject()
event.put("uid", UUID.randomUUID()) //随机生成用户id
event.put("event_time", System.currentTimeMillis.toString) //记录事件发生时间
event.put("os_type", getOsType) //设备类型
event.put("click_count", click) //点击次数
val record = new ProducerRecord[String, String](topic, event.toString)
producer.send(record)
println("Message sent: " + event) Thread.sleep()
}
}
}

Spark Streaming主类

object PageViewStream {
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("PageViewStream").setMaster("local[*]")
//创建StreamingContext 批处理间隔5s
val ssc = new StreamingContext(conf, Seconds())
// kafka配置
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "10.3.71.154:9092",
"serializer.class" -> "kafka.serializer.StringEncoder"
)
//创建一个direct stream
val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("user_events"))
val events: DStream[JSONObject] = kafkaStream.flatMap(line => {
val data: JSONObject = JSON.parseObject(line._2)
Some(data)
}) // 计算用户点击次数
val userClicks: DStream[(String, Integer)] = events.map(x => (x.getString("uid"), x.getInteger("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
//Hbase配置
val tableName = "PageViewStream2"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "master66")
hbaseConf.set("hbase.zookeeper.property.clientPort", "")
val conn = ConnectionFactory.createConnection(hbaseConf)
val StatTable = conn.getTable(TableName.valueOf(tableName))
partitionOfRecords.foreach(pair => {
//用户ID
val uid = pair._1
//点击次数
val click = pair._2
//组装数据 创建put对象 rowkey
val put = new Put(Bytes.toBytes(uid))
put.addColumn("Stat2".getBytes, "ClickStat".getBytes, Bytes.toBytes("TESTS============"))
StatTable.put(put)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
 

最新文章

  1. view.performClick()触发点击事件
  2. u3d 模型ID配置
  3. HDU 4031 Attack(线段树/树状数组区间更新单点查询+暴力)
  4. jquery学习笔记1
  5. 给Jquery动态添加的元素添加事件2
  6. Hello World程序
  7. error C2664 转换错误汇总[转]
  8. 文件下载:"Content-disposition","attachment; filename=中文名>>>解决方案
  9. Cocos2d-x学习笔记(19)(TestCpp源代码分析-3)
  10. Python学习笔记——基础篇2【第三周】——计数器、有序字典、元组、单(双)向队列、深浅拷贝、函数、装饰器
  11. HDU 5862 Counting Intersections(离散化+树状数组)
  12. Android 调用 .NET WebService
  13. 【转】jira迁移数据
  14. 远程连接MongoDB报“Network is unreachable”错误的解决方法
  15. DG备库缺失归档文件GAP日志
  16. TcxGrid Sqlite text类型 显示memo
  17. Android 演示 DownloadManager——Android 下载 apk 包并安装
  18. Java基础学习篇---------多线程
  19. python(31) enumerate 的用法
  20. fastdfs 集群配置

热门文章

  1. centos上编译bitcoin
  2. HTML&CSS精选笔记_列表与超链接
  3. Java类的设计----访问控制
  4. asp.net返回值当文件下载问题
  5. Python 爬虫知识点 - 淘宝商品检索结果抓包分析(续一)
  6. laravel 查询构建器(连贯操作)
  7. Python 文件类型
  8. PHP学习记录数组中的数组的指针
  9. $.data(elem, key, val) 和 elem.data(key, val)
  10. 学习POC框架pocsuite--编写hellowordPOC