总的来说,需要考虑以下两点:

1. 有效地运用集群资源去减少每个批次处理的时间

2. 正确的设置batch size,以使得处理速度能跟上接收速度

一.  为了减少处理时间,主要有以下几个优化点:

1. 接收数据的并行度。

每个InputDStream只创建一个Receiver用于接收数据,如果接收数据是系统的瓶颈,可以创建多个InputDStream。配置不同的InputDStream读取数据源的不同分区。比如原先用一个InputDStream读取Kafka的两个topic的数据,可以拆分成两个InputDStream读取不同的Topic。处理时,把两个InputDStream收到的数据合并成一个。

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

2.  阻塞间隔

有一个配置参数:spark.streaming.blockInterval,它的意思是每间隔多少秒后,Spark才把接收到的数据组成数据块存到Spark中。官网推荐的最小值是50ms,默认值200ms。假设实例化InputDStream时设置的Duration(batch interval)为1秒(1000ms),那么任务执行时,总共有 1000 / 200 = 5 个block,每个block将对应一个task。如果task的数量少于每台机器配置的core的数量,则说明资源没有被很好的利用。应减少 spark.streaming.blockInterval 或增加batch interval。

注意:官网并没有说要使得task的数量和core的数量一致。能想到两个的原因:(1)receiver也会占用core (2)driver也会占用core

3. 相对于1中所说的增加receiver,一个可供选择的方案是通过调用 inputStream.repartition(<number of partitions>) 对inputDStream的数据进行repartition。这将使接收到的数据分布到指定数量的其它机器上,以供进一步处理。

4. 处理数据的并行度

对于reduceByKey, reduceByKeyAndWindow,Join 等shuffle操作,默认的并行度由 spark.default.parallelism 控制。可以在shuffle操作中设置partition的数量来覆盖默认值。

5. 数据序列化

在Spark Streaming中,有两类数据会被序列化:

5.1 输入数据。默认情况下,输入数据会以StorageLevel.MEMORY_AND_DISK_SER_2 的方式存储在 executor 的内存中。Receiver会反序列化接收到数据,然后再把它序列化成Spark的序列化格式。这显然会有花销。

5.2 持久化Streaming操作产生的RDD。某些windows操作会持久化需要进行多次处理的数据到内存中,默认的存储方式是StorageLevel.MEMORY_ONLY_SER

以上两种情况,使用Kyro序列化机制来减少CPU和内存的花销。

6. 启动Task的开销

如果每秒启动的task较多(比如:50个或更高),那么发送task去workers的开销会较大。通过以下方式减少这个开销:以Standalone模式或 coarse-grained Mesos 模式运行Spark程序。详见: Running on Mesos guide

7. 设置正确的Batch Interval

每个时间间隔提交的job应该能处理完这个时间间隔内收到的数据。可以通过Web UI 去查看批处理的时间是否小于interval

二. 内存调优

Spark Streaming应用程序需要的内存依赖于程序中用到的transformation。比如,你使用了window操作,想要处理最后10分钟的数据,这就要求内存能保存这10分钟的所有数据。虽然默认情况下是,内存存不下后会存到磁盘,但是这样的会比较慢。建议尽量加大内存。

垃圾回收也是需要考虑的一方面。可以考虑以下几点来减少GC的开销:

1. 接收的数据和RDD会默认序列化并持久化。开启Kyro序列化机制可减少内存的使用。 设置spark.rdd.compress为TRUE,可减少CPU时间

2. 清除旧的数据。Spark Streaming默认会做这件事。

3. 使用CMS 垃圾收集器。driver端使用 --driver-java-options ( spark-submit的时候)。executor端, 使用spark.executor.extraJavaOptions 这个配置

4. 尝试以下2点:1. 使用OFF_HEAP 存储级别  2. 增加executor,减少heap size

总的来说,需要记住以下几点:

1. 一个InputDSteam对应一个receiver, receiver运行在executor上,因此会占用一个core。Receivers以轮询的方式分配到executors中。

2. 每隔block interval的时间就会生成一个block,所以每个batch interval的时间会生成 batch interval / block interval 个block,每个block对应一个task。block会被BlockManager分发到不同的executor上

3. 在batchInterval内,在driver端生成一个RDD,在此期间内生成的blocks是这个RDD的partitions。每个partition是一个task。

4. 较大的blockInterval 意味着较大的data block。较大的 spark.locality.wait 将增加 block 在本地处理的机会。寻找这两个值的平衡点,以使较大的block在本地处理。

5. 可以使用 inputDstream.repartition(n) 来替代设置 batchInterval 和 blockInterval 来获取较好的并行度。但这会产生shuffle的花销。

6. 一次只能处理一个Job。所以当有多个 InputDStream 时,需要先Union两个 InputDStream。

7. Receiver暂时没有暂停的方法。因此当job的处理时间大于 batchInterval 时,receiver 的内存使用将会持续增加,最终导致 BlockNotFoundException 。使用spark.streaming.receiver.maxRate 可以限制 receiver的接收速度

最新文章

  1. 在Github上搭建自己的博客(Windows平台)
  2. STL中容器的push()或者push_back()函数的一点说明
  3. run loop 输入源
  4. 插值和空间分析(二)_变异函数分析(R语言)
  5. C#常用的字符串操作, 包括截取
  6. c#调用c++ dll的几种类型(转)
  7. JavaScript,通过分析Array.prototype.push重新认识Array
  8. Android开发之使用意图调用内置应用程序
  9. SRM 441(1-250pt, 1-500pt)
  10. 详解C#中System.IO.File类和System.IO.FileInfo类的用法
  11. 【转】Everything中文绿色版在Win7/8用不了?
  12. Delphi/C#之父首次访华:55岁了 每天都写代码
  13. linux cat more less head tail
  14. Hibernate基础学习(五)&mdash;对象-关系映射(下)
  15. C#、Python中分别是怎么实现通过字符串获取实体类的值以及给实体类赋值
  16. scala模式匹配详细解析
  17. python获取两个日期间的工作日
  18. JVM(一)—— 内存管理
  19. Code Chef April Cook-Off 2019题解
  20. Centos 设置zookeeper开机自启动

热门文章

  1. Linux课程---4、Linux目录结构及常用命令(目录结构)
  2. Java 学习摘要
  3. 单机版 RedisUtils({基本操作封装工具类})【三】
  4. 十九 Django框架,发送邮件
  5. PHP如何得到数组最后元素的key
  6. docker 基本概念
  7. 关于linux 安装 python pymssql模块
  8. DNSmasq
  9. myeclipse保存时弹出Building workspace
  10. c++对象导出到lua