SparkStreaming+kafka Receiver模式
2024-09-19 22:45:40
1.图解
2.过程
1.使用Kafka的High Level Consumer API 实现,消费者不能自己去维护消费者offset,而且kafka也不关心数据是否丢失。
2.当向zookeeper中更新完offset后,Driver如果挂到,Driver下的Executors会被kill掉,会造成数据丢失。
3.开启WAL【Write Ahead Log】预写日志机制,将数据备份到HDFS中一份,再去更新zookeeper中的offset,此时需调整spark存储基本,去掉备份两次【MEMORY_AND_DISK_SER_2中的_2】。开启WAL机制会加大application处理的时间。
3.特点
1.receiver模式依赖zookeeper管理offset。
2.receiver模式的并行度由spark.streaming.blockInterval决定,默认是200ms。
3.receiver模式接收block.batch数据后会封装到RDD中,这里的block对应RDD中的partition。
4.在batchInterval一定的情况下,减少spark.streaming.Interval参数值,会增大DStream中的partition个数,建议spark.streaming.Interval最低不能低于50ms。
4.代码实现
package big.data.analyse.streaming import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf} /**
* Created by zhen on 2019/5/11.
*/
object SparkStreamingReceiverKafka {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("SparkStreamingReceiverKafka")
conf.set("spark.streaming.kafka.maxRatePerPartition", "")
conf.setMaster("local[2]") val sc = new SparkContext(conf)
sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds()) // 创建streamingcontext入口 val quorum = "master,worker1,worker2"
val groupId = "zhenGroup"
val map : Map[String, Int] = Map("zhenTopic" -> ) // topic名称为zhenTopic,每次使用1个线程读取数据 val dframe = KafkaUtils.createStream(ssc, quorum, groupId, map, StorageLevel.MEMORY_AND_DISK_SER_2) dframe.foreachRDD(rdd => { // 操作方式和rdd类似,必须使用action算子才会触发程序执行!
rdd.foreachPartition(partition =>{
partition.foreach(println)
})
})
}
}
最新文章
- Java-jdbc操作数据库
- POJ 3278 Catch That Cow
- docker网络配置方法总结
- 联想Phab2 Pro Tango手机测评
- 吐槽坑爹的微软win store app审核
- String-原型属性
- 信息安全系统设计基础exp_4
- Laravel 5 基础(六)- 数据库迁移(Migrations)
- jQuery 源码细读 -- $.Callbacks
- NPM 简单实用说明
- 窝上课不听,how to learn C language easily(1)
- 201521123075 《Java程序设计》第3周学习总结
- flag.xls
- Linux几个常用的目录结构
- Java基础笔记(1) 语言 JAVA的历史 Java的搭建环境
- Exp2 MAL_后门原理与实践 20155214
- mysql数据库外键删除更新规则
- Iframe中子窗体给父窗体传值
- (转)Inno Setup入门(二十)——Inno Setup类参考(6)
- 昊合数据整合平台HHDI常见问题
热门文章
- PowerMock框架讲解及使用
- Better ultra_simple for Slamtec RPLIDAR on Linux
- MySQL高效分页-mybatis插件PageHelper改进
- svg轻松实现文字水印
- (转) 解决django项目部署到nginx+uwsgi服务器后 admin页面样式消失的问题
- ThreadLocal源代码3
- java 字符串转json,json转实体对象、json字符串转换成List、List转String、以及List排序等等...
- Flask源码之:路由加载
- Python中NumPy的使用一
- Spark之RDD容错原理及四大核心要点