updataStateByKeyApp.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext} object updataStateByKeyApp extends App { //配置入口点
val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
val ssc= new StreamingContext(conf, Seconds(1)) //设置checkpoint的目录
ssc.checkpoint(".") //输入数据流(DStream)
val lines = ssc.socketTextStream("localhost", 9999) //todo...
val pairs = lines.flatMap(_.split(" ")).map((_,1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
//输出打印到控制台
runningCounts.print() //启动StreamingContext,接收数据,然后处理数据
ssc.start()
ssc.awaitTermination() /**
* 把当前的数据去更新已有的或者是老的数据
* @param currentValues 当前的
* @param preValues 老的
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues : Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0) Some(current + pre)
}
}

最新文章

  1. Unix NetWork Programming(unix环境编程)——环境搭建(解决unp.h等源码编译问题)
  2. deep learning...深入学习深度学习 --工具篇
  3. 15.6.2 Configuring the Merge Threshold for index pages[innodb]
  4. iOS-H5学习篇-02
  5. jsp\struts1.2\struts2 中文件上传(转)
  6. OceanBase架构浅析(一)
  7. 批量导入Excel存在的问题及解决方案
  8. go循环
  9. 修改linux终端命令行颜色
  10. ulua学习笔记(二):官方资料及问题解决方案
  11. How to add “Maven Managed Dependencies” library in build path eclipse
  12. 安装edX DevStack
  13. JAVA实现前几秒几分钟几天前几年源码
  14. IE6双倍margin间距解决方法
  15. 一个有趣的问题——MySQL中varchar的最大长度
  16. BZOJ 1324 Exca神剑 最小割
  17. MongoDB基础教程系列--第八篇 MongoDB 副本集实现复制功能
  18. 由form表单来说说前后台数据之间的交互
  19. bandit_pass
  20. ACM在线模板

热门文章

  1. 当时学习《鸟哥的Linux私房菜-基础学习篇》记录的点
  2. 5.JavaCC官方入门指南-概述
  3. [Go] tcp服务下的数据传递
  4. [Linux] deepin系统添加PHP仓库源出错Error: could not find a distribution template for Deepin/stable
  5. echarts使用简介
  6. Metrics、Tracing、Logging的融合
  7. 浅谈js的类数组对象arguments
  8. 第50 课C++对象模型分析——成员变量(上)
  9. Apache(基于端口号)
  10. Appium自动化WebView中元素的操作