問(wèn)題背景
筆者所在的部門(mén)屬于公司的大數(shù)據(jù)架構(gòu)部,現(xiàn)主要參與公司流式計(jì)算平臺(tái)的推廣戴卜,個(gè)人負(fù)責(zé)spark的平臺(tái)維護(hù)逾条、特性定制、線(xiàn)上問(wèn)題修改等投剥。為了方便業(yè)務(wù)用戶(hù)提交spark應(yīng)用师脂。我們開(kāi)發(fā)了一套實(shí)時(shí)計(jì)算管理平臺(tái),用戶(hù)在頁(yè)面上填寫(xiě)應(yīng)用的相關(guān)參數(shù)江锨,點(diǎn)擊提交按鈕后觸發(fā)后臺(tái)服務(wù)向yarn集群提交應(yīng)用吃警。具體流程如下圖:
從圖中我們可以看出,無(wú)論前臺(tái)是什么用戶(hù)在進(jìn)行操作啄育,均是由WebServer來(lái)進(jìn)行應(yīng)用的提交酌心。Hadoop在默認(rèn)情況下,spark應(yīng)用提交的用戶(hù)名均是進(jìn)程WebServer啟動(dòng)的用戶(hù)名灸撰。而在ResourceManager服務(wù)端,為了進(jìn)行安全的隔離和成本統(tǒng)計(jì)拼坎,會(huì)為不同的用戶(hù)分配不同的隊(duì)列浮毯,因此我們需要WebServer支持多用戶(hù)提交。
問(wèn)題分析
無(wú)論是Yarn還是Hdfs的相關(guān)接口泰鸡,底層的通信均是采用統(tǒng)一的RPC模型债蓝;下面一ClientRMService為例來(lái)進(jìn)行下整個(gè)的調(diào)用交互過(guò)程:
在rpc接口調(diào)用之前,客戶(hù)端必須和服務(wù)端建立socket連接盛龄,同時(shí)在socket連接建立的過(guò)程對(duì)客戶(hù)端的進(jìn)行認(rèn)證饰迹。讀者在這個(gè)過(guò)程中,肯定有個(gè)疑問(wèn)余舶,就是客戶(hù)端的用戶(hù)名是什么時(shí)候發(fā)送到服務(wù)端的啊鸭,下面我們來(lái)看下org.apache.hadoop.ipc.protocolPB.Client類(lèi)。
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connecting to " + server);
}
short numRetries = 0;
Random rand = null;
while (true) {
//和遠(yuǎn)程的Server端建立socket連接
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeConnectionHeader(outStream);
if (authProtocol == AuthProtocol.SASL) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket();
if (ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
}
try {
authMethod = ticket
.doAs(new PrivilegedExceptionAction<AuthMethod>() {
@Override
public AuthMethod run()
throws IOException, InterruptedException {
return setupSaslConnection(in2, out2);
}
});
} catch (Exception ex) {
authMethod = saslRpcClient.getAuthMethod();
if (rand == null) {
rand = new Random();
}
handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex,
rand, ticket);
continue;
}
if (authMethod != AuthMethod.SIMPLE) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
// for testing
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(false);
}
} else if (UserGroupInformation.isSecurityEnabled()) {
if (!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE " +
"auth, but this client is configured to only allow secure " +
"connections.");
}
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(true);
}
}
}
if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
}
this.out = new DataOutputStream(outStream);
//寫(xiě)入連接的上下文匿值,包括用戶(hù)名等ugi信息
writeConnectionContext(remoteId, authMethod);
// update last activity time
touch();
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connected to " + server);
}
// start the receiver thread after the socket connection has been set
// up
//啟動(dòng)線(xiàn)程接受服務(wù)端的response消息
start();
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}
連接的上下文的詳細(xì)信息
/* Write the connection context header for each connection
* Out is not synchronized because only the first thread does this.
*/
private void writeConnectionContext(ConnectionId remoteId,
AuthMethod authMethod)
throws IOException {
// Write out the ConnectionHeader
IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
RPC.getProtocolName(remoteId.getProtocol()),
//ticket為當(dāng)前線(xiàn)程的ugi信息
remoteId.getTicket(),
authMethod);
RpcRequestHeaderProto connectionContextHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId);
RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(connectionContextHeader, message);
// Write out the packet length
out.writeInt(request.getLength());
request.write(out);
}
其中ticket獲取的方式為UserGroupInformation.getCurrentUser
赠制,具體的內(nèi)容如下:
public synchronized
static UserGroupInformation getCurrentUser() throws IOException {
//如果線(xiàn)程的訪(fǎng)問(wèn)上下文中設(shè)置了Subject,則直接獲取Subject的用戶(hù)信息
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
return getLoginUser();
} else {
return new UserGroupInformation(subject);
}
}
public synchronized
static UserGroupInformation getLoginUser() throws IOException {
if (loginUser == null) {
loginUserFromSubject(null);
}
return loginUser;
}
public synchronized
static void loginUserFromSubject(Subject subject) throws IOException {
ensureInitialized();
try {
if (subject == null) {
subject = new Subject();
}
LoginContext login =
newLoginContext(authenticationMethod.getLoginAppName(),
subject, new HadoopConfiguration());
login.login();
UserGroupInformation realUser = new UserGroupInformation(subject);
realUser.setLogin(login);
realUser.setAuthenticationMethod(authenticationMethod);
realUser = new UserGroupInformation(login.getSubject());
// If the HADOOP_PROXY_USER environment variable or property
// is specified, create a proxy user as the logged in user.
String proxyUser = System.getenv(HADOOP_PROXY_USER);
if (proxyUser == null) {
proxyUser = System.getProperty(HADOOP_PROXY_USER);
}
loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
// Load the token storage file and put all of the tokens into the
// user. Don't use the FileSystem API for reading since it has a lock
// cycle (HADOOP-9212).
Credentials cred = Credentials.readTokenStorageFile(
new File(fileLocation), conf);
loginUser.addCredentials(cred);
}
loginUser.spawnAutoRenewalThreadForUserCreds();
} catch (LoginException le) {
LOG.debug("failure to login", le);
throw new IOException("failure to login", le);
}
if (LOG.isDebugEnabled()) {
LOG.debug("UGI loginUser:"+loginUser);
}
}
用戶(hù)名的獲取
@InterfaceAudience.Private
public static class HadoopLoginModule implements LoginModule {
private Subject subject;
@Override
public boolean abort() throws LoginException {
return true;
}
private <T extends Principal> T getCanonicalUser(Class<T> cls) {
for(T user: subject.getPrincipals(cls)) {
return user;
}
return null;
}
@Override
public boolean commit() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop login commit");
}
// if we already have a user, we are done.
if (!subject.getPrincipals(User.class).isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("using existing subject:"+subject.getPrincipals());
}
return true;
}
Principal user = null;
// if we are using kerberos, try it out
if (isAuthenticationMethodEnabled(AuthenticationMethod.KERBEROS)) {
user = getCanonicalUser(KerberosPrincipal.class);
if (LOG.isDebugEnabled()) {
LOG.debug("using kerberos user:"+user);
}
}
//If we don't have a kerberos user and security is disabled, check
//if user is specified in the environment or properties
//從環(huán)境變量或者System.properties中獲取用戶(hù)名
if (!isSecurityEnabled() && (user == null)) {
String envUser = System.getenv(HADOOP_USER_NAME);
if (envUser == null) {
envUser = System.getProperty(HADOOP_USER_NAME);
}
user = envUser == null ? null : new User(envUser);
}
// use the OS user,獲取操作系統(tǒng)的用戶(hù)名
if (user == null) {
user = getCanonicalUser(OS_PRINCIPAL_CLASS);
if (LOG.isDebugEnabled()) {
LOG.debug("using local user:"+user);
}
}
// if we found the user, add our principal
if (user != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using user: \"" + user + "\" with name " + user.getName());
}
User userEntry = null;
try {
userEntry = new User(user.getName());
} catch (Exception e) {
throw (LoginException)(new LoginException(e.toString()).initCause(e));
}
if (LOG.isDebugEnabled()) {
LOG.debug("User entry: \"" + userEntry.toString() + "\"" );
}
subject.getPrincipals().add(userEntry);
return true;
}
LOG.error("Can't find user in " + subject);
throw new LoginException("Can't find user name");
}
從上面的代碼可以挟憔,如果當(dāng)前線(xiàn)程的上下文AccessControllerContext中設(shè)置了Subject寞肖,則直接通過(guò)subject構(gòu)造ugi對(duì)象醇坝,用戶(hù)名取Subject中的用戶(hù)名;否則用戶(hù)名取環(huán)境變量或則System.properties中的HADOOP_USER_NAME對(duì)應(yīng)的值,默認(rèn)去操作系統(tǒng)的用戶(hù)名鸠补。
由于從環(huán)境變量或者system.properties中獲取用戶(hù)名均是進(jìn)程級(jí)別的,無(wú)法做到一個(gè)進(jìn)程以不同的用戶(hù)提交零院,唯一的可能性就只能是預(yù)先設(shè)置線(xiàn)程的Subject信息劳吠,具體的操作方式就是通過(guò)Subject.doAs方法來(lái)實(shí)現(xiàn)迫筑,我們接下來(lái)看一個(gè)小例子。
public class UgiTest {
public static void main(String[] args) throws IOException {
//構(gòu)建一個(gè)Subject對(duì)象
Subject subject = new Subject();
subject.getPrincipals().add(new User("xielijuan"));
//打印出當(dāng)前的ugi信息
System.out.println("main before: " + UserGroupInformation.getCurrentUser().getShortUserName());
Subject.doAs(subject, new PrivilegedAction<Object>() {
@Override
public Object run() {
//新建一個(gè)線(xiàn)程鹤树,查看其ugi的信息
Thread thread = new Thread(){
@Override
public void run() {
try {
System.out.println("thread: " + UserGroupInformation.getCurrentUser().getShortUserName());
Thread.sleep(5000);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
thread.start();
UserGroupInformation xielijuan = null;
try {
xielijuan = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(xielijuan.getShortUserName());
return null;
}
});
//查看doAs執(zhí)行之后的main線(xiàn)程的信息
System.out.println("main after: " + UserGroupInformation.getCurrentUser().getShortUserName());
System.in.read();
}
}
執(zhí)行結(jié)果
main before: liujianhui
xielijuan
thread: xielijuan
main after: liujianhui
從上面的例子可以看出铣焊,確實(shí)可以通過(guò)構(gòu)建一個(gè)Subject來(lái)改變UserGroupInformation.getCurrentUser的用戶(hù)名,同時(shí)我們知道罕伯,subject只影響doAs中的代碼段曲伊。另外,如果在doAs中新建線(xiàn)程追他,那么該現(xiàn)場(chǎng)會(huì)繼承父線(xiàn)程的AccessControllerContext坟募,其subject和執(zhí)行進(jìn)程創(chuàng)建時(shí)候父線(xiàn)程的subject一致。在doAs的代碼段之外邑狸,UserGroupInformation.getCurrentUser不受影響懈糯。
解決方式
在webServer中提交用戶(hù)的時(shí)候,首先根據(jù)待提交的用戶(hù)名構(gòu)建一個(gè)Subject對(duì)象单雾,同時(shí)在Subject.doAs方法中進(jìn)行應(yīng)用的提交赚哗。比如
Subject subject = new Subject();
subject.getPrincipals().add(new User("xielijuan"));
Subject.doAs(subject, () -> {
Client.submitApplication(xxx);
return null;
});