Spark 框架安全認證實現(xiàn)

導(dǎo)言

隨著大數(shù)據(jù)集群的使用卡乾,大數(shù)據(jù)的安全受到越來越多的關(guān)注一個安全的大數(shù)據(jù)集群的使用夭苗,運維必普通的集群更為復(fù)雜信卡。
集群的安全通常基于kerberos集群完成安全認證题造。kerberos基本原理可參考:一張圖了解Kerberos訪問流程

Spark應(yīng)用(On Yarn模式下)在安全的hadoop集群下的訪問傍菇,需要訪問各種各樣的組件/進程,如ResourceManager界赔,NodeManager丢习,NameNode牵触,DataNode,Kafka,Hmaster,HregionServer,MetaStore等等咐低。尤其是在長時運行的應(yīng)用揽思,如sparkStreaming,StructedStreaming见擦,如何保證用戶認證后的長期有效性绰更,其安全/認證更為復(fù)雜。

一個Spark應(yīng)用提交用戶要先在kdc中完成用戶的認證锡宋,及拿到對應(yīng)service服務(wù)的票據(jù)之后才能訪問對應(yīng)的服務(wù)儡湾。由于Spark應(yīng)用運行時涉及yarnclient,driver执俩,applicationMaster徐钠,executor等多個服務(wù),這其中每個進程都應(yīng)當是同一個用戶啟動并運行役首,這就涉及到多個進程中使用同一個用戶的票據(jù)來對各種服務(wù)進行訪問尝丐,本文基于Spark2.3對此做簡要分析。

  • spark應(yīng)用包含進程
進程 功能 yarn-client模式 yarn-cluster模式
yarnclient Spark應(yīng)用提交app的模塊 yarn-client模式下生命周期與driver一致衡奥; yarn-cluster模式下可以設(shè)置為app提交后即退出爹袁,或者提交后一直監(jiān)控app運行狀態(tài)
driver spark應(yīng)用驅(qū)動器,調(diào)度應(yīng)用邏輯矮固,應(yīng)用的“大腦” yarn-client模式下運行在yarnclient的JVM中失息; yarn-cluster模式下運行在applicationMaster中
applicationMaster 基于yarn服務(wù)抽象出的app管理者 yarn-client模式下僅僅負責啟動/監(jiān)控container,匯報應(yīng)用狀態(tài)的功能档址; yarn-cluster模式下負責啟動/監(jiān)控container盹兢,匯報應(yīng)用狀態(tài)的功,同時包含driver功能
Executor spark應(yīng)用的執(zhí)行器守伸,yarn應(yīng)用的container實體绎秒,業(yè)務(wù)邏輯的實際執(zhí)行者

spark應(yīng)用的提交用戶認證之后才能提交應(yīng)用,所以在yarnclient/driver的邏輯中必然會執(zhí)行到kerberos認證相關(guān)的登錄認證尼摹。然而其他的進程如applicationMaster见芹,executor等均需要經(jīng)過認證,應(yīng)用提交后才由用戶啟動蠢涝,這些進程則可以不進行kerberos認證而是利用Hadoop的token機制完成認證玄呛,減小kerberos服務(wù)壓力,同時提高訪問效率惠赫。

  • Hadoop Token機制

Hadoop的token實現(xiàn)基類為org.apache.hadoop.security.token.Token把鉴,

/**
   * Construct a token from the components.
   * @param identifier the token identifier
   * @param password the token's password
   * @param kind the kind of token
   * @param service the service for this token
   */
  public Token(byte[] identifier, byte[] password, Text kind, Text service) {
    this.identifier = identifier;
    this.password = password;
    this.kind = kind;
    this.service = service;
  }

不同的服務(wù)也可hadoop的token來交互,只要使用不同的identifer來區(qū)分token即可。 如NMTokenIdentifier, AMRMTokenIdentifier,AuthenticationTokenIdentifier等不同的tokenIdentifier來區(qū)分不同的服務(wù)類型的token庭砍。

Spark應(yīng)用各進程的安全實現(xiàn)

yarnclient的實現(xiàn)

此處yarnclient指的是向ResourceManager提交yarn應(yīng)用的客戶端场晶。在spark中,向yarn提交應(yīng)用有兩種應(yīng)用有yarn-client,yarn-cluster模式怠缸。在這兩種應(yīng)用模式下提交應(yīng)用诗轻,yarn client邏輯有些許不同。

