Spark优化一则 - 减少Shuffle

看了Spark Summit 2014的A Deeper Understanding of Spark Internals,视频(要科学上网)详细讲解了Spark的工作原理,Slides的45页给原始算法和优化算法。

破砂锅用自己3节点的Spark集群试验了这个优化算法,并进一步找到更快的算法。测试数据是Sogou实验室的日志文件前10000000条数据。目标是对日志第2列数据,按照第一个字母合并,得到每个首字母有几条记录。

所有的方案都重新启动Spark shell,先用以下代码把日志第2列数据cache到内存里,Spark GUI显示cache有8个partition,约1GB内存。

val rdd = sc.textFile("hdfs://hadoop1:8000/input/SogouQ3.txt").map(_.split("\t")).map(_())
rdd.cache()
rdd.count()
// res1: Long = 10000000

Spark GUI

RDD Name

Storage Level

Cached Partitions

Fraction Cached

Size in Memory

Size in Tachyon

Size on Disk

3

Memory Deserialized 1x Replicated

8

100%

1089.4 MB

0.0 B

0.0 B

Slides原始方案

rdd.map(x => (x.charAt(), x)).groupByKey().mapValues({x => x.toSet.size}).collect()
// res2: Array[(Char, Int)] = Array((8,168189), (0,168338), (a,168228), (9,168018), (1,167647), (b,168404), (2,168731), (3,168206), (c,168991), (d,168095), (4,167523), (e,168179), (5,167967), (6,167907), (f,168174), (7,168718))

Spark stage GUI显示有关stage Id是1-2,累计耗时5s,产生140MB shuffle read和208MB shuffle write。

Stage Id

Description

Submitted

Duration

Tasks: Succeeded/Total

Shuffle Read

Shuffle Write

1

collect at <console>:15

2014/09/03 20:51:58

3 s

8/8

140.2 MB

 

2

map at <console>:15

2014/09/03 20:51:55

2 s

8/8

 

208.4 MB

0

count at <console>:15

2014/09/03 20:51:46

8 s

8/8

   

Slides优化方案

rdd.distinct(numPartitions = ).map(x => (x.charAt(), )).reduceByKey(_+_).collect()
// res2: Array[(Char, Int)] = Array((6,167907), (0,168338), (f,168174), (7,168718), (a,168228), (1,167647), (8,168189), (b,168404), (2,168731), (9,168018), (3,168206), (c,168991), (d,168095), (4,167523), (e,168179), (5,167967))

Spark stage GUI显示有关stage Id是1-3,累计耗时4.2s,生成50MB shuffle read和75MB shuffle write。虽然多了1个stage,shuffle read/write比原始方案减少超过60%,从而速度加快16%。

Stage Id

Description

Submitted

Duration

Tasks: Succeeded/Total

Shuffle Read

Shuffle Write

1

collect at <console>:15

2014/09/03 20:24:17

0.2 s

6/6

4.9 KB

 

2

reduceByKey at <console>:15

2014/09/03 20:24:15

2 s

6/6

50.4 MB

7.4 KB

3

distinct at <console>:15

2014/09/03 20:24:13

2 s

8/8

 

75.6 MB

0

count at <console>:15

2014/09/03 20:23:55

7 s

8/8

   

Zero Shuffle优化方案

既然减少shuffle可以加快速度,破砂锅想出以下的Zero Shuffle方案来。

rdd.map(x => (x.charAt(), x)).countByKey()
// res2: scala.collection.Map[Char,Long] = Map(e -> 623689, 2 -> 623914, 5 -> 619840, b -> 626111, 8 -> 620738, d -> 623515, 7 -> 620222, 1 -> 616184, 4 -> 616628, a -> 641623, c -> 630514, 6 -> 621346, f -> 624447, 0 -> 632735, 9 -> 637770, 3 -> 620724)

Spark stage GUI显示有关stage Id是1,累计耗时只有0.3s,没有shuffle read/write。这个方案有关的RDD只有narrow dependency,所以只有1个stage。

Stage Id

Description

Submitted

Duration

Tasks: Succeeded/Total

Shuffle Read

Shuffle Write

1

countByKey at <console>:15

2014/09/03 20:45:02

0.3 s

8/8

   

0

count at <console>:15

2014/09/03 20:44:32

8 s

     

小结

比较3种方案

方案

Shuffle Read

Shuffle Write

Time

Slides原始方案

140.2 MB

208.4 MB

5s

Slides优化方案

50.4 MB

75.6 MB

4.2s

Zero Shuffle优化方案

0

0

0.3s

Spark的优化之一是尽可能减少shuffle从而大幅减少缓慢的网络传输。熟悉RDD的函数对Spark优化有很大帮助。

最新文章

  1. 根据第三方库spire.pdf使用指定打印机打印pdf文件
  2. JavaScript数独求解器
  3. phpcms get标签用法
  4. ACM/ICPC 之 四道MST-Prim解法(POJ1258-POJ1751-POJ2349-POJ3026)
  5. php的错误级别
  6. POJ - 1666 Candy Sharing Game
  7. mysqli_stmt预处理类
  8. iOS开发之格式化日期时间(转)
  9. 了解shell
  10. delphi7 开发布局
  11. JAVA学习 分析Servlet
  12. CSS布局之-水平垂直居中
  13. Linux基础教程
  14. 关于Java里面File类创建txt文件重复???
  15. C#多线程和线程同步总结
  16. 基于服务器AAA实验
  17. &lt;Convolutional Neural Network for Paraphrase Identification&gt;
  18. bzoj 4449: [Neerc2015]Distance on Triangulation
  19. java二维码生成工具
  20. env命令详解

热门文章

  1. 【node.js】Buffer(缓冲区)
  2. 1、Android-活动(上)
  3. docker-5-容器数据卷
  4. 关于IntelliJ IDEA 文档无法编辑的解决办法
  5. Dubbo实践(十四)生产者发布服务
  6. 《AngularJS即学即用》读书笔记(一)
  7. docker 容器不能访问宿主端口原因
  8. PE下安装官方WIN7
  9. Mac电脑用终端生成SSH key 访问自己的Github
  10. js怎样得出数组中某个数据最大连续出现的次数