package test
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext object demo9 {
def main (args : Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("logapp") val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.socketTextStream("localhost", 9999).map(x =>x.split(" ")) lines.foreachRDD(rdd => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
//构造case class: DapLog,提取日志中相应的字段
val logDataFrame = rdd.map(w => DapLog(w(0).toInt,w(1),w(2),w(3)+" "+w(4))).toDF() //注册为tempTable
//logDataFrame.registerTempTable("log")
logDataFrame.createOrReplaceTempView("log")
//查询该批次的字段 to_timestamp($"event_time", "MM/dd/yyyy HH:mm:ss")
val logCountsDataFrame = sqlContext.sql("SELECT login_id,user,event_name,to_timestamp(event_time, 'yyyy-MM-dd HH:mm:ss') as event_time,lead(event_time,1,null) over(partition by login_id order by login_id ASC) as nextline_time FROM log")
//打印查询结果
val countN = logCountsDataFrame.count()
if (countN != 0){
logCountsDataFrame.show()
//下面保存会出错,我注释掉了/Users/huiliyang/streaming //logCountsDataFrame.write.json("/Users/huiliyang/streaming/cc")
//logCountsDataFrame.write.parquet("/Users/huiliyang/streaming/bb") val prop = new Properties()
prop.put("user", "root")
prop.put("password", "yh200888")
prop.put("driver","com.mysql.jdbc.Driver")
logCountsDataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/school?useUnicode=true&characterEncoding=utf8", "log", prop)
} })
ssc.start()
ssc.awaitTermination() }
} case class DapLog(login_id:Int, user:String, event_name:String, event_time:String) object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
} }

最新文章

  1. winform常用的属性(listview),常用容器(二者结合)
  2. [转]C#常用开源类库收集
  3. [IOS]edgesForExtendedLayout、automaticallyAdjustsScrollViewInsets
  4. C#读取数据库字节流生成图片
  5. 【原创】No matching distribution found for Twisted>=10.0.0 (from scrapy)
  6. 集线器hub、交换机switch、路由器router 的区别
  7. "LC.exe" exited with code -1 错误
  8. 冲突--ScrollView嵌套ListView冲突问题的最优解决方案
  9. UNITY打包问题
  10. vim高级编辑(一)
  11. flume从kafka读取数据到hdfs中的配置
  12. mongodb 参数优化
  13. 数据库 'xxxx' 的事务日志已满。若要查明无法重用日志中的空间的原因
  14. Ubuntu install flash
  15. 利用百度OCR实现验证码自动识别
  16. 030.Zabbix分布式部署
  17. Abp.AutoMapper扩展(1) --static class AutoMapExtensions
  18. 2.python知识点总结
  19. HttpClient-----待补充
  20. 7.Reverse Integer (INT; Overflow)

热门文章

  1. js基础用法 ,基础语法
  2. NX二次开发-UFUN读取工程图注释UF_DRF_ask_text_data
  3. sqlserver 获取实例上用户数据库的数据字典
  4. 2019 IEEEXtreme 13.0 Impact Factor 影响因子
  5. WebApi 如何 优雅的 对 输入输出 解密加密
  6. Winform 获取桌面设备上下文
  7. list集合排序2
  8. react添加多个域名proxy代理,跨域
  9. ReentrantLock与synchronized的区别
  10. 一个切图仔的HTML笔记