Protocol and DataType

大体结构,

定义protocol EventManager, 其实就是定义interface

函数event-manager, 主要做2件事
1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
2. 返回实现EventManager的匿名record(reify部分, 实现protocol)

这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner

(ns backtype.storm.event
(:use [backtype.storm log util])
(:import [backtype.storm.utils Time Utils])
(:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
) (defprotocol EventManager
(add [this event-fn])
(waiting? [this])
(shutdown [this])) (defn event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
[daemon?]
(let [added (atom 0)
processed (atom 0)
^LinkedBlockingQueue queue (LinkedBlockingQueue.)
running (atom true)
runner (Thread.
(fn []
(try-cause
(while @running
(let [r (.take queue)]
(r)
(swap! processed inc)))
(catch InterruptedException t
(log-message "Event manager interrupted"))
(catch Throwable t
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event"))
)))]
(.setDaemon runner daemon?)
(.start runner)
(reify
EventManager
(add [this event-fn]
;; should keep track of total added and processed to know if this is finished yet
(when-not @running
(throw (RuntimeException. "Cannot add events to a shutdown event manager")))
(swap! added inc)
(.put queue event-fn)
)
(waiting? [this]
(or (Time/isThreadWaiting runner)
(= @processed @added)
))
(shutdown [this]
(reset! running false)
(.interrupt runner)
(.join runner)
)
)))

 

使用的时候很简单, 如下

let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
(.add processes-event-manager sync-processes)

可以直接调用add或其他的function

相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型

比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们

最新文章

  1. Node学习笔记(一):stream流操作
  2. java.lang.UnsatisfiedLinkError: Unable to load library 'xxx': Native library (win32-x86-64/ID_Fpr.dll)
  3. Hive UDF 实验1
  4. Java for LeetCode 063 Unique Paths II
  5. Android里面的命名规范
  6. Android 动画 setVisibility 后出错解决方法
  7. C# winform 中 TabControl 动态显示 TabPage
  8. java:装饰者模式,节点流和处理流
  9. 8.1.C++ AMP简介
  10. C# 根据Word模版生成Word文件
  11. python 标准库基础学习之开发工具部分1学习
  12. SQL SERVER 中 实现主表1行记录,子表多行记录 整合成一条虚拟列
  13. request.getParameterValues与request.getParameter的差别
  14. php+sqlite cms
  15. 学习笔记--jQuery基础
  16. Cordova的搭建
  17. Javascript DOM 编程艺术———总结-2
  18. spring cloud+docker 简单说一说
  19. Android BLE与终端通信(五)——Google API BLE4.0低功耗蓝牙文档解读之案例初探
  20. Oracle E-Business Suite Maintenance Guide Release 12.2(Patching Procedures)

热门文章

  1. CentOS6.2下安装配置MySql
  2. jdom 读取
  3. eclipse生成export生成jar详解
  4. nginx日志自动切割
  5. buildroot 修改root密码后无法登录ssh解决方法
  6. jquery post 同步异步总结
  7. svn & git 问题汇总
  8. 人脸验证算法Joint Bayesian详解及实现(Matlab)
  9. could not find com.android.support.appcompat-v7:23.4.0
  10. 控制反转(Inversion of Control,英文缩写为IoC),另外一个名字叫做依赖注入(Dependency Injection,简称DI)