Storm-源码分析-EventManager (backtype.storm.event)
2024-08-24 09:51:45
大体结构,
定义protocol EventManager, 其实就是定义interface
函数event-manager, 主要做2件事
1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
2. 返回实现EventManager的匿名record(reify部分, 实现protocol)
这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner
(ns backtype.storm.event
(:use [backtype.storm log util])
(:import [backtype.storm.utils Time Utils])
(:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
) (defprotocol EventManager
(add [this event-fn])
(waiting? [this])
(shutdown [this])) (defn event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
[daemon?]
(let [added (atom 0)
processed (atom 0)
^LinkedBlockingQueue queue (LinkedBlockingQueue.)
running (atom true)
runner (Thread.
(fn []
(try-cause
(while @running
(let [r (.take queue)]
(r)
(swap! processed inc)))
(catch InterruptedException t
(log-message "Event manager interrupted"))
(catch Throwable t
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event"))
)))]
(.setDaemon runner daemon?)
(.start runner)
(reify
EventManager
(add [this event-fn]
;; should keep track of total added and processed to know if this is finished yet
(when-not @running
(throw (RuntimeException. "Cannot add events to a shutdown event manager")))
(swap! added inc)
(.put queue event-fn)
)
(waiting? [this]
(or (Time/isThreadWaiting runner)
(= @processed @added)
))
(shutdown [this]
(reset! running false)
(.interrupt runner)
(.join runner)
)
)))
使用的时候很简单, 如下
let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
(.add processes-event-manager sync-processes)
可以直接调用add或其他的function
相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型
比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们
最新文章
- Node学习笔记(一):stream流操作
- java.lang.UnsatisfiedLinkError: Unable to load library 'xxx': Native library (win32-x86-64/ID_Fpr.dll)
- Hive UDF 实验1
- Java for LeetCode 063 Unique Paths II
- Android里面的命名规范
- Android 动画 setVisibility 后出错解决方法
- C# winform 中 TabControl 动态显示 TabPage
- java:装饰者模式,节点流和处理流
- 8.1.C++ AMP简介
- C# 根据Word模版生成Word文件
- python 标准库基础学习之开发工具部分1学习
- SQL SERVER 中 实现主表1行记录,子表多行记录 整合成一条虚拟列
- request.getParameterValues与request.getParameter的差别
- php+sqlite cms
- 学习笔记--jQuery基础
- Cordova的搭建
- Javascript DOM 编程艺术———总结-2
- spring cloud+docker 简单说一说
- Android BLE与终端通信(五)——Google API BLE4.0低功耗蓝牙文档解读之案例初探
- Oracle E-Business Suite Maintenance Guide Release 12.2(Patching Procedures)
热门文章
- CentOS6.2下安装配置MySql
- jdom 读取
- eclipse生成export生成jar详解
- nginx日志自动切割
- buildroot 修改root密码后无法登录ssh解决方法
- jquery post 同步异步总结
- svn &; git 问题汇总
- 人脸验证算法Joint Bayesian详解及实现(Matlab)
- could not find com.android.support.appcompat-v7:23.4.0
- 控制反转(Inversion of Control,英文缩写为IoC),另外一个名字叫做依赖注入(Dependency Injection,简称DI)