Azkaban3.45

一 简介

1 官网

https://azkaban.github.io/

Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.

Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.

Azkaban是由LinkedIn为了解决Hadoop环境下任务依赖问题而开发的,LinkedIn团队有很多任务需要按照顺序运行,包括ETL任务以及数据分析任务;

Azkaban一开始是单server方案,现在已经演化为一个更健壮的方案;(可惜当前版本的WebServer还是单点)

Azkaban consists of 3 key components:

  • Relational Database (MySQL)
  • AzkabanWebServer
  • AzkabanExecutorServer

Azkaban有3个核心组件:Mysql、WebServer、ExecutorServer;

2 部署

3 数据库表结构

projects:项目

project_flows:工作流定义

execution_flows:工作流实例

execution_jobs:任务实例

triggers:调度定义

ps:表中很多数据都是编码的,enc_type是编码类型(对应的枚举为EncodingType),2是gzip编码,其他为无编码,2需要调用GZIPUtils.transformBytesToObject解析得到原始字符串;

4 概念

l  Job:最小的执行单元,作为DAG的一个结点,即任务

l  Flow:由多个Job组成,并通过dependent配置Job的依赖属性,即工作流

l  Tirgger:根据指定Cron信息触发Flow,即调度

二 代码解析

1 启动过程

Web Server

AzkabanWebServer.main

         launch

                  prepareAndStartServer

                          configureRoutes

                                   TriggerManager.start

                          FlowTriggerService.start

                                   recoverIncompleteTriggerInstances

                                            SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))

                          FlowTriggerScheduler.start

ExecutorManager

         setupExecutors

         loadRunningFlows

QueueProcessorThread.run

ExecutingManagerUpdaterThread.run

Executor Server

AzkabanExecutorServer.main

         launch

                  AzkabanExecutorServer.start

                          insertExecutorEntryIntoDB

2 工作流执行过程

Web Server两个入口:

ExecuteFlowAction.doAction

ExecutorServlet.ajaxExecuteFlow

Web Server分配任务:

ExecutorManager.submitExecutableFlow

         JdbcExecutorLoader.uploadExecutableFlow

                  INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)

         ExecutorLoader.addActiveExecutableReference

                  INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)

         queuedFlows.enqueue

QueueProcessorThread.run

         processQueuedFlows

                  ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)

                          selectExecutor

                          dispatch

                                   JdbcExecutorLoader.assignExecutor

                                            UPDATE execution_flows SET executor_id=? where exec_id=?

                                   ExecutorApiGateway.callWithExecutable (调用Executor Server)

Executor Server执行任务:

ExecutorServlet.doGet

         handleAjaxExecute

                  FlowRunnerManager.submitFlow

                          JdbcExecutorLoader.fetchExecutableFlow

                                 SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?

                          FlowPreparer.setup

                          FlowRunner.run

                                   setupFlowExecution

                                   updateFlow

                                            UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?

                                   runFlow

                                            progressGraph

                                                     runReadyJob

                                                             runExecutableNode

                                                                      JobRunner.run

                                                                               uploadExecutableNode

                                                                                        INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)

                                                                               prepareJob

                                                                               runJob

                                                                                        Job.run (ProcessJob, JavaJob)

Web Server轮询流程状态:

ExecutingManagerUpdaterThread.run

         getFlowToExecutorMap

         ExecutorApiGateway.callWithExecutionId

         updateExecution

3 调度执行过程

TriggerManager.start

         loadTriggers

                  SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers

         TriggerScannerThread.start

                  checkAllTriggers

                          onTriggerTrigger

                                   TriggerAction.doAction

                                            ExecuteFlowAction.doAction

PS:还有另一套完全独立的定时任务逻辑,通过azkaban.server.schedule.enable_quartz控制(默认false),以下为register job到quartz:

ProjectManagerServlet.ajaxHandleUpload

         SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true

         ProjectManager.loadAllProjectFlows

                  SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?

         FlowTriggerScheduler.scheduleAll

                  SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?

                  SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?

                  registerJob

以下为quartz job执行:

FlowTriggerQuartzJob.execute

         FlowTriggerService.startTrigger

                  TriggerInstanceProcessor.processSucceed

                          TriggerInstanceProcessor.executeFlowAndUpdateExecID

                                   ExecutorManager.submitExecutableFlow

4 任务执行过程

Job是任务的核心接口,所有具体任务都是该接口的子类:

Job

         AbstractJob

                  AbstractProcessJob

                          ProcessJob (Shell任务)

                                   JavaProcessJob (Java任务)

                                            JavaJob

最新文章

  1. 解密jQuery事件核心 - 自定义设计(三)
  2. Java aes加密C#解密的取巧方法
  3. 动态调用web服务
  4. Decorator(装饰)-对象结构型模式
  5. CSS实现背景透明,文字不透明(各浏览器兼容)
  6. linux下crontab实现定时服务详解
  7. hdu 4300 Clairewd’s message KMP应用
  8. OnScroll与OnTouchEvent方法的区别与联系
  9. Java Topology Suite (JTS)与空间数据模型
  10. 本地搭建开发环境开发redis程序
  11. flowJS源码个人分析
  12. 如何通过jQuery获取一个没有定高度的元素---------的自适应高度(offsetHeight的正确使用方法)
  13. python中使用redis实战
  14. 适用于 Windows 7 SP1、Windows Server 2008 R2 SP1 和 Windows Server 2008 SP2 的 .NET Framework 4.5.2 仅安全更新说明:2017 年 9 月 12 日
  15. Qt5数据库
  16. java 模拟登录新浪微博(通过cookie)
  17. Vue入门系列(五)Vue实例详解与生命周期
  18. 使用pm2管理node.js应用
  19. 从头认识java-18.2 主要的线程机制(4)-优先级
  20. Linux下通用打印系统CUPS使用教程

热门文章

  1. Leetcode 21. Merge Two Sorted Lists(easy)
  2. CodeForces Round #549 Div.2
  3. jmeter学习记录--03--jmeter负载与监听
  4. react 报错的堆栈处理
  5. js判断数组中有没有指定元素
  6. Django模板语言初识
  7. root密码重置、Linux目录结构和远程连接Linux
  8. SpringCloud实践引入注册中心+配置中心
  9. mongoDB 数据库简介
  10. grafana-Admin密码重置