1.Spark Streaming功能介绍
1)定义
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams

2.NC服务安装并运行Spark Streaming
1)在线安装nc命令
yum install -y nc
2)运行Spark Streaming 的WordCount
bin/run-example streaming.NetworkWordCount localhost 9999
3)把文件通过管道作为nc的输入,然后观察spark Streaming计算结果
cat test.txt | nc -lk 9999
文件具体内容
hadoop storm spark
hbase spark flume
spark dajiangtai spark
hdfs mapreduce spark
hive hdfs solr
spark flink storm
hbase storm es
3.Spark Streaming工作原理
1)Spark Streaming数据流处理

2)接收器工作原理

3)综合工作原理

4.Spark Streaming编程模型
1)StreamingContext初始化的两种方式
#第一种
val ssc = new StreamingContext(sc, Seconds(5))
#第二种
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
2)Spark Streaming socket代码
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}

//创建StreamingContext,每秒钟计算一次
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

//监听网络端口,参数一:hostname 参数二:port 参数三:存储级别,创建了lines流
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
//flatMap运算
val words = lines.flatMap(_.split(" "))
//map reduce 计算
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
5.Spark Streaming读取Socket流数据
1)spark-shell运行Streaming程序,要么线程数大于1,要么基于集群。
bin/spark-shell --master local[2]
bin/spark-shell --master spark://bigdata-pro01.kfk.com:7077
2)spark 运行模式

3)Spark Streaming读取Socket流数据
a)编写测试代码,并本地运行
object TestStreaming {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}

val spark=SparkSession.builder().master("local[2]").setAppName("streaming").getOrCreate()
val sc = spark.SparkContext

val ssc = new StreamingContext(sc, Seconds(5))

//监听网络端口,参数一:hostname 参数二:port 参数三:存储级别,创建了lines流
val lines = ssc.socketTextStream("igdata-pro02.kfk.com", 9999, StorageLevel.MEMORY_AND_DISK_SER)
//flatMap运算
val words = lines.flatMap(_.split(" "))
//map reduce 计算
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
b)启动nc服务发送数据
nc -lk 9999
6.Spark Streaming保存数据到外部系统
1)保存到mysql数据库

2)保存到hdfs

7.Spark Streaming与Kafka集成
1)Maven引入相关依赖:spark-streaming-kafka
2)编写测试代码并启动运行
object StreamingKafka8 {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
.master("local[2]")
.appName("streaming").getOrCreate()

val sc =spark.sparkContext;
val ssc = new StreamingContext(sc, Seconds(5))

// Create direct kafka stream with brokers and topics
val topicsSet =Set("weblogs")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

val lines = kafkaStream.map(x => x._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}
3)启动Kafka服务并测试生成数据
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com --topic weblogs

最新文章

  1. 未能添加对***.dll的引用 问题解决方法
  2. Lind.DDD.Utils.HttpHelper里静态对象引出的Http超时问题
  3. 【iCore3 双核心板_ uC/OS-III】例程七:信号量——任务同步
  4. thinkphp开发规范
  5. 八 mybatis查询缓存(一级缓存,二级缓存)和ehcache整合
  6. 夺命雷公狗---DEDECMS----26dedecms面包屑导航的实现
  7. 利用Jmeter做接口测试
  8. [Gauss]POJ2065 SETI
  9. Code Forces Gym 100886J Sockets(二分)
  10. IOS 轻量级数据持久化 DataLite
  11. React设计思想
  12. SpringMVC的坑
  13. MEF 基础简介 二
  14. Vue(十)生命周期
  15. Cracking The Coding Interview 4.7_暂存
  16. 基于TensorFlow Serving的深度学习在线预估
  17. Server.Transfer和Response.Redirect的区别
  18. 20155301 2016-2017-2 《Java程序设计》第1周学习总结
  19. Linux的setup命令启动服务名称和功能
  20. 008-shiro与spring web项目整合【二】认证、授权、session管理

热门文章

  1. Java基础(三十四)String、StringBuffer类和数据缓冲区Buffer类
  2. ARM、X86和AI处理器的区别
  3. django-数据库之连接数据库
  4. Sping MVC不使用任何注解处理(jQuery)Ajax请求(基于XML配置)
  5. Element-ui-安装
  6. redis面试题及答案
  7. yii2中commands的简单应用
  8. [2018-01-13] 什么是Django
  9. IDEA复制多行及对多行代码上下左右移动
  10. NOIP原题 斗地主(20190804)