微信公眾號:大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)
關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊株扛。問題或建議服爷,請公眾號留言;
如果您覺得“大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)”對你有幫助继低,歡迎轉(zhuǎn)發(fā)朋友圈
從微信公眾號拷貝過來黑界,格式有些錯亂,建議直接去公眾號閱讀
? 當(dāng)kafka開啟Kerberos認(rèn)證后羹蚣,如何使用Flink生產(chǎn)或消費(fèi)數(shù)據(jù)呢嗤堰?其實(shí)就是在生產(chǎn)消費(fèi)者的代碼中加入jaas.conf、keytab這些認(rèn)證有關(guān)的配置度宦,下面我們直接看代碼:
版本信息:
flink1.9.0
kafka0.10.0
這里提示一下,如果版本依賴的不一致會報錯告匠,一定要對應(yīng)版本:
java.lang.NoSuchMethodError:org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
1.其實(shí)連接Kerberos集群很簡單戈抄,需要下面三個文件:
1).KerberosServer的配置文件krb5.conf,讓程序知道我應(yīng)該哪個kdc去登錄認(rèn)證;
[libdefaults]udp_preference_limit=1 renew_lifetime=3650dforwardable=truedefault_realm=CHINAUNICOMticket_lifetime=3650ddns_lookup_realm=falsedns_lookup_kdc=falsedefault_ccache_name=/tmp/krb5cc_%{uid}? #default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5??#default_tkt_enctypes?=?aes?des3-cbc-sha1?rc4?des-cbc-md5[domain_realm]??.CHINAUNICOM?=?CHINAUNICOM[logging]default=FILE:/var/log/krb5kdc.logadmin_server=FILE:/var/log/kadmind.log??kdc?=?FILE:/var/log/krb5kdc.log[realms]CHINAUNICOM={????admin_server?=?master98.hadoop.ljskdc=master98.hadoop.ljs??}
2).認(rèn)證肯定需要指定認(rèn)證方式這里需要一個jaas.conf文件后专,一般集群的conf目錄下都有划鸽;
KafkaClient{com.sun.security.auth.module.Krb5LoginModulerequireduseKeyTab=truekeyTab="D:\\kafkaSSL\\kafka.service.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka/salver32.hadoop.unicom@CHINAUNICOM"serviceName=kafka;};
??? 3).就是用戶的登錄認(rèn)證票據(jù)和認(rèn)證文件,票據(jù)和keytab文件這里就不在貼了;
2.為防止你依賴報錯戚哎,這里貼下pom.xml依賴裸诽,可能有些冗余,自己刪除即可:
org.apache.kafkakafka-clients${kafka.version}compileorg.apache.flinkflink-hadoop-fs${flink.version}org.apache.hadoophadoop-common${hadoop.version}org.apache.hadoophadoop-hdfs${hadoop.version}org.apache.httpcomponentshttpclient${httpclient.version}org.apache.flinkflink-connector-kafka-0.10_2.111.9.0compile
4.Flink接收socket端消息型凳,發(fā)送到kafka:
5.Flink將socket接收的數(shù)據(jù)發(fā)送Kafka,代碼實(shí)例:
packagecom.hadoop.ljs.flink.streaming;importcom.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;importorg.apache.kafka.clients.producer.ProducerConfig;importjava.util.Properties;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-02-29 09:31*@version: v1.0*@description: com.hadoop.ljs.flink.streaming */publicclassFlinkKafkaKerberosProducer{publicstaticfinalString topic="topic1";publicstaticfinalString krb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalString kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalString bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticfinalString hostname="localhost";publicstaticfinalintport=9000;publicstaticvoidmain(String[]?args)throwsException{//在windows中設(shè)置JAAS丈冬,也可以通過-D方式傳入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config",?kafkaJaasConf);/*獲取flink流式計算執(zhí)行環(huán)境*/finalStreamExecutionEnvironment?senv?=?StreamExecutionEnvironment.getExecutionEnvironment();/*從Socket端接收數(shù)據(jù)*/DataStream dataSource = senv.socketTextStream(hostname, port,"\n");/*下面可以根據(jù)自己的需求進(jìn)行自動的轉(zhuǎn)換*//*接收的數(shù)據(jù),中間可經(jīng)過復(fù)雜的處理甘畅,最后發(fā)送到kafka端*/dataSource.addSink(newFlinkKafkaProducer010(topic,newCustomKeyedSerializationSchema(), getProducerProperties()));/*啟動*/senv.execute("FlinkKafkaProducer");????}publicstaticPropertiesgetProducerProperties(){Properties props =newProperties();props.put("bootstrap.servers", bootstrapServers);props.put("acks","1");props.put("retries",3);props.put("batch.size",16384);props.put("linger.ms",1);props.put("buffer.memory",33554432);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;? ? }}
6.Flink連接kafka消費(fèi)消息埂蕊,代碼實(shí)例:
package?com.hadoop.ljs.flink.streaming;importcom.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;importorg.apache.kafka.clients.consumer.ConsumerRecord;importjava.util.HashMap;importjava.util.Map;importjava.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-29 09:31 * @version: v1.0 * @description: com.hadoop.ljs.flink.streaming */publicclassFlinkKafkaKerberosConsumer{publicstaticfinalStringkrb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalStringkafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalStringtopic="topic1";publicstaticfinalStringconsumerGroup="test_topic1";publicstaticfinalStringbootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticvoid?main(String[]?args)throwsException{//在windows中設(shè)置JAAS往弓,也可以通過-D方式傳入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config", kafkaJaasConf);finalStreamExecutionEnvironmentenv?=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FlinkKafkaConsumer010 consumer010 = newFlinkKafkaConsumer010(topic,newSimpleStringSchema(), getComsumerProperties());????????consumer010.setStartFromEarliest();??//source從kafkaDataStream?dataStream?=?env.addSource(consumer010);dataStream.print();try{? ? ? ? ? ? env.execute();}catch(Exceptionex) {? ? ? ? ? ? ex.printStackTrace();? ? ? ? }? ? }privatestaticPropertiesgetComsumerProperties() {Propertiesprops = newProperties();props.put("bootstrap.servers",bootstrapServer);props.put("group.id",consumerGroup);props.put("auto.offset.reset","earliest");props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;? ? }}