【慕课网实战】Spark Streaming实时流处理项目实战笔记十之铭文升级版
铭文一级:
第八章:Spark Streaming进阶与案例实战
updateStateByKey算子
需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态)
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
需求:将统计结果写入到MySQL
create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
通过该sql将统计结果写入到MySQL
insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
存在的问题:
1) 对于已有的数据做更新,而是所有的数据均为insert
改进思路:
a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
b) 工作中:HBase/Redis
2) 每个rdd的partition创建connection,建议大家改成连接池
window:定时的进行一个时间段内的数据处理
window length : 窗口的长度
sliding interval: 窗口的间隔
这2个参数和我们的batch size有关系:倍数
每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc
==> 每隔sliding interval统计前window length的值
铭文二级:
第七章:Spark Streaming核心概念与编程
实战:Spark Streaming处理文件系统数据=>
与处理socket数据类似
1.建FileWordCount类
2.建监控的路径,本次为:/Users/rocky/data/imooc/ss
3.只需修改SocketTextStream成textFileStream
参数设置为file:///Users/rocky/data/imooc/ss/ /* 前面的“///”、最后的“/” */
4.vi test.log //里面有内容,然后cp到监控的路径
nc监控6789端口即可
注意事项:
官网Basic Sources
1、必须每次相同的文件格式
2、必须使用移动的方式将内容move到路径
3、一旦移动,无法再修改里面的内容
第八章:Spark Streaming进阶与案例实战
实战:使用UpdateStateByKey算子统计到目前为止累计出现的单词个数
copy一个NetworkWordCount类改成StatefulWordCount
步骤一、将reduceBykey改成UpdateStateByKey
官网代码(两个重要参数:newValues、running):
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
步骤二、自定义代码:
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
步骤三、修改代码:
ssc.checkpoint(".") //一定要设置,运行后文件夹根目录会出现receivedBlockMetadata文件夹
ps:checkpoint一般生产上设置到HDFS的某个文件夹
val result = lines.flatMap(_.split(" ")).map((_,1))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()
实战:计算到目前为止累计出现的单词个数写到mysql中:
ps:mysql知识复习
mysql -uroot -proot //登录mysql
create database imooc_spark; //建立imooc_spark数据库
use imooc_spark; //使用数据库
show tables; //查看表
select * from wordcount; //查看表内容
复制一个类文件(删掉UpdateStateByKey算子的相关内容)
步骤一、copy一个StatefulWordCount类改成ForeachRDDApp类
停掉之前运行的程序,删掉receivedBlockMetadata的文件内容
步骤二、在mysq建表wordcount
word varchar(50) default null,
wordcount int(10) default null
步骤三、提供的自定义代码:
package com.imooc.spark
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//result.print() //此处仅仅是将统计结果输出到控制台
//TODO... 将结果写入到MySQL
// result.foreachRDD(rdd =>{
// val connection = createConnection() // executed at the driver
// rdd.foreach { record =>
// val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
// connection.createStatement().execute(sql)
// }
// })
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 获取MySQL的连接
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark", "root", "root")
}
}
报错分析:
1、connection.createStatement().execute(sql)//没有驱动包,自己引入
2、第一种官网连接会报序列化错误,自己改成partition式连接,如上面代码
3、重复执行,mysql数据库的列名会重复出现,自行使用Hbase或redis等数据库
4、改成连接池的方式
官网代码参考:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
实战:窗口函数的使用(摘自官网)
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
最新文章
- DateEdit控件的使用
- 75篇关于Tomcat源码和机制的文章
- Java for LeetCode 213 House Robber II
- Beta阶段站立会议-02
- 深入了解iPad上的MouseEvent【转】
- jquery生产和开发的区别
- HDU 4870 Rating (2014 多校联合第一场 J)(概率)
- C 一个字符串有三段,第一段原样输出,第二段为要输出字符串的长度,第三段为依据第二段长度补齐第一段
- Leetcode 136 137 260 SingleNumber I II III
- java基础之路(二)上
- 浮点数的陷阱--double i != 10 基本都是对的,不管怎么赋值
- Luogu P3381 (模板题) 最小费用最大流
- Mesos源码分析(3): Mesos Master的启动之二
- [WC2018]通道——边分治+虚树+树形DP
- [UOJ50]链式反应
- Appium 测试APK
- Python基础部分
- 自己总结的,输出到前端JSON的几种方法
- Qt 之 模仿 QQ登陆界面——样式篇
- Linux UDP通信例子
热门文章
- es数组去重的简写
- ROS安装
- django rest framework restful 规范
- Java并发-ConcurrentModificationException原因源码分析与解决办法
- Mac OS 10.12 - 在VMwear Workstation12.5.2中以两种方式进入恢复模式(Recovery)!!!
- POJ 2014.K-th Number 区间第k小 (归并树)
- JSP属性的四种保存范围(page request session application)
- redis 和 kookeeper 连用 构建 redis集群
- Spring Boot学习笔记:JavaMailSender发送邮件
- JWT-Token登陆校验