安全hadoop場景下spark的用戶登錄認證機制

  • spark提交應(yīng)用時揭北,通過--principal, --keytab參數(shù)傳入認證所需文件扳炬。
    在sparkSubmit中prepareSubmitEnvironment時,完成認證

     // assure a keytab is available from any place in a JVM
     if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
       if (args.principal != null) {
         if (args.keytab != null) {
           require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
           // Add keytab and principal configurations in sysProps to make them available
           // for later use; e.g. in spark sql, the isolated class loader used to talk
           // to HiveMetastore will use these settings. They will be set as Java system
           // properties and then loaded by SparkConf
           sparkConf.set(KEYTAB, args.keytab)
           sparkConf.set(PRINCIPAL, args.principal)
           UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
         }
       }
     }
    
  • 在yarn-cluster模式下搔体,不會調(diào)用業(yè)務(wù)層代碼恨樟,即不會初始化SparkContext,其通過YarnClusterApplication的start方法調(diào)用client.submitApplication提交應(yīng)用

  • 在yarn-client模式下疚俱,會在yarnclient邏輯中調(diào)用業(yè)務(wù)代碼劝术,即會初始化并運行SparkContext,通過YarnClientSchedulerBackend其調(diào)度client.submitApplication提交應(yīng)用呆奕。

在client的submitApplication方法中提交app养晋,之后創(chuàng)建amContext,準備本地資源梁钾,此時會將本地的文件上傳至HDFS绳泉,其中就包括keytab文件,同時會生成spark_conf.properties配置文件以供am使用姆泻,該配置文件中會包含keytab的配置

 val props = new Properties()
  sparkConf.getAll.foreach { case (k, v) =>
    props.setProperty(k, v)
  }
  // Override spark.yarn.key to point to the location in distributed cache which will be used
  // by AM.
  Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }

其中的amKeytabFileName是在setUpCredentials時設(shè)置如下零酪,該值為指定的keytab文件加上隨機的字符串后綴,騎在am重點使用麦射,可參考下節(jié)的介紹蛾娶。

val f = new File(keytab)
  // Generate a file name that can be used for the keytab file, that does not conflict
  // with any user file.
  amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
  sparkConf.set(PRINCIPAL.key, principal)

獲取相關(guān)組件的token灯谣,注意:此處的token均非與yarn服務(wù)交互相關(guān)token潜秋,這里只有與HDFS,HBASE胎许,Hive服務(wù)交互的token峻呛。

 def obtainDelegationTokens(
  hadoopConf: Configuration,
  creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
  if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
  // 各provider的obtainDelegationTokens方法中,會獲取對應(yīng)組件的token辜窑,并放入credentials中
    provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
  } else {
    logDebug(s"Service ${provider.serviceName} does not require a token." +
      s" Check your configuration to see if security is disabled or not.")
    None
  }
}.foldLeft(Long.MaxValue)(math.min)

}

Spark中常訪問的服務(wù)使用token機制的有hive钩述,hbase,hdfs穆碎,對應(yīng)的tokenProvider如下:

服務(wù) tokenProvider token獲取類 token獲取方法
HDFS HadoopFSDelegationTokenProvider org.apache.hadoop.hbase.security.token.TokenUtil obtainToken
HIVE HiveDelegationTokenProvider org.apache.hadoop.hive.ql.metadata getDelegationToken
HBASE HBaseDelegationTokenProvider org.apache.hadoop.hdfs.DistributedFileSystem addDelegationTokens

以HbaseDelegationTokenProvider為例牙勘,主要是通過反射調(diào)用hbase的TokenUtil類的obtainTOken方法,對應(yīng)的obtainDelegationTokens方法如下:

override def obtainDelegationTokens(
  hadoopConf: Configuration,
  sparkConf: SparkConf,
  creds: Credentials): Option[Long] = {
try {
  val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
  val obtainToken = mirror.classLoader.
    loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
    getMethod("obtainToken", classOf[Configuration])
  logDebug("Attempting to fetch HBase security token.")
  val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
    .asInstanceOf[Token[_ <: TokenIdentifier]]
  logInfo(s"Get token from HBase: ${token.toString}")
  creds.addToken(token.getService, token)
} catch {
  case NonFatal(e) =>
    logDebug(s"Failed to get token from service $serviceName", e)
}
None
}

PS : HBase的token獲取的用戶需要具有hbase:meta表的exec權(quán)限,否則無法成功獲取token

在獲取token后方面,將token設(shè)置到amContainer中放钦,并放入appContext中

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
//
appContext.setAMContainerSpec(containerContext)

driver的token更新

在yarn-client模式下,driver在yarnclient進程中啟動恭金,同樣需要訪問業(yè)務(wù)層及集群的相關(guān)組件如hdfs操禀。driver通過讀取am更新在hdfs路徑下的credentials文件來保證driver節(jié)點的token有效。

// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {
    YarnSparkHadoopUtil.startCredentialUpdater(conf)
} 

在yarn-cluster模式下横腿,driver運行在applicationMaster的JVM中颓屑,其安全相關(guān)由Am同一操作

ApplicationMaster 的安全認證

