Spark Streaming 是一个分布式数据流处理框架,它可以近乎实时的处理流数据,它易编程,可以处理大量数据,并且能把实时数据与历史数据结合起来处理。

Streaming 使得 spark 具有了流式处理的能力,它为数据流式处理提供了高层抽象,底层仍然是 spark,所以它具有 spark 的可扩展、可容错、高吞吐量的特点,而且它可以与 spark 的各种库结合使用,如 sparkSQL、MLib、ml 等

总体架构

Spark Streaming 是一个伪实时的流处理框架,它处理的是一个微批次的数据流,就是说他把数据流按照非常小的时间间隔切分成一批一批的数据,然后每次处理一批数据;

每一批数据仍然以 RDD 方式存储,然后使用 spark core 进行处理;

RDD 操作结果也是一批一批的输出;

数据流来源

Streaming 支持多种数据流来源,比较常用的有 TCP网络传输、Kafa、Flume 等,还有 Twitter、ZeroMQ、MQTT 等,

它也可以把一个文件当成流来处理,

也可以自定义数据流来源;

流数据经过 spark 处理后可以流向各种地方;

总结一下如下图

接收器

也叫数据采集器,接收器收到数据后存储在内存中;

Streaming 在每个 worker 上为每个数据流来源创建一个接收器;

一个 spark 应用可以同时接收多个数据流来源,然后统一处理;

API

Streaming API 有两个高度抽象 StreamingContext 和 离散流 DStream

StreamingContext

他是 Streaming 库的入口点,使得 Streaming 连接到 spark 上;

每个 Streaming 应用必须先创建一个 StreamingContext 实例;

创建 StreamingContext 实例

创建方法和 SparkContext 一样,创建 sc 的方法都能用来创建 StreamingContext;

不同的是多了一个参数,指定划分数据流的时间间隔;

from pyspark import SparkContext, StreamingContext, SparkConf
conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master
ssc = StreamingContext(conf=conf, 10) # 10s 间隔 ## 或者这样
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # 直接传入 sc

StreamingContext 实例的方法

ssc.start()     # 启动 流式计算,在这之前,什么也不会发生
ssc.checkpoint('hdfs path') # 定期创建检查点数据,输入为 hdfs 的路径
ssc.stop() # 停止 流式计算
ssc.awaitTermination() # 等待流式计算结束

checkpoint

DStream     【内容比较多,故单独一章】

离散数据流;

他是 Streaming 处理数据流的一个高度抽象,也是一个抽象类,并且定义了一系列对该类的操作;

不同数据流来源有不同的 DStream 类;

DStream 实际上是一个 RDD 序列,Spark Streaming 把对 DStream 的操作转换成对 RDD 的操作;

因为他是 RDD 序列,所以具有 RDD 的特点:不可变、分区、容错

创建 DStream 实例

DStream 创建有两种方式,一种是从数据流来源直接创建,一种是从现有的 DStream 对象转换得到

socketTextStream:创建一个从 TCP 套接字连接 接收数据流的 DStream

3 个参数,host,port,第三个可选,指定接收数据的存储等级

def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)

默认的存储等级为 StorageLevel.MEMORY_AND_DISK_2,顾名思义,表示接收到的数据先存储在内存中,如果内存放不下,多出来的数据会存放到硬盘上;

而且他会对接收到的数据 以 spark 序列化的方式 进行序列化操作;

所以这个存储等级会有序列化的开销,但是减少了 jvm 垃圾回收相关的问题;

接收到的数据会复制多份,提高容错;

选择合适的存储等级可以提高性能,比如 Streaming 采集周期很短,如几秒钟,也就是数据量很小,那么可以指定存储等级为只内存存储 StorageLevel.MEMORY_ONLY

textFileStream:创建一个 DStream 用于监控 hadoop 兼容的文件系统中是否有新文件创建

输入为被监控的目录;

如果有新文件创建,则将作为文本文件读出;

注意,写入被监控目录的文件必须是从同等文件系统中移动过来的,比如 linux 系统,新文件必须是用 mv 命令移动过来的

def textFileStream(self, directory)

actorStream:用户自己定义的 Akka actor 接收器的 DStream

python 好像没这个

KafkaUtils:创建从 Kafka 接收数据流的 DStream

class KafkaUtils(object):

    @staticmethod
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker. :param ssc: StreamingContext object
:param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
:param groupId: The group id for this consumer.
:param topics: Dict of (topic_name -> numPartitions) to consume.
Each partition is consumed in its own thread.
:param kafkaParams: Additional params for Kafka
:param storageLevel: RDD storage level.
:param keyDecoder: A function used to decode key (default is utf8_decoder)
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object

还有一些其他的,参考 pyspark

DStream 操作

内容比较多,后面会专门写一篇博客

DStream 输出

Dstream 可以输出到各种地方,如文件、数据库或者其他应用程序

输出到文件

saveAsTextFiles:将 DStream 保存成文件,他为每个 RDD 创建一个目录,并在每个目录里创建多个副本;

目录名称为 用户定义的前缀-时间戳-可选后缀

DStream.saveAsTextFiles('/usr/lib/text')

saveAsObjectFiles:将 DStream 以序列化对象的形式保存成二进制 SequenceFile 文件,用法同 saveAsTextFiles

