zip拉链操作

def zip[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$10: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[(String, U)]

scala> val rdd1=sc.makeRDD(Array("apple","pear","grape","egg","elephant"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at <console>:24

scala> val rdd2=sc.makeRDD(List(20,5,8,6,3))

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:24

scala> rdd1.zip(rdd2).collect
res35: Array[(String, Int)] = Array((apple,20), (pear,5), (grape,8), (egg,6), (elephant,3))

scala> val rdd3=rdd1 zip rdd2

rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[27] at zip at <console>:28

scala> rdd3.collect

res36: Array[(String, Int)] = Array((apple,20), (pear,5), (grape,8), (egg,6), (elephant,3))

-------------------------

def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(String, C)]
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,numPartitions: Int): org.apache.spark.rdd.RDD[(String, C)]
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C,partitioner: org.apache.spark.Partitioner,mapSideCombine: Boolean,serializer: org.apache.spark.serializer.Serializer): org.apache.spark.rdd.RDD[(String, C)]

def combineByKey[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,       n

umPartitions: Int): RDD[(K, C)] = self.withScope {

combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)

}

scala> rdd3.collect
res53: Array[(String, Int)] = Array((apple,2), (pear,1), (grape,2), (egg,1), (elephant,1))

scala> val rdd4=rdd3.combineByKey(List(_),(x:List[Int],v:Int)=>x:+v,(m:List[Int],n:List[Int])=>m++n)
rdd4: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[35] at combineByKey at <console>:30

scala> rdd4.collect

res51: Array[(String, List[Int])] = Array((egg,List(1)), (elephant,List(1)), (pear,List(1)), (apple,List(2)), (grape,List(2)))

scala> val rdd4=rdd3.map(x=>(x._2,x._1))

rdd4: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[33] at map at <console>:30

scala> val rdd5=rdd4.combineByKey(List(_),(x:List[String],v:String)=>x:+v,(m:List[String],n:List[String])=>m++n)
rdd5: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[37] at combineByKey at <console>:32

scala> rdd5.collect

res52: Array[(Int, List[String])] = Array((1,List(pear, egg, elephant)), (2,List(apple, grape)))

--------------------

