updataStateByKey算子的使用
2024-10-18 00:29:53
updataStateByKeyApp.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object updataStateByKeyApp extends App {
//配置入口点
val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
val ssc= new StreamingContext(conf, Seconds(1))
//设置checkpoint的目录
ssc.checkpoint(".")
//输入数据流(DStream)
val lines = ssc.socketTextStream("localhost", 9999)
//todo...
val pairs = lines.flatMap(_.split(" ")).map((_,1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
//输出打印到控制台
runningCounts.print()
//启动StreamingContext,接收数据,然后处理数据
ssc.start()
ssc.awaitTermination()
/**
* 把当前的数据去更新已有的或者是老的数据
* @param currentValues 当前的
* @param preValues 老的
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues : Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}
最新文章
- Unix NetWork Programming(unix环境编程)——环境搭建(解决unp.h等源码编译问题)
- deep learning...深入学习深度学习 --工具篇
- 15.6.2 Configuring the Merge Threshold for index pages[innodb]
- iOS-H5学习篇-02
- jsp\struts1.2\struts2 中文件上传(转)
- OceanBase架构浅析(一)
- 批量导入Excel存在的问题及解决方案
- go循环
- 修改linux终端命令行颜色
- ulua学习笔记(二):官方资料及问题解决方案
- How to add “Maven Managed Dependencies” library in build path eclipse
- 安装edX DevStack
- JAVA实现前几秒几分钟几天前几年源码
- IE6双倍margin间距解决方法
- 一个有趣的问题——MySQL中varchar的最大长度
- BZOJ 1324 Exca神剑 最小割
- MongoDB基础教程系列--第八篇 MongoDB 副本集实现复制功能
- 由form表单来说说前后台数据之间的交互
- bandit_pass
- ACM在线模板
热门文章
- 当时学习《鸟哥的Linux私房菜-基础学习篇》记录的点
- 5.JavaCC官方入门指南-概述
- [Go] tcp服务下的数据传递
- [Linux] deepin系统添加PHP仓库源出错Error: could not find a distribution template for Deepin/stable
- echarts使用简介
- Metrics、Tracing、Logging的融合
- 浅谈js的类数组对象arguments
- 第50 课C++对象模型分析——成员变量(上)
- Apache(基于端口号)
- Appium自动化WebView中元素的操作