本文主要记录我使用Spark以来遇到的一些典型问题及其解决的方法,希望对遇到相同问题的同学们有所帮助。

1. Spark环境或配置相关

Q: Sparkclient配置文件spark-defaults.conf中,spark.executor.memory和spark.cores.max应该怎样合理配置?

A: 配置前,须要对spark集群中每一个节点机器的core和memory的配置有基本了解。比方由100台机器搭建的spark集群中。每一个节点的配置是core=32且memory=128GB,那么,向该集群提交应用时,应注意cores.max和executor.memory配置的”和谐”。详细而言,须要先预估应用涉及到的数据量和计算量。然后最大限度压榨单机core和memory,尽量避免core和memory配置比例失衡的情况。

举个样例,若某个应用配置了cores.max=1000且executor.memory=512m。则这个配置比例明显不合理,由于每一个节点仅仅有32个cores,1000个cores须要占用集群32台机器,而每一个executor(即集群中的每一个节点)仅仅申请512MB内存,也即该应用占用的32台机器的所有cores。但仅仅占用0.5G*32=16GB内存,这意味着剩余的(128G-0.5G)*32=4080GB内存被浪费了。

总之。合理的配置应该是在保证cores和memory都尽可能少的情况下。使得spark计算速度能满足业务需求。实际配置时,可将memory配置为机器最大阈值,cores的数目按实际计算量合理设定。尽量降低任务占用的节点数。



Q:
关于数据源与spark集群的问题,考虑这样的场景对spark性能的影响:数据源在位于南方的hdfs集群上,而spark集群位于北方机房。

A: 数据源集群与spark计算集群物理距离较远的情况下,spark读入数据时会由非常大网络开销,对作业执行速度影响非常明显。因此,搭建集群时,要尽量让spark集群靠近数据源。



Q:
提交任务后。执行报错"spark java.lang.OutOfMemoryError: Java heap space",怎样处理?

A: 这个错误的引发原因较多,比方代码中global变量太大(如载入了大字典)或rdd.collect()太大。Spark的调度逻辑对transformations operations是lazy evaluation策略,即仅仅有遇到action operations时才会计算整个job
chain上涉及到的transformations和action,而actions的输出假设不是写磁盘,就会输出到driver program,典型如sparkclient所在机器的终端。driver 的jvm默认heap space是数百MB,无法hold住由action返回的大数据集时。就会报OOM。

解决思路有2个:一个思路是改动代码逻辑。比方将超大变量拆分成多个变量后多步运行(如将大dict或set拆分成N个小的,每步运行完后,销毁当前的变量并构造新变量)。这个思路是典型的时间换空间。只是有时候业务逻辑不一定能做这样的拆分处理;还有一个思路是提交spark应用前。合理配置相关參数,如在spark-defaults.conf中添加配置项spark.driver.memory
12G。详细配置思路能够參考StackOverflow的这篇帖子



Q:
提交任务时sparkclient报错"No space left on device",例如以下图所看到的,怎样解决?

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2x2aGVy/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" style="font-size:14px">

A: 这是因为任务执行过程中。Sparkclient可能会在本机写暂时文件,默认情况下。它会写到/tmp文件夹,非常easy写满/tmp。从而导致报错。

解决方法是在Sparkclientspark-defaults.conf中明白指定暂时文件的写入路径:

spark.local.dir /home/slvher/tools/spark-scratch



Q: Sparkclient提交任务后。spark job的日志默认是输出到console的。与用户print的调试日志混到一起,不方便调试。怎样配置使spark的内部日志单独打印到文件?

A: Spark借助log4j打印日志。而log4j的打印行为能够通过在conf文件夹创建log4j.properties并进行配置来控制(建议拷贝conf/log4j.properties.template为log4j.properties来配置log4j的打印行为),详细配置方法能够參考log4j的官网文档,这里不赘述。

2. Spark应用编程相关