saveAsHadoopFiles:只有由键值对组成的 DStream 才能使用该方法

saveAsNewAPIHadoopFiles:自己试试

输出到控制台

pprint(n):打印指定输出元素个数

输出到数据库

foreachRDD:输入一个 func 逐个处理 DStream 中的每一个 RDD;

该方法无需返回任何东西;

    def foreachRDD(self, func):
"""
Apply a function to each RDD in this DStream.
"""
if func.__code__.co_argcount == 1:
old_func = func
func = lambda t, rdd: old_func(rdd)
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
api = self._ssc._jvm.PythonDStream
api.callForeachRDD(self._jdstream, jfunc)

对于 foreachRDD 需要理解的是:

1. func 是把 RDD 作为输入,并且可以使用 RDD 的所有操作;

2. foreachRDD 执行在 Driver 中,而 func 执行在 Executor 中;

3. foreachRDD 不仅仅用于存到数据库

存储到数据库需要注意的是:

1. 数据库连接比较耗时,不要频繁的连接、关闭

2. 数据库连接无法序列化,也就是无法从 Driver 发送给 Executor,故数据库连接只能在 worker 上创建,并且复用

3. RDD 的 foreachPartition 操作可以使用同一个数据库连接保存多个 DStream 中的元素,可在 func 中使用该方法

4. 利用数据库连接池进行优化

5. 也可以采用批量写入的方法来优化数据库存储

示例代码

数据流来源是 TCP 套接字

from __future__ import print_function
import sys from pyspark import SparkContext
from pyspark.streaming import StreamingContext if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
sys.exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount")
# 以指定时间为周期采集实时数据
ssc = StreamingContext(sc, 30) # 采集周期为 x s ### 连接已经开启的 socket 服务器,注意必须事先开启 socket server
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 采集数据
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint() # 打印 DStream
## 如
# (u'a', 1)
# (u'e', 1)
# (u'd', 1)
counts.pprint(2) # 打印 DStream,并指定输出元素个数
## 如果 pprint 输出如上,pprint(2)输出如下
# (u'a', 1)
# (u'e', 1)
counts.saveAsTextFiles('/usr/lib/text') # DStream 保存至文件系统 ssc.checkpoint('/spark') ssc.start() # 启动采集器
ssc.awaitTermination() # 等待 socket ser 终止

TCP 服务器要事先存在,在 linux 上使用如下命令建立 TCP 服务器

nc -lk 9999

注意 nc 命令需要手动安装

数据流来源是 Kafka

import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add sc = SparkContext(master="yarn",appName="PythonSparkStreamingRokidDtSnCount")
ssc = StreamingContext(sc, 2)
zkQuorum = '172.16.89.80:2181' # broker list
topic = {'':1} # topic name : partition
groupid = "test-consumer-group" # consumer group
lines = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
lines1 = lines.flatMap(lambda x: x.split("\n"))
valuestr = lines1.map(lambda x: x.value.decode())
valuedict = valuestr.map(lambda x:eval(x))
message = valuedict.map(lambda x: x["message"])
rdd2 = message.map(lambda x: (time.strftime("%Y-%m-%d",time.localtime(float(x.split("\u0001")[0].split("\u0002")[1])/1000))+"|"+x.split("\u0001")[1].split("\u0002")[1],1)).map(lambda x: (x[0],x[1]))
rdd3 = rdd2.reduceByKey(add)
rdd3.saveAsTextFiles("/tmp/wordcount")
rdd3.pprint()
ssc.start()
ssc.awaitTermination()

其他场景参考 spark 自带的样例

参考资料:

《Spark大数据分析核心概念技术及实践OCR-2017》  电子书

最新文章

  1. Cordova+Ionic之坑
  2. PAT 1017. A除以B (20)
  3. django数据库动态添加列
  4. PHP类与面向对象(二)
  5. lvs keepalived 安装配置详解
  6. 关于Android模拟器键盘不能使用的解决方法
  7. 快速、冒泡排序算法(PHP版)
  8. 阿里云OSS Multipart Upload上传实例
  9. HTML5图形绘制
  10. WPF 16进制byte输入框
  11. Python eval 函数妙用
  12. Android四大组件之Service --- 如何启动和停止Service?
  13. Centos Linux 下Pycharm 安装
  14. eclipse打包jar包
  15. python 列表排序方法sort、sorted技巧篇
  16. linux上单网卡配置使用多个IP地址
  17. 事务 TRANSACTION
  18. 那些年的 网络通信之 TCP/IP 传输控制协议 ip 加 端口 ---
  19. asp.net core 发布到docker 极简步骤
  20. Django内置过滤器详解附代码附效果图--附全部内置过滤器帮助文档

热门文章

  1. POJ1990--POJ 1990 MooFest(树状数组)
  2. __new()__与__init__()
  3. NullPointerException 没有堆栈
  4. JS判定数据类型
  5. Cortex-M3 操作模式与特权等级
  6. LC 676. Implement Magic Dictionary
  7. CSS中盒子模型
  8. SAX解析示例代码和原理
  9. Spring学习之==&gt;IoC
  10. Python排序搜索基本算法之归并排序实例分析