程序入口
在org.apache.hadoop.hbase.master.HMaster中定義了MasterRpcServices提供rpc服務
org.apache.hadoop.hbase.master.MasterRpcServices實現(xiàn)了接口MasterProtos.MasterService.BlockingInterface
其中使用了google的protobuf rpc通信异吻,可以參見另一篇文章:hbase與客戶端的通信過程解析鉴吹,最終實現(xiàn)了createTable接口:
@Override
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
throws ServiceException {
HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
try {
long procId =
master.createTable(hTableDescriptor, splitKeys, req.getNonceGroup(), req.getNonce());
return CreateTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
habse的procedure框架
@Override
public long createTable(
final HTableDescriptor hTableDescriptor,
final byte [][] splitKeys,
final long nonceGroup,
final long nonce) throws IOException {
...
return MasterProcedureUtil.submitProcedure(
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
...
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
submitProcedure(new CreateTableProcedure(
procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch));
latch.await();
...
}
});
}
這里引用了兩個概念皿伺,一個是NonceProcedureRunnable惰瓜,意思是以下的代碼只能同時執(zhí)行一次账磺,避免重復執(zhí)行同一個create table操作临梗;另一個是Procedure宇挫,對一些必須保證事務性的操作捧书,hbase實現(xiàn)了一套Procedure操作吹泡,方便rollback;
org.apache.hadoop.hbase.procedure2.ProcedureExecutor
定義了幾個核心的方法:
- public long submitProcedure(final Procedure proc, final NonceKey nonceKey)
實現(xiàn)了一個Procedure的提交過程经瓷,寫wal爆哑,同時將Procedure加入runnable和rollback隊列 - private void execProcedure(final RootProcedureState procStack, final Procedure procedure)
執(zhí)行一個procedure,如果procedure有下一步要執(zhí)行的subprocedure舆吮,那么繼續(xù)執(zhí)行 - private boolean executeRollback(final Procedure proc)
實現(xiàn)一個procedure的rollback - private void load(final boolean abortOnCorruption)
當程序異常終止后通過wal恢復現(xiàn)場
org.apache.hadoop.hbase.procedure2.Procedure
execute()
is called each time the procedure is executed.it may be called multiple times in case of failure and restart, so the code must be idempotent.the return is a set of sub-procedures or null in case the procedure doesn't have sub-procedures. Once the sub-procedures are successfully completed the execute() method is called again, you should think at it as a stack:rollback()
is called when the procedure or one of the sub-procedures is failed.he rollback step is supposed to cleanup the resources created during theexecute() step. in case of failure and restart rollback() may be called multiple times, so the code must be idempotent.
org.apache.hadoop.hbase.procedure2.StateMachineProcedure
實現(xiàn)了一個按照state去以此執(zhí)行的procedure
- Once the procedure is running, the procedure-framework will call executeFromState()揭朝, using the 'state' provided by the user. The implementor can jump between states using setNextState(MyStateEnum.ordinal()).
- The rollback will call rollbackState() for each state that was executed, in reverse order.
對procedure框架的使用:
- 實現(xiàn)procedure類:CreateTableProcedure extends StateMachineProcedure<MasterProcedureEnv, CreateTableState>
- 在Hmaster中start一個ProcedureExecutor,提交CreateTableProcedure
具體的create table過程
@Override
protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableState state)
throws InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " execute state=" + state);
}
try {
switch (state) {
case CREATE_TABLE_PRE_OPERATION:
// Verify if we can create the table
boolean exists = !prepareCreate(env);
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
if (exists) {
assert isFailed() : "the delete should have an exception here";
return Flow.NO_MORE_STATE;
}
preCreate(env);
setNextState(CreateTableState.CREATE_TABLE_WRITE_FS_LAYOUT);
break;
case CREATE_TABLE_WRITE_FS_LAYOUT:
newRegions = createFsLayout(env, hTableDescriptor, newRegions);
setNextState(CreateTableState.CREATE_TABLE_ADD_TO_META);
break;
case CREATE_TABLE_ADD_TO_META:
newRegions = addTableToMeta(env, hTableDescriptor, newRegions);
setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
break;
case CREATE_TABLE_ASSIGN_REGIONS:
assignRegions(env, getTableName(), newRegions);
setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
break;
case CREATE_TABLE_UPDATE_DESC_CACHE:
updateTableDescCache(env, getTableName());
setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
break;
case CREATE_TABLE_POST_OPERATION:
postCreate(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (HBaseException|IOException e) {
LOG.error("Error trying to create table=" + getTableName() + " state=" + state, e);
setFailure("master-create-table", e);
}
return Flow.HAS_MORE_STATE;
}
入代碼所示色冀,流程如下:
- CREATE_TABLE_PRE_OPERATION
判斷表存在潭袱,調用MasterCoprocessor;
注意這行代碼:ProcedurePrepareLatch.releaseLatch(syncLatch, this);
意味著當判斷過table是否存在后latch釋放锋恬,客戶端請求即可返回屯换,因此create table這個操作對于客戶端來說本身是一個異步操作 - CREATE_TABLE_WRITE_FS_LAYOUT
在磁盤上創(chuàng)建對應的目錄文件 - CREATE_TABLE_ADD_TO_META
將table加到meta表里邊,但是region并不可用 - CREATE_TABLE_ASSIGN_REGIONS
將region分配給regionserver与学,這一步將region都設置為offline趟径,寫到zk的region-in-transition目錄下,提供給regionserver接管癣防;同時會enable這個table; - CREATE_TABLE_UPDATE_DESC_CACHE
update一下內存中的cache - CREATE_TABLE_POST_OPERATION
調用MasterCoprocessor掌眠;