title: Azkaban Learning
date: 2017-01-11 11:54:03
tags: [Azkaban,調(diào)度系統(tǒng),大數(shù)據(jù)組件]
categories: "調(diào)度系統(tǒng)"
Azkaban
關(guān)鍵字:Azkaban簡介轩娶、大數(shù)據(jù)作業(yè)調(diào)度系統(tǒng)
這篇文章適合對azkaban有一定了解的人閱讀。建議先粗讀:
AZ開發(fā)文檔:http://azkaban.github.io/azkaban/docs/latest/#overview
強子哥的源碼分析:https://my.oschina.net/qiangzigege/blog/653198
(以下內(nèi)容部分摘自上兩個鏈接)
azkaban源碼: git clone https://github.com/azkaban/azkaban.git
Azkaban 簡介
Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.
Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.
Azkaban 是由Linkedln公司為了解決hadoop 作業(yè)之間的依賴而實現(xiàn)的框往。因為有一些ETL作業(yè)以及數(shù)據(jù)分析產(chǎn)品需要按照一定的順序去執(zhí)行鳄抒。
隨著hadoop用戶的逐年增加,Azkaban從一個簡單的服務(wù)解決方案發(fā)展成為一個更加健壯魯棒的方案椰弊。
Azkaban的系統(tǒng)架構(gòu)主要由三個組件組成:
- WebServer :暴露Restful API许溅,提供分發(fā)作業(yè)和調(diào)度作業(yè)功能;
- ExecServer :對WebServer 暴露 API 秉版,提供執(zhí)行作業(yè)的功能贤重;
- MySQL :數(shù)據(jù)存儲,實現(xiàn)Web 和 Exec之間的數(shù)據(jù)共享和部分狀態(tài)的同步清焕。
多執(zhí)行節(jié)點模式下并蝗,更細節(jié)一點的架構(gòu)圖可以如下祭犯,圖中省略MySQL數(shù)據(jù)庫:
非常簡單而直觀
WebServer
暴露Restful API
在azkaban-webserver工程中,可以非常清晰地看到對外暴露的Servlet滚停,其中最主要的幾個是:
- ExecutorServlet 主要提供立即執(zhí)行作業(yè)沃粗、取消作業(yè)流、暫停作業(yè)流键畴、獲取流或節(jié)點日志等接口
- ScheduleServlet 主要提供設(shè)置調(diào)度最盅、設(shè)置Sla報警規(guī)則、獲取調(diào)度信息等接口
- HistoryServlet 主要提供查看作業(yè)流執(zhí)行歷史的接口
- ProjectManagerServlet 主要提供上傳項目zip包起惕、下載項目zip包檩禾、刪除項目、獲取流的DAG等接口
分發(fā)作業(yè)
ExecutorManager 主要承擔(dān)這部分的功能疤祭,所有類型的作業(yè)(包括立即執(zhí)行和調(diào)度執(zhí)行),都會通過submitExecutableFlow(ExecutableFlow exflow, String userId)這個方法進行提交盼产。
在該方法中,我們可以看到:如果是多執(zhí)行節(jié)點模式下勺馆,執(zhí)行實例先放進分發(fā)隊列中戏售;如果是單節(jié)點模式下,立即調(diào)用dispatch方法進行分發(fā)草穆。
if (isMultiExecutorMode()) {
//Take MultiExecutor route
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
// assign only local executor we have
Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference.getExecId());
throw e;
}
}
在多執(zhí)行節(jié)點模式下灌灾,執(zhí)行實例被放進分發(fā)隊列。隊列會被線程 QueueProcessorThread 定時處理悲柱。
/* Method responsible for processing the non-dispatched flows */
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
long lastExecutorRefreshTime = 0;
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
ExecutionReference reference = runningCandidate.getFirst();
ExecutableFlow exflow = runningCandidate.getSecond();
long currentTime = System.currentTimeMillis();
// if we have dispatched more than maxContinuousFlowProcessed or
// It has been more then activeExecutorsRefreshWindow millisec since we
// refreshed
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
// Refresh executorInfo for all activeExecutors
refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
/**
* <pre>
* TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
* Currently we try each queued flow once to infer a global busy state
* Possible improvements:-
* 1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
* 2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
* taking out all the filters which do not depend on the flow but are still being part of Selector.
* Assumptions:-
* 1. no one else except QueueProcessor is updating ExecutableFlow update time
* 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
* </pre>
*/
if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
// put back in the queue
queuedFlows.enqueue(exflow, reference);
long sleepInterval =
activeExecutorsRefreshWindow
- (currentTime - lastExecutorRefreshTime);
// wait till next executor refresh
sleep(sleepInterval);
} else {
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
}
// do not count failed flow processsing (flows still in queue)
if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
currentContinuousFlowProcessed++;
}
}
}
selectExecutorAndDispatchFlow 方法先是選擇執(zhí)行節(jié)點(選擇節(jié)點的實現(xiàn)比較有意思)锋喜,選好節(jié)點后最終也是調(diào)用了dispatch進行作業(yè)分發(fā)。
/* process flow with a snapshot of available Executors */
private void selectExecutorAndDispatchFlow(ExecutionReference reference,
ExecutableFlow exflow, Set<Executor> availableExecutors)
throws ExecutorManagerException {
synchronized (exflow) {
Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
if (selectedExecutor != null) {
try {
dispatch(reference, exflow, selectedExecutor);
} catch (ExecutorManagerException e) {
logger.warn(String.format(
"Executor %s responded with exception for exec: %d",
selectedExecutor, exflow.getExecutionId()), e);
handleDispatchExceptionCase(reference, exflow, selectedExecutor,
availableExecutors);
}
} else {
handleNoExecutorSelectedCase(reference, exflow);
}
}
}
因為Web 和Exec 之間是通過mysql進行數(shù)據(jù)共享的豌鸡,所以dispatch進行作業(yè)分發(fā)的邏輯非常簡單嘿般,就是簡單地通過HTTP請求傳遞execId等信息,其余所需要的數(shù)據(jù)都通過數(shù)據(jù)庫讀寫完成涯冠。
調(diào)度作業(yè)
調(diào)度作業(yè)是調(diào)度系統(tǒng)的最重要的功能之一炉奴,也是Azkaban里相對復(fù)雜的一個模塊。調(diào)度是通過ScheduleManager對外暴露蛇更,對應(yīng)著的結(jié)構(gòu)是Schedule瞻赶;對內(nèi)是通過TriggerManager來實現(xiàn),對應(yīng)著的結(jié)構(gòu)是Trigger派任。
所有的調(diào)度信息都通過ScheduleManager.scheduleFlow傳入,可以看到傳入?yún)?shù)包含了項目id砸逊、項目名字、流名字掌逛、第一次調(diào)度時間戳师逸、時區(qū)、調(diào)度周期颤诀、下一次執(zhí)行時間字旭、提交時間、提交人崖叫。對于一個調(diào)度來說遗淳,最關(guān)鍵的信息無非是第一次調(diào)度時間和調(diào)度周期。
public Schedule scheduleFlow(final int scheduleId, final int projectId,
final String projectName, final String flowName, final String status,
final long firstSchedTime, final DateTimeZone timezone,
final ReadablePeriod period, final long lastModifyTime,
final long nextExecTime, final long submitTime, final String submitUser)
從scheduleFlow 往下可以看到調(diào)用了TriggerBasedScheduleLoader.insertSchedule心傀。這個方法里邊先是將Schedule轉(zhuǎn)換成了Trigger屈暗,然后將Trigger放到了TriggerManager里邊。scheduleToTrigger方法寫的非常簡潔巧妙脂男,讀者自行研究养叛,此處不作細致分析。
@Override
public void insertSchedule(Schedule s) throws ScheduleManagerException {
Trigger t = scheduleToTrigger(s);
try {
triggerManager.insertTrigger(t, t.getSubmitUser());
s.setScheduleId(t.getTriggerId());
} catch (TriggerManagerException e) {
throw new ScheduleManagerException("Failed to insert new schedule!", e);
}
}
我們在繼續(xù)看看Trigger被塞到TriggerManager做了些啥宰翅。從下邊可以看到弃甥,先是調(diào)用triggerLoader寫進數(shù)據(jù)庫,然后就放到了一個線程runnerThread中去汁讼。
public void insertTrigger(Trigger t) throws TriggerManagerException {
synchronized (syncObj) {
try {
triggerLoader.addTrigger(t);
} catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
}
接下來就顯而易見了淆攻,這個線程TriggerScannerThread runnerThread 定期檢查Trigger是否應(yīng)該觸發(fā)(onTriggerTrigger)或者終止(onTriggerExpire)。
private void checkAllTriggers() throws TriggerManagerException {
long now = System.currentTimeMillis();
// sweep through the rest of them
for (Trigger t : triggers) {
try {
scannerStage = "Checking for trigger " + t.getTriggerId();
boolean shouldSkip = true;
if (shouldSkip && t.getInfo() != null && t.getInfo().containsKey("monitored.finished.execution")) {
int execId = Integer.valueOf((String) t.getInfo().get("monitored.finished.execution"));
if (justFinishedFlows.containsKey(execId)) {
logger.info("Monitored execution has finished. Checking trigger earlier " + t.getTriggerId());
shouldSkip = false;
}
}
if (shouldSkip && t.getNextCheckTime() > now) {
shouldSkip = false;
}
if (shouldSkip) {
logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
}
if (logger.isDebugEnabled()) {
logger.info("Checking trigger " + t.getTriggerId());
}
if (t.getStatus().equals(TriggerStatus.READY)) {
if (t.triggerConditionMet()) {
onTriggerTrigger(t);
} else if (t.expireConditionMet()) {
onTriggerExpire(t);
}
}
if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
removeTrigger(t);
} else {
t.updateNextCheckTime();
}
} catch (Throwable th) {
//skip this trigger, moving on to the next one
logger.error("Failed to process trigger with id : " + t.getTriggerId(), th);
}
}
}
Trigger觸發(fā)的時候就會調(diào)用自己的action.doAction()嘿架,調(diào)度任務(wù)的Trigger的action一般都是ExecuteFlowAction瓶珊,其doAction方法如下。方法主要做了兩個事情耸彪,第一個是構(gòu)建執(zhí)行實例ExecutableFlow伞芹,第二個是如果該調(diào)度設(shè)置了報警規(guī)則,則構(gòu)建SlaTrigger蝉娜。
構(gòu)建執(zhí)行實例完成后唱较,可以看到調(diào)用了executorManager.submitExecutableFlow(exflow, submitUser) 進行作業(yè)分發(fā),這樣子召川,就跟上文提到的作業(yè)分發(fā)殊途同歸绊汹。下邊不再分析。
@Override
public void doAction() throws Exception {
if (projectManager == null || executorManager == null) {
throw new Exception("ExecuteFlowAction not properly initialized!");
}
Project project = projectManager.getProject(projectId);
if (project == null) {
logger.error("Project to execute " + projectId + " does not exist!");
throw new RuntimeException("Error finding the project to execute "
+ projectId);
}
Flow flow = project.getFlow(flowName);
if (flow == null) {
logger.error("Flow " + flowName + " cannot be found in project "
+ project.getName());
throw new RuntimeException("Error finding the flow to execute "
+ flowName);
}
ExecutableFlow exflow = new ExecutableFlow(project, flow);
exflow.setSubmitUser(submitUser);
exflow.addAllProxyUsers(project.getProxyUsers());
if (executionOptions == null) {
executionOptions = new ExecutionOptions();
}
if (!executionOptions.isFailureEmailsOverridden()) {
executionOptions.setFailureEmails(flow.getFailureEmails());
}
if (!executionOptions.isSuccessEmailsOverridden()) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
exflow.setExecutionOptions(executionOptions);
try {
executorManager.submitExecutableFlow(exflow, submitUser);
logger.info("Invoked flow " + project.getName() + "." + flowName);
} catch (ExecutorManagerException e) {
throw new RuntimeException(e);
}
// deal with sla
if (slaOptions != null && slaOptions.size() > 0) {
int execId = exflow.getExecutionId();
for (SlaOption sla : slaOptions) {
logger.info("Adding sla trigger " + sla.toString() + " to execution "
+ execId);
SlaChecker slaFailChecker =
new SlaChecker("slaFailChecker", sla, execId);
Map<String, ConditionChecker> slaCheckers =
new HashMap<String, ConditionChecker>();
slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
Condition triggerCond =
new Condition(slaCheckers, slaFailChecker.getId()
+ ".isSlaFailed()");
// if whole flow finish before violate sla, just expire
SlaChecker slaPassChecker =
new SlaChecker("slaPassChecker", sla, execId);
Map<String, ConditionChecker> expireCheckers =
new HashMap<String, ConditionChecker>();
expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
Condition expireCond =
new Condition(expireCheckers, slaPassChecker.getId()
+ ".isSlaPassed()");
List<TriggerAction> actions = new ArrayList<TriggerAction>();
List<String> slaActions = sla.getActions();
for (String act : slaActions) {
if (act.equals(SlaOption.ACTION_ALERT)) {
SlaAlertAction slaAlert =
new SlaAlertAction("slaAlert", sla, execId);
actions.add(slaAlert);
} else if (act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
KillExecutionAction killAct =
new KillExecutionAction("killExecution", execId);
actions.add(killAct);
}
}
Trigger slaTrigger =
new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond,
actions);
slaTrigger.getInfo().put("monitored.finished.execution",
String.valueOf(execId));
slaTrigger.setResetOnTrigger(false);
slaTrigger.setResetOnExpire(false);
logger.info("Ready to put in the sla trigger");
triggerManager.insertTrigger(slaTrigger);
logger.info("Sla inserted.");
}
}
}
WebServer總結(jié)
下邊用一張圖簡單總結(jié)
ExecServer
暴露Restful API
Azkaban3.0后就開始支持多執(zhí)行節(jié)點部署扮宠。單個執(zhí)行節(jié)點比較簡單西乖,對web暴露的API也比較少,主要是:
- ExecutorServlet 主要提供執(zhí)行坛增、取消获雕、暫停、日志查詢等接口收捣。
執(zhí)行作業(yè)
這里簡單看下執(zhí)行節(jié)點執(zhí)行一個作業(yè)的流程是怎樣的届案。我們在ExecutorServlet中看到所有的執(zhí)行作業(yè)請求都經(jīng)過handleAjaxExecute方法,這個方法簡單地將執(zhí)行id傳遞給FlowRunnerManager:
private void handleAjaxExecute(HttpServletRequest req,
Map<String, Object> respMap, int execId) throws ServletException {
try {
flowRunnerManager.submitFlow(execId);
} catch (ExecutorManagerException e) {
e.printStackTrace();
logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
FlowRunnerManager 通過submitFlow方法提交工作流去執(zhí)行罢艾。先是構(gòu)建執(zhí)行實例ExecutableFlow楣颠,然后準(zhǔn)備執(zhí)行目錄setupFlow(flow)尽纽,然后生成FlowRunner,然后提交到線程池去運行executorService.submit(runner)童漩。
public void submitFlow(int execId) throws ExecutorManagerException {
// Load file and submit
if (runningFlows.containsKey(execId)) {
throw new ExecutorManagerException("Execution " + execId
+ " is already running.");
}
ExecutableFlow flow = null;
flow = executorLoader.fetchExecutableFlow(execId);
if (flow == null) {
throw new ExecutorManagerException("Error loading flow with exec "
+ execId);
}
// Sets up the project files and execution directory.
setupFlow(flow);
// Setup flow runner
FlowWatcher watcher = null;
ExecutionOptions options = flow.getExecutionOptions();
if (options.getPipelineExecutionId() != null) {
Integer pipelineExecId = options.getPipelineExecutionId();
FlowRunner runner = runningFlows.get(pipelineExecId);
if (runner != null) {
watcher = new LocalFlowWatcher(runner);
} else {
watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
}
}
int numJobThreads = numJobThreadPerFlow;
if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
try {
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
FLOW_NUM_JOB_THREADS));
if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
.isProjectWhitelisted(flow.getProjectId(),
WhitelistType.NumJobPerFlow))) {
numJobThreads = numJobs;
}
} catch (Exception e) {
throw new ExecutorManagerException(
"Failed to set the number of job threads "
+ options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
+ " for flow " + execId, e);
}
}
FlowRunner runner =
new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
.setNumJobThreads(numJobThreads).addListener(this);
configureFlowLevelMetrics(runner);
// Check again.
if (runningFlows.containsKey(execId)) {
throw new ExecutorManagerException("Execution " + execId
+ " is already running.");
}
// Finally, queue the sucker.
runningFlows.put(execId, runner);
try {
// The executorService already has a queue.
// The submit method below actually returns an instance of FutureTask,
// which implements interface RunnableFuture, which extends both
// Runnable and Future interfaces
Future<?> future = executorService.submit(runner);
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
// update the last submitted time.
this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (RejectedExecutionException re) {
throw new ExecutorManagerException(
"Azkaban server can't execute any more flows. "
+ "The number of running flows has reached the system configured limit."
+ "Please notify Azkaban administrators");
}
}
FlowRunner本身也繼承與Runnable弄贿,其run方法里邊調(diào)用了 runFlow方法,方法內(nèi)容如下矫膨。方法里按照樹的層次結(jié)構(gòu)逐層訪問DAG圖的每一個job差凹,逐個去提交執(zhí)行。
private void runFlow() throws Exception {
logger.info("Starting flows");
runReadyJob(this.flow);
updateFlow();
while (!flowFinished) {
synchronized (mainSyncObj) {
if (flowPaused) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
continue;
} else {
if (retryFailedJobs) {
retryAllFailures();
} else if (!progressGraph()) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
}
}
}
}
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
updateFlow();
logger.info("Finished Flow");
}
對于單個job侧馅,最后構(gòu)造一個JobRunner去執(zhí)行之危尿。
private void runExecutableNode(ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
executorService.submit(runner);
activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
logger.error(e);
}
;
}
private JobRunner createJobRunner(ExecutableNode node) {
// Load job file.
File path = new File(execDir, node.getJobSource());
JobRunner jobRunner =
new JobRunner(node, path.getParentFile(), executorLoader,
jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
if (validateUserProxy) {
jobRunner.setValidatedProxyUsers(proxyUsers);
}
jobRunner.setDelayStart(node.getDelayedExecution());
jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
if (JobCallbackManager.isInitialized()) {
jobRunner.addListener(JobCallbackManager.getInstance());
}
configureJobLevelMetrics(jobRunner);
return jobRunner;
}
每個jobRunner在執(zhí)行的時候,都去插件模塊里邊尋找對應(yīng)的插件來進行job的類型加載馁痴。每種job類型都有對應(yīng)的run方法谊娇。最后就是調(diào)用run方法去執(zhí)行job。各種不同類型的job可以參考azkaban默認(rèn)的job類型以及 azkaban-plugin工程里邊實現(xiàn)的一些hadoop相關(guān)作業(yè)類型罗晕。
try {
job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
} catch (JobTypeManagerException e) {
logger.error("Failed to build job type", e);
return false;
}
Azkaban Plugin
azkaban的插件機制使得可以非常方便的增加插件類型邮绿,從而支持運行更多的作業(yè)類型。azkaban的hadoop插件可以從以下倉庫中找到:
git clone https://github.com/azkaban/azkaban-plugins.git
插件的實現(xiàn)
其中插件的類繼承關(guān)系圖如下攀例。每種插件作業(yè)都會單獨起一個進程去執(zhí)行船逮。其中ProcessJob就是負(fù)責(zé)起進程的一個類;JavaProcessJob繼承自它粤铭,特化為Java進程挖胃;其他的hadoop插件又各自繼承自JavaProcessJob。如果要自己實現(xiàn)插件類型梆惯,只要繼承JavaProcessJob類酱鸭,在繼承子類里邊調(diào)用插件的Wrapper類就可以了。具體細節(jié)可以看代碼實現(xiàn)垛吗。