1 mk-worker

和其他的daemon一样, 都是通过defserverfn macro来创建worker

(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
(log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
" and conf " conf)
(if-not (local-mode? conf)
(redirect-stdio-to-slf4j!))
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
(when (= :distributed (cluster-mode conf))
(touch (worker-pid-path conf worker-id (process-pid))))
(let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id) ;;1.1 生成work-data
;;1.2 生成worker的hb
        heartbeat-fn #(do-heartbeat worker)
;; do this here so that the worker process dies if this fails
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
_ (heartbeat-fn) ;; heartbeat immediately to nimbus so that it knows that the worker has been started
_ (do-executor-heartbeats worker) executors (atom nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
_ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
_ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors)) ;;1.3 更新发送connections
refresh-connections (mk-refresh-connections worker)
_ (refresh-connections nil)
_ (refresh-storm-active worker nil)
        ;;1.4 创建executors
_ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)))
        ;;1.5 launch接收线程,将数据从server的侦听端口, 不停的放到task对应的接收队列
         receive-thread-shutdown (launch-receive-thread worker) ;;返回值是thread的close function
        
        ;;1.6 定义event handler来处理transfer queue里面的数据, 并创建transfer-thread
transfer-tuples (mk-transfer-tuples-handler worker)
transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)
               
        ;;1.7 定义worker shutdown函数, 以及worker的操作接口实现
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " assignment-id " " port)
(doseq [[_ socket] @(:cached-node+port->socket worker)]
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
(log-message "Shutting down receive thread")
(receive-thread-shutdown)
(log-message "Shut down receive thread")
(log-message "Terminating messaging context")
(log-message "Shutting down executors")
(doseq [executor @executors] (.shutdown executor))
(log-message "Shut down executors") ;;this is fine because the only time this is shared is when it's a local context,
;;in which case it's a noop
(.term ^IContext (:mq-context worker))
(log-message "Shutting down transfer thread")
(disruptor/halt-with-interrupt! (:transfer-queue worker)) (.interrupt transfer-thread)
(.join transfer-thread)
(log-message "Shut down transfer thread")
(cancel-timer (:heartbeat-timer worker))
(cancel-timer (:refresh-connections-timer worker))
(cancel-timer (:refresh-active-timer worker))
(cancel-timer (:executor-heartbeat-timer worker))
(cancel-timer (:user-timer worker)) (close-resources worker) ;; TODO: here need to invoke the "shutdown" method of WorkerHook (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port)
(log-message "Disconnecting from storm cluster state context")
(.disconnect (:storm-cluster-state worker))
(.close (:cluster-state worker))
(log-message "Shut down worker " storm-id " " assignment-id " " port))
ret (reify
Shutdownable
(shutdown
[this]
(shutdown*))
DaemonCommon
(waiting? [this]
(and
(timer-waiting? (:heartbeat-timer worker))
(timer-waiting? (:refresh-connections-timer worker))
(timer-waiting? (:refresh-active-timer worker))
(timer-waiting? (:executor-heartbeat-timer worker))
(timer-waiting? (:user-timer worker))
))
)] (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
(schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) (log-message "Worker has topology config " (:storm-conf worker))
(log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
ret
))

 

1.1 生成worker-data

(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
storm-conf (read-supervisor-storm-conf conf storm-id)
        ;;从assignments里面找出分配给这个worker的executors, 另外加上个SYSTEM_EXECUTOR
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
;;基于disruptor创建worker用于接收和发送messgae的buffer queue
        ;;创建基于disruptor的transfer-queue
        transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
;;对于每个executors创建receive-queue(基于disruptor-queue),并生成{e,queue}的map返回
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
;;executor可能有多个tasks,相同executor的tasks公用一个queue, 将{e,queue}转化为{t,queue}
receive-queue-map (->> executor-receive-queue-map
(mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
(into {}))
;;读取supervisor机器上存储的stormcode.ser (topology对象的序列化文件)
topology (read-supervisor-topology conf storm-id)]
     ;;recursive-map,会将底下value都执行一遍, 用返回值和key生成新的map
     (recursive-map
:conf conf
:mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf)) ;;已经prepare的具有IContext接口的对象
:storm-id storm-id
:assignment-id assignment-id
:port port
:worker-id worker-id
:cluster-state cluster-state
:storm-cluster-state storm-cluster-state
:storm-active-atom (atom false)
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
:topology topology
:system-topology (system-topology! storm-conf topology)
:heartbeat-timer (mk-halting-timer)
:refresh-connections-timer (mk-halting-timer)
:refresh-active-timer (mk-halting-timer)
:executor-heartbeat-timer (mk-halting-timer)
:user-timer (mk-halting-timer)
:task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>)) ;;从ComponentCommon中读出steams的fields信息
:component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
:endpoint-socket-lock (mk-rw-lock)
:cached-node+port->socket (atom {})
:cached-task->node+port (atom {})
:transfer-queue transfer-queue
:executor-receive-queue-map executor-receive-queue-map
:short-executor-receive-queue-map (map-key first executor-receive-queue-map) ;;单纯为了简化executor的表示, 由[first-task,last-task]变为first-task
:task->short-executor (->> executors ;;列出task和简化后的short-executor的对应关系
(mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
(into {})
(HashMap.))
:suicide-fn (mk-suicide-fn conf)
:uptime (uptime-computer)
:default-shared-resources (mk-default-resources <>)
:user-shared-resources (mk-user-resources <>)
:transfer-local-fn (mk-transfer-local-fn <>) ;;接收messages并发到task对应的接收队列
:transfer-fn (mk-transfer-fn <>) ;;将处理过的message放到发送队列transfer-queue
)))

