Hive版本:2.3.7
CliDriver類
image.png
image.png
main方法:
public static void main(String[] args) throws Exception {
int ret = new CliDriver().run(args);
System.exit(ret);
}
run方法:主要是各種參數(shù)的初始化
public int run(String[] args) throws Exception {
//oproc 選項處理器,處理命令行參數(shù)
OptionsProcessor oproc = new OptionsProcessor();
if (!oproc.process_stage1(args)) {
// 初始化環(huán)境配置,返回true丈攒,意味著除打印日志級別外其他參數(shù)已經(jīng)寫入hiveVariables中
// hiveVariables 是一個Map<String,String>
// oproc.process_stage1(args)
return 1;
}
// 日志初始化
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
boolean logInitFailed = false;
String logInitDetailMessage;
//初始化HiveLog4j配置信息
try {
logInitDetailMessage = LogUtils.initHiveLog4j();
} catch (LogInitializationException e) {
logInitFailed = true;
logInitDetailMessage = e.getMessage();
}
//ss客戶端狀態(tài)氓润,創(chuàng)建CliSessionState對象职车,即客戶端會話對象
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
try {
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.info = new PrintStream(System.err, true, "UTF-8");
ss.err = new CachingPrintStream(System.err, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
return 3;
}
//獲取命令行的配置
//檢查命令行配置屬否有誤,-e -f 不能同時選定磕仅,并設(shè)置獲取對應(yīng)屬性進行配置
if (!oproc.process_stage2(ss)) {
return 2;
}
//如果是不沉默的
//hive.session.silent", false
if (!ss.getIsSilent()) {
//如果初始化失敗荐吉,那么打印錯誤日志
if (logInitFailed) {
System.err.println(logInitDetailMessage);
} else {
SessionState.getConsole().printInfo(logInitDetailMessage);
}
}
//創(chuàng)建HiveConf
// set all properties specified via command line
HiveConf conf = ss.getConf();
for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
//獲取并覆蓋相應(yīng)配置
ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue());
}
// read prompt configuration and substitute variables.
//獲取默認(rèn)配置參數(shù)并進行替換
//CLIPROMPT : "hive.cli.prompt", "hive"
prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT);
prompt = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(conf, prompt);
//生成長度和prompt長度一樣的空格
prompt2 = spacesForString(prompt);
//HIVE_CLI_TEZ_SESSION_ASYNC : hive.cli.tez.session.async 是否使用來tez客戶端,默認(rèn)是true
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
// Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
// the session are needed, the corresponding getters and other methods will wait as needed.
//使用異步的方式開始會話
SessionState.beginStart(ss, console);
} else {
SessionState.start(ss);
}
ss.updateThreadName();
// execute cli driver work
// 執(zhí)行cli driver 任務(wù)
try {
return executeDriver(ss, conf, oproc);
} finally {
ss.resetThreadName();
ss.close();
}
}
executeDriver方法:
/**
* Execute the cli work
* @param ss CliSessionState of the CLI driver
* @param conf HiveConf for the driver session
* @param oproc Operation processor of the CLI invocation
* @return status of the CLI command execution
* @throws Exception
*/
/**
* 其實這個方法只是按照特定的規(guī)則也就是以';'結(jié)尾,將輸入處理成一行炬藤,這一行可能是一個命令也可能是多個命令
* 然后交給processLine方法
* */
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
throws Exception {
CliDriver cli = new CliDriver();
cli.setHiveVariables(oproc.getHiveVariables());
// use the specified database if specified
//使用特定的數(shù)據(jù)庫,CliSessionState對象里包含里很多屬性碴里,其中一個就是使用那個數(shù)據(jù)庫
//database 就是數(shù)據(jù)庫的名稱
cli.processSelectDatabase(ss);
// Execute -i init files (always in silent mode)
//初始化文件沈矿,始終保持沉默
//如果使用里初始化文件,則在這里進行處理咬腋,這里其實就是我們使用 -i 指定的文件
cli.processInitFiles(ss);
//使用-e 指定的SQL羹膳,也就是我們常說的-e模式,這樣的話就不會進入命令行根竿,處理結(jié)束之后直接返回
if (ss.execString != null) {
int cmdProcessStatus = cli.processLine(ss.execString);
return cmdProcessStatus;
}
// -f模式指定的SQL文件和-e對應(yīng)
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
} catch (FileNotFoundException e) {
System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
return 3;
}
//HIVE_EXECUTION_ENGINE : hive執(zhí)行的引擎陵像,可以有mr, tez, spark
if ("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE))) {
//如果使用的是MR,那么控制臺打印警告信息
console.printInfo(HiveConf.generateMrDeprecationWarning());
}
//正式進入命令行寇壳,完成里ConsoleReader對象的初始化
setupConsoleReader();
String line;
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(conf, ss);
String curPrompt = prompt + curDB;
//生成與curDB長度相同的空白字符串
String dbSpaces = spacesForString(curDB);
// while循環(huán)讀取客戶端的輸入命令醒颖,換行的SQL也是在這里完成拼接的
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
// -- 開頭的代表注釋
if (line.trim().startsWith("--")) {
continue;
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
//包含;則說明已經(jīng)獲取到里完整的SQL壳炎,可以執(zhí)行了
line = prefix + line;
//執(zhí)行SQL
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}
return ret;
}
processLine方法:
/**
* Processes a line of semicolon separated commands
* 處理以分號分隔的命令
* @param line
* The commands to process
* @param allowInterrupting
* When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
* returning -1
* @return 0 if ok
*/
//line, allowInterrupting=true
public int processLine(String line, boolean allowInterrupting) {
SignalHandler oldSignal = null;
Signal interruptSignal = null;
if (allowInterrupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interruptSignal = new Signal("INT");
//這里是一個回調(diào)方法泞歉,如果出現(xiàn)中斷則執(zhí)行該方法,輸出信息
oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
//interruptRequested : 中斷請求
private boolean interruptRequested;
@Override
public void handle(Signal signal) {
boolean initialRequest = !interruptRequested;
interruptRequested = true;
// Kill the VM on second ctrl+c
// 在第二次ctrl+C ,殺死VM
if (!initialRequest) {
console.printInfo("Exiting the JVM");
System.exit(127);
}
// Interrupt the CLI thread to stop the current statement and return to prompt
// 中斷 CLI 線程以停止當(dāng)前語句并返回
console.printInfo("Interrupting... Be patient, this might take some time.");
console.printInfo("Press Ctrl+C again to kill JVM");
// First, kill any running MR jobs
// kill 掉正在運行的MR任務(wù)
// hadoop作業(yè)執(zhí)行
HadoopJobExecHelper.killRunningJobs();
// Tez作業(yè)執(zhí)行
TezJobExecHelper.killRunningJobs();
// Hive作業(yè)中斷
HiveInterruptUtils.interrupt();
}
});
}
//準(zhǔn)備執(zhí)行命令
try {
int lastRet = 0, ret = 0;
// we can not use "split" function directly as ";" may be quoted
// 使用';'進行切割冕广,獲取命令列表,沒有直接使用"split"函數(shù)疏日,而是使用"substring"切割的
List<String> commands = splitSemiColon(line);
String command = "";
for (String oneCmd : commands) {
if (StringUtils.endsWith(oneCmd, "\\")) {
//如果是以"\\"結(jié)尾的,那么使用StringUtils.chop()切除最后一個字符
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
//StringUtils.isBlank() : 判斷字符串command是否是空字符串
if (StringUtils.isBlank(command)) {
continue;
}
//處理命令
//例如:show databases
//如果沒有退出撒汉,則正常返回ret=1,否則返回ret=0
ret = processCmd(command);
command = "";
lastRet = ret;
//CLIIGNOREERRORS : hive.cli.errors.ignore 客戶端錯誤是否忽略涕滋,默認(rèn)false
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConf) conf);
return ret;
}
}
CommandProcessorFactory.clean((HiveConf) conf);
return lastRet;
} finally {
// Once we are done processing the line, restore the old handler
//一旦我們處理完該行睬辐,恢復(fù)舊的處理程序
if (oldSignal != null && interruptSignal != null) {
Signal.handle(interruptSignal, oldSignal);
}
}
}
processCmd方法:
/**
* processCmd方法需要將processLine方法傳遞進來的一個個命令進行一次處理,也就是說我們的processCmd方法處理的就是一個個確定的命令,
* 但是需要注意hive是可以執(zhí)行多種命令的溯饵,例如:shell侵俗、source、add丰刊,所以我們要進行判斷隘谣,然后進行處理。
* */
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
ss.updateThreadName();
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = cmd.trim();
//使用空格切分cmd_trimmed命令串
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
//如果命令是"quit"或"exit"啄巧,則退出客戶端進程
//因為退出命令quit/exit就是單獨的一行寻歧,所以這里使用cmd_trimmed進行判斷
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);
// source 命令和-f有些類似,不同的是它進入命令行之后執(zhí)行一個SQL文件秩仆,這里使用了分割后的數(shù)組進行了判斷
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
//創(chuàng)建一個變換替換的變量并且實現(xiàn)HiveVariableSource類
cmd_1 = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), cmd_1);
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
// 使用processFile方法讀取SQL文件码泛,最終還是交給processLine方法進行處理
ret = processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
}
//shell 命令,hive的cli是可以執(zhí)行shell命令的澄耍,格式需要以噪珊!開頭
} else if (cmd_trimmed.startsWith("!")) {
//for shell commands, use unstripped command
String shell_cmd = cmd_trimmed.substring(1);
shell_cmd = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), shell_cmd);
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
//創(chuàng)建ShellCmdExecutor,執(zhí)行SQL
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
ret = executor.execute();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
} else { // local mode
//排除了上面幾種特殊的命令齐莲,到這里其實就是要執(zhí)行的就是hive的命令了痢站,不一定是SQL,也可能是SET选酗、ADD之類的命令
try {
//Let Driver strip comments using sql parser
//proc就是命令處理器
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
ret = processLocalCmd(cmd, proc, ss);
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
}
ss.resetThreadName();
return ret;
}
CommandProcessorFactory.get方法(部分):
public static CommandProcessor get(String[] cmd, HiveConf conf)
throws SQLException {
CommandProcessor result = getForHiveCommand(cmd, conf);
if (result != null) {
return result;
}
if (isBlank(cmd[0])) {
return null;
} else {
//如果conf為null阵难,則初始化一個新的Driver
if (conf == null) {
return new Driver();
}
Driver drv = mapDrivers.get(conf);
if (drv == null) {
//如果對應(yīng)的conf沒有生成driver,那么就新初始化一個對應(yīng)conf的Driver
//并且將conf和driver的對應(yīng)關(guān)系存入mapDrivers
drv = new Driver();
mapDrivers.put(conf, drv);
} else {
//重置查詢狀態(tài)
drv.resetQueryState();
}
//刷新
drv.init();
return drv;
}
}
getForHiveCommand方法:
public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf)
throws SQLException {
return getForHiveCommandInternal(cmd, conf, false);
}
getForHiveCommandInternal方法:
public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf,
boolean testOnly)
throws SQLException {
HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
if (hiveCommand == null || isBlank(cmd[0])) {
return null;
}
if (conf == null) {
conf = new HiveConf();
}
Set<String> availableCommands = new HashSet<String>();
for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
.split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
}
if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0])
&& "function".equalsIgnoreCase(cmd[1])) {
// special handling for SQL "reload function"
return null;
}
switch (hiveCommand) {
case SET:
return new SetProcessor();
case RESET:
return new ResetProcessor();
case DFS:
SessionState ss = SessionState.get();
return new DfsProcessor(ss.getConf());
case ADD:
return new AddResourceProcessor();
case LIST:
return new ListResourceProcessor();
case DELETE:
return new DeleteResourceProcessor();
case COMPILE:
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
case CRYPTO:
try {
return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
} catch (HiveException e) {
throw new SQLException("Fail to start the command processor due to the exception: ", e);
}
default:
throw new AssertionError("Unknown HiveCommand " + hiveCommand);
}
}
processLocalCmd方法:
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
int tryCount = 0;
boolean needRetry;
int ret = 0;
do {
try {
needRetry = false;
if (proc != null) {
//處理SQL命令
//instanceof 嚴(yán)格來說是Java中的一個雙目運算符星掰,用來測試一個對象是否為一個類的實例多望,在這里判斷proc是否是Driver的實例,如果是氢烘,返回true
if (proc instanceof Driver) {
Driver qp = (Driver) proc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
//判斷是否輸出SQL
if (ss.getIsVerbose()) {
out.println(cmd);
}
qp.setTryCount(tryCount);
ret = qp.run(cmd).getResponseCode();
if (ret != 0) {
qp.close();
return ret;
}
// query has run capture the time
//查詢運行的時間
long end = System.currentTimeMillis();
double timeTaken = (end - start) / 1000.0;
ArrayList<String> res = new ArrayList<String>();
printHeader(qp, out);
// print the results
int counter = 0;
try {
if (out instanceof FetchConverter) {
((FetchConverter)out).fetchStarted();
}
//通過getResults()方法怀偷,res會獲取到執(zhí)行的結(jié)果
while (qp.getResults(res)) {
for (String r : res) {
out.println(r);
}
counter += res.size();
res.clear();
if (out.checkError()) {
break;
}
}
} catch (IOException e) {
console.printError("Failed with exception " + e.getClass().getName() + ":"
+ e.getMessage(), "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
//關(guān)閉driver對象
int cret = qp.close();
if (ret == 0) {
ret = cret;
}
if (out instanceof FetchConverter) {
((FetchConverter)out).fetchFinished();
}
console.printInfo("Time taken: " + timeTaken + " seconds" +
(counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
} else {
//處理非SQL的命令
String firstToken = tokenizeCmd(cmd.trim())[0];
String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
if (ss.getIsVerbose()) {
ss.out.println(firstToken + " " + cmd_1);
}
CommandProcessorResponse res = proc.run(cmd_1);
if (res.getResponseCode() != 0) {
ss.out.println("Query returned non-zero code: " + res.getResponseCode() +
", cause: " + res.getErrorMessage());
}
if (res.getConsoleMessages() != null) {
for (String consoleMsg : res.getConsoleMessages()) {
console.printInfo(consoleMsg);
}
}
ret = res.getResponseCode();
}
}
} catch (CommandNeedRetryException e) {
console.printInfo("Retry query with a different approach...");
tryCount++;
needRetry = true;
}
} while (needRetry);
return ret;
}
CliSessionState對象:持有了很多屬性信息
/**
* SessionState for hive cli.
*
*/
public class CliSessionState extends SessionState {
/**
* -database option if any that the session has been invoked with.
*/
public String database;
/**
* -e option if any that the session has been invoked with.
*/
public String execString;
/**
* -f option if any that the session has been invoked with.
*/
public String fileName;
/**
* properties set from -hiveconf via cmdline.
*/
public Properties cmdProperties = new Properties();
/**
* -i option if any that the session has been invoked with.
*/
public List<String> initFiles = new ArrayList<String>();
public CliSessionState(HiveConf conf) {
super(conf);
}
@Override
public void close() {
try {
super.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
總結(jié):
CliDriver是我們命令行執(zhí)行SQL的入口,主要功能有:
1.從客戶端獲取輸入播玖,進行簡單解析
2.根據(jù)解析的結(jié)果椎工,創(chuàng)建不同的CommandProcessor,然后執(zhí)行命令
3.最后返回執(zhí)行的結(jié)果
4.執(zhí)行SQL時的順序:
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver類run方法
至此蜀踏,CliDriver類的簡單分析完成维蒙。
參考:
https://blog.csdn.net/king14bhhb/article/details/118417214