简介

sparkStream官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#overview

sparkStream是构建在spark core之上的实时流处理框架,它支持很多的数据源,如:

你可以从kafka等各种数据源中实时获取数据流,然后经过spark计算,持久化或者实时的dashBoard展示。

sparkStream的实时计算其实也可以称为微批处理计算,它将数据流按照一定的时间段分割成小批的数据,然后将对数据流的操作转换为对RDD的操作,整个流计算的中间结果进行叠加存储到内存或者外部设备,如图:

代码示例

下面将使用tcp socket作为数据源,每隔1秒钟发送字符数据。sparkstream将在启动以后,将收集10秒的数据作为一个批数据进行统计处理,代码如下:

import java.net.ServerSocket

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext} /**
* @Description sparkStream demo
* @Author lay
* @Date 2018/12/08 21:43
*/
object SparkStreamDemo {
var conf: SparkConf = _
var sc: SparkContext = _
var ssc: StreamingContext = _ def init(): Unit = {
conf = new SparkConf().setAppName("spark stream demo").setMaster("local[2]")
sc = new SparkContext(conf)
sc.setLogLevel("warn")
// 时间片为10秒钟
ssc = new StreamingContext(sc, Seconds(10))
} def main(args: Array[String]): Unit = {
// 初始化socket流
initSocketStream()
// 初始化SparkStream
init()
// 从socket获取DStream
val lines = ssc.socketTextStream("localhost", 8888)
// 统计字数
val wordCount = lines.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_)
// 打印结果
wordCount.print()
// 启动
ssc.start()
println("spark stream started")
} def initSocketStream(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
val serverSocket = new ServerSocket(8888)
val socket = serverSocket.accept()
println("accepted")
for (i <- 1 to 10) {
val text = "what is this\n"
socket.getOutputStream.write(text.getBytes("utf-8"))
Thread.sleep(1000)
}
println("waiting")
Thread.sleep(50000)
socket.close()
serverSocket.close()
println("closed")
}
}).start()
println("thread started")
}
}

注意:

1)这里的master设置为"local[2]",是因为spark起码需要两个线程,一个线程用来接收数据,另一个线程用来处理数据。

2)"what is this\n"这里加了一个'\n'字符,是因为字节流的接收将会以这个字符作为分隔符。

你会看到类似如下的打印:

-------------------------------------------
Time: 1544281700000 ms
-------------------------------------------
(this,10)
(is,10)
(what,10)

最新文章

  1. 用CIL写程序:从“call vs callvirt”看方法调用
  2. docker容器与容器云读书笔记1
  3. C#利用服务器实现客户端之间通信
  4. Quick-Cocos2d-x初学者游戏教程1
  5. 在C#中创建和读取XML文件
  6. Convert HTML Entities
  7. sublime text3 安装package
  8. js页面跳转整理(转载未整理)
  9. Easyui Combotree问题及其相关
  10. osg事件处理器osgGA::GUIEventHandler handle
  11. Upgrade Image&amp;ntext to varbinarymax&amp;nvarchar(max)
  12. CCNA 6.5
  13. thinksns消息提示的实现机制(转)
  14. 编译cwm-recovery(含部分修改步骤)[转]
  15. UIViewController的生命周期及iOS程序执行顺序
  16. 如何学习php之吐槽
  17. Best jQuery Plugins of the Month – May 2014
  18. LoadTest中内存和线程Troubleshooting实战
  19. 火币网现货API[Python3版]
  20. Another Eight Puzzle

热门文章

  1. Android Studio如何用真机调试
  2. http协议与https协议的区别
  3. 正则表达式的Wed验证应用(40)
  4. day05.2-一个文件的增删改查实例
  5. JAVA日期——java.util.date类的操作
  6. leecode刷题(18)-- 报数
  7. C++中重载、覆盖和隐藏
  8. Elasticsearch NEST – Examples for mapping between Query and C#
  9. jscover使用说明-总体说明
  10. delphi 10.2 ----简单的叠乘例子