1.2 Worker Heartbeat

1.2.1. 建立worker本地的hb

调用do-heartbeat, 将worker的hb写到本地的localState数据库中, (.put state LS-WORKER-HEARTBEAT hb false)

1.2.2. 将worker hb同步到zk, 以便nimbus可以立刻知道worker已经启动

调用do-executor-heartbeats, 通过worker-heartbeat!将worker hb写入zk的workerbeats目录

1.2.3. 设定timer定期更新本地hb和zk hb

(schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)

(schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))

 

1.3 维护和更新worker的发送connection

mk-refresh-connections定义并返回一个匿名函数, 但是这个匿名函数, 定义了函数名this, 这个情况前面也看到, 是因为这个函数本身要在函数体内被使用.

并且refresh-connections是需要反复被执行的, 即当每次assignment-info发生变化的时候, 就需要refresh一次

所以这里使用timer.schedule-recurring就不合适, 因为不是以时间触发

这里使用的是zk的callback触发机制

Supervisor的mk-synchronize-supervisor, 以及worker的mk-refresh-connections, 都采用类似的机制

a. 首先需要在每次assignment改变的时候被触发, 所以都利用zk的watcher

b. 都需要将自己作为callback, 并在获取assignment时进行注册, 都使用(fn this [])

c. 因为比较耗时, 都选择后台执行callback, 但是mk-synchronize-supervisor使用的是eventmanager, mk-refresh-connections使用的是timer

两者不同, timer是基于优先级队列, 所以更灵活, 可以设置延时时间, 而eventmanager, 就是普通队列实现, FIFO

另外, eventmanager利用reify来封装接口, 返回的是record, 比timer的实现要优雅些

首先, 如果没有指定callback, 以(schedule (:refresh-connections-timer worker) 0 this)为callback

接着, (.assignment-info storm-cluster-state storm-id callback) 在获取assignment信息的时候, 设置callback, 也就是说当assignment发生变化时, 就会向refresh-connections-timer中发送一个'立即执行this’的event

这样就可以保证, 每次assignment发生变化, timer都会在后台做refresh-connections的操作

