BlockGenerator和RateLimiter其实很简单,但是它包含了几个很重要的属性配置的处理,所以记录一下。
/**
* Generates batches of objects received by a
* [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
) extends RateLimiter(conf) with Logging {

private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

private val clock = new SystemClock()
private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
private val blockIntervalTimer =
new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var stopped = false

/** Provides waitToPush() method to limit the rate at which receivers consume data.
*
* waitToPush method will block the thread if too many messages have been pushed too quickly,
* and only return when a new message has been pushed. It assumes that only one message is
* pushed at a time.
*
* The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
* per second that each receiver will accept.
*
* @param conf spark configuration
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

private var lastSyncTime = System.nanoTime
private var messagesWrittenSinceSync = 0L
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)












最新文章

  1. jQuery中iframe的操作
  2. apache2.4 windows764 python cgi
  3. 【Swift学习】Swift编程之旅(四)基本运算符
  4. iOS运用fabric记录crash日志过程
  5. 光流算法:Brox算法
  6. QT5.6 编译SQLServer驱动
  7. 在Windows 10 Anniversary下配置Caffe
  8. 撸一个Android高性能日历控件,高仿魅族
  9. BZOJ 2073: [POI2004]PRZ [DP 状压]
  10. activiti实战系列 并行网关(parallelGateWay)
  11. 使用CSS3的clip-path(裁剪路径)实现剪贴区域的显示以及实例实现图片渐变
  12. POM文件详解(1)
  13. python(Django之Logging、API认证)
  14. MySQL数据查询子查询语句
  15. svn代码发版的脚本分享
  16. 支付宝app支付流程
  17. 转载 logback的使用和logback.xml详解 http://www.cnblogs.com/warking/p/5710303.html
  18. xcode修改默认头部注释(__MyCompanyName__) (转)
  19. A River Runs Through It
  20. Java中日期类型和mysql中日期类型进行整合

热门文章

  1. Java高并发程序设计学习笔记(三):Java内存模型和线程安全
  2. Oracle笔记(四) 简单查询、限定查询、数据的排序
  3. 日常系统维护之修复linux的grub引导
  4. CH5105 Cookies饼干(线性DP)
  5. 【未知来源】Randomized Binary Search Tree
  6. adb常见命令
  7. CentOS5、CentOS6启动流程
  8. 简单理解TCP/IP协议
  9. 基于Kibana的可视化监控报警插件sentinl入门
  10. BZOJ 3881[COCI2015]Divljak (AC自动机+dfs序+lca+BIT)