applicationMaster是Yarn進行應(yīng)用調(diào)度/管理的核心,需要與RM/NM等進行交互以便應(yīng)用運行耿焊。其中相關(guān)的交互均通過token完成認證揪惦,認證實現(xiàn)由Yarn內(nèi)部框架完成。查看am日志發(fā)現(xiàn)罗侯,即是在非安全(非kerberos)的場景下丹擎,同樣會使用到token。而與hdfs歇父,hbase等服務(wù)交互使用的token則需Spark框架來實現(xiàn)蒂培。

applicationMaster中與YARN相關(guān)的認證

  • AM與RM的認證

在ResourceManager接收到應(yīng)用提交的ApplicationSubmissionContext后,在其AmLauncher.java的run方法中為am設(shè)置生成“YARN_AM_RM_TOKEN榜苫,該token用于am于rm通信使用”

 public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
  ApplicationAttemptId appAttemptId) {
this.writeLock.lock();
try {
  LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
  AMRMTokenIdentifier identifier =
      new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
        .getKeyId());
  byte[] password = this.createPassword(identifier);
  appAttemptSet.add(appAttemptId);
  return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
    identifier.getKind(), new Text());
} finally {
  this.writeLock.unlock();
}
}
  • AM與NM的認證

Am在啟動之后护戳,會向ResourceManager申請container,并與對應(yīng)的NodeManager通信以啟動container垂睬。然而AM與NM通信的token是如何得到的呢媳荒?

查看AMRMClientImpl類可以看到,AM向RM發(fā)送分配請求驹饺,RM接收到請求后钳枕,將container要分配至的NM節(jié)點的Token放置response中返回給AM。Am接收到response后赏壹,會保存NMToken鱼炒,并判定是否需要更新YARN_AM_RM_TOKEN

//通過rmClient向RM發(fā)送分配請求
allocateResponse = rmClient.allocate(allocateRequest);
//拿到response后,保存NMToken并根據(jù)response判定是否需要更新AMRM通信的TOken
if (!allocateResponse.getNMTokens().isEmpty()) {
      populateNMTokens(allocateResponse.getNMTokens());
    }
    if (allocateResponse.getAMRMToken() != null) {
      updateAMRMToken(allocateResponse.getAMRMToken());
    }

RM通過ApplicationMasterService響應(yīng)allocation請求

// 通過調(diào)度器為cotnainer分配NodeManager并生成該NodeManager的Token放入allcation中
 Allocation allocation =
      this.rScheduler.allocate(appAttemptId, ask, release, 
          blacklistAdditions, blacklistRemovals);
 ......
  if (!allocation.getContainers().isEmpty()) {
    allocateResponse.setNMTokens(allocation.getNMTokens());
  }

AM在準備啟動container時蝌借,將當前用戶的token都設(shè)置進ContainerLaunchContext中

def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
  .asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

ApplicationMaster業(yè)務(wù)相關(guān)的服務(wù)的token更新

Am啟動的資源情況

查看Am啟動命令大致如下昔瞧,可以發(fā)現(xiàn)有指定配置文件,而該配置文件即為yarnclient生成上傳至hdfs菩佑,在am啟動前由NodeManager從hdfs中copy至本地路徑自晰,供container使用:

 /usr/jdk64/jdk1.8.0_77//bin/java -server -Xmx512m -Djava.io.tmpdir=/localpath/*/tmp -Dspark.yarn.app.container.log.dir=/localpath/*/ org.apache.spark.deploy.yarn.ExecutorLauncher --arg host:port --properties-file /localpath/*/__spark_conf__/__spark_conf__.properties

查看此配置文件可以看到有如下配置項:

spark.yarn.principal=ocsp-ygcluster@ASIAINFO.COM
spark.yarn.keytab=hbase.headless.keytab-18f29b79-b7a6-4cb2-b79d-4305432a5e9a

下圖為am進程使用到的資源文件


am進程資源.jpg

如上可以看出,am雖然運行在集群中稍坯,但運行時認證相關(guān)的資源已經(jīng)準備就緒酬荞。下面分析其運行中關(guān)于安全的邏輯

Am安全認證及token更新邏輯

在applicationMaster中,定期更新token,并寫入文件到hdfs的相關(guān)目錄混巧,并清理舊文件以供各executor使用糟把。
在ApplicationMaster啟動后,進行l(wèi)ogin登錄并啟動名為am-kerberos-renewer的dameon線程定期登錄牲剃,保證用戶認證的有效性

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

