概述

StreamingListener 是针对spark streaming的各个阶段的事件监听机制。

StreamingListener接口

//需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可
//本身既有注释说明
trait StreamingListener { /** Called when the streaming has been started */
/** streaming 启动的事件 */
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { } /** Called when a receiver has been started */
/** 接收启动事件 */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } /** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { } /** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } /** Called when a batch of jobs has been submitted for processing. */
/** 每个批次提交的事件 */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } /** Called when processing of a batch of jobs has started. */
/** 每个批次启动的事件 */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } /** Called when processing of a batch of jobs has completed. */
/** 每个批次完成的事件 */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } /** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { } /** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

自定义StreamingListener

功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟

class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{

  private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")

//每个批次完成时执行
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
val processingStartTime = batchCompleted.batchInfo.processingStartTime
val numRecords = batchCompleted.batchInfo.numRecords
val processingEndTime = batchInfo.processingEndTime
val processingDelay = batchInfo.processingDelay
val totalDelay = batchInfo.totalDelay //将每次告警时间写入redis,用以判断告警间隔大于2分钟
val jedis = RedisClusterClient.getJedisClusterClient()
val current_time = (System.currentTimeMillis / 1000).toInt
val redis_time = jedis.get(appName)
var flag = false
if(redis_time==null || current_time-redis_time.toInt>120){
jedis.set(appName,current_time.toString)
flag = true
} //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警
if(totalDelay.get >= times * duration * 1000 && flag){
val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"
println(monitorContent)
val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"
val getURL = "http://node1:8002/message/weixin?msg="+msg
HttpClient.doGet(getURL)
}
}
}

应用

//streamingListener不需要在配置中设置,可以直接添加到streamingContext中
object My{
def main(args : Array[String]) : Unit = {
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf,Seconds(20))
ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times)) ....
}
}

最新文章

  1. C#操作日志
  2. 说完Pivot 今天说下Unpivot 的处理方式
  3. 51nod 1297 管理二叉树
  4. solr5.5教程-solr.home 配置
  5. 线程——委托InvokeRequired和Invoke
  6. 学习Ember遇到的一些问题
  7. Codeforces Round #273 (Div. 2)
  8. 我的第一个项目:用kinect录视频库
  9. [C++程序设计]内置函数
  10. oracle__wm_concat函数
  11. mac 辅助接口
  12. 【解决方案】M2Crypto不支持python3
  13. jmeter入门(02)测试报告各项指标含义
  14. Git分支合并冲突解决
  15. cocos creator入门
  16. 解析:为什么设计师选择mac电脑居多?
  17. Nginx + Uswgi + Django的部署
  18. centos如何查看linux内核,版本号
  19. ionic 搜索双向数据绑定失效
  20. CentOS安装git及使用Gitolite来管理版本库

热门文章

  1. 网页前端之JavaScript学习记录总结篇
  2. 第一篇:C++之hello world
  3. 解决spark dataframe get 报空指针异常 java.lang.NullPointerException
  4. (六十一)c#Winform自定义控件-信号灯(工业)-HZHControls
  5. Windowns系统下搭建python环境
  6. python爬虫公众号所有信息,并批量下载公众号视频
  7. SQLServer之GROUP BY语句
  8. 41-data-packed volume container
  9. [Linux] nginx记录多种响应时间
  10. SPA项目开发之动态树以及数据表格和分页