outputMysqlApp.scala

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext } object outputMysqlApp extends App { //配置入口点
val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
val ssc= new StreamingContext(conf, Seconds(1)) //输入数据流(DStream)
val lines = ssc.socketTextStream("localhost", 9999) //todo...
val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 方式三:
words.foreachRDD ( rdd => {
rdd.foreachPartition(partitionOfRecords => { if (partitionOfRecords.size > 0) {
val connection = createNewConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) vlaues('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
}) connection.close()
}
})
}) //启动StreamingContext,接收数据,然后处理数据
ssc.start()
ssc.awaitTermination() //创建Mysql数据库连接/**
/**
* 获取Mysql数据库连接
* @return 注意返回值,这块不能为空
*/
def createNewConnection()= {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.1.100:3306/streaming_mysql","root","root")
}
}

最新文章

  1. Java中读取properties资源文件
  2. CSS知识点补充
  3. linux打开文件数量的查看方法
  4. Financial Management 分类: POJ 2015-06-11 10:51 12人阅读 评论(0) 收藏
  5. 项目分析(PLUG)
  6. 【查找结构3】平衡二叉查找树 [AVL]
  7. Android输入输出系统之TouchEvent流程
  8. (转)从集中到分布,解读网络视频IT架构变迁
  9. The Tower of Babylon
  10. 通过request读取所有参数
  11. 数据结构之后缀数组suffix array
  12. 201521123047 《Java学习笔记》第二周学习总结
  13. HTML笔记<note2>
  14. Hive 本地调试方法
  15. ajax 小练习
  16. 062 SparkStream内部原理
  17. 解决eclipse修改后台代码ctrl+s总是【自动重启服务器】问题
  18. LeetCode141.环形链表
  19. [Winform]关于cefsharp触屏设备长按文本内容,崩溃问题的修复
  20. easyui的日期控件

热门文章

  1. 导入部署 hand
  2. [Go] gocron源码阅读-判断是否使用root用户执行
  3. 08 在设备树里描述platform_device【转】
  4. Android刷机
  5. CodeForces - 1228D (暴力+思维+乱搞)
  6. Java面试中遇到的坑【篇二面试干货】
  7. 2016年蓝桥杯B组C/C++决赛题目
  8. map 基本使用
  9. Dockerfil
  10. Physically Based Shader Development for Unity 2017 Develop Custom Lighting Systems (Claudia Doppioslash 著)