// If a principal and keytab were provided, log in to kerberos, and set up a thread to
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
val principal = sparkConf.get(PRINCIPAL).orNull
val keytab = sparkConf.get(KEYTAB).orNull
if (principal != null && keytab != null) {
  UserGroupInformation.loginUserFromKeytab(principal, keytab)

  val renewer = new Thread() {
    override def run(): Unit = Utils.tryLogNonFatalError {
      while (true) {
        TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
        UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
      }
    }
  }
  renewer.setName("am-kerberos-renewer")
  renewer.setDaemon(true)
  renewer.start()

  // Transfer the original user's tokens to the new user, since that's needed to connect to
  // YARN. It also copies over any delegation tokens that might have been created by the
  // client, which will then be transferred over when starting executors (until new ones
  // are created by the periodic task).
  val newUser = UserGroupInformation.getCurrentUser()
  SparkHadoopUtil.get.transferCredentials(original, newUser)
  newUser
} else {
  SparkHadoopUtil.get.createSparkUser()
}
}

在am中啟動AMCredentialRenewerStarter線程,調(diào)度認證登錄及token renew邏輯

if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
        val credentialRenewerThread = new Thread {
          setName("AMCredentialRenewerStarter")
          setContextClassLoader(userClassLoader)
      override def run(): Unit = {
        val credentialManager = new YARNHadoopDelegationTokenManager(
          sparkConf,
          yarnConf,
          conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
        val credentialRenewer =
          new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
        credentialRenewer.scheduleLoginFromKeytab()
      }
    }
    credentialRenewerThread.start()
    credentialRenewerThread.join()
  }

在scheduleLoginFromKeytab中遣疯,會周期調(diào)度登錄,token獲取更新寫入hdfs文件等操作凿傅。
其核心邏輯如下

調(diào)度周期:

各種組件的token更新周期如hdfs的更新周期dfs.namenode.delegation.token.renew-interval默認為1天缠犀,hbase的token更新周期hbase.auth.key.update.interval默認為1天;調(diào)度更新的周期為如上各組件最小值的75%聪舒,

調(diào)度流程:

//將生成的token寫入hdfs目錄${spark.yarn.credentials.file}-${timeStamp}-${nextSuffix}
writeNewCredentialsToHDFS(principal, keytab)
//刪除邏輯為保留五個(${spark.yarn.credentials.file.retention.count})文件辨液,文件更新時間早于五天(${spark.yarn.credentials.file.retention.days})的全部清理
cleanupOldFiles()

Executor的認證機制

executor的認證同樣使用的是token機制。executor啟動之后箱残,根據(jù)driver啟動設(shè)置的${spark.yarn.credentials.file}啟動token更新:

if (driverConf.contains("spark.yarn.credentials.file")) {
    logInfo("Will periodically update credentials from: " +
      driverConf.get("spark.yarn.credentials.file"))
    Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
      .getMethod("startCredentialUpdater", classOf[SparkConf])
      .invoke(null, driverConf)
  }

Executor中的token更新是讀取hdfs目錄{spark.yarn.credentials.file}-{timeStamp}-${nextSuffix}目錄下的文件滔迈,讀取到緩存中,以便保證讀取到的是更新后的token使用被辑。

安全Spark的使用

Spark框架完成的kerberos認證及使用token與其他服務(wù)交互的機制使用較為簡單燎悍,只需要在提交應(yīng)用時的spark-submit命令行中加入--principal appuserName --keytab /path/to/user.keytab即可

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市盼理,隨后出現(xiàn)的幾起案子谈山,更是在濱河造成了極大的恐慌,老刑警劉巖宏怔,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奏路,死亡現(xiàn)場離奇詭異,居然都是意外死亡臊诊,警方通過查閱死者的電腦和手機鸽粉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抓艳,“玉大人触机,你說我怎么就攤上這事『瑁” “怎么了威兜?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長庐椒。 經(jīng)常有香客問我,道長蚂踊,這世上最難降的妖魔是什么约谈? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上棱诱,老公的妹妹穿的比我還像新娘泼橘。我一直安慰自己,他們只是感情好迈勋,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布炬灭。 她就那樣靜靜地躺著,像睡著了一般靡菇。 火紅的嫁衣襯著肌膚如雪重归。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天厦凤,我揣著相機與錄音鼻吮,去河邊找鬼错妖。 笑死鼻种,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的朝卒。 我是一名探鬼主播博烂,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼香椎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了禽篱?” 一聲冷哼從身側(cè)響起士鸥,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎谆级,沒想到半個月后烤礁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡肥照,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年脚仔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片舆绎。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡鲤脏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吕朵,到底是詐尸還是另有隱情猎醇,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布努溃,位于F島的核電站硫嘶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏梧税。R本人自食惡果不足惜沦疾,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一称近、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧哮塞,春花似錦刨秆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至家凯,卻和暖如春缓醋,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背肆饶。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工改衩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人驯镊。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓葫督,卻偏偏與公主長得像,于是被迫代替她去往敵國和親板惑。 傳聞我的和親對象是個殘疾皇子橄镜,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容