简单介绍

combineByKey()是最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

其定义如下,我们可以根据这个形式来分别定义createCombiner、mergeValue和mergeCombiners三个函数:

def combineByKey[C](
  createCombiner: V => C, ##A
  mergeValue: (C, V) => C, ##B
  mergeCombiners: (C, C) => C,##C 
  partitioner: Partitioner,   
  mapSideCombine: Boolean = true,
  serializer: Serializer = null

)

自定义combineByKey

以实现一个计算平均值的功能为例来分别说明createCombiner、mergeValue和mergeCombiners三个函数的作用和定义方法。

##A createCombiner(value)

createCombiner: V => C ,这个函数把当前rdd中的值(value)作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作,分区内操作)

def createCombiner(value):

   (value, 1)

##B mergeValue(acc, value)

mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (每个分区内合并)

def mergeValue(acc, value):
# 注意,这里的acc即为createCombiner产生的C。
# 这里,用acc[0]表明为acc这个元组中的第一个元素,在scala中acc._1表示
  (acc[0]+value, acc[1]+1)
###C   mergeCombiners: (acc1, acc2)

mergeCombiners: (C, C) => C,该函数把2个元素C合并 (此函数作用范围在rdd的不同分区间内,跨分区合并)

def mergeCombiners(acc1, acc2):

# 注意,到这一步,表明这个rdd的每条数据都已经被###A和###B捕获匹配完毕

   (acc1[0]+acc2[0], acc1[1]+acc2[1])

案例:

如图,有两个分区,key-value(类别-数量)形式也清楚,我们想知道coffee的平均数量和panda的平均数量。以scala形式写法如下:

val init_data = Array(("coffee", 1), ("coffee", 2), ("panda", 3), ("coffee", 9))
val data = sc.parallelize(init_data) # 两个分区
type MVType = (Int, Int) //定义一个元组类型
data.combineByKey(
   score => (1, score), # createCombiner函数
   (c: MVType, newScore) => (c._1 + 1, c._2 + newScore), # mergeValue函数
   (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) # mergeCombiners函数
).map { case (key, value) => (key, value._2/ value._1) }.map(println(_))

分析:

Partition 1 trace:
(coffee, 1) => new key
accumulators[coffee] = createCombiner(1)
得到:(coffee, (1, 1))
(coffee, 2) => existing key
accumulators[coffee] = mergeValue(accumulators[coffee], 2)
得到:(coffee, (2, 3))
显然(panda, 3) => new key,调用createCombiner方法。
得到:(panda, (1, 3))

Partition 2 trace:
(coffee, 9) => new key
accumulators[coffee] = createCombiner(9)
得到:(coffee, (1, 9))

接下来,mergeCombiners来合并分区:

Merge Partitions
mergeCombiners(partition1.accumulators[coffee], partition2.accumulators[coffee])
得到:(coffee, (3,12))

---------------------------------------------细心看 反复看 不然是假懂--------------------------------

最新文章

  1. 【代码笔记】iOS-获得当前硬盘空间
  2. LR一个简单的流程
  3. [转]DB2时间类函数
  4. Android 定时器
  5. 完数c实现
  6. asp.net js调用后台方法
  7. BZOJ 3438: 小M的作物( 最小割 )
  8. Python-cookies
  9. 微信公众号Unauthorized API function
  10. JAVA的DES加密解密在windows上测试一切正常,在linux上异常
  11. 如何用ABP框架快速完成项目(9) - 用ABP一个人快速完成项目(5) - 不要执着于设计模式和DDD理论,避免原教旨主义
  12. [CSS] Frequently used method or solutions for issues
  13. iOS架构设计系列之解耦的尝试之变异的MVVM
  14. 在 Azure 中的 Windows 虚拟机上使用 SSL 证书保护 IIS Web 服务器
  15. struts工作原理不错的解释___
  16. 网络请求及各类错误代码含义总结(包含AFN错误码大全)
  17. Delphi 中的 RectTracker - 原创
  18. JNLP文件具体说明编辑
  19. 【原】Coursera—Andrew Ng机器学习—课程笔记 Lecture 8_Neural Networks Representation 神经网络的表述
  20. centOS如何设置时间同步

热门文章

  1. PS 滤镜— — sparkle 效果
  2. MySQL如何计算动销率_20161025
  3. [CTSC 2012] Cheat
  4. javaCV入门指南:调用FFmpeg原生API和JavaCV是如何封装了FFmpeg的音视频操作?
  5. AtCoder Grand Contest 028 A:Two Abbreviations
  6. C++多态的实现条件
  7. Scala学习——类,继承,接口(中)
  8. CCD与CMOS的区别?
  9. [hdu3530]Subsequence (单调队列)
  10. MySQL之创、增、删、改、查