参考:jianshu.com/p/9d2d225c1951

监听socket获取数据,代码如下:
这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息

object SocketStream {
def main(args: Array[String]): Unit = {
//本地测试,设置4核
val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
//以10秒为一个批次
val ssc = new StreamingContext(conf,Seconds(10))
//接收消息
val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
//监测关键字error,出现则print
dstream.filter(_.contains("error")).foreachRDD(rdd=>{
rdd.foreach(println(_))
})
ssc.start()
ssc.awaitTermination()
}
}

  从kafka读取数据,比较常用

object KafkaStream {

  def main(args: Array[String]): Unit = {
//本地测试,设置4核
val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
//以10秒为一个批次
val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
val group_id = "realtime_data" //kafka相关参数
val kafka_param = Map[String,String](
"zookeeper.connect" ->zkQuorum,
"group.id" -> group_id,
"zookeeper.connection.timeout.ms" -> "10000",
"fetch.message.max.bytes" -> "10485760"
)
val topic = Map[String,Int]("test_topic" -> 16)
//接收消息
val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
//监测关键字error,出现则print
dstream.filter(_.contains("error")).foreachRDD(rdd=>{
rdd.foreach(println(_))
}) ssc.start()
ssc.awaitTermination()
}
}

  

最新文章

  1. PHP变量
  2. Easyui 让DataGrid适应浏览器宽度
  3. delphi的取整函数round、trunc、ceil和floor
  4. vue.js使用详解
  5. SQL镜像资料
  6. android-sdks/build-tools/17.0.0/aapt: error while loading shared libraries: libz.so.1: cannot open shared object file: No such file or directory
  7. mybatis中的mapper.xml
  8. 013--VS2013 C++ 地图贴图-其它格式图片
  9. FullCalendar应用——读取JSON数据
  10. SPOJ3267--D-query (主席树入门练习)
  11. 今天工作中遇到的根据用户id取得产品大类和相关小类的问题
  12. [js高手之路] html5 canvas动画教程 - 实时获取鼠标的当前坐标
  13. ShoneSharp语言(S#)的设计和使用介绍—数值Double
  14. web攻击
  15. h5与c3权威指南笔记--css3结构性伪类选择器root,not,empty,target
  16. bzoj千题计划321:bzoj5251: [2018多省省队联测]劈配(网络流 + 二分)
  17. 改变input标签中placeholder显示的颜色
  18. 【scrapy】关于xpath helper中能匹配,但是在程序里匹配为none的问题
  19. GENA
  20. [原]git的使用(五)---删除文件

热门文章

  1. 2016 Multi-University Training Contest 1 T3
  2. 记一次Maven发布Jar包中文乱码解决方法
  3. Journal of Proteomics Research | 利用混合蛋白质组模型对MBR算法中错误转移鉴定率的评估
  4. jdbc连接数据库三种方式
  5. Loadrunner 11安装和破解
  6. Functional mechanism: regression analysis under differential privacy_阅读报告
  7. python3正则提取字符串里的中文
  8. MySQL----多表操作
  9. 洛谷 P1438 无聊的数列 题解
  10. leetcode 签到 914. 卡牌分组