spark管理平臺(tái)支持多用戶(hù)

問(wèn)題背景

筆者所在的部門(mén)屬于公司的大數(shù)據(jù)架構(gòu)部,現(xiàn)主要參與公司流式計(jì)算平臺(tái)的推廣戴卜,個(gè)人負(fù)責(zé)spark的平臺(tái)維護(hù)逾条、特性定制、線(xiàn)上問(wèn)題修改等投剥。為了方便業(yè)務(wù)用戶(hù)提交spark應(yīng)用师脂。我們開(kāi)發(fā)了一套實(shí)時(shí)計(jì)算管理平臺(tái),用戶(hù)在頁(yè)面上填寫(xiě)應(yīng)用的相關(guān)參數(shù)江锨,點(diǎn)擊提交按鈕后觸發(fā)后臺(tái)服務(wù)向yarn集群提交應(yīng)用吃警。具體流程如下圖:

應(yīng)用提交流程圖

從圖中我們可以看出,無(wú)論前臺(tái)是什么用戶(hù)在進(jìn)行操作啄育,均是由WebServer來(lái)進(jìn)行應(yīng)用的提交酌心。Hadoop在默認(rèn)情況下,spark應(yīng)用提交的用戶(hù)名均是進(jìn)程WebServer啟動(dòng)的用戶(hù)名灸撰。而在ResourceManager服務(wù)端,為了進(jìn)行安全的隔離和成本統(tǒng)計(jì)拼坎,會(huì)為不同的用戶(hù)分配不同的隊(duì)列浮毯,因此我們需要WebServer支持多用戶(hù)提交。

問(wèn)題分析

無(wú)論是Yarn還是Hdfs的相關(guān)接口泰鸡,底層的通信均是采用統(tǒng)一的RPC模型债蓝;下面一ClientRMService為例來(lái)進(jìn)行下整個(gè)的調(diào)用交互過(guò)程:

任務(wù)提交

在rpc接口調(diào)用之前,客戶(hù)端必須和服務(wù)端建立socket連接盛龄,同時(shí)在socket連接建立的過(guò)程對(duì)客戶(hù)端的進(jìn)行認(rèn)證饰迹。讀者在這個(gè)過(guò)程中,肯定有個(gè)疑問(wèn)余舶,就是客戶(hù)端的用戶(hù)名是什么時(shí)候發(fā)送到服務(wù)端的啊鸭,下面我們來(lái)看下org.apache.hadoop.ipc.protocolPB.Client類(lèi)。

 /** Connect to the server and set up the I/O streams. It then sends
     * a header to the server and starts
     * the connection thread that waits for responses.
     */
    private synchronized void setupIOstreams(
        AtomicBoolean fallbackToSimpleAuth) {
      if (socket != null || shouldCloseConnection.get()) {
        return;
      } 
      try {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to "+server);
        }
        if (Trace.isTracing()) {
          Trace.addTimelineAnnotation("IPC client connecting to " + server);
        }
        short numRetries = 0;
        Random rand = null;
        while (true) {
          //和遠(yuǎn)程的Server端建立socket連接
          setupConnection();
          InputStream inStream = NetUtils.getInputStream(socket);
          OutputStream outStream = NetUtils.getOutputStream(socket);
          writeConnectionHeader(outStream);
          if (authProtocol == AuthProtocol.SASL) {
            final InputStream in2 = inStream;
            final OutputStream out2 = outStream;
            UserGroupInformation ticket = remoteId.getTicket();
            if (ticket.getRealUser() != null) {
              ticket = ticket.getRealUser();
            }
            try {
              authMethod = ticket
                  .doAs(new PrivilegedExceptionAction<AuthMethod>() {
                    @Override
                    public AuthMethod run()
                        throws IOException, InterruptedException {
                      return setupSaslConnection(in2, out2);
                    }
                  });
            } catch (Exception ex) {
              authMethod = saslRpcClient.getAuthMethod();
              if (rand == null) {
                rand = new Random();
              }
              handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex,
                  rand, ticket);
              continue;
            }
            if (authMethod != AuthMethod.SIMPLE) {
              // Sasl connect is successful. Let's set up Sasl i/o streams.
              inStream = saslRpcClient.getInputStream(inStream);
              outStream = saslRpcClient.getOutputStream(outStream);
              // for testing
              remoteId.saslQop =
                  (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
              LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
              if (fallbackToSimpleAuth != null) {
                fallbackToSimpleAuth.set(false);
              }
            } else if (UserGroupInformation.isSecurityEnabled()) {
              if (!fallbackAllowed) {
                throw new IOException("Server asks us to fall back to SIMPLE " +
                    "auth, but this client is configured to only allow secure " +
                    "connections.");
              }
              if (fallbackToSimpleAuth != null) {
                fallbackToSimpleAuth.set(true);
              }
            }
          }
        
          if (doPing) {
            inStream = new PingInputStream(inStream);
          }
          this.in = new DataInputStream(new BufferedInputStream(inStream));

          // SASL may have already buffered the stream
          if (!(outStream instanceof BufferedOutputStream)) {
            outStream = new BufferedOutputStream(outStream);
          }
          this.out = new DataOutputStream(outStream);
          //寫(xiě)入連接的上下文匿值,包括用戶(hù)名等ugi信息
          writeConnectionContext(remoteId, authMethod);

          // update last activity time
          touch();

          if (Trace.isTracing()) {
            Trace.addTimelineAnnotation("IPC client connected to " + server);
          }

          // start the receiver thread after the socket connection has been set
          // up
          //啟動(dòng)線(xiàn)程接受服務(wù)端的response消息
          start();
          return;
        }
      } catch (Throwable t) {
        if (t instanceof IOException) {
          markClosed((IOException)t);
        } else {
          markClosed(new IOException("Couldn't set up IO streams", t));
        }
        close();
      }
    }