Q: 考虑这样的场景:待提交的Application包括多个python files,当中一个是main入口。其他都是自己定义的module files且它们之间有依赖关系,通过spark-submit(已用--py-files參数指定要上传的文件)提交任务时。报错"ImportError: No module named xxx",怎样解决?

A: 通过--py-files參数上传的.py文件仅仅是上传而已,默认不会在集群节点python环境中将在这些文件里定义的module(s)增加解释器的search path。Spark文档Submiiting
Applications
事实上对这样的场景下提交任务的方式做了说明:

For Python, you can use the --py-files argument of spark-submit to add .py, .zip or .egg files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a .zip or .egg.

所以,正确的提交方法是,将除main入口以外的.py文件做成package(将这些文件放到某个文件夹下,并在该文件夹创建名为__init__.py的空文件)并打包成zip archives,然后通过--py-files上传这个zip包。

由于python解释器默认能够处理zip archives的import场景,且由于上传的zip是个包括__init__.py的package。故集群节点机器上的python解释器会自己主动把它们增加对module的搜索路径中,这样就攻克了Import
Error的问题。

Q: 怎样查看函数中通过print打印的调试信息?

A: Spark应用在传给spark operations api的函数中print的调试信息在本地driver program端是看不到的,由于这些函数是在集群节点上运行的。故这些print信息被打印到了为作业分配的节点机器上,须要从spartk master的http查看接口找到提交的应用,然后去应用运行节点的stderr中看看。



Q: PySpark的API中。map和flatMap有何差别?

A: 从函数行为来看。它们都是接受一个自己定义函数f,然后对RDD中的每一个元素调用这个函数。关键差别在于自己定义函数f的类型:map接受的函数返回一个普通value;而flatMap接受的函数必须返回一个iterable value,即返回值必须可用于迭代,然后flatMap会对这个iterable value做flat操作(迭代这个value并flat成list)。能够借助以下的demo来理解。



在spark集群上的运行结果例如以下:



以下对输出的每行结果稍做解释:

a. 代码第18行,rdd.flatMap接受的函数參数是test_flatmap_v1,而后者返回值是一个可迭代的generator object。故flatMap对返回值做flat操作后。generator object的每一个element作为终于flattened结果的element。

b. 代码第19行,rdd.flatMap接受的函数參数是test_flatmap_v2,而后者返回值一个set。因为这个set本身是iterable的,故flatMap对set做flat操作后,set中的每一个element做完终于flattened结果的element。

特别注意:若return的value不支持iterate(如int型),则flatMap会不错。

感兴趣的话。能够亲自试验下。

c. 代码第20行,因为map不要求其函数參数的返回值是否iterable,也不会对iterable的value做flat操作。它仅仅是将return value本身作为终于结果的一个element,因此它的输出结果也就非常easy理解了。

Q: 对rdd运行cache有何注意事项?

A: 关于rdd persist对性能优化的原理。能够查看这里Persisting RDD in Spark。但并非全部的persist/cache操作都与spark性能正相关。在persist前,最好遵循以下的原则:

a) rdd会被多次使用时再考虑cache

b) rdd需cache时。尽量对"靠近"算法的rdd做cache,而不要cache读入的raw数据

c) cache的rdd不再使用时。尽快调用unpersist释放其占用的集群资源(主要是memory) 



Q: 怎样在传给spark transformations操作的函数中訪问共享变量?

A: 依据官网Programming Guide文档说明(參见这里),当作为參数传给spark操作(如map或reduce)的函数在远程机器的节点上运行时,函数中使用到的每一个变量的副本(separate
copies)也会被复制到这些节点上以便函数訪问。假设这些变量在节点上被改动,那这些改动不会被反传回spark driver program,即在实现业务代码时。应由实现者保证这些变量的仅仅读特性。由于在不同任务间维护通用的、支持读/写的共享变量会减少spark效率。

举个样例,以下的代码说明了怎样在传给spark操作的函数中借助全局变量实现共享訪问:





Q: 除通过global variable共享变量外。spark还支持什么方式共享变量?

