Flink源碼分析系列文檔目錄
請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄
前言
本篇分析下Flink安全認(rèn)證部分的處理方式樊销。主要為Kerberos認(rèn)證相關(guān)內(nèi)容意荤。下面從配置項(xiàng)開始分析冠骄。
SecurityConfiguration
此類包含了Flink安全認(rèn)證相關(guān)的配置項(xiàng)偶摔。它們的含義如下:
zookeeper.sasl.disable:是否啟用Zookeeper SASL耳标。
security.kerberos.login.keytab:Kerberos認(rèn)證keytab文件的路徑忌锯。
security.kerberos.login.principal:Kerberos認(rèn)證principal垄提。
security.kerberos.login.use-ticket-cache:Kerberos認(rèn)證是否使用票據(jù)緩存枫慷。
security.kerberos.login.contexts:Kerberos登錄上下文名稱,等效于JAAS文件的entry name涡戳。
zookeeper.sasl.service-name:Zookeeper SASL服務(wù)名结蟋。默認(rèn)為zookeeper
。
zookeeper.sasl.login-context-name:Zookeeper SASL登陸上下文名稱渔彰。默認(rèn)為Client
嵌屎。
security.context.factory.classes:包含哪些 SecurityContextFactory
。默認(rèn)值為:
- org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
- org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory
security.module.factory.classes:包含哪些SecurityModuleFactory
恍涂。 默認(rèn)值為:
- org.apache.flink.runtime.security.modules.HadoopModuleFactory
- org.apache.flink.runtime.security.modules.JaasModuleFactory
- org.apache.flink.runtime.security.modules.ZookeeperModuleFactory
SecurityUtils
SecurityUtils.install
方法是提交Flink任務(wù)安全認(rèn)證的入口方法宝惰,用于安裝安全配置。它的代碼如下所示:
public static void install(SecurityConfiguration config) throws Exception {
// Install the security modules first before installing the security context
// 安裝安全模塊
installModules(config);
// 安裝安全上下文
installContext(config);
}
installModules
方法用于安裝安全認(rèn)證模塊再沧。安全認(rèn)證模塊的內(nèi)容在后面分析掌测。
static void installModules(SecurityConfiguration config) throws Exception {
// install the security module factories
List<SecurityModule> modules = new ArrayList<>();
// 遍歷所有SecurityModuleFactory的配置
for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
SecurityModuleFactory moduleFactory = null;
try {
// 使用ServiceLoader加載ModuleFactory
moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
} catch (NoMatchSecurityFactoryException ne) {
LOG.error("Unable to instantiate security module factory {}", moduleFactoryClass);
throw new IllegalArgumentException("Unable to find module factory class", ne);
}
// 使用factory創(chuàng)建出SecurityModule
SecurityModule module = moduleFactory.createModule(config);
// can be null if a SecurityModule is not supported in the current environment
// 安裝module
// 添加module到modules集合
if (module != null) {
module.install();
modules.add(module);
}
}
installedModules = modules;
}
installContext
方法用于安裝安全上下文環(huán)境,它的用途同樣在后面章節(jié)介紹产园。
static void installContext(SecurityConfiguration config) throws Exception {
// install the security context factory
// 遍歷SecurityContextFactories
// 配置項(xiàng)名稱為security.context.factory.classes
for (String contextFactoryClass : config.getSecurityContextFactories()) {
try {
// 使用ServiceLoader汞斧,加載SecurityContextFactory
SecurityContextFactory contextFactory =
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
// 檢查SecurityContextFactory是否和配置文件兼容(1)
if (contextFactory.isCompatibleWith(config)) {
try {
// 創(chuàng)建出第一個(gè)兼容的SecurityContext
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining.
break;
} catch (SecurityContextInitializeException e) {
LOG.error(
"Cannot instantiate security context with: " + contextFactoryClass,
e);
} catch (LinkageError le) {
LOG.error(
"Error occur when instantiate security context with: "
+ contextFactoryClass,
le);
}
} else {
LOG.debug("Unable to install security context factory {}", contextFactoryClass);
}
} catch (NoMatchSecurityFactoryException ne) {
LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
}
}
if (installedContext == null) {
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
}
}
數(shù)字標(biāo)注內(nèi)容解析:
- 這里分析下
isCompatibleWith
方法邏輯,SecurityContextFactory
具有HadoopSecurityContextFactory
和NoOpSecurityContextFactory
兩個(gè)實(shí)現(xiàn)類什燕。其中HadoopSecurityContextFactory
要求security.module.factory.classes
配置項(xiàng)包含org.apache.flink.runtime.security.modules.HadoopModuleFactory
粘勒,并且要求org.apache.hadoop.security.UserGroupInformation
在classpath中。NoOpSecurityContextFactory
無任何要求屎即。
SecurityModule
SecurityModule
分別為不同類型服務(wù)提供安全認(rèn)證功能庙睡,包含3個(gè)子類:
- HadoopModule:使用
UserGroupInformation
方式認(rèn)證事富。 - JaasModule:負(fù)責(zé)安裝JAAS配置,在進(jìn)程范圍內(nèi)生效乘陪。
- ZookeeperModule:提供Zookeeper安全配置统台。
HadoopModule
HadoopModule
包含了Flink的SecurityConfiguration
和Hadoop的配置信息(從Hadoop配置文件讀取,讀取邏輯在HadoopUtils.getHadoopConfiguration
啡邑,后面分析)贱勃。
install方法
install
方法使用Hadoop提供的UserGroupInformation
進(jìn)行認(rèn)證操作。
@Override
public void install() throws SecurityInstallException {
// UGI設(shè)置hadoop conf
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try {
// 如果Hadoop啟用了安全配置
if (UserGroupInformation.isSecurityEnabled()
&& !StringUtils.isBlank(securityConfig.getKeytab())
&& !StringUtils.isBlank(securityConfig.getPrincipal())) {
// 獲取keytab路徑
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
// 使用UGI認(rèn)證Flink conf中配置的keytab和principal
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
// 獲取認(rèn)證的用戶
loginUser = UserGroupInformation.getLoginUser();
// supplement with any available tokens
// 從HADOOP_TOKEN_FILE_LOCATION讀取token緩存文件
String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
// 如果有本地token緩存
if (fileLocation != null) {
Credentials credentialsFromTokenStorageFile =
Credentials.readTokenStorageFile(
new File(fileLocation), hadoopConfiguration);
// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token
// since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
// 如果UGI使用keytab方式登錄谤逼,不用加載HDFS的delegation token
// 因?yàn)閁GI傾向于使用delegation token贵扰,這些token最終會(huì)失效,不會(huì)使用kerberos票據(jù)
Credentials credentialsToBeAdded = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok =
credentialsFromTokenStorageFile.getAllTokens();
// If UGI use keytab for login, do not load HDFS delegation token.
// 遍歷token存儲(chǔ)文件中的token
// 將所有的非delegation token添加到憑據(jù)中
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentialsToBeAdded.addToken(id, token);
}
}
// 為loginUser添加憑據(jù)
loginUser.addCredentials(credentialsToBeAdded);
}
} else {
// 如果沒有啟動(dòng)安全配置
// 從當(dāng)前用戶憑據(jù)認(rèn)證
// login with current user credentials (e.g. ticket cache, OS login)
// note that the stored tokens are read automatically
try {
// Use reflection API to get the login user object
// 反射調(diào)用如下方法
// UserGroupInformation.loginUserFromSubject(null);
Method loginUserFromSubjectMethod =
UserGroupInformation.class.getMethod(
"loginUserFromSubject", Subject.class);
loginUserFromSubjectMethod.invoke(null, (Subject) null);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
// 獲取當(dāng)前登錄用戶
loginUser = UserGroupInformation.getLoginUser();
}
LOG.info("Hadoop user set to {}", loginUser);
if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
boolean isCredentialsConfigured =
HadoopUtils.areKerberosCredentialsValid(
loginUser, securityConfig.useTicketCache());
LOG.info(
"Kerberos security is enabled and credentials are {}.",
isCredentialsConfigured ? "valid" : "invalid");
}
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
}
}
HadoopUtils.getHadoopConfiguration 方法
這個(gè)方法為讀取Hadoop配置文件的邏輯流部,較為復(fù)雜戚绕,接下來詳細(xì)分析下。
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath
// 創(chuàng)建個(gè)空的conf
Configuration result = new HdfsConfiguration();
// 標(biāo)記是否找到hadoop配置文件
boolean foundHadoopConfiguration = false;
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: HADOOP_HOME environment variables
// 保存兩個(gè)可能的hadoop conf路徑
String[] possibleHadoopConfPaths = new String[2];
// 獲取HADOOP_HOME環(huán)境變量的值
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
// 如果發(fā)現(xiàn)HADOOP_HOME環(huán)境變量的值
// 嘗試分別從如下路徑獲戎健:
// $HADOOP_HOME/conf
// $HADOOP_HOME/etc/hadoop
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
// 依次嘗試讀取possibleHadoopConfPath下的core-site.xml文件和hdfs-site.xml文件到hadoop conf中
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}
// Approach 2: Flink configuration (deprecated)
// 獲取Flink配置項(xiàng) fs.hdfs.hdfsdefault 對(duì)應(yīng)的配置文件舞丛,加入hadoop conf
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug(
"Using hdfs-default configuration-file path from Flink config: {}",
hdfsDefaultPath);
foundHadoopConfiguration = true;
}
// 獲取Flink配置項(xiàng) fs.hdfs.hadoopconf 對(duì)應(yīng)的配置文件,加入hadoop conf
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
// 獲取Flink配置項(xiàng) fs.hdfs.hadoopconf 對(duì)應(yīng)的配置文件果漾,加入hadoop conf
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
// Approach 3: HADOOP_CONF_DIR environment variable
// 從系統(tǒng)環(huán)境變量HADOOP_CONF_DIR目錄中讀取hadoop配置文件
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
// 讀取Flink配置文件中所有以flink.hadoop.為前綴的key
// 將這些key截掉這個(gè)前綴作為新的key球切,和原先的value一起作為hadoop conf的配置項(xiàng),存放入hadoop conf
for (String key : flinkConfiguration.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
}
}
}
// 如果以上途徑均未找到hadoop conf跨晴,顯示告警信息
if (!foundHadoopConfiguration) {
LOG.warn(
"Could not find Hadoop configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
}
return result;
}
我們總結(jié)下Flink讀取Hadoop配置文件的完整邏輯欧聘,從上到下為讀取順序:
- 讀取
HADOOP_HOME
環(huán)境變量片林,如果存在端盆,分別從它的conf
和etc/hadoop
目錄下讀取core-site.xml
和hdfs-site.xml
文件。 - 從Flink配置文件的
fs.hdfs.hdfsdefault
配置項(xiàng)所在目錄下尋找费封。 - 從Flink配置文件的
fs.hdfs.hadoopconf
配置項(xiàng)所在目錄下尋找焕妙。 - 從
HADOOP_CONF_DIR
環(huán)境變量對(duì)應(yīng)的目錄下尋找。 - 讀取Flink配置文件中所有以
flink.hadoop.
為前綴的key弓摘,將這些key截掉這個(gè)前綴作為新的key焚鹊,和原先的value一起作為hadoop conf的配置項(xiàng),存放入hadoop conf韧献。
JaasModule
install
方法讀取了java.security.auth.login.config
系統(tǒng)變量對(duì)應(yīng)的jaas配置末患,并且將Flink配置文件中相關(guān)配置轉(zhuǎn)換為JAAS中的entry,合并到系統(tǒng)變量對(duì)應(yīng)的jaas配置中并設(shè)置給JVM锤窑。代碼如下所示:
@Override
public void install() {
// ensure that a config file is always defined, for compatibility with
// ZK and Kafka which check for the system property and existence of the file
// 讀取java.security.auth.login.config系統(tǒng)變量值璧针,用于在卸載module的時(shí)候恢復(fù)
priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
// 如果沒有配置
if (priorConfigFile == null) {
// Flink的 io.tmp.dirs配置項(xiàng)第一個(gè)目錄為workingDir
// 將默認(rèn)的flink-jaas.conf文件寫入這個(gè)位置,創(chuàng)建臨時(shí)文件渊啰,名為jass-xxx.conf
// 在JVM進(jìn)程關(guān)閉的時(shí)候刪除這個(gè)臨時(shí)文件
File configFile = generateDefaultConfigFile(workingDir);
// 配置java.security.auth.login.config系統(tǒng)變量值
// 保證這個(gè)系統(tǒng)變量的值始終存在探橱,這是為了兼容Zookeeper和Kafka
// 他們會(huì)去檢查這個(gè)jaas文件是否存在
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
LOG.info("Jaas file will be created as {}.", configFile);
}
// read the JAAS configuration file
// 讀取已安裝的jaas配置文件
priorConfig = javax.security.auth.login.Configuration.getConfiguration();
// construct a dynamic JAAS configuration
// 包裝為DynamicConfiguration申屹,這個(gè)配置是可以修改的
currentConfig = new DynamicConfiguration(priorConfig);
// wire up the configured JAAS login contexts to use the krb5 entries
// 從Flink配置文件中讀取kerberos配置
// AppConfigurationEntry為Java讀取Jaas配置文件中一段配置項(xiàng)的封裝
// 一段配置項(xiàng)指的是大括號(hào)之內(nèi)的配置
AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
if (krb5Entries != null) {
// 遍歷Flink配置項(xiàng)security.kerberos.login.contexts,作為entry name使用
for (String app : securityConfig.getLoginContextNames()) {
// 將krb5Entries對(duì)應(yīng)的AppConfigurationEntry添加入currrentConfig
// 使用security.kerberos.login.contexts對(duì)應(yīng)的entry name
currentConfig.addAppConfigurationEntry(app, krb5Entries);
}
}
// 設(shè)置新的currentConfig
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
}
上面代碼中getAppConfigurationEntries
方法邏輯較為復(fù)雜隧膏,下面給出它的解析哗讥。
getAppConfigurationEntries
方法從Flink的securityConfig
中讀取配置,轉(zhuǎn)換為JAAS entry的格式胞枕,存入AppConfigurationEntry
杆煞。如果Flink配置了security.kerberos.login.use-ticket-cache
,加載類似如下內(nèi)容的文件曲稼,生成一個(gè)AppConfigurationEntry
叫做userKerberosAce
:
EntryName {
com.sun.security.auth.module.Krb5LoginModule optional
doNotPrompt=true
useTicketCache=true
renewTGT=true;
};
如果Flink中配置了security.kerberos.login.keytab
索绪,會(huì)加載如下配置,生成一個(gè)AppConfigurationEntry
叫做keytabKerberosAce
:
EntryName {
com.sun.security.auth.module.Krb5LoginModule required
keyTab=keytab路徑
doNotPrompt=true
useKeyTab=true
storeKey=true
principal=principal名稱
refreshKrb5Config=true;
};
getAppConfigurationEntries
最后返回這兩個(gè)AppConfigurationEntry
的集合贫悄,如果某一個(gè)為null瑞驱,只返回其中一個(gè)。
ZookeeperModule
install
方法窄坦,主要作用為根據(jù)Flink配置唤反,設(shè)置Zookeeper相關(guān)的幾個(gè)系統(tǒng)變量的值。
@Override
public void install() throws SecurityInstallException {
// 獲取zookeeper.sasl.client系統(tǒng)變量值鸭津,用于在卸載module的時(shí)候恢復(fù)
priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
// 讀取Flink配置項(xiàng)zookeeper.sasl.disable的值彤侍,根據(jù)其語義(取反)設(shè)置為zookeeper.sasl.client系統(tǒng)變量
System.setProperty(
ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
// 獲取zookeeper.sasl.client.username系統(tǒng)變量值,用于在卸載module的時(shí)候恢復(fù)
priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
// 讀取Flink配置項(xiàng)zookeeper.sasl.service-name
// 如果不為默認(rèn)值zookeeper逆趋,設(shè)置zookeeper.sasl.client.username系統(tǒng)變量
if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
}
// 獲取zookeeper.sasl.clientconfig系統(tǒng)變量值盏阶,用于在卸載module的時(shí)候恢復(fù)
priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
// 讀取Flink配置項(xiàng)zookeeper.sasl.login-context-name
// 如果不為默認(rèn)值Client,設(shè)置zookeeper.sasl.clientconfig系統(tǒng)變量
if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
System.setProperty(
ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
}
}
SecurityContext
顧名思義為安全環(huán)境上下文闻书,用于在不同認(rèn)證環(huán)境下執(zhí)行需要授權(quán)才能調(diào)用的邏輯名斟。
HadoopSecurityContext
HadoopSecurityContext
用于在認(rèn)證過的UserGroupInformation
中執(zhí)行邏輯(封裝在Callable
中)。
public class HadoopSecurityContext implements SecurityContext {
private final UserGroupInformation ugi;
public HadoopSecurityContext(UserGroupInformation ugi) {
this.ugi = Preconditions.checkNotNull(ugi, "UGI passed cannot be null");
}
public <T> T runSecured(final Callable<T> securedCallable) throws Exception {
return ugi.doAs((PrivilegedExceptionAction<T>) securedCallable::call);
}
}
NoOpSecurityContext
NoOpSecurityContext
不做任何認(rèn)證魄眉,直接運(yùn)行Callable
砰盐。
public class NoOpSecurityContext implements SecurityContext {
@Override
public <T> T runSecured(Callable<T> securedCallable) throws Exception {
return securedCallable.call();
}
}
本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正坑律。如需轉(zhuǎn)載請(qǐng)注明出處岩梳。