連接的上下文的詳細(xì)信息

 /* Write the connection context header for each connection
     * Out is not synchronized because only the first thread does this.
     */
    private void writeConnectionContext(ConnectionId remoteId,
                                        AuthMethod authMethod)
                                            throws IOException {
      // Write out the ConnectionHeader
      IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
          RPC.getProtocolName(remoteId.getProtocol()),
        //ticket為當(dāng)前線(xiàn)程的ugi信息
          remoteId.getTicket(),
          authMethod);
      RpcRequestHeaderProto connectionContextHeader = ProtoUtil
          .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
              OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
              RpcConstants.INVALID_RETRY_COUNT, clientId);
      RpcRequestMessageWrapper request =
          new RpcRequestMessageWrapper(connectionContextHeader, message);
      
      // Write out the packet length
      out.writeInt(request.getLength());
      request.write(out);
    }

其中ticket獲取的方式為UserGroupInformation.getCurrentUser赠制,具體的內(nèi)容如下:

  public synchronized
  static UserGroupInformation getCurrentUser() throws IOException {
    //如果線(xiàn)程的訪(fǎng)問(wèn)上下文中設(shè)置了Subject,則直接獲取Subject的用戶(hù)信息
    AccessControlContext context = AccessController.getContext();
    Subject subject = Subject.getSubject(context);
    if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
      return getLoginUser();
    } else {
      return new UserGroupInformation(subject);
    }
  }

  public synchronized 
  static UserGroupInformation getLoginUser() throws IOException {
    if (loginUser == null) {
      loginUserFromSubject(null);
    }
    return loginUser;
  }

 public synchronized 
  static void loginUserFromSubject(Subject subject) throws IOException {
    ensureInitialized();
    try {
      if (subject == null) {
        subject = new Subject();
      }
      LoginContext login =
          newLoginContext(authenticationMethod.getLoginAppName(), 
                          subject, new HadoopConfiguration());
      login.login();
      UserGroupInformation realUser = new UserGroupInformation(subject);
      realUser.setLogin(login);
      realUser.setAuthenticationMethod(authenticationMethod);
      realUser = new UserGroupInformation(login.getSubject());
      // If the HADOOP_PROXY_USER environment variable or property
      // is specified, create a proxy user as the logged in user.
      String proxyUser = System.getenv(HADOOP_PROXY_USER);
      if (proxyUser == null) {
        proxyUser = System.getProperty(HADOOP_PROXY_USER);
      }
      loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);

      String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
      if (fileLocation != null) {
        // Load the token storage file and put all of the tokens into the
        // user. Don't use the FileSystem API for reading since it has a lock
        // cycle (HADOOP-9212).
        Credentials cred = Credentials.readTokenStorageFile(
            new File(fileLocation), conf);
        loginUser.addCredentials(cred);
      }
      loginUser.spawnAutoRenewalThreadForUserCreds();
    } catch (LoginException le) {
      LOG.debug("failure to login", le);
      throw new IOException("failure to login", le);
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("UGI loginUser:"+loginUser);
    } 
  }