(defn mk-refresh-connections [worker]
(let [outbound-tasks (worker-outbound-tasks worker) ;;a.找出该woker需要向哪些component tasks发送数据,to-tasks
conf (:conf worker)
storm-cluster-state (:storm-cluster-state worker)
storm-id (:storm-id worker)]
(fn this
([]
(this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) ;;schedule往timer里面加event
([callback]
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
my-assignment (-> assignment ;;b.得到to-tasks的node+port
:executor->node+port
to-task->node+port
(select-keys outbound-tasks)
(#(map-val endpoint->string %)))
;; we dont need a connection for the local tasks anymore
needed-assignment (->> my-assignment ;;c.排除local tasks
(filter-key (complement (-> worker :task-ids set))))
needed-connections (-> needed-assignment vals set)
needed-tasks (-> needed-assignment keys) current-connections (set (keys @(:cached-node+port->socket worker)))
new-connections (set/difference needed-connections current-connections) ;;d.需要add的和remove的connections
remove-connections (set/difference current-connections needed-connections)]
(swap! (:cached-node+port->socket worker) ;;e.创建新的connections
#(HashMap. (merge (into {} %1) %2))
(into {}
(dofor [endpoint-str new-connections
:let [[node port] (string->endpoint endpoint-str)]]
[endpoint-str
(.connect
^IContext (:mq-context worker)
storm-id
((:node->host assignment) node)
port)
]
)))
(write-locked (:endpoint-socket-lock worker)
(reset! (:cached-task->node+port worker)
(HashMap. my-assignment)))
(doseq [endpoint remove-connections]
(.close (get @(:cached-node+port->socket worker) endpoint)))
(apply swap!
(:cached-node+port->socket worker)
#(HashMap. (apply dissoc (into {} %1) %&))
remove-connections) (let [missing-tasks (->> needed-tasks
(filter (complement my-assignment)))]
(when-not (empty? missing-tasks)
(log-warn "Missing assignment for following tasks: " (pr-str missing-tasks))
)))))))

refresh-connections的步骤

a. 找出该worker下需要往其他task发送数据的task, outbound-tasks

    worker-outbound-tasks, 找出当前work中的task属于的component, 并找出该component的目标component

    最终找出目标compoennt所对应的所有task, 作为返回   

b. 找出outbound-tasks对应的tasks->node+port, my-assignment

c. 如果outbound-tasks在同一个worker进程中, 不需要建connection, 所以排除掉, 剩下needed-assignment 

   :value –> needed-connections , :key –> needed-tasks

d. 和当前已经创建并cache的connection集合对比一下, 找出new-connections和remove-connections

e. 调用Icontext.connect, (.connect ^IContext (:mq-context worker) storm-id ((:node->host assignment) node) port), 创建新的connection, 并merge到:cached-node+port->socket中

f. 使用my-assignment更新:cached-task->node+port (结合:cached-node+port->socket, 就可以得到task->socket) 

g. close所有remove-connections, 并从:cached-node+port->socket中删除

 

1.4 创建worker中的executors

executor/mk-executor worker e, Storm-源码分析-Topology Submit-Executor

 

1.5 launch-receive-thread

launch接收线程,将数据从server的侦听端口, 不停的放到task对应的接收队列

(defn launch-receive-thread [worker]
(log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
(msg-loader/launch-receive-thread!
(:mq-context worker)
(:storm-id worker)
(:port worker)
(:transfer-local-fn worker)
(-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
:kill-fn (fn [t] (halt-process! 11))))

1.5.1 mq-context

调用TransportFactory/makeContext来创建context对象, 根据配置不同, 分别创建local或ZMQ的context

1.5.2 transfer-local-fn

返回fn, 该fn会将tuple-batch里面的tuples, 按task所对应的executor发送到对应的接收队列

(defn mk-transfer-local-fn [worker]
(let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
task->short-executor (:task->short-executor worker)
task-getter (comp #(get task->short-executor %) fast-first)]
(fn [tuple-batch]
(let [grouped (fast-group-by task-getter tuple-batch)] ;;将tuple-batch按executor进行分组
(fast-map-iter [[short-executor pairs] grouped] ;;对应grouped里面每个entry执行下面的逻辑
(let [q (short-executor-receive-queue-map short-executor)]
(if q
(disruptor/publish q pairs) ;;将tuple pairs发送到executor所对应的接收queue里面
(log-warn "Received invalid messages for unknown tasks. Dropping... ")
)))))))

 

(defn fast-group-by [afn alist]
(let [ret (HashMap.)]
(fast-list-iter [e alist] ;;宏, e表示list里面的elem, 下面的逻辑会在每个elem上执行
(let [key (afn e) ;;将afn(e)作为key
^List curr (get-with-default ret key (ArrayList.))] ;;value是arraylist, 如果第一次就创建
(.add curr e))) ;;把e加到对应key的arraylist里面
ret ))

作用就是将alist里面的elem, 按afn(elem)作为key, 经行group, 最终返回hashmap, 以便通过key可以取出所有的elem

 

(defmacro fast-map-iter [[bind amap] & body]
`(let [iter# (map-iter ~amap)] ;;把map转化为entryset, 并返回iterator
(while (iter-has-next? iter#)
(let [entry# (iter-next iter#)
~bind (convert-entry entry#)]
~@body
))))

对上面的例子,

bind = [short-executor pairs]

amap = grouped

grouped的一个entry是, {: short-executor pairs}

一个简化的iter map的宏, 比较难于理解

1.5.3 msg-loader/launch-receive-thread!

a, 使用async-loop, 创建异步执行loop的线程, 并start thread

   主要的逻辑是, bind到socket端口, 不停的recv messages

   当接收完一批, 通过transfer-local-fn放到接收队列

b, 在async-loop中已经start thread, 完成let的时候thread已经在工作了

   这个function的返回值, 很有意思, 其实是这个thread的close function, 并且由于闭包了该thread, 使得这个thread在close前一直存在

(defnk launch-receive-thread!
[context storm-id port transfer-local-fn max-buffer-size
:daemon true
:kill-fn (fn [t] (System/exit 1))
:priority Thread/NORM_PRIORITY]
(let [max-buffer-size (int max-buffer-size)
vthread (async-loop
(fn []
(let [socket (.bind ^IContext context storm-id port)]
(fn []
(let [batched (ArrayList.)
init (.recv ^IConnection socket 0)] ;;block方式
(loop [packet init]
(let [task (if packet (.task ^TaskMessage packet))
message (if packet (.message ^TaskMessage packet))]
(if (= task -1) ;;收到结束命令
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
nil )
(do
(when packet (.add batched [task message]))
(if (and packet (< (.size batched) max-buffer-size))
(recur (.recv ^IConnection socket 1)) ;;non-block方式, 无数据则loop结束
(do (transfer-local-fn batched) ;;将batched数据放到各个task对应的接收队列
0 ))))))))))
:factory? true
:daemon daemon
:kill-fn kill-fn
:priority priority)]
(fn [] ;;该thread的close function
(let [kill-socket (.connect ^IContext context storm-id "localhost" port)] ;;本地创建client socket用于发送kill cmd
(log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
(.send ^IConnection kill-socket ;;发送kill cmd, -1
-1
(byte-array []))
(log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
(.join vthread) ;;等待thread结束
(.close ^IConnection kill-socket)
(log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
))))

1.6 生成mk-transfer-tuples-handler, 并创建transfer-thread

生成disrputor的event handler,

将packets不停的放到drainer里面, 当batch结束时, 将drainer里面的每条message发送到对应task的connection

(defn mk-transfer-tuples-handler [worker]
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
drainer (ArrayList.)
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
(.addAll drainer packets)
(when batch-end?
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering) (fast-list-iter [[task ser-tuple] drainer]
;; TODO: consider write a batch of tuples here to every target worker
;; group by node+port, do multipart send
(let [node-port (get task->node+port task)]
(when node-port
(.send ^IConnection (get node+port->socket node-port) task ser-tuple))
))))
(.clear drainer))))))

 

总结,

从下图比较清晰的可以看出worker做了哪些事情,

1. 根据assignment变化, 调整或创建send-connection

2. 创建executors的输入和输出queue

3. 创建worker的接收和发送线程, receive thread和tansfer thread

4. 根据assignments关系, 创建executors

其中线程间通信使用的是, disruptor

而进程间通信使用的是, ZMQ

最新文章

  1. [LeetCode] Recover Binary Search Tree 复原二叉搜索树
  2. 深入了解try catch
  3. dom解析xml
  4. 详谈OC(object-c)深浅复制/拷贝-什么情况下用retain和copy
  5. mysql概要(八)视图
  6. 关于javascript的误区
  7. 转:Apache和Nginx运行原理解析
  8. Java笔记(十六)&hellip;&hellip;内部类
  9. EF有外键的查询
  10. 对$NOMOD51的理解
  11. 从零开始Unity3D游戏开发【1 常用快捷键】
  12. C++关于数字逆序输出的两种思路,及字符串逆序输出
  13. 如何将编译出来的images拷贝到windows下面刷机
  14. GitHub 入门教程
  15. img alt与title的区别
  16. 为OPENCV添加freetype支持并显示中文字符(在mac上编译opencv及contrib库)
  17. 使用FormData格式在前后端传递数据
  18. 数据库部分(MySql)_4
  19. 学习笔记之1001 Inventions That Changed the World
  20. Java基础-Java中的内存分配与回收机制

热门文章

  1. 每日英语:Stalled Project Shows Why China&#39;s Economy Is Wobbling
  2. 总结iOS9中的新的方法
  3. vue2.0项目的环境配置以及有哪些的坑
  4. jquery 获取各种高宽
  5. 利用MapReduce实现数据去重
  6. PHP——连接数据库初
  7. 1.phpmyadmin的配置
  8. 修改多渠道打包的App名
  9. TensorFlow基础笔记(2) minist分类学习
  10. Ubuntu16.04+cuda8.0+cuDNNV5.1 + Tensorflow+ GT 840M安装小结