IDEA配置本地開(kāi)發(fā)環(huán)境連接遠(yuǎn)程集群訪問(wèn)Hdfs累舷,Spark SQL訪問(wèn)Hive

摘要:IDEA蝇摸,Spark橘洞,Hive捌袜,Hdfs

IDEA配置訪問(wèn)hdfs

  • IDEA本地環(huán)境需要配置pom.xml依賴配置hadoop-client
  • resources集群hadoop配置文件hdfs-site.xmlcore-site.xml
        <!-- Hadoop  -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

hdfs-site.xml震檩,core-site.xml放入src/main/resources目錄下
hifs-site.xml配置詳解琢蛤,通過(guò)RPC通信地址進(jìn)行hdfs文件的讀寫(xiě)操作

<configuration>
        <!-- hdfs數(shù)據(jù)塊的復(fù)制份數(shù),默認(rèn)3抛虏,和三臺(tái)機(jī)器datanode節(jié)點(diǎn)對(duì)應(yīng) -->
        <property>
                <name>dfs.replication</name>
                <value>3</value>
        </property>
         <!-- 是否在HDFS中開(kāi)啟權(quán)限檢查 -->
        <property>
                <name>dfs.permissions.enabled</name>
                <value>false</value>
        </property>
        <!-- 為namenode集群定義一個(gè)services name -->
        <property>
                <name>dfs.nameservices</name>
                <value>ns1</value>
        </property>
        <!-- 文件按照128m進(jìn)行分割 -->
        <property>
                <name>dfs.blocksize</name>
                <value>134217728</value>
        </property>
        <!--services ns1包含那些namenode博其,為namenode起名  -->
        <property>
                <name>dfs.ha.namenodes.ns1</name>
                <value>nn1,nn2</value>
        </property>
        <!-- nn1的RPC通信地址,nn1所在地址 -->
        <property>
                <name>dfs.namenode.rpc-address.ns1.nn1</name>
                <value>cloudera01:8020</value>
        </property>
        <!-- nn1的http通信地址,外部訪問(wèn)地址 -->
        <property>
                <name>dfs.namenode.http-address.ns1.nn1</name>
                <value>cloudera01:50070</value>
        </property>
        <!-- nn2的RPC通信地址,nn2所在地址 -->
        <property>
                <name>dfs.namenode.rpc-address.ns1.nn2</name>
                <value>cloudera02:8020</value>
        </property>
        <!-- nn2的http通信地址,外部訪問(wèn)地址 -->
        <property>
                <name>dfs.namenode.http-address.ns1.nn2</name>
                <value>cloudera02:50070</value>
        </property>
        <!--客戶端通過(guò)代理訪問(wèn)namenode,訪問(wèn)文件系統(tǒng),HDFS 客戶端與Active 節(jié)點(diǎn)通信的Java 類(lèi),使用其確定Active 節(jié)點(diǎn)是否活躍 -->
        <property>
                <name>dfs.client.failover.proxy.provider.ns1</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
                </value>
        </property>
        <!-- 這個(gè)是開(kāi)啟自動(dòng)故障轉(zhuǎn)移,如果你沒(méi)有自動(dòng)故障轉(zhuǎn)移,這個(gè)可以先不配 -->
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>
</configuration>

core-site.xml配置詳解

<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://ns1</value>
        </property>
        <!-- zookeeper集群地址 -->
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>cloudera01:2181,cloudera02:2181,cloudera14:2181,cloudera03:2181,cloudera16:2181</value>
        </property>
        <property>
                <name>fs.hdfs.impl.disable.cache</name>
                <value>true</value>
        </property>
</configuration>

scala腳本訪問(wèn)hdfs工具類(lèi),使用Configuration加載core-site.xml迂猴, hdfs-site.xml配置文件獲得hdfs入口進(jìn)行讀寫(xiě)操作

import java.io._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, _}
import org.apache.hadoop.io.IOUtils

/**
 * hdfs文件夾操作類(lèi)
 */
object HdfsUtils {

  def getFS(): FileSystem = {
    this.synchronized {
      System.setProperty("HADOOP_USER_NAME", "hdfs")
      val conf = new Configuration()
      conf.addResource(getClass.getResourceAsStream("/core-site.xml"))
      conf.addResource(getClass.getResourceAsStream("/hdfs-site.xml"))
      conf.set("mapred.remote.os", "Linux")
      println(conf)
      FileSystem.get(conf)
    }
  }

  /**
   * 關(guān)閉FileSystem
   *
   * @param fileSystem
   */
  def closeFS(fileSystem: FileSystem) {
    this.synchronized {
      if (fileSystem != null) {
        try {
          fileSystem.close()
        } catch {
          case e: IOException => e.printStackTrace()
        }
      }
    }
  }

