Yarn之狀態(tài)機分析
前言
hadoop2.x.x版本的底層實現(xiàn)中作了很多優(yōu)化:用狀態(tài)機對各種對象生命周期和狀態(tài)轉(zhuǎn)移進行管理顿膨;采用事件機制避免線程同步與阻塞。主要分析下yarn中狀態(tài)機的實現(xiàn)機制
一伍伤、事件
YARN中的很多組件之間進行通信抄邀,主要借助于事件。為了可讀性像云、可維護性及可擴展性锌雀,YARN中的事件由事件名稱和事件類型組成。比如JobImpl處理的事件名稱為JobEvent迅诬,而事件類型為JobEventType.
二腋逆、狀態(tài)
YARN中的很多組件之間進行通信,主要借助于事件侈贷。為了可讀性惩歉、可維護性及可擴展性,YARN中的事件由事件名稱和事件類型組成俏蛮。比如JobImpl處理的事件名稱為JobEvent撑蚌,而事件類型為JobEventType,如下所示
public enum JobStateInternal {
NEW,
SETUP,
INITED,
RUNNING,
COMMITTING,
SUCCEEDED,
FAIL_WAIT,
FAIL_ABORT,
FAILED,
KILL_WAIT,
KILL_ABORT,
KILLED,
ERROR,
REBOOT
}
我們看到JobImpl的內(nèi)部狀態(tài)包括新建(NEW)、初始化(INITED)搏屑、運行中(RUNNING)锨并、提交中(COMMITTING)、成功(SUCCEEDED)睬棚、失數谥蟆(FAILED)等
三、轉(zhuǎn)化
我們已經(jīng)了解了事件與狀態(tài)的基本實現(xiàn)與概念抑党,那么事件與狀態(tài)有什么關(guān)系?一個對象當前處于狀態(tài)state0,當對象接收到事件Event后,將引發(fā)轉(zhuǎn)換動作transition,最終當前對象的狀態(tài)過渡到state1.
1)Yarn中與狀態(tài)轉(zhuǎn)化相關(guān)的類圖如下
2)SingleArcTransition
@Public
@Evolving
public interface SingleArcTransition<OPERAND, EVENT> {
/**
* Transition hook.
*
* @param operand the entity attached to the FSM, whose internal
* state may change.
* @param event causal event
*/
public void transition(OPERAND operand, EVENT event);
}
由于SingleArcTransition的具體實現(xiàn)類只負責接收到事件后的具體操作或行為包警,并沒有包含狀態(tài)相關(guān)的信息,所以在狀態(tài)機執(zhí)行狀態(tài)過渡時底靠,并不是直接調(diào)用SingleArcTransition具體實現(xiàn)類的transition方法害晦,而是由接口Transition定義(見代碼清單3)真正的轉(zhuǎn)態(tài)過渡(包括行為和狀態(tài)改變)。
- Transition接口
private interface Transition<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType);
}
- SingleInternalArc接口
SingleInternalArc作為Transition接口的實現(xiàn)類暑中,在代理SingleArcTransition的同時壹瘟,負責狀態(tài)變換
private class SingleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {
private STATE postState;
private SingleArcTransition<OPERAND, EVENT> hook; // transition hook
SingleInternalArc(STATE postState,
SingleArcTransition<OPERAND, EVENT> hook) {
this.postState = postState;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType) {
if (hook != null) {
hook.transition(operand, event);
}
return postState;
}
}
3)MultipleArcTransition
@Public
@Evolving
public interface MultipleArcTransition
<OPERAND, EVENT, STATE extends Enum<STATE>> {
/**
* Transition hook.
* @return the postState. Post state must be one of the
* valid post states registered in StateMachine.
* @param operand the entity attached to the FSM, whose internal
* state may change.
* @param event causal event
*/
public STATE transition(OPERAND operand, EVENT event);
}
由于MultipleArcTransition的具體實現(xiàn)類只負責接收到事件后的具體操作或行為,并沒有包含狀態(tài)相關(guān)的信息鳄逾,所以在狀態(tài)機執(zhí)行狀態(tài)過渡時稻轨,并不是直接調(diào)用MultipleArcTransition具體實現(xiàn)類的transition方法,而是通過代理類MultipleInternalArc雕凹。MultipleInternalArc也實現(xiàn)了Transition接口殴俱,并在代理 MultipleArcTransition 的轉(zhuǎn)換行為的同時,負責狀態(tài)變換.
- MultipleInternalArc
private class MultipleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{
// Fields
private Set<STATE> validPostStates;
private MultipleArcTransition<OPERAND, EVENT, STATE> hook; // transition hook
MultipleInternalArc(Set<STATE> postStates,
MultipleArcTransition<OPERAND, EVENT, STATE> hook) {
this.validPostStates = postStates;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType)
throws InvalidStateTransitonException {
STATE postState = hook.transition(operand, event);
if (!validPostStates.contains(postState)) {
throw new InvalidStateTransitonException(oldState, eventType);
}
return postState;
}
}
4)ApplicableTransition
為了將所有狀態(tài)機中的狀態(tài)過渡與狀態(tài)建立起映射關(guān)系枚抵,YARN中提供了ApplicableTransition接口用于將SingleInternalArc和 MultipleInternalArc 添加到狀態(tài)機的拓撲表中线欲,提高在檢索狀態(tài)對應的過渡實現(xiàn)時的性能,ApplicableTransition的實現(xiàn)類為ApplicableSingleOrMultipleTransition類汽摹,其apply方法用于代理SingleInternalArc和MultipleInternalArc 李丰,將它們添加到狀態(tài)拓撲表中。
- ApplicableTransition接口定義
private interface ApplicableTransition
<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
}
- ApplicableSingleOrMultipleTransition
static private class ApplicableSingleOrMultipleTransition
<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
ApplicableSingleOrMultipleTransition
(STATE preState, EVENTTYPE eventType,
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
}
@Override
public void apply
(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= subject.stateMachineTable.get(preState);
if (transitionMap == null) {
// I use HashMap here because I would expect most EVENTTYPE's to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMap's here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
}
transitionMap.put(eventType, transition);
}
}
可以看到ApplicableSingleOrMultipleTransition的apply方法就是為構(gòu)建狀態(tài)拓撲表
四逼泣、狀態(tài)機
YARN中狀態(tài)機的實現(xiàn)類是StateMachineFactory趴泌,它主要包含4個屬性信息:
- transitionsListNode:就是將狀態(tài)機的一個個過渡的ApplicableTransition實現(xiàn)串聯(lián)為一個列表舟舒,每個節(jié)點包含一個ApplicableTransition實現(xiàn)及指向下一個節(jié)點的引用
private class TransitionsListNode {
final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
final TransitionsListNode next;
TransitionsListNode
(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,
TransitionsListNode next) {
this.transition = transition;
this.next = next;
}
}
transitionsListNode形成的過渡列表節(jié)點如下:
- stateMachineTable:狀態(tài)拓撲表,為了提高檢索狀態(tài)對應的過渡map而冗余的數(shù)據(jù)結(jié)構(gòu),此結(jié)構(gòu)在optimized為真時踱讨,通過對transitionsListNode鏈表進行處理產(chǎn)生
defaultInitialState:對象創(chuàng)建時魏蔗,內(nèi)部有限狀態(tài)機的默認初始狀態(tài)砍的。比如:JobImpl的內(nèi)部狀態(tài)機默認初始狀態(tài)是JobStateInternal.NEW痹筛。
optimized:布爾類型,用于標記當前狀態(tài)機是否需要優(yōu)化性能廓鞠,即構(gòu)建狀態(tài)拓撲表stateMachineTable帚稠。
-
公共構(gòu)造器
StateMachineFactory 的公有構(gòu)造器只有一個。
public StateMachineFactory(STATE defaultInitialState) { this.transitionsListNode = null; this.defaultInitialState = defaultInitialState; this.optimized = false; this.stateMachineTable = null; }
-
私有構(gòu)造器
StateMachineFactory 的 私有構(gòu)造器有兩個床佳,其中代碼清單11中的構(gòu)造器在addTransition方法中使用滋早。
從其實現(xiàn)看出,此構(gòu)造器的主要作用是構(gòu)建transitionsListNode鏈表砌们。private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode); this.optimized = false; this.stateMachineTable = null; }
下面的構(gòu)造器則用于installTopology方法
private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = that.transitionsListNode; this.optimized = optimized; if (optimized) { makeStateMachineTable(); } else { stateMachineTable = null; } }
構(gòu)造器當optimized參數(shù)為true時杆麸,調(diào)用了makeStateMachineTable方法,makeStateMachineTable的實現(xiàn)如下:
private void makeStateMachineTable() { Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack = new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>(); Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(); prototype.put(defaultInitialState, null); // I use EnumMap here because it'll be faster and denser. I would // expect most of the states to have at least one transition. stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype); for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next) { stack.push(cursor.transition); } while (!stack.isEmpty()) { stack.pop().apply(this); } }
1.創(chuàng)建堆棧stack浪感,用于將transitionsListNode鏈表中各個節(jié)點持有的ApplicableSingleOrMultipleTransition壓入棧中昔头;
2.創(chuàng)建狀態(tài)拓撲表stateMachineTable,并在此拓撲表中插入一個額外的默認初始狀態(tài)defaultInitialState與null的映射影兽;
3.迭代訪問transitionsListNode鏈表揭斧,并將各個節(jié)點持有的ApplicableSingleOrMultipleTransition壓入棧中;
4.依次彈出棧頂?shù)腁pplicableSingleOrMultipleTransition峻堰,并應用其apply方法(已在前面小節(jié)介紹)讹开,持續(xù)不斷的構(gòu)建狀態(tài)拓撲表stateMachineTable。
五捐名、狀態(tài)機的構(gòu)建
為了簡化敘述旦万,本節(jié)以JobImpl中狀態(tài)機的構(gòu)建為例。由于JobImpl的狀態(tài)機預設(shè)的(調(diào)用addTransition方法)加入的ApplicableSingleOrMultipleTransition非常多镶蹋,我們節(jié)選其中的2個作為典型進行分析纸型。最后還會分析installTopology方法的實現(xiàn)。
protected static final
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory
= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
(JobStateInternal.NEW)
// Transitions from NEW state
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
// 省略其它addTransition調(diào)用
// create the topology tables
.installTopology();
構(gòu)建JobImpl的狀態(tài)機的步驟如下:
1.調(diào)用 StateMachineFactory 構(gòu)造器創(chuàng)建一個初始的狀態(tài)機
2.調(diào)用addTransition(STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook)方法添加單弧過渡梅忌。從其實現(xiàn)(見代碼清單15)可以知道addTransition方法將SingleArcTransition封裝為SingleInternalArc狰腌,然后將SingleInternalArc封裝為ApplicableSingleOrMultipleTransition,最后調(diào)用之前說的第一個私有構(gòu)造器構(gòu)建transitionsListNode鏈表
3.調(diào)用addTransition(STATE preState, Set<STATE> postStates, EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)方法添加多弧過渡牧氮。從其實現(xiàn)(見代碼清單16)可以知道addTransition方法將MultipleArcTransition封裝為MultipleInternalArc琼腔,然后將MultipleInternalArc封裝為ApplicableSingleOrMultipleTransition,最后調(diào)用之前說的第一個私有構(gòu)造器構(gòu)建transitionsListNode鏈表踱葛;
4.最后調(diào)用installTopology方法丹莲,其實現(xiàn)見代碼清單17光坝。installTopology正是在使用之前說的第二個私有構(gòu)造器構(gòu)建狀態(tài)拓撲表stateMachineTable
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, STATE postState,
EVENTTYPE eventType,
SingleArcTransition<OPERAND, EVENT> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new SingleInternalArc(postState, hook)));
}
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, Set<STATE> postStates,
EVENTTYPE eventType,
MultipleArcTransition<OPERAND, EVENT, STATE> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this,
new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new MultipleInternalArc(postStates, hook)));
}
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
installTopology() {
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
}
Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();
Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();
prototype.put(defaultInitialState, null);
// I use EnumMap here because it'll be faster and denser. I would
// expect most of the states to have at least one transition.
stateMachineTable
= new EnumMap<STATE, Map<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);
for (TransitionsListNode cursor = transitionsListNode;
cursor != null;
cursor = cursor.next) {
stack.push(cursor.transition);
}
//最終在這里,狀態(tài)路由表都存放到一個StateMachineFactory中了
while (!stack.isEmpty()) {
stack.pop().apply(this);
}