最近在HDP2.6的環(huán)境里嘗試了Kerberos,在各組件運行正常的情況下最終成功運行spark-streaming應(yīng)用,總結(jié)一下就是一葉障目,不見泰山鬼譬,坑多梯子少。尤其在國內(nèi)逊脯,關(guān)于Kerberos的資料較少优质,但在生產(chǎn)環(huán)境中,Kerberos又是如鯁在喉男窟,無法忽視盆赤。
因此分享這篇文章贾富,希望能給還在苦苦爬坑的小伙伴們一點幫助歉眷。
- 我們的HDP為單用戶ocsp安裝,多用戶需要根據(jù)以下步驟進(jìn)行細(xì)微修改
確認(rèn)OCSP各組件的Kerberos工作正常
1. Kafka
使用kafka-topics.sh創(chuàng)建topic
使用kafka producer和consumer需要先kinit
kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/host-10-1-236-122@ASIAINFO.COM
-
使用producer發(fā)送消息颤枪,consumer消費消息
kafka producer
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --topic kerin --broker-list host-10-1-236-122:6667 --security-protocol PLAINTEXTSASL
kafka consumer
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-consumer.sh --topic kerin --security-protocol PLAINTEXTSASL --bootstrap-server host-10-1-236-122:6667
FAQ:
- 使用kafka producer和consumer需要先kinit kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/<hostname>@ASIAINFO.COM
- 否則:
-
kafka producer 報錯:
[2017-07-19 10:44:56,582] WARN Error while fetching metadata with correlation id 0 : {kertest=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
-
kafka consumer 報錯:
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
-
2. Hive
- kinit
- 使用beeline登錄
3. Phoenix
- 使用sqlline與principal汗捡,keytab登錄
進(jìn)行Spark,Kafka針對Kerberos相關(guān)配置
1. 先放上最后提交任務(wù)的命令
spark-submit --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar
--principal與--keytab這兩個參數(shù)為spark需要的Kerberos認(rèn)證信息
--driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf"為driver連接kafka用到的認(rèn)證信息,因此使用本地絕對路徑
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"為executor連接kafka用到的Kerberos認(rèn)證信息扇住,因此使用container中的相對路徑./
jaas文件中定義了principal與keytab春缕,由于我們使用了yarn-client模式,driver需要的文件在本地文件系統(tǒng)艘蹋,executor需要的文件需要我們使用--files的方式上傳锄贼,即--files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab"
有的文檔中說--files中傳keytab文件會與spark本身的--keytab 沖突,其實是因為他們對spark和kafka使用了相同的principal和keytab女阀,在上述命令中我為了清晰起見宅荤,讓spark使用了principal ocsp-yg@ASIAINFO.COM,keytab hdfs.headless.keytab浸策,讓spark連接kafka時使用了principal ocsp/ASIAINFO.COM(principal其實是在jaas文件中指定的冯键,3中詳細(xì)講jaas文件) keytab ocsp.keytab,當(dāng)spark提交任務(wù)時庸汗,yarn會將--keytab后面的keytab文件與--files里的文件先后上傳惫确,即 hdfs.headless.keytab與ocsp.keytab均會被上傳,spark與kafka各取所需蚯舱,即可正常工作改化。當(dāng)spark與kafka要使用相同的keytab文件時,比如都用ocsp.keytab晓淀,那么yarn會先后上傳兩次ocsp.keytab所袁,在spark正使用的時候更新了keytab,造成異常退出
因此如果spark與kafka需要使用相同的keytab文件凶掰,我們只需要在--files里不要上傳keytab即可避免沖突
spark-submit --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar
- 還有一個問題是本例中drvier和executor使用了相同的kafka_client_jaas.conf燥爷,這也會造成一些問題,3中會詳細(xì)說明
2. 生成keytab和principal
- 在KDC Server上執(zhí)行
kadmin -p admin/admin@ASIAINFO.COM
- 生成principal懦窘,principal最好使用ocsp的用戶名+domain
addprinc -randkey ocsp/ASIAINFO.COM
- 生成keytab
ktadd -k /data/ocsp.keytab ocsp/ASIAINFO.COM
- 將keytab文件copy到spark driver所在的機器(因為OCSP默認(rèn)使用yarn-client模式)
3. 創(chuàng)建spark讀取kafka的jaas配置文件
- 配置文件kafka_client_jaas.conf樣例如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
principal="ocsp@ASIAINFO.COM"
keyTab="./ocsp.keytab"
renewTicket=true
storeKey=true
serviceName="ocsp";
};
其中useTicketCache指從系統(tǒng)的cash中讀取credential信息前翎,useKeyTab指從指定的keyTab文件讀取credential
-
principal和keytab用第二步生成的principal與keytab,注意:k?eytab的路徑
- 如果這個conf文件是給driver讀取畅涂,則我們要用keytab文件在本地的絕對路徑
- 如果這個conf文件是executor讀取港华,則我們要用keytab文件在container中的相對路徑,即./ocsp.keytab
- 如果為了方便起見午衰,drvier與executor要使用相同的jaas文件立宜,路徑配置為./ocsp.keytab,我們需要將keytab文件copy到運行spark-submit的當(dāng)前路徑
- 如果driver和executor要使用不同的jaas文件臊岸,則driver的jaas文件中橙数,keytab應(yīng)為本地絕對路徑,executor的jaas文件中帅戒,keytab應(yīng)為相對路徑./
4. 配置spark1.6+kafka0.10 jar包
- 在我們的應(yīng)用中有兩部分需要修改灯帮,一個是處理之前從kafka讀取數(shù)據(jù),一個是處理結(jié)束后向kafka寫數(shù)據(jù)
- 由于kafka0.10版本后才支持Kerberos,而官方Spark2.* 之后才適配kafka0.10钟哥,但我們目前使用HDP2.6中spark1.6與2.* 雙版本迎献,Spark 1.6 + Kafka 0.10就需要使用HDP提供的spark-kafka-0-10-connector包,官網(wǎng)說明如下:https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.0/bk_spark-component-guide/content/using-spark-streaming.html#spark-streaming-jar
- 通過官方maven的方式?jīng)]有添加成功腻贰,可能是網(wǎng)絡(luò)原因吁恍,因此我是從https://github.com/hortonworks-spark/skc下載源碼,本地編譯播演,然后添加本地jar包進(jìn)我們的項目
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
<version>1.0.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
</dependency>
5. 修改Spark讀取Kafka部分
- 需要import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
- 我們使用的DirectApi讀取kafka
KafkaUtils.createDirectStream[String, String](
SSC,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))
KafkaParams配置如下:
val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
, "key.deserializer" -> classOf[StringDeserializer]
, "value.deserializer" -> classOf[StringDeserializer]
, "security.protocol" -> "SASL_PLAINTEXT"
, "bootstrap.servers" -> "kafka-server1:6667"
, "group.id" -> "test")
6. 修改Spark寫Kafka部分
- 寫kafka調(diào)用的是kafka官方的庫
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
- 代碼中需要import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
val props = new Properties()
props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
if (MainFrameConf.KERBEROS_ENABLE == "true"){
props.put("security.protocol","SASL_PLAINTEXT")
}
new KafkaProducer[String, String](props)