spark streaming 6: BlockGenerator、RateLimiter
2024-09-04 12:36:46
BlockGenerator和RateLimiter其实很简单,但是它包含了几个很重要的属性配置的处理,所以记录一下。
/**
* Generates batches of objects received by a
* [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
) extends RateLimiter(conf) with Logging {
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
private val clock = new SystemClock()
private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
private val blockIntervalTimer =
new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var stopped = false
/** Provides waitToPush() method to limit the rate at which receivers consume data.
*
* waitToPush method will block the thread if too many messages have been pushed too quickly,
* and only return when a new message has been pushed. It assumes that only one message is
* pushed at a time.
*
* The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
* per second that each receiver will accept.
*
* @param conf spark configuration
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
private var lastSyncTime = System.nanoTime
private var messagesWrittenSinceSync = 0L
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
最新文章
- jQuery中iframe的操作
- apache2.4 windows764 python cgi
- 【Swift学习】Swift编程之旅(四)基本运算符
- iOS运用fabric记录crash日志过程
- 光流算法:Brox算法
- QT5.6 编译SQLServer驱动
- 在Windows 10 Anniversary下配置Caffe
- 撸一个Android高性能日历控件,高仿魅族
- BZOJ 2073: [POI2004]PRZ [DP 状压]
- activiti实战系列 并行网关(parallelGateWay)
- 使用CSS3的clip-path(裁剪路径)实现剪贴区域的显示以及实例实现图片渐变
- POM文件详解(1)
- python(Django之Logging、API认证)
- MySQL数据查询子查询语句
- svn代码发版的脚本分享
- 支付宝app支付流程
- 转载 logback的使用和logback.xml详解 http://www.cnblogs.com/warking/p/5710303.html
- xcode修改默认头部注释(__MyCompanyName__) (转)
- A River Runs Through It
- Java中日期类型和mysql中日期类型进行整合
热门文章
- Java高并发程序设计学习笔记(三):Java内存模型和线程安全
- Oracle笔记(四) 简单查询、限定查询、数据的排序
- 日常系统维护之修复linux的grub引导
- CH5105 Cookies饼干(线性DP)
- 【未知来源】Randomized Binary Search Tree
- adb常见命令
- CentOS5、CentOS6启动流程
- 简单理解TCP/IP协议
- 基于Kibana的可视化监控报警插件sentinl入门
- BZOJ 3881[COCI2015]Divljak (AC自动机+dfs序+lca+BIT)