使用pyspark 建立spark 的累加器
2024-10-21 03:03:05
一,累加器特征
1,PySpark累加器是一个共享变量,与RDD和DataFrame一起使用,以执行与Map reduce计数器类似的求和和和计数器操作。
2,只有分布在各个节点上的task任务才能更新累加器的数值,并且只有driver 端可以读取数值。
二,累加器的创建和使用
sparkContext.accumulator() 可以定义累加器
add() function 增加或者更新累加器的值
value 属性(累加器中的)可以读取到值。
代码:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("accumulator").getOrCreate() accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x)) #统计rdd内的数值和
print(accum.value) # driver program 中获取值 accuSum=spark.sparkContext.accumulator(0)
def countFun(x): # 功能同上,使用函数方法
global accuSum
accuSum+=x
rdd.foreach(countFun)
print(accuSum.value) accumCount=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:accumCount.add(1)) #作为计数器功能
print(accumCount.value)
最新文章
- 【leetcode】Count Primes(easy)
- Sqlstate解释
- javascript 的基本优化
- Swift - 04 - 浮点型
- 安全运维之:Linux系统账户和登录安全
- swipe方法
- 设计模式--工厂方法模式(Factory method pattern)及应用
- redis hashmap数据结构分析
- weex开发错误汇总
- C# MVC+EF—WebApi
- 【VSCode】Windows下VSCode编译调试c/c++【更新】
- Windows下利用TortoiseSVN搭建本地SVN服务器
- AWS邮件通知服务:实时监控邮件状态
- mysql导入导出数据中文乱码解决方法小结(1、navicat导入问题已解决,创建连接后修改连接属性,选择高级->;将使用Mysql字符集复选框去掉,下拉框选择GBK->;导入sql文件OK;2、phpmyadmin显示乱码的问题也解决,两步:1.将sql文件以utf8的字符集编码另存,2.将文件中sql语句中的字段字符集编码改成utf8,导入OK)
- SQL语句添加删除修改字段[sql server 2000/2005]
- codeforces 547A Mike and Frog
- ubuntu 14.04/14.10 iptables 防火墙设置
- 深入理解Solaris X64系统调用
- 计算机网络课设之基于UDP协议的简易聊天机器人
- Unity Json 之三
热门文章
- Dijkstra求最短路 I(朴素算法)
- 接口介绍以及postman的基本使用
- 解析sensor_msgs::PointCloud2 ROS点云数据
- Vue3中的响应式api
- vue axios请求中断的处理
- JavaWeb 之 Cookie
- [Unity]利用Mesh绘制简单的可被遮挡,可以探测的攻击指示器
- Vue 权限控制 使用自定义指令 代替v-if
- python ddt file_data
- Python中的__new__()方法