scala_spark实践2
2024-10-09 02:00:49
参考:jianshu.com/p/9d2d225c1951
监听socket获取数据,代码如下:
这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息
object SocketStream {
def main(args: Array[String]): Unit = {
//本地测试,设置4核
val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
//以10秒为一个批次
val ssc = new StreamingContext(conf,Seconds(10))
//接收消息
val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
//监测关键字error,出现则print
dstream.filter(_.contains("error")).foreachRDD(rdd=>{
rdd.foreach(println(_))
})
ssc.start()
ssc.awaitTermination()
}
}
从kafka读取数据,比较常用
object KafkaStream { def main(args: Array[String]): Unit = {
//本地测试,设置4核
val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
//以10秒为一个批次
val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
val group_id = "realtime_data" //kafka相关参数
val kafka_param = Map[String,String](
"zookeeper.connect" ->zkQuorum,
"group.id" -> group_id,
"zookeeper.connection.timeout.ms" -> "10000",
"fetch.message.max.bytes" -> "10485760"
)
val topic = Map[String,Int]("test_topic" -> 16)
//接收消息
val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
//监测关键字error,出现则print
dstream.filter(_.contains("error")).foreachRDD(rdd=>{
rdd.foreach(println(_))
}) ssc.start()
ssc.awaitTermination()
}
}
最新文章
- PHP变量
- Easyui 让DataGrid适应浏览器宽度
- delphi的取整函数round、trunc、ceil和floor
- vue.js使用详解
- SQL镜像资料
- android-sdks/build-tools/17.0.0/aapt: error while loading shared libraries: libz.so.1: cannot open shared object file: No such file or directory
- mybatis中的mapper.xml
- 013--VS2013 C++ 地图贴图-其它格式图片
- FullCalendar应用——读取JSON数据
- SPOJ3267--D-query (主席树入门练习)
- 今天工作中遇到的根据用户id取得产品大类和相关小类的问题
- [js高手之路] html5 canvas动画教程 - 实时获取鼠标的当前坐标
- ShoneSharp语言(S#)的设计和使用介绍—数值Double
- web攻击
- h5与c3权威指南笔记--css3结构性伪类选择器root,not,empty,target
- bzoj千题计划321:bzoj5251: [2018多省省队联测]劈配(网络流 + 二分)
- 改变input标签中placeholder显示的颜色
- 【scrapy】关于xpath helper中能匹配,但是在程序里匹配为none的问题
- GENA
- [原]git的使用(五)---删除文件
热门文章
- 2016 Multi-University Training Contest 1 T3
- 记一次Maven发布Jar包中文乱码解决方法
- Journal of Proteomics Research | 利用混合蛋白质组模型对MBR算法中错误转移鉴定率的评估
- jdbc连接数据库三种方式
- Loadrunner 11安装和破解
- Functional mechanism: regression analysis under differential privacy_阅读报告
- python3正则提取字符串里的中文
- MySQL----多表操作
- 洛谷 P1438 无聊的数列 题解
- leetcode 签到 914. 卡牌分组