将流数据输出到Mysql中
2024-09-08 07:05:39
outputMysqlApp.scala
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext }
object outputMysqlApp extends App {
//配置入口点
val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
val ssc= new StreamingContext(conf, Seconds(1))
//输入数据流(DStream)
val lines = ssc.socketTextStream("localhost", 9999)
//todo...
val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
// 方式三:
words.foreachRDD ( rdd => {
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.size > 0) {
val connection = createNewConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) vlaues('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
}
})
})
//启动StreamingContext,接收数据,然后处理数据
ssc.start()
ssc.awaitTermination()
//创建Mysql数据库连接/**
/**
* 获取Mysql数据库连接
* @return 注意返回值,这块不能为空
*/
def createNewConnection()= {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.1.100:3306/streaming_mysql","root","root")
}
}
最新文章
- Java中读取properties资源文件
- CSS知识点补充
- linux打开文件数量的查看方法
- Financial Management 分类: POJ 2015-06-11 10:51 12人阅读 评论(0) 收藏
- 项目分析(PLUG)
- 【查找结构3】平衡二叉查找树 [AVL]
- Android输入输出系统之TouchEvent流程
- (转)从集中到分布,解读网络视频IT架构变迁
- The Tower of Babylon
- 通过request读取所有参数
- 数据结构之后缀数组suffix array
- 201521123047 《Java学习笔记》第二周学习总结
- HTML笔记<;note2>;
- Hive 本地调试方法
- ajax 小练习
- 062 SparkStream内部原理
- 解决eclipse修改后台代码ctrl+s总是【自动重启服务器】问题
- LeetCode141.环形链表
- [Winform]关于cefsharp触屏设备长按文本内容,崩溃问题的修复
- easyui的日期控件
热门文章
- 导入部署 hand
- [Go] gocron源码阅读-判断是否使用root用户执行
- 08 在设备树里描述platform_device【转】
- Android刷机
- CodeForces - 1228D (暴力+思维+乱搞)
- Java面试中遇到的坑【篇二面试干货】
- 2016年蓝桥杯B组C/C++决赛题目
- map 基本使用
- Dockerfil
- Physically Based Shader Development for Unity 2017 Develop Custom Lighting Systems (Claudia Doppioslash 著)