由于特殊需求紧索,需要在一次
Spark
任務(wù)中切換HDFS
集群袁辈。
本文我將介紹如何在一次的spark
任務(wù)中操作不同的HDFS集群
我們以wordcount
為例,分析如何配置珠漂。我們的輸入數(shù)據(jù)源來自cluster1的HDFS晚缩,需要將分析結(jié)果輸出到cluster2的HDFS。
val conf = new SparkConf().setAppName("Spark Word Count")
val sc = new SparkContext()
// 在輸入數(shù)據(jù)之前先將hadoop config配置為cluster1集群
sc.hadoopConfiguration.addResource("cluster1/core-site.xml")
sc.hadoopConfiguration.addResource("cluster1/hdfs-site.xml")
// load data
val input = sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
// 再將hadoop config設(shè)為cluster2集群
sc.hadoopConfiguration.addResource("cluster2/core-site.xml")
sc.hadoopConfiguration.addResource("cluster2/hdfs-site.xml")
input.saveAsTextFile(args(1))
core-site.xml和hdfs-site.xml放在項目的
resources
目錄下
通過上述例子我們可以看到媳危,我們?nèi)绻枰?code>spark任務(wù)中想操作不同的hdfs
集群荞彼,我們需要在操作之前先將hadoop
的config
設(shè)置為我們需要操作的目標(biāo)HDFS集群即可。
向spark提交任務(wù):
bin/spark-submit --master yarn-client --class SparkWordcount run.jar /input /output
NOTE: 這里我們即可以寫成全路徑形式待笑,即:
hdfs://cluster1/input
hdfs://cluster2/output
鸣皂,也可以寫成上面相對路徑的形式。
上面我們通過hadoopConfiguration
的addResource
方法來添加相關(guān)配置,其實(shí)Spark在操作hdfs的時候签夭,只需hadoop的ha相關(guān)配置就可以了齐邦,所以我們也可以通過代碼來直接配置hadoop的相關(guān)配置椎侠。
val conf = new SparkConf().setAppName("Spark Word Count")
val sc = new SparkContext()
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://cluster1");
sc.hadoopConfiguration.set("dfs.nameservices", "cluster1");
sc.hadoopConfiguration.set("dfs.ha.namenodes.cluster1", "nn1,nn2");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster1.nn1", "namenode001:8020");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster1.nn2", "namenode002:8020");
sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
val wc = sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://cluster2");
sc.hadoopConfiguration.set("dfs.nameservices", "cluster2");
sc.hadoopConfiguration.set("dfs.ha.namenodes.cluster2", "nn3,nn4");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster2.nn3", "namenode003:8020");
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cluster2.nn4", "namenode004:8020");
sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cluster2", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
wc.saveAsTextFile(args(1))
這樣我們就兩種不同的方式來配置hadoop
的config
第租,我們可以根據(jù)自己的需求來選擇需要用哪種方式來配置