  /**
   * ls
   *
   * @param hdfsFilePath
   */
  def listFiles(hdfsFilePath: String): Unit = {
    this.synchronized {
      val fileSystem = getFS()
      try {
        val fstats = fileSystem.listStatus(new Path(hdfsFilePath))
        for (fstat <- fstats) {
          if (fstat.isDirectory()) {
            println("directory")
          } else {
            println("file")
          }
          println("Permission:" + fstat.getPermission())
          println("Owner:" + fstat.getOwner())
          println("Group:" + fstat.getGroup())
          println("Size:" + fstat.getLen())
          println("Replication:" + fstat.getReplication())
          println("Block Size:" + fstat.getBlockSize())
          println("Name:" + fstat.getPath())
          println("#############################")
        }

      } catch {
        case e: IOException => e.printStackTrace()
      } finally {
        if (fileSystem != null) {
          try {
            fileSystem.close()
          } catch {
            case e: IOException => e.printStackTrace()
          }
        }
      }
    }
  }

  def ls(fileSystem: FileSystem, path: String) = {
    println("list path:" + path)
    val fs = fileSystem.listStatus(new Path(path))
    val listPath = FileUtil.stat2Paths(fs)
    for (p <- listPath) {
      println(p)
    }
    println("----------------------------------------")
  }

