azkaban是沒有創(chuàng)建作業(yè)流,創(chuàng)建任務,建立任務之間關系的操作的.
它本身的工作模式,在本地進行編輯.創(chuàng)建任務,創(chuàng)建作業(yè)流,建立依賴關系,然后打成zip上傳.
DirectoryFlowLoader解壓zip包,然后解析解壓出來的zip目錄.
首先需要兩個基礎類Node,Edge.
Node表示的是作業(yè)流中的節(jié)點信息,包含job文件名,properties屬性文件名,作業(yè)類型,
Edge表示的是節(jié)點之間邊信息.
loadProjectFlow
將project目錄解析成flow,properties json串,將一些無效的job,flow預先剔除掉.
無效的job:
- type屬性沒寫的
- 重復job(多次出現(xiàn)將會被剔除掉)
無效依賴:
- 只身依賴(自己依賴自己)
- 無效依賴(所依賴的job不存在)
- 依賴于job是個重復job
代碼
public void loadProjectFlow(Project project, File baseDirectory) {
propsList = new ArrayList<Props>();//整個工程的.properties配置列表
flowPropsList = new ArrayList<FlowProps>();//作業(yè)流.properties配置列表
jobPropsMap = new HashMap<String, Props>();//jobName->.job配置
nodeMap = new HashMap<String, Node>();//jobName->Node
flowMap = new HashMap<String, Flow>();//flowName->Flow
errors = new HashSet<String>();
duplicateJobs = new HashSet<String>();//重復任務名(jobName)
nodeDependencies = new HashMap<String, Map<String, Edge>>();//jobname->sourceJobName->依賴的邊
rootNodes = new HashSet<String>();//根節(jié)點,解釋一下,這里的根節(jié)點是flow中最后的節(jié)點
flowDependencies = new HashMap<String, Set<String>>();//flow于flow之間的依賴關系,解決內(nèi)嵌之間依賴關系
// Load all the props files and create the Node objects
loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
jobPropertiesCheck(project);
// Create edges and find missing dependencies
resolveDependencies();
// Create the flows.
buildFlowsFromDependencies();
// Resolve embedded flows
resolveEmbeddedFlows();
}
loadProjectFlow函數(shù)用于解析工作目錄,解析job,構建工作流.
第一步loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
目的有兩個,一將配置文件解析成類,二排除文件重復job
加載.properties,.job文件,將加載的配置放入
flowPropsList,propsList.(.properties)
jobPropsMap,duplicateJobs,nodeMap.(.job)
nodeMap存儲的是所有的節(jié)點信息
jobPropertiesCheck函數(shù),檢查job任務屬性是否合格.azkaban中會限制每個任務最大內(nèi)存和最小內(nèi)存.如果超過job的社會超過限制,就會放入error中.
private void jobPropertiesCheck(Project project) {
// if project is in the memory check whitelist, then we don't need to check
// its memory settings
if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
ProjectWhitelist.WhitelistType.MemoryCheck)) {
return;
}
String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
long sizeMaxXms = Utils.parseMemString(maxXms);
long sizeMaxXmx = Utils.parseMemString(maxXmx);
for (String jobName : jobPropsMap.keySet()) {
Props jobProps = jobPropsMap.get(jobName);
String xms = jobProps.getString(XMS, null);
if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
&& Utils.parseMemString(xms) > sizeMaxXms) {
errors.add(String.format(
"%s: Xms value has exceeded the allowed limit (max Xms = %s)",
jobName, maxXms));
}
String xmx = jobProps.getString(XMX, null);
if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
&& Utils.parseMemString(xmx) > sizeMaxXmx) {
errors.add(String.format(
"%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
jobName, maxXmx));
}
// job callback properties check
JobCallbackValidator.validate(jobName, props, jobProps, errors);
}
}
resolveDependencies回溯依賴關系,得到所有Edge,將無效依賴排除(依賴的job不存在,依賴的job是重復job)
buildFlowsFromDependencies利用之前解析好Nodes,Edges,回溯形成flow.
//這里所謂的根節(jié)點是末節(jié)點
private void buildFlowsFromDependencies() {
//找出所有的依賴節(jié)點
// Find all root nodes by finding ones without dependents.
HashSet<String> nonRootNodes = new HashSet<String>();
for (Map<String, Edge> edges : nodeDependencies.values()) {
for (String sourceId : edges.keySet()) {
nonRootNodes.add(sourceId);
}
}
// Now create flows. Bad flows are marked invalid
Set<String> visitedNodes = new HashSet<String>();
for (Node base : nodeMap.values()) {
// Root nodes can be discovered when parsing jobs
if (rootNodes.contains(base.getId())
|| !nonRootNodes.contains(base.getId())) {
rootNodes.add(base.getId());
Flow flow = new Flow(base.getId());
Props jobProp = jobPropsMap.get(base.getId());
// Dedup with sets
@SuppressWarnings("unchecked")
List<String> successEmailList =
jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS,
Collections.EMPTY_LIST);
Set<String> successEmail = new HashSet<String>();
for (String email : successEmailList) {
successEmail.add(email.toLowerCase());
}
@SuppressWarnings("unchecked")
List<String> failureEmailList =
jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS,
Collections.EMPTY_LIST);
Set<String> failureEmail = new HashSet<String>();
for (String email : failureEmailList) {
failureEmail.add(email.toLowerCase());
}
@SuppressWarnings("unchecked")
List<String> notifyEmailList =
jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS,
Collections.EMPTY_LIST);
for (String email : notifyEmailList) {
email = email.toLowerCase();
successEmail.add(email);
failureEmail.add(email);
}
flow.addFailureEmails(failureEmail);
flow.addSuccessEmails(successEmail);
flow.addAllFlowProperties(flowPropsList);
constructFlow(flow, base, visitedNodes);//不斷的遞歸,直到依賴為null為止
flow.initialize();
flowMap.put(base.getId(), flow);
}
}
}