Spark详解(07) - SparkStreaming

SparkStreaming概述

Spark Streaming用于流式数据的处理。

Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、HDFS等。

数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。

而结果也能保存在很多地方,如HDFS、数据库等。

Spark Streaming架构原理

什么是Dstream

SparkCore => RDD

SparkSQL => DataFrame、DataSet

Spark Streaming使用离散化流(Discretized Stream)作为抽象表示,叫作Dstream

DStream是随时间推移而收到的数据的序列。

在DStream内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此得名"离散化")。

所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。

架构图

整体`架构图

SparkStreaming架构图

背压机制

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数"spark.streaming.receiver.maxRate"的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

通过属性"spark.streaming.backpressure.enabled"来控制是否启用背压机制,默认值false,即不启用。

Spark Streaming特点

易用

容错

易整合到Spark体系

DStream入门

WordCount案例实操

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

1)添加依赖

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.12</artifactId>

<version>3.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.12</artifactId>

<version>3.0.0</version>

</dependency>

</dependencies>

2)编写代码

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3.  
  4. object SparkStreaming01_WordCount {
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.         //1.初始化Spark配置信息
  8.         val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
  9.         //2.初始化SparkStreamingContext
  10.         val ssc = new StreamingContext(sparkConf, Seconds(3))
  11.  
  12.         //3.通过监控端口创建DStream,读进来的数据为一行行
  13.         val lineDStream = ssc.socketTextStream("hadoop102", 9999)
  14.  
  15.         //3.1 将每一行数据做切分,形成一个个单词
  16.         val wordDStream = lineDStream.flatMap(_.split(" "))
  17.         //3.2 将单词映射成元组(word,1)
  18.         val wordToOneDStream = wordDStream.map((_, 1))
  19.         //3.3 将相同的单词次数做统计
  20.         val wordToSumDStream = wordToOneDStream.reduceByKey(_+_)
  21.         //3.4 打印
  22.         wordToSumDStream.print()
  23.  
  24.         //4 启动SparkStreamingContext
  25.         ssc.start()
  26.         // 将主线程阻塞,主线程不退出
  27.         ssc.awaitTermination()
  28.     }
  29. }

3)更改日志打印级别

将log4j.properties文件添加到resources里面,就能更改打印日志的级别为error

log4j.rootLogger=error, stdout,R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender

log4j.appender.R.File=../log/agent.log

log4j.appender.R.MaxFileSize=1024KB

log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout

log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

4)启动程序并通过netcat发送数据:

[hadoop@hadoop102 ~]$ nc -lk 9999

hello spark

5)在Idea控制台输出如下内容:

-------------------------------------------

Time: 1602731772000 ms

-------------------------------------------

(hello,1)

(spark,1)

注意:目前用的算子,只能处理本批次数据的累加,不能统计所有批次总的单词个数。

WordCount解析

DStream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。

在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成了DStream。对这些RDD的转换是由Spark引擎来计算。

说明:DStream中批次与批次之间计算相互独立。如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源。通常情况,批次设置时间要大于计算时间。

DStream创建

RDD队列

测试方法:

使用ssc.queueStream(queueOfRDDs)来创建DStream

将每一个推送到这个队列中的RDD,都会作为一个DStream处理。

需求:循环创建几个RDD,将RDD放入队列。通过SparkStreaming创建DStream,计算WordCount

编写代码

结果展示(oneAtATime = false)

-------------------------------------------

Time: 1603347444000 ms

-------------------------------------------

15

-------------------------------------------

Time: 1603347448000 ms

-------------------------------------------

30

-------------------------------------------

Time: 1603347452000 ms

-------------------------------------------

30

说明:如果一个批次中有多个RDD进入队列,最终计算前都会合并到一个RDD计算

自定义数据源

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

3.2.2 案例实操

需求:自定义数据源,实现监控某个端口号,获取该端口号内容。

2)自定义数据源

3)测试

先启动nc服务,再启动SparkStreaming程序

[hadoop@hadoop102 ~]$ nc -lk 9999

hello spark

Kafka数据源(面试、开发重点)

ReceiverAPI需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题:接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。

DirectAPI是由计算的Executor来主动消费Kafka的数据,速度由自身控制。

注意:目前spark3.0.0以上版本只有Direct模式。

http://spark.apache.org/docs/2.4.7/streaming-kafka-integration.html

http://spark.apache.org/docs/3.0.0/streaming-kafka-0-10-integration.html

总结:不同版本的offset存储位置

0-8 ReceiverAPI offset默认存储在:Zookeeper中

0-8 DirectAPI offset默认存储在:CheckPoint

手动维护:MySQL等有事务的存储系统

0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题

手动维护:MySQL等有事务的存储系统

  • Kafka 0-10 Direct模式

1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

2)导入依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

3)编写代码

  1. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
  2. import org.apache.kafka.common.serialization.StringDeserializer
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  5. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7.  
  8. object SparkStreaming04_DirectAuto {
  9.  
  10.     def main(args: Array[String]): Unit = {
  11.  
  12.         //1.创建SparkConf
  13.         val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")
  14.  
  15.         //2.创建StreamingContext
  16.         val ssc = new StreamingContext(sparkConf, Seconds(3))
  17.  
  18.         //3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化
  19.         val kafkaPara: Map[String, Object] = Map[String, Object](
  20.             ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  21.             ConsumerConfig.GROUP_ID_CONFIG -> "group_1",
  22.             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  23.             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
  24.         )
  25.  
  26.         //4.读取Kafka数据创建DStream
  27.         val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  28.             ssc,
  29.             LocationStrategies.PreferConsistent, //优先位置
  30.             ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara)// 消费策略:(订阅多个主题,配置参数)
  31.         )
  32.  
  33.         //5.将每条消息的KV取出
  34.         val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
  35.  
  36.         //6.计算WordCount
  37.         valueDStream.flatMap(_.split(" "))
  38.             .map((_, 1))
  39.             .reduceByKey(_ + _)
  40.             .print()
  41.  
  42.         //7.开启任务
  43.         ssc.start()
  44.         ssc.awaitTermination()
  45.     }
  46. }

4)测试

(1)分别启动Zookeeper和Kafka集群

zk.sh start

kf.sh start

(2)创建一个Kafka的Topic主题testTopic,两个分区

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 1 --partitions 2 --topic testTopic

(3)查看Topic列表

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka -list

(4)查看Topic详情

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --describe --topic testTopic

(5)创建Kafka生产者

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic testTopic

Hello spark

Hello spark

(6)创建Kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic testTopic

5)查看_consumer_offsets主题中存储的offset

bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group group_1

GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET

Group_1 testTopic 0 13 13

在生产者中生产数据,再次观察offset变化

DStream转换

DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态转化操作

无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。

常规无状态转化操作

DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。

注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。

函数名称

目的

Scala示例

函数签名

map()

对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream。

ds.map(x=>x + 1)

f: (T) -> U

flatMap()

对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。

ds.flatMap(x => x.split(" "))

f: T -> Iterable[U]

filter()

返回由给定DStream中通过筛选的元素组成的DStream

ds.filter(x => x != 1)

f: T -> Boolean

repartition()

改变DStream的分区数

ds.repartition(10)

N / A

reduceByKey()

将每个批次中键相同的记录规约。

ds.reduceByKey( (x, y) => x + y)

f: T, T -> T

groupByKey()

将每个批次中的记录根据键分组。

ds.groupByKey()

N / A

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的。

Transform

需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。

  1. 代码编写
    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    4. import org.apache.spark.streaming.{Seconds, StreamingContext}
    5.  
    6. object SparkStreaming05_Transform {
    7.  
    8.     def main(args: Array[String]): Unit = {
    9.         //1 创建SparkConf
    10.         val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
    11.         //2 创建StreamingContext
    12.         val ssc = new StreamingContext(sparkConf, Seconds(3))
    13.  
    14.         //3 创建DStream
    15.         val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    16.  
    17.         // Driver端执行,全局一次
    18.         println("111111111:" + Thread.currentThread().getName)
    19.  
    20.         //4 转换为RDD操作
    21.         val wordToSumDStream: DStream[(String, Int)] = lineDStream.transform(
    22.             rdd => {
    23.                 // Driver端执行(ctrl+n JobGenerator),一个批次一次
    24.                 println("222222:" + Thread.currentThread().getName)
    25.                 val words: RDD[String] = rdd.flatMap(_.split(" "))
    26.                 val wordToOne: RDD[(String, Int)] = words.map(x=>{
    27.                     // Executor端执行,和单词个数相同
    28.                     println("333333:" + Thread.currentThread().getName)
    29.                     (x, 1)
    30.                 })
    31.                 val value: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    32.                 value
    33.             }
    34.         )
    35.         //5 打印
    36.         wordToSumDStream.print
    37.  
    38.         //6 启动
    39.         ssc.start()
    40.         ssc.awaitTermination()
    41.     }
    42. }

有状态转化操作

有状态转化操作:计算当前批次RDD时,需要用到历史RDD的数据。

UpdateStateByKey

updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。

updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。

注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

checkpoint小文件过多

checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次

0)需求:更新版的WordCount

1)编写代码

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4.  
  5. object sparkStreaming06_updateStateByKey {
  6.  
  7.     // 定义更新状态方法,参数seq为当前批次单词次数,state为以往批次单词次数
  8.     val updateFunc = (seq: Seq[Int], state: Option[Int]) => {
  9.         // 当前批次数据累加
  10.         val currentCount = seq.sum
  11.         // 历史批次数据累加结果
  12.         val previousCount = state.getOrElse(0)
  13.         // 总的数据累加
  14.         Some(currentCount + previousCount)
  15.     }
  16.  
  17.     def createSCC(): StreamingContext = {
  18.         //1 创建SparkConf
  19.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
  20.         //2 创建StreamingContext
  21.         val ssc = new StreamingContext(conf, Seconds(3))
  22.  
  23.         ssc.checkpoint("./ck")
  24.  
  25.         //3 获取一行数据
  26.         val lines = ssc.socketTextStream("hadoop102", 9999)
  27.         //4 切割
  28.         val words = lines.flatMap(_.split(" "))
  29.         //5 统计单词
  30.         val wordToOne = words.map(word => (word, 1))
  31.         //6 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
  32.         val stateDstream = wordToOne.updateStateByKey[Int](updateFunc)
  33.         stateDstream.print()
  34.  
  35.         ssc
  36.     }
  37.  
  38.     def main(args: Array[String]): Unit = {
  39.  
  40.         val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck",()=>createSCC())
  41.  
  42.         //7 开启任务
  43.         ssc.start()
  44.         ssc.awaitTermination()
  45.     }
  46. }

2)启动程序并向9999端口发送数据

nc -lk 9999

hello hadoop

hello hadoop

3)结果展示

-------------------------------------------

Time: 1603441344000 ms

-------------------------------------------

(hello,1)

(hadoop,1)

-------------------------------------------

Time: 1603441347000 ms

-------------------------------------------

(hello,2)

(hadoop,2)

4)原理说明

WindowOperations(窗口函数)

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

窗口时长:计算内容的时间范围;

滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集批次大小的整数倍。

如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。

Window

1)基本语法:window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream。

2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

3)代码编写:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.dstream.DStream
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4.  
  5. object SparkStreaming07_window {
  6.  
  7.     def main(args: Array[String]): Unit = {
  8.  
  9.         // 1 初始化SparkStreamingContext
  10.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
  11.         val ssc = new StreamingContext(conf, Seconds(3))
  12.  
  13.         // 2 通过监控端口创建DStream,读进来的数据为一行行
  14.         val lines = ssc.socketTextStream("hadoop102", 9999)
  15.  
  16.         // 3 切割=》变换
  17.         val wordToOneDStream = lines.flatMap(_.split(" "))
  18.             .map((_, 1))
  19.  
  20.         // 4 获取窗口返回数据
  21.         val wordToOneByWindow: DStream[(String, Int)] = wordToOneDStream.window(Seconds(12), Seconds(6))
  22.  
  23.         // 5 聚合窗口数据并打印
  24.         val wordToCountDStream: DStream[(String, Int)] = wordToOneByWindow.reduceByKey(_+_)
  25.         wordToCountDStream.print()
  26.  
  27.         // 6 启动=》阻塞
  28.         ssc.start()
  29.         ssc.awaitTermination()
  30.     }
  31. }

5)如果有多批数据进入窗口,最终也会通过window操作变成统一的RDD处理。

reduceByKeyAndWindow

1)基本语法:

  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。

2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

3)代码编写:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3.  
  4. object SparkStreaming08_reduceByKeyAndWindow {
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.  
  8.         // 1 初始化SparkStreamingContext
  9.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
  10.         val ssc = new StreamingContext(conf, Seconds(3))
  11.  
  12.         // 保存数据到检查点
  13.         ssc.checkpoint("./ck")
  14.  
  15.         // 2 通过监控端口创建DStream,读进来的数据为一行行
  16.         val lines = ssc.socketTextStream("hadoop102", 9999)
  17.  
  18.         // 3 切割=》变换
  19.         val wordToOne = lines.flatMap(_.split(" "))
  20.                          .map((_, 1))
  21.  
  22.         // 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒
  23.         val wordCounts = wordToOne.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))
  24.  
  25.         // 5 打印
  26.         wordCounts.print()
  27.  
  28.         // 6 启动=》阻塞
  29.         ssc.start()
  30.         ssc.awaitTermination()
  31.     }
  32. }

reduceByKeyAndWindow(反向Reduce)

1)基本语法:

  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并"反向reduce"离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的"加""减"计数。通过前边介绍可以想到,这个函数只适用于"可逆的reduce函数",也就是这些reduce函数有相应的"反reduce"函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。

2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

3)代码编写:

  1. import org.apache.spark.{HashPartitioner, SparkConf}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3.  
  4. object SparkStreaming09_reduceByKeyAndWindow_reduce {
  5.  
  6.     def main(args: Array[String]): Unit = {
  7.  
  8.         // 1 初始化SparkStreamingContext
  9.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
  10.         val ssc = new StreamingContext(conf, Seconds(3))
  11.  
  12.         // 保存数据到检查点
  13.         ssc.checkpoint("./ck")
  14.  
  15.         // 2 通过监控端口创建DStream,读进来的数据为一行行
  16.         val lines = ssc.socketTextStream("hadoop102", 9999)
  17.  
  18.         // 3 切割=》变换
  19.         val wordToOne = lines.flatMap(_.split(" "))
  20.             .map((_, 1))
  21.  
  22.         // 4 窗口参数说明: 算法逻辑,窗口12秒,滑步6秒
  23.         /*
  24.         val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(
  25.             (a: Int, b: Int) => (a + b),
  26.             (x: Int, y: Int) => (x - y),
  27.             Seconds(12),
  28.             Seconds(6)
  29.         )*/
  30.  
  31.         // 处理单词统计次数为0的问题
  32.         val wordToSumDStream: DStream[(String, Int)]= wordToOne.reduceByKeyAndWindow(
  33.             (a: Int, b: Int) => (a + b),
  34.             (x: Int, y: Int) => (x - y),
  35.             Seconds(12),
  36.             Seconds(6),
  37.             new HashPartitioner(2),
  38.             (x:(String, Int)) => x._2 > 0
  39.         )
  40.  
  41.         // 5 打印
  42.         wordToSumDStream.print()
  43.  
  44.         // 6 启动=》阻塞
  45.         ssc.start()
  46.         ssc.awaitTermination()
  47.     }
  48. }

Window的其他操作

(1)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

(2)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

DStream输出

DStream通常将数据输出到,外部数据库或屏幕上。

DStream与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不会启动。

1)输出操作API如下:

  1. saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。"prefix-Time_IN_MS[.suffix]"。
  2. saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将DStream中的数据保存为 SequenceFiles 。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。
  3. saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。

注意:以上操作都是每一批次写出一次,会产生大量小文件,在生产环境,很少使用。

  1. print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。
  2. foreachRDD(func):这是最通用的输出操作,即将函数func用于产生DStream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者写入数据库。

在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。

  1. foreachRDD代码实操
    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3.  
    4. object SparkStreaming10_output {
    5.  
    6.     def main(args: Array[String]): Unit = {
    7.  
    8.         // 1 初始化SparkStreamingContext
    9.         val conf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
    10.         val ssc = new StreamingContext(conf, Seconds(3))
    11.  
    12.         // 2 通过监控端口创建DStream,读进来的数据为一行行
    13.         val lineDStream = ssc.socketTextStream("hadoop102", 9999)
    14.  
    15.         // 3 切割=》变换
    16.         val wordToOneDStream = lineDStream.flatMap(_.split(" "))
    17.             .map((_, 1))
    18.  
    19.         // 4 输出
    20.         wordToOneDStream.foreachRDD(
    21.             rdd=>{
    22.                 // Driver端执行(ctrl+n JobScheduler),一个批次一次
    23.                 // JobScheduler 中查找(ctrl + f)streaming-job-executor
    24.                 println("222222:" + Thread.currentThread().getName)
    25.  
    26.                 rdd.foreachPartition(
    27.                     //5.1 测试代码
    28.                     iter=>iter.foreach(println)
    29.  
    30.                     //5.2 企业代码
    31.                     //5.2.1 获取连接
    32.                     //5.2.2 操作数据,使用连接写库
    33.                     //5.2.3 关闭连接
    34.                 )
    35.             }
    36.         )
    37.  
    38.         // 5 启动=》阻塞
    39.         ssc.start()
    40.         ssc.awaitTermination()
    41.     }
    42. }

3)注意

(1)连接不能写在Driver层面(序列化)

(2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

(3)增加foreachPartition,在分区创建(获取)。

优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。

关闭方式:使用外部文件系统来控制内部程序关闭。

  1. 主程序
    1. import java.net.URI
    2. import org.apache.hadoop.conf.Configuration
    3. import org.apache.hadoop.fs.{FileSystem, Path}
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.streaming.dstream.ReceiverInputDStream
    6. import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
    7.  
    8. object SparkStreaming11_stop {
    9.  
    10.     def main(args: Array[String]): Unit = {
    11.  
    12.         //1.初始化Spark配置信息
    13.         val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
    14.         //2.初始化SparkStreamingContext
    15.         val ssc: StreamingContext = new StreamingContext(sparkconf, Seconds(3))
    16.  
    17.         // 设置优雅的关闭
    18.         sparkconf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    19.         // 接收数据
    20.         val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    21.         // 执行业务逻辑
    22.         lineDStream.flatMap(_.split(" ")).map((_,1)).print()
    23.  
    24.         // 开启监控程序
    25.         new Thread(new MonitorStop(ssc)).start()
    26.  
    27.         //4 启动SparkStreamingContext
    28.         ssc.start()
    29.  
    30.         // 将主线程阻塞,主线程不退出
    31.         ssc.awaitTermination()
    32.     }
    33. }
    34.  
    35. // 监控程序
    36. class MonitorStop(ssc: StreamingContext) extends Runnable{
    37.  
    38.     override def run(): Unit = {
    39.         // 获取HDFS文件系统
    40.         val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration(),"hadoop")
    41.  
    42.         while (true){
    43.             Thread.sleep(5000)
    44.             // 获取/stopSpark路径是否存在
    45.             val result: Boolean = fs.exists(new Path("hdfs://hadoop102:8020/stopSpark"))
    46.  
    47.             if (result){
    48.                 val state: StreamingContextState = ssc.getState()
    49.                 // 获取当前任务是否正在运行
    50.                 if (state == StreamingContextState.ACTIVE){
    51.                     // 优雅关闭
    52.                     ssc.stop(stopSparkContext = true, stopGracefully = true)
    53.                     System.exit(0)
    54.                 }
    55.             }
    56.         }
    57.     }
    58. }

2)测试

(1)发送数据

nc -lk 9999

hello

(2)启动Hadoop集群

sbin/start-dfs.sh

hadoop fs -mkdir /stopSpark

最新文章

  1. jQuery UI dialog
  2. sql 中各种锁随记
  3. Gitlab仓库规范实践建议
  4. 63. Unique Paths II
  5. opencv学习笔记(02)——遍历图像(指针法)
  6. Ubuntu启动时直接进入命令行模式
  7. win7笔记本电脑实现wifi共享
  8. 关于bind函数和connect函数的测试结论
  9. swift(2)元祖(Tuple)
  10. 【Jquery系列】prop和attr区别
  11. c++---天梯赛---查验身份证
  12. 【嵌入式开发】gcc 学习笔记(一) - 编译C程序 及 编译过程
  13. Android Gradle 学习笔记(一):Gradle 入门
  14. css实现圆形倒计时效果
  15. 各种数据库连接字符串 -- c#
  16. SpringBoot 三种方式配置 Druid(包括纯配置文件配置)
  17. mysql 数据库表备份和还原
  18. 大数据-10-Spark入门之支持向量机SVM分类器
  19. web:频繁刷新浏览器的页面【小工具】
  20. 补充 3:Golang 一些特性

热门文章

  1. C++编程范式(函数)
  2. OCI runtime exec failed: exec failed: unable to start container process: exec: &quot;mongo&quot;: executable file not found in $PATH: unknown
  3. JavaScript基本语法(函数与对象)
  4. C语言编译环境中的 调试功能及常见错误提示
  5. 齐博x1如何开启自定义标签模板功能
  6. 《Java并发编程的艺术》读书笔记:二、Java并发机制的底层实现原理
  7. 《Java并发编程的艺术》读书笔记:一、并发编程的目的与挑战
  8. 一篇了解全MVCC
  9. 基于SqlSugar的开发框架循序渐进介绍(20)-- 在基于UniApp+Vue的移动端实现多条件查询的处理
  10. Python基础之模块:5、 第三方模块 requests模块 openpyxl模块