A: Spark还支持broadcast变量和accumulators这两种共享变量的方式。当中。broadcast同意开发人员在spark集群的每一个节点上保持变量的一份仅仅读cache,本质上,broadcast变量也是global变量,仅仅只是它是由开发人员显式分发到集群节点上的,而非spark依据每一个task调用的函数对变量的訪问情况自己主动拷贝。至于accumulators。顾名思义,它仅仅支持add操作,详细语法可參考spark
programming guide关于accumulators部分的说明。



Q: broadcast变量与普通global变量有何关系?各自的适用场合?

A: 实际上。broadcast变量是一种global变量,它们均能够实如今分布式节点中运行函数时共享变量。当中,普通global变量是随着spark对task的调度依据实际情况由spark调度器负责拷贝至集群节点的,这意味着若有需訪问某global变量的多个task运行时,每一个task的运行均有变量拷贝过程。而broadcast变量则是由开发人员主动拷贝至集群节点且会一直cache直至用户主动调用unpersist或整个spark作业结束。

PS: 实际上。即使调用unpersist也不会马上释放资源。它仅仅是告诉spark调度器资源能够释放。至于何时真正释放由spark调度器决定。參见SPARK-4030

结论:若共享变量仅仅会被某个task使用1次,则使用普通global变量共享方式就可以。若共享变量会被先后运行的多个tasks訪问,则broadcast方式会节省拷贝开销。

再次提醒:若使用了broadcast方式共享变量,则开发人员应在确定该变量不再须要共享时主动调用unpersist来释放集群资源。

3. 其他注意事项

Q:
还有其它注意事项吗?

A: 上面提到的仅仅是最常见的问题。实际编写复杂Spark应用时,怎样高效利用spark集群的其他注意事项。强烈推荐參考Notes on Writing Complex Spark
Applications
这篇文章(Google Docs需FQ訪问)。

【參考资料】

1. StackOverflow: spark java.lang.OutOfMemoryError: Java heap space

2. Spark Doc: Submitting Applications

3. Spark Programming Guide: Shared Variables

4. Spark Issues: [SPARK-4030] "destroy" method in Broadcast should be public

5. [GoogleDoc] Notes on Writing Complex Spark Applications

========================= EOF =======================

最新文章

  1. 本周博客--WinForm线程初步 2014-10-31 09:15 54人阅读 评论(0) 收藏
  2. 用mixin引入模块后, 方法重名的解析方法
  3. Struts2与Struts1的区别
  4. 关于电信cdma基站nid,sid,bid的解释
  5. Linux环境下段错误的产生原因及调试方法小结
  6. C#MongoDB使用实践
  7. sphinx配置文件sphinx.conf参数详细说明
  8. Java设计模式-工厂方法模式(Factory Method)
  9. 怎么提高OCR文字识别软件的识别正确率
  10. ngnix 一 入门指南
  11. iOS中构造函数与析构函数
  12. QT5.6,5.7,5.8的新特征以及展望
  13. mysql5.6.16绿色版配置、运行
  14. 【Spring】JDBC事务管理XML配置
  15. 软件安装配置笔记(三)——ArcGIS系列产品安装与配置(补档)(附数据库连接及数据导入)
  16. ceph学习
  17. Luogu 1941 【NOIP2014】飞扬的小鸟 (动态规划)
  18. sql分割字符串
  19. javascript实现unicode与字符互相转换
  20. 使用IAR编译STM8S 怎样生产烧录文件

热门文章

  1. 洛谷 P3371 【模板】单源最短路径 【链式前向星+SPFA】
  2. CF 115 A 【求树最大深度/DFS/并查集】
  3. DFS之奇偶剪枝
  4. [CF441E]Valera and Number
  5. 通过python的logging模块输出日志文件
  6. 显示图案 Exercise06_06
  7. cocos2d-x 扩展 修改 备注
  8. Xcode升级后导致插件不能用, 一句代码更新UUID OK~
  9. 重写规则为什么要options +followsymlinks
  10. access日志配置