scala> val rdd1=sc.makeRDD(Array("apple","apple","pear","egg","hellokitty","egg","apple"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at makeRDD at <console>:24

scala> rdd1.countByValue

res1: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

scala> val map1=rdd1.countByValue
map1: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

scala> val rdd2=sc.makeRDD(map1.toList)

rdd2: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[21] at makeRDD at <console>:28

scala> rdd2.collect

res5: Array[(String, Long)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

-------------------

scala> val rdd1=sc.makeRDD(Array("apple","apple","pear","egg","hellokitty","egg","apple"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at <console>:24

scala> val rdd2=rdd1.map(x=>(x,1))

rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26

scala> rdd2.collect

res33: Array[(String, Int)] = Array((apple,1), (apple,1), (pear,1), (egg,1), (hellokitty,1), (egg,1), (apple,1))

scala> rdd2.partitions.size
res34: Int = 4

scala> rdd2.reduceByKey(_+_).collect
res36: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

scala> rdd2.reduceByKey(_+_,2).partitions.size //shuffile重新分为2个分区
res37: Int = 2

-------------------------------

shuffle操作可以重新分区,指定分区数

进行 shuffle 操作的是是很消耗系统资源的,需要写入到磁盘并通过网络传输,有时还需要对数据进行排序.常见的 Transformation 操作如:repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 Transformation 都需要 shuffle

--------------------------------------

scala> val rdd2=rdd1.map(x=>(x,1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26

scala> rdd2.collect
res39: Array[(String, Int)] = Array((apple,1), (apple,1), (pear,1), (egg,1), (hellokitty,1), (egg,1), (apple,1))

scala> rdd2.combineByKey(x=>x,(c:Int,n:Int)=>c+n,(c1:Int,c2:Int)=>c1+c2).collect
res41: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

scala> rdd1.countByValue()
res42: scala.collection.Map[String,Long] = Map(hellokitty -> 1, egg -> 2, pear -> 1, apple -> 3)

scala> rdd2.reduceByKey(_+_).collect
res44: Array[(String, Int)] = Array((hellokitty,1), (egg,2), (pear,1), (apple,3))

-------------------------------

scala> val rdd3=rdd1.map(x=>(1,x))

rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[40] at map at <console>:26

scala> rdd3.collect

res45: Array[(Int, String)] = Array((1,apple), (1,apple), (1,pear), (1,egg), (1,hellokitty), (1,egg), (1,apple))

scala> rdd3.combineByKey(x=>List(x),(c:List[String],y:String)=>c:+y,(c1:List[String],c2:List[String])=>c1++c2).collect
res49: Array[(Int, List[String])] = Array((1,List(apple, apple, pear, egg, hellokitty, egg, apple)))

---------------------------------------------

scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)),2)

rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[44] at makeRDD at <console>:24

scala> val rdd3=rdd00.map(x=>(x._2,x._1))

rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[45] at map at <console>:26

scala> rdd3.collect

res51: Array[(Int, String)] = Array((1,a), (1,b), (3,a), (3,ba), (1,b), (10,g))

scala> rdd3.groupByKey().collect

res53: Array[(Int, Iterable[String])] = Array((10,CompactBuffer(g)), (1,CompactBuffer(a, b, b)), (3,CompactBuffer(a, ba)))

scala> rdd3.combineByKey(x=>List(x),(c:List[String],y:String)=>c:+y,(c1:List[String],c2:List[String])=>c1++c2).collect

res54: Array[(Int, List[String])] = Array((10,List(g)), (1,List(a, b, b)), (3,List(a, ba)))

-----------------------

distinct(numPartitions:Int) 去重的同时重新分区

scala> val bb=sc.makeRDD(Array(1,1,2,1,8,6,8,4,5,4),2)

bb: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[81] at makeRDD at <console>:25

scala> bb.distinct(1).partitions.size

res61: Int = 1

scala> bb.distinct(3).partitions.size

res62: Int = 3

----------------------

def randomSplit(weights: Array[Double],seed: Long): Array[org.apache.spark.rdd.RDD[Int]]

randomSplit操作根据weights权重将一个RDD分割为多个RDD。权重越高,划分到的几率越大,权重的总和加起来为1,否则会不正常

scala> val split=aa.randomSplit(Array(0.1,0.2,0.3,0.4))

split: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[165] at randomSplit at <console>:27, MapPartitionsRDD[166] at randomSplit at <console>:27, MapPartitionsRDD[167] at randomSplit at <console>:27, MapPartitionsRDD[168] at randomSplit at <console>:27)

scala> split(0).count

res94: Long = 11

scala> split(1).count

res95: Long = 19

scala> split(2).count

res96: Long = 34

scala> split(3).count

res97: Long = 36

-----------------------------------------------------

def glom(): org.apache.spark.rdd.RDD[Array[Int]]

glom将每个分区中的元素放到一个数组里,变成和分区数一样多的数据

scala> val bb=sc.makeRDD(1 to 10,3)

bb: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[203] at makeRDD at <console>:25

scala> bb.glom().collect

res127: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

最新文章

  1. Appcan——Box
  2. OC中的__attribute__的使用
  3. 由Excel表格导出Latex代码
  4. YII rules常见规则
  5. Raft一致性协议
  6. MVC UpdateModel的未能更新XXXXX的类型模型
  7. windows2003服务器mysql每天定时备份
  8. 【SSM 1】SpringMVC、Spring和Struts的区别
  9. zoj 3829 Known Notation
  10. launchMode使用详解
  11. rac 10g 加入节点具体解释
  12. OpenCV学习(1) OpenCV的安装
  13. IplImage 封装释放
  14. Webservice WCF WebApi
  15. URI结构
  16. log4j:ERROR Category option &quot; 1 &quot; not a decimal integer.错误解决
  17. ubuntu ssh 免密码登录
  18. js day01
  19. C++取出string的一部分以及int型转成string类型
  20. 由于php环境时间与北京时间相差7个小时

热门文章

  1. java实现PC之间的udp数据单向传输
  2. Laravel 输出最后一条sql
  3. spring 事务的配置学习
  4. Dubbo(2)发布Dubbo服务
  5. Python——numpy(python programming)
  6. IDC:IDC(互联网数据中心)
  7. Python twilio发短信实践
  8. qsort实现结构体数组排序
  9. Java zxing生成二维码所需的jar包
  10. win7运行bat文件 一闪而过 解决 必须要将生成器放在C盘等没有中文的目录里