用戶(hù)名的獲取
 @InterfaceAudience.Private
  public static class HadoopLoginModule implements LoginModule {
    private Subject subject;

    @Override
    public boolean abort() throws LoginException {
      return true;
    }

    private <T extends Principal> T getCanonicalUser(Class<T> cls) {
      for(T user: subject.getPrincipals(cls)) {
        return user;
      }
      return null;
    }

    @Override
    public boolean commit() throws LoginException {
      if (LOG.isDebugEnabled()) {
        LOG.debug("hadoop login commit");
      }
      // if we already have a user, we are done.
      if (!subject.getPrincipals(User.class).isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("using existing subject:"+subject.getPrincipals());
        }
        return true;
      }
      Principal user = null;
      // if we are using kerberos, try it out
      if (isAuthenticationMethodEnabled(AuthenticationMethod.KERBEROS)) {
        user = getCanonicalUser(KerberosPrincipal.class);
        if (LOG.isDebugEnabled()) {
          LOG.debug("using kerberos user:"+user);
        }
      }
      //If we don't have a kerberos user and security is disabled, check
      //if user is specified in the environment or properties
     //從環(huán)境變量或者System.properties中獲取用戶(hù)名
      if (!isSecurityEnabled() && (user == null)) {
        String envUser = System.getenv(HADOOP_USER_NAME);
        if (envUser == null) {
          envUser = System.getProperty(HADOOP_USER_NAME);
        }
        user = envUser == null ? null : new User(envUser);
      }
      // use the OS user,獲取操作系統(tǒng)的用戶(hù)名
      if (user == null) {
        user = getCanonicalUser(OS_PRINCIPAL_CLASS);
        if (LOG.isDebugEnabled()) {
          LOG.debug("using local user:"+user);
        }
      }
      // if we found the user, add our principal
      if (user != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Using user: \"" + user + "\" with name " + user.getName());
        }

        User userEntry = null;
        try {
          userEntry = new User(user.getName());
        } catch (Exception e) {
          throw (LoginException)(new LoginException(e.toString()).initCause(e));
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("User entry: \"" + userEntry.toString() + "\"" );
        }

        subject.getPrincipals().add(userEntry);
        return true;
      }
      LOG.error("Can't find user in " + subject);
      throw new LoginException("Can't find user name");
    }

從上面的代碼可以挟憔,如果當(dāng)前線(xiàn)程的上下文AccessControllerContext中設(shè)置了Subject寞肖,則直接通過(guò)subject構(gòu)造ugi對(duì)象醇坝,用戶(hù)名取Subject中的用戶(hù)名;否則用戶(hù)名取環(huán)境變量或則System.properties中的HADOOP_USER_NAME對(duì)應(yīng)的值,默認(rèn)去操作系統(tǒng)的用戶(hù)名鸠补。
由于從環(huán)境變量或者system.properties中獲取用戶(hù)名均是進(jìn)程級(jí)別的,無(wú)法做到一個(gè)進(jìn)程以不同的用戶(hù)提交零院,唯一的可能性就只能是預(yù)先設(shè)置線(xiàn)程的Subject信息劳吠,具體的操作方式就是通過(guò)Subject.doAs方法來(lái)實(shí)現(xiàn)迫筑,我們接下來(lái)看一個(gè)小例子。

