首页 技术 正文
技术 2022年11月12日
0 收藏 897 点赞 2,855 浏览 3927 个字

1. ConnectorParams (interface): 定义了各种常量参数,没有声明任何方法。2. ExecutorServlet.java类  2.1 继承类HttpServlet和接口ConnectorParams,用于处理Http请求,主要是Get请求,处理方式都写在doGet方法中。  2.2 init()方法:创建AzkabanExecutorServer实例,通过该executor server实例获取flowRunnerManager,以及jobRunnerManager。  2.3 doGet(HttpServletRequest req, HttpServletResponse resp)方法:处理具体的请求,并返回resp。 针对不同的action,进行不同的处理。        action可以分为两类:        第一类不需要获取execid和user,有三个action,分别是:update,ping,reloadJobTypePlugins;        第二类action,会先获取execid和user,包含:metadata,metadata_jobRunnerMgr,log,attachments,execute,status,cancel,pause,resume,modifyExecution,job_execute,job_cancel         action=execute时,ExecutorServlet类调handleAjaxExecute()方法去调flowRunnerManager.submitFlow(execId)来执行该工作流。  3. FlowRunnerManager.java类  实现EventListener接口    private Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();    private Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();   记录当前正在执行的Flows,key是execId    private Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();    void submitFlow(int execId) 方法:    先判断runningFlows是不是已经包含该execId对应的作业流    如果已经包含:抛异常    如果不包含-》获取execId对应的executableFlow实例flow-》然后执行setupFlow(flow)配置flow(创建项目和执行的目录等)-》获取执行配置(ExecutionOptions)-》判断pipelineExecId是否为null,如果不为null-》判断该pipelineExecId对应的flowRunner在不在runningFlows中     如果在runningFlows中:起一个LocalFlowWatcher检测该pipeline流的执行情况     如果不在runningFlows中:起一个RemoteFlowWatcher检测该pipeline流的执行情况。实际是起了一个RemoteUpdaterThread来每隔一定时间(默认为60秒)通过读取数据库的记录监控流的状态。     -》判断流执行设置里是否包含参数flow.num.job.threads,如果存在该参数,且小于默认的值10,则将该流并行执行的job线程数设置为该参数值。-》创建新的FlowRunner实例runner-》对线程做一些配置,configureFlowLevelMetrics(runner); -》再次check runningflows是否包含execId对应的线程-》将runner加入runningflows-》将这个执行流对应的future加入到submittedFlows里用于跟踪流的执行,修改最后提交时间     void handleEvent方法:当流Finish时,在recentlyFinishedFlows 里加入该流,将流从runningFlows里去除 4. EventListener接口,声明了一个方法:void handleEvent(Event event)5. EventHandler.java类:包含一个HashSet<EventListener>,包含方法addListener,fireEventListener(该方法调用每个listener.handleEvent()),removeListener。6. ExecutableFlow.java: 包含可执行流的相关信息和设置信息的方法。 7.pipelineExecId:pipeline就是并发策略里的流水线,该execId对应的flow正在执行的执行流中最后次提交的执行流execId8. ExecutorManager.java类:private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();    private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();     8.1 public String submitExecutableFlow(ExecutableFlow exflow, String userId)方法:将作业流提交到执行队列          方法过程:          根据exflow获取flowId -》判断queuedFlows是否满-》            queuedFlows满了-》提交失败,打出log提示error            queuedFlows未满-》获取该次执行流对应的作业流所有正在跑的实例running,获取流执行设置(ExecutionOptions)-》获取流的执行参数(是否enable,如果enable则将参数生效)-》判断runningflows是否为空,如果不为空-》获取并发设置            并发设置:流水线(pipeline)-》设置流水线执行Id(PipelineExecutionId)为正在执行的最后次提交的执行流id,获取pipeline level            并发设置:忽略本次执行(skip)-》抛出异常ExecutorManagerException,给出提示该流已经有实例已经在执行,本次执行被skip了            并发设置:并行执行-》仅修改message提示            -》白名单设置?options.setMemoryCheck(memoryCheck);-》判断是否多节点模式(isMultiExecutorMode)            多节点模式:是-》将该flow记录为正在执行的flow(executorLoader.addActiveExecutableReference(reference);),将作业流放入队列queuedFlows.enqueue(exflow, reference);            多节点模式:否-》将该flow记录为正在执行的flow,选择本地executor,下发作业流dispatch(reference, exflow, choosenExecutor);             9. ExecutableFlow.java类:一个可执行流的相关信息。                  10.ExecutionReference.java类:存储execId,executor,updateTime,nextCheckTime,numErrors信息;一个具体的执行实例11. FlowWatcher.java类:检测某个execId的各个作业的执行状态。    private int execId;    private ExecutableFlow flow;    private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();   该map用于存储各个job的执行状态,key为jobId,value为job的状态12. BlockingStatus.java类:管理特定作业的状态,以同步的方式改变作业的状态。当状态处于block状态时,线程会处于等待状态,等待其他线程的通知(notify),最多等待时长为5分钟。      private static final long WAIT_TIME = 5 * 60 * 1000;      private final int execId;      private final String jobId;      private Status status; 13. FlowRunner.java 继承EventHandler,实现Runnable接口。      public void run(): 

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,985
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,501
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,345
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,128
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,763
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,840