1.图解

  

2.过程

  1.使用Kafka的High Level Consumer API 实现,消费者不能自己去维护消费者offset,而且kafka也不关心数据是否丢失。

  2.当向zookeeper中更新完offset后,Driver如果挂到,Driver下的Executors会被kill掉,会造成数据丢失。

  3.开启WAL【Write Ahead Log】预写日志机制,将数据备份到HDFS中一份,再去更新zookeeper中的offset,此时需调整spark存储基本,去掉备份两次【MEMORY_AND_DISK_SER_2中的_2】。开启WAL机制会加大application处理的时间。

3.特点

  1.receiver模式依赖zookeeper管理offset。

  2.receiver模式的并行度由spark.streaming.blockInterval决定,默认是200ms。

  3.receiver模式接收block.batch数据后会封装到RDD中,这里的block对应RDD中的partition。

  4.在batchInterval一定的情况下,减少spark.streaming.Interval参数值,会增大DStream中的partition个数,建议spark.streaming.Interval最低不能低于50ms。

4.代码实现

package big.data.analyse.streaming

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf} /**
* Created by zhen on 2019/5/11.
*/
object SparkStreamingReceiverKafka {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("SparkStreamingReceiverKafka")
conf.set("spark.streaming.kafka.maxRatePerPartition", "")
conf.setMaster("local[2]") val sc = new SparkContext(conf)
sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds()) // 创建streamingcontext入口 val quorum = "master,worker1,worker2"
val groupId = "zhenGroup"
val map : Map[String, Int] = Map("zhenTopic" -> ) // topic名称为zhenTopic,每次使用1个线程读取数据 val dframe = KafkaUtils.createStream(ssc, quorum, groupId, map, StorageLevel.MEMORY_AND_DISK_SER_2) dframe.foreachRDD(rdd => { // 操作方式和rdd类似,必须使用action算子才会触发程序执行!
rdd.foreachPartition(partition =>{
partition.foreach(println)
})
})
}
}

最新文章

  1. Java-jdbc操作数据库
  2. POJ 3278 Catch That Cow
  3. docker网络配置方法总结
  4. 联想Phab2 Pro Tango手机测评
  5. 吐槽坑爹的微软win store app审核
  6. String-原型属性
  7. 信息安全系统设计基础exp_4
  8. Laravel 5 基础(六)- 数据库迁移(Migrations)
  9. jQuery 源码细读 -- $.Callbacks
  10. NPM 简单实用说明
  11. 窝上课不听,how to learn C language easily(1)
  12. 201521123075 《Java程序设计》第3周学习总结
  13. flag.xls
  14. Linux几个常用的目录结构
  15. Java基础笔记(1) 语言 JAVA的历史 Java的搭建环境
  16. Exp2 MAL_后门原理与实践 20155214
  17. mysql数据库外键删除更新规则
  18. Iframe中子窗体给父窗体传值
  19. (转)Inno Setup入门(二十)——Inno Setup类参考(6)
  20. 昊合数据整合平台HHDI常见问题

热门文章

  1. PowerMock框架讲解及使用
  2. Better ultra_simple for Slamtec RPLIDAR on Linux
  3. MySQL高效分页-mybatis插件PageHelper改进
  4. svg轻松实现文字水印
  5. (转) 解决django项目部署到nginx+uwsgi服务器后 admin页面样式消失的问题
  6. ThreadLocal源代码3
  7. java 字符串转json,json转实体对象、json字符串转换成List、List转String、以及List排序等等...
  8. Flask源码之:路由加载
  9. Python中NumPy的使用一
  10. Spark之RDD容错原理及四大核心要点