public class UgiTest {
    public static void main(String[] args) throws IOException {
        //構(gòu)建一個(gè)Subject對(duì)象
        Subject subject = new Subject();
        subject.getPrincipals().add(new User("xielijuan"));

      //打印出當(dāng)前的ugi信息
        System.out.println("main before: " + UserGroupInformation.getCurrentUser().getShortUserName());
        Subject.doAs(subject, new PrivilegedAction<Object>() {
            @Override
            public Object run() {

                //新建一個(gè)線(xiàn)程鹤树,查看其ugi的信息
                Thread thread = new Thread(){
                    @Override
                    public void run() {
                            try {
                                System.out.println("thread: " + UserGroupInformation.getCurrentUser().getShortUserName());
                                Thread.sleep(5000);
                            } catch (IOException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                    }
                };
                thread.start();

                UserGroupInformation xielijuan = null;
                try {
                    xielijuan = UserGroupInformation.getCurrentUser();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                System.out.println(xielijuan.getShortUserName());
                return null;
            }
        });

        //查看doAs執(zhí)行之后的main線(xiàn)程的信息
        System.out.println("main after: " + UserGroupInformation.getCurrentUser().getShortUserName());

        System.in.read();
    }
}
執(zhí)行結(jié)果
main before: liujianhui
xielijuan
thread: xielijuan
main after: liujianhui

從上面的例子可以看出铣焊,確實(shí)可以通過(guò)構(gòu)建一個(gè)Subject來(lái)改變UserGroupInformation.getCurrentUser的用戶(hù)名,同時(shí)我們知道罕伯,subject只影響doAs中的代碼段曲伊。另外,如果在doAs中新建線(xiàn)程追他,那么該現(xiàn)場(chǎng)會(huì)繼承父線(xiàn)程的AccessControllerContext坟募,其subject和執(zhí)行進(jìn)程創(chuàng)建時(shí)候父線(xiàn)程的subject一致。在doAs的代碼段之外邑狸,UserGroupInformation.getCurrentUser不受影響懈糯。

解決方式

在webServer中提交用戶(hù)的時(shí)候,首先根據(jù)待提交的用戶(hù)名構(gòu)建一個(gè)Subject對(duì)象单雾,同時(shí)在Subject.doAs方法中進(jìn)行應(yīng)用的提交赚哗。比如

   Subject subject = new Subject();
   subject.getPrincipals().add(new User("xielijuan"));
    Subject.doAs(subject,  () -> {
            Client.submitApplication(xxx);
            return null;
        });

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市硅堆,隨后出現(xiàn)的幾起案子屿储,更是在濱河造成了極大的恐慌,老刑警劉巖渐逃,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件够掠,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡茄菊,警方通過(guò)查閱死者的電腦和手機(jī)疯潭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)面殖,“玉大人竖哩,你說(shuō)我怎么就攤上這事〖沽牛” “怎么了期丰?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)吃挑。 經(jīng)常有香客問(wèn)我钝荡,道長(zhǎng),這世上最難降的妖魔是什么舶衬? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任埠通,我火速辦了婚禮,結(jié)果婚禮上逛犹,老公的妹妹穿的比我還像新娘端辱。我一直安慰自己梁剔,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布舞蔽。 她就那樣靜靜地躺著荣病,像睡著了一般。 火紅的嫁衣襯著肌膚如雪渗柿。 梳的紋絲不亂的頭發(fā)上个盆,一...
    開(kāi)封第一講書(shū)人閱讀 51,541評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音朵栖,去河邊找鬼颊亮。 笑死,一個(gè)胖子當(dāng)著我的面吹牛陨溅,可吹牛的內(nèi)容都是我干的终惑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼门扇,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼雹有!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起臼寄,我...
    開(kāi)封第一講書(shū)人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤霸奕,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后脯厨,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體铅祸,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡坑质,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年合武,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涡扼。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡稼跳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吃沪,到底是詐尸還是另有隱情汤善,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布票彪,位于F島的核電站红淡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏降铸。R本人自食惡果不足惜在旱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望推掸。 院中可真熱鬧桶蝎,春花似錦驻仅、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至胜茧,卻和暖如春粘优,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背竹揍。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工敬飒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人芬位。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓无拗,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親昧碉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子英染,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容