  /**
   * 創(chuàng)建目錄
   *
   * @param hdfsFilePath
   */
  def mkdir(hdfsFilePath: String) = {
    this.synchronized {
      val fileSystem = getFS()

      try {
        val success = fileSystem.mkdirs(new Path(hdfsFilePath))
        if (success) {
          println("Create directory or file successfully")
        }
      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 刪除文件或目錄
   *
   * @param hdfsFilePath
   * @param recursive 遞歸
   */
  def rm(hdfsFilePath: String, recursive: Boolean): Unit = {
    this.synchronized {
      val fileSystem = this.getFS()
      try {
        val path = new Path(hdfsFilePath)
        if (fileSystem.exists(path)) {
          val success = fileSystem.delete(path, recursive)
          if (success) {
            System.out.println("delete successfully")
          }
        }

      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 上傳文件到HDFS
   *
   * @param localPath
   * @param hdfspath
   */
  def write(localPath: String, hdfspath: String) {
    this.synchronized {
      val fileSystem = this.getFS()
      var inStream: FileInputStream = null
      var outStream: FSDataOutputStream = null
      try {
        inStream = new FileInputStream(
          new File(localPath))
        val writePath = new Path(hdfspath)
        outStream = fileSystem.create(writePath)
        IOUtils.copyBytes(inStream, outStream, 4096, false)
      } catch {
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(inStream)
        IOUtils.closeStream(outStream)
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 讀文本文件并返回行的列表
   *
   * @param hdfspath
   */
  def readAllLines(hdfspath: String): scala.collection.mutable.ListBuffer[String] = {
    this.synchronized {
      val fileSystem = this.getFS()
      var inStreamReader: InputStreamReader = null
      var isr: java.io.BufferedReader = null
      var allLines: scala.collection.mutable.ListBuffer[String] = scala.collection.mutable.ListBuffer()
      try {
        val readPath = new Path(hdfspath)
        inStreamReader = new InputStreamReader(fileSystem.open(readPath), "UTF-8")
        isr = new java.io.BufferedReader(inStreamReader)
        var line: String = null
        do {
          line = isr.readLine()
          if (line != null) {
            //println(line)
            allLines += line;
          }
        } while (line != null)
      } catch {
        case e: IOException => {
          e.printStackTrace()
        }
      } finally {
        isr.close
        inStreamReader.close
        this.closeFS(fileSystem)
      }
      allLines
    }
  }


  /**
   * 讀文本文件并返回行的列表
   *
   * @param hdfspath
   */
  def readContent(hdfspath: String): String = {
    this.synchronized {
      val fileSystem = this.getFS()
      var buf: Array[Byte] = null
      var inputStream: FSDataInputStream = null
      try {
        val readPath = new Path(hdfspath)
        buf = new Array[Byte](fileSystem.getFileStatus(readPath).getLen.toInt)
        inputStream = fileSystem.open(readPath)
        var toRead: Int = buf.length
        var off = 0
        while (toRead > 0) {
          val ret: Int = inputStream.read(buf, off, toRead)
          if (ret < 0) {
            throw new IOException("Premature EOF from inputStream")
          }
          toRead = toRead - ret
          off += ret
          Thread.sleep(10)
        }
        new String(buf, "UTF-8")
      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
          ""
      } finally {
        inputStream.close
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * //    * 上傳文件到HDFS
   * //    *
   * //    * @param localFilePath
   * //    * @param hdfsFilePath
   * //
   */
  def put(localFilePath: String, hdfsFilePath: String) = {
    this.synchronized {
      val fileSystem = this.getFS()
      var fdos: FSDataOutputStream = null
      var fis: FileInputStream = null
      try {
        fdos = fileSystem.create(new Path(hdfsFilePath))
        fis = new FileInputStream(new File(localFilePath))
        IOUtils.copyBytes(fis, fdos, 1024)
      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(fdos)
        IOUtils.closeStream(fis)
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 打印hdfs上的文件內(nèi)容
   *
   * @param hdfsFilePath
   */
  def cat(hdfsFilePath: String) {
    this.synchronized {
      val fileSystem = this.getFS()
      var inStream: FSDataInputStream = null
      try {
        val readPath = new Path(hdfsFilePath)
        inStream = fileSystem.open(readPath)
        IOUtils.copyBytes(inStream, System.out, 4096, false)
      } catch {
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(inStream)
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 打印hdfs上的文件內(nèi)容
   *
   * @param hdfsFilePath
   */
  def exist(hdfsFilePath: String): Boolean = {
    this.synchronized {
      val fileSystem = this.getFS()
      try {
        fileSystem.exists(new Path(hdfsFilePath))
      } catch {
        case e: IOException =>
          e.printStackTrace()
          false
          false
      } finally {
        //      this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 下載文件到本地
   *
   * @param localFilePath
   * @param hdfsFilePath
   */
  def get(localFilePath: String, hdfsFilePath: String) {
    this.synchronized {
      val fileSystem = this.getFS()
      var fsis: FSDataInputStream = null
      var fos: FileOutputStream = null
      try {
        fsis = fileSystem.open(new Path(hdfsFilePath))
        fos = new FileOutputStream(new File(localFilePath))
        IOUtils.copyBytes(fsis, fos, 1024)
      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(fsis)
        IOUtils.closeStream(fos)
        this.closeFS(fileSystem)
      }
    }
  }
}

IDEA配置Spark訪問(wèn)遠(yuǎn)程集群hive

  • IDEA本地環(huán)境需要配置pom.xml依賴配置spark-core慕淡,spark-sqlspark-hive依賴沸毁,其中spark依賴的依賴范圍使用默認(rèn)的compile
  • resources集群hive配置文件hive-site.xml指定metastore服務(wù)的機(jī)器地址和端口號(hào)
  • 遠(yuǎn)程集群hive開(kāi)啟metastore服務(wù)
hive --service metastore -p 9083 &

引入項(xiàng)目依賴

        <!--Spark  -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop  -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

hive-site.xml配置峰髓,其中hive.metastore.uris指定metastore服務(wù)運(yùn)行的機(jī)器ip和端口,并且需要單獨(dú)手動(dòng)啟動(dòng)metastore服務(wù)傻寂,客戶端連接metastore服務(wù),metastore再去連接MySQL數(shù)據(jù)庫(kù)來(lái)存取hive元數(shù)據(jù)携兵,元數(shù)據(jù)包含用Hive創(chuàng)建的database疾掰、table等的元信息。有了metastore服務(wù)徐紧,就可以有多個(gè)客戶端同時(shí)連接静檬,而且這些客戶端不需要知道MySQL數(shù)據(jù)庫(kù)的用戶名和密碼,只需要連接metastore 服務(wù)即可并级。

  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://cloudera01:9083</value>
  </property>

腳本測(cè)試使用Spark SQL連接遠(yuǎn)程hive拂檩,如果沒(méi)有權(quán)限訪問(wèn)hive表修改HADOOP_USER_NAME,例如為hdfs

import org.apache.spark.sql.SparkSession

object test {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")

    // hive
    val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
    spark.sql("show databases").show()
    val df = spark.sql("select * from test_gp.student_info")
    df.show()
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嘲碧,一起剝皮案震驚了整個(gè)濱河市稻励,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌愈涩,老刑警劉巖望抽,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異钠署,居然都是意外死亡糠聪,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)谐鼎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)舰蟆,“玉大人,你說(shuō)我怎么就攤上這事狸棍∩砗Γ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵草戈,是天一觀的道長(zhǎng)塌鸯。 經(jīng)常有香客問(wèn)我,道長(zhǎng)唐片,這世上最難降的妖魔是什么丙猬? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮费韭,結(jié)果婚禮上茧球,老公的妹妹穿的比我還像新娘。我一直安慰自己星持,他們只是感情好抢埋,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著,像睡著了一般揪垄。 火紅的嫁衣襯著肌膚如雪穷吮。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,443評(píng)論 1 302
  • 那天饥努,我揣著相機(jī)與錄音捡鱼,去河邊找鬼。 笑死肪凛,一個(gè)胖子當(dāng)著我的面吹牛堰汉,可吹牛的內(nèi)容都是我干的辽社。 我是一名探鬼主播伟墙,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼滴铅!你這毒婦竟也來(lái)了戳葵?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤汉匙,失蹤者是張志新(化名)和其女友劉穎拱烁,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體噩翠,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡戏自,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伤锚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片擅笔。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖屯援,靈堂內(nèi)的尸體忽然破棺而出猛们,到底是詐尸還是另有隱情,我是刑警寧澤狞洋,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布弯淘,位于F島的核電站,受9級(jí)特大地震影響吉懊,放射性物質(zhì)發(fā)生泄漏庐橙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一借嗽、第九天 我趴在偏房一處隱蔽的房頂上張望态鳖。 院中可真熱鬧,春花似錦淹魄、人聲如沸郁惜。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)兆蕉。三九已至羽戒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間虎韵,已是汗流浹背易稠。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留包蓝,地道東北人驶社。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像测萎,于是被迫代替她去往敵國(guó)和親亡电。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容