摘要:IDEA
蝇摸,Spark
橘洞,Hive
捌袜,Hdfs
IDEA配置訪問(wèn)hdfs
- IDEA本地環(huán)境需要配置
pom.xml
依賴配置hadoop-client
-
resources
集群hadoop配置文件hdfs-site.xml
,core-site.xml
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
將hdfs-site.xml
震檩,core-site.xml
放入src/main/resources
目錄下
hifs-site.xml配置詳解琢蛤,通過(guò)RPC
通信地址進(jìn)行hdfs文件的讀寫(xiě)操作
<configuration>
<!-- hdfs數(shù)據(jù)塊的復(fù)制份數(shù),默認(rèn)3抛虏,和三臺(tái)機(jī)器datanode節(jié)點(diǎn)對(duì)應(yīng) -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 是否在HDFS中開(kāi)啟權(quán)限檢查 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<!-- 為namenode集群定義一個(gè)services name -->
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- 文件按照128m進(jìn)行分割 -->
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<!--services ns1包含那些namenode博其,為namenode起名 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址,nn1所在地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn1</name>
<value>cloudera01:8020</value>
</property>
<!-- nn1的http通信地址,外部訪問(wèn)地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn1</name>
<value>cloudera01:50070</value>
</property>
<!-- nn2的RPC通信地址,nn2所在地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn2</name>
<value>cloudera02:8020</value>
</property>
<!-- nn2的http通信地址,外部訪問(wèn)地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn2</name>
<value>cloudera02:50070</value>
</property>
<!--客戶端通過(guò)代理訪問(wèn)namenode,訪問(wèn)文件系統(tǒng),HDFS 客戶端與Active 節(jié)點(diǎn)通信的Java 類(lèi),使用其確定Active 節(jié)點(diǎn)是否活躍 -->
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
</value>
</property>
<!-- 這個(gè)是開(kāi)啟自動(dòng)故障轉(zhuǎn)移,如果你沒(méi)有自動(dòng)故障轉(zhuǎn)移,這個(gè)可以先不配 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>
core-site.xml配置詳解
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns1</value>
</property>
<!-- zookeeper集群地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>cloudera01:2181,cloudera02:2181,cloudera14:2181,cloudera03:2181,cloudera16:2181</value>
</property>
<property>
<name>fs.hdfs.impl.disable.cache</name>
<value>true</value>
</property>
</configuration>
scala腳本訪問(wèn)hdfs工具類(lèi),使用Configuration
加載core-site.xml
迂猴, hdfs-site.xml
配置文件獲得hdfs入口進(jìn)行讀寫(xiě)操作
import java.io._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, _}
import org.apache.hadoop.io.IOUtils
/**
* hdfs文件夾操作類(lèi)
*/
object HdfsUtils {
def getFS(): FileSystem = {
this.synchronized {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val conf = new Configuration()
conf.addResource(getClass.getResourceAsStream("/core-site.xml"))
conf.addResource(getClass.getResourceAsStream("/hdfs-site.xml"))
conf.set("mapred.remote.os", "Linux")
println(conf)
FileSystem.get(conf)
}
}
/**
* 關(guān)閉FileSystem
*
* @param fileSystem
*/
def closeFS(fileSystem: FileSystem) {
this.synchronized {
if (fileSystem != null) {
try {
fileSystem.close()
} catch {
case e: IOException => e.printStackTrace()
}
}
}
}
/**
* ls
*
* @param hdfsFilePath
*/
def listFiles(hdfsFilePath: String): Unit = {
this.synchronized {
val fileSystem = getFS()
try {
val fstats = fileSystem.listStatus(new Path(hdfsFilePath))
for (fstat <- fstats) {
if (fstat.isDirectory()) {
println("directory")
} else {
println("file")
}
println("Permission:" + fstat.getPermission())
println("Owner:" + fstat.getOwner())
println("Group:" + fstat.getGroup())
println("Size:" + fstat.getLen())
println("Replication:" + fstat.getReplication())
println("Block Size:" + fstat.getBlockSize())
println("Name:" + fstat.getPath())
println("#############################")
}
} catch {
case e: IOException => e.printStackTrace()
} finally {
if (fileSystem != null) {
try {
fileSystem.close()
} catch {
case e: IOException => e.printStackTrace()
}
}
}
}
}
def ls(fileSystem: FileSystem, path: String) = {
println("list path:" + path)
val fs = fileSystem.listStatus(new Path(path))
val listPath = FileUtil.stat2Paths(fs)
for (p <- listPath) {
println(p)
}
println("----------------------------------------")
}
/**
* 創(chuàng)建目錄
*
* @param hdfsFilePath
*/
def mkdir(hdfsFilePath: String) = {
this.synchronized {
val fileSystem = getFS()
try {
val success = fileSystem.mkdirs(new Path(hdfsFilePath))
if (success) {
println("Create directory or file successfully")
}
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
this.closeFS(fileSystem)
}
}
}
/**
* 刪除文件或目錄
*
* @param hdfsFilePath
* @param recursive 遞歸
*/
def rm(hdfsFilePath: String, recursive: Boolean): Unit = {
this.synchronized {
val fileSystem = this.getFS()
try {
val path = new Path(hdfsFilePath)
if (fileSystem.exists(path)) {
val success = fileSystem.delete(path, recursive)
if (success) {
System.out.println("delete successfully")
}
}
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
this.closeFS(fileSystem)
}
}
}
/**
* 上傳文件到HDFS
*
* @param localPath
* @param hdfspath
*/
def write(localPath: String, hdfspath: String) {
this.synchronized {
val fileSystem = this.getFS()
var inStream: FileInputStream = null
var outStream: FSDataOutputStream = null
try {
inStream = new FileInputStream(
new File(localPath))
val writePath = new Path(hdfspath)
outStream = fileSystem.create(writePath)
IOUtils.copyBytes(inStream, outStream, 4096, false)
} catch {
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(inStream)
IOUtils.closeStream(outStream)
this.closeFS(fileSystem)
}
}
}
/**
* 讀文本文件并返回行的列表
*
* @param hdfspath
*/
def readAllLines(hdfspath: String): scala.collection.mutable.ListBuffer[String] = {
this.synchronized {
val fileSystem = this.getFS()
var inStreamReader: InputStreamReader = null
var isr: java.io.BufferedReader = null
var allLines: scala.collection.mutable.ListBuffer[String] = scala.collection.mutable.ListBuffer()
try {
val readPath = new Path(hdfspath)
inStreamReader = new InputStreamReader(fileSystem.open(readPath), "UTF-8")
isr = new java.io.BufferedReader(inStreamReader)
var line: String = null
do {
line = isr.readLine()
if (line != null) {
//println(line)
allLines += line;
}
} while (line != null)
} catch {
case e: IOException => {
e.printStackTrace()
}
} finally {
isr.close
inStreamReader.close
this.closeFS(fileSystem)
}
allLines
}
}
/**
* 讀文本文件并返回行的列表
*
* @param hdfspath
*/
def readContent(hdfspath: String): String = {
this.synchronized {
val fileSystem = this.getFS()
var buf: Array[Byte] = null
var inputStream: FSDataInputStream = null
try {
val readPath = new Path(hdfspath)
buf = new Array[Byte](fileSystem.getFileStatus(readPath).getLen.toInt)
inputStream = fileSystem.open(readPath)
var toRead: Int = buf.length
var off = 0
while (toRead > 0) {
val ret: Int = inputStream.read(buf, off, toRead)
if (ret < 0) {
throw new IOException("Premature EOF from inputStream")
}
toRead = toRead - ret
off += ret
Thread.sleep(10)
}
new String(buf, "UTF-8")
} catch {
case e: Exception => {
e.printStackTrace()
}
""
} finally {
inputStream.close
this.closeFS(fileSystem)
}
}
}
/**
* // * 上傳文件到HDFS
* // *
* // * @param localFilePath
* // * @param hdfsFilePath
* //
*/
def put(localFilePath: String, hdfsFilePath: String) = {
this.synchronized {
val fileSystem = this.getFS()
var fdos: FSDataOutputStream = null
var fis: FileInputStream = null
try {
fdos = fileSystem.create(new Path(hdfsFilePath))
fis = new FileInputStream(new File(localFilePath))
IOUtils.copyBytes(fis, fdos, 1024)
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(fdos)
IOUtils.closeStream(fis)
this.closeFS(fileSystem)
}
}
}
/**
* 打印hdfs上的文件內(nèi)容
*
* @param hdfsFilePath
*/
def cat(hdfsFilePath: String) {
this.synchronized {
val fileSystem = this.getFS()
var inStream: FSDataInputStream = null
try {
val readPath = new Path(hdfsFilePath)
inStream = fileSystem.open(readPath)
IOUtils.copyBytes(inStream, System.out, 4096, false)
} catch {
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(inStream)
this.closeFS(fileSystem)
}
}
}
/**
* 打印hdfs上的文件內(nèi)容
*
* @param hdfsFilePath
*/
def exist(hdfsFilePath: String): Boolean = {
this.synchronized {
val fileSystem = this.getFS()
try {
fileSystem.exists(new Path(hdfsFilePath))
} catch {
case e: IOException =>
e.printStackTrace()
false
false
} finally {
// this.closeFS(fileSystem)
}
}
}
/**
* 下載文件到本地
*
* @param localFilePath
* @param hdfsFilePath
*/
def get(localFilePath: String, hdfsFilePath: String) {
this.synchronized {
val fileSystem = this.getFS()
var fsis: FSDataInputStream = null
var fos: FileOutputStream = null
try {
fsis = fileSystem.open(new Path(hdfsFilePath))
fos = new FileOutputStream(new File(localFilePath))
IOUtils.copyBytes(fsis, fos, 1024)
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(fsis)
IOUtils.closeStream(fos)
this.closeFS(fileSystem)
}
}
}
}
IDEA配置Spark訪問(wèn)遠(yuǎn)程集群hive
- IDEA本地環(huán)境需要配置
pom.xml
依賴配置spark-core
慕淡,spark-sql
,spark-hive
依賴沸毁,其中spark依賴的依賴范圍使用默認(rèn)的compile
-
resources
集群hive配置文件hive-site.xml
指定metastore服務(wù)
的機(jī)器地址和端口號(hào) - 遠(yuǎn)程集群hive開(kāi)啟metastore服務(wù)
hive --service metastore -p 9083 &
引入項(xiàng)目依賴
<!--Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
hive-site.xml配置峰髓,其中hive.metastore.uris
指定metastore服務(wù)
運(yùn)行的機(jī)器ip和端口,并且需要單獨(dú)手動(dòng)啟動(dòng)metastore服務(wù)傻寂,客戶端連接metastore服務(wù),metastore再去連接MySQL數(shù)據(jù)庫(kù)來(lái)存取hive元數(shù)據(jù)携兵,元數(shù)據(jù)包含用Hive創(chuàng)建的database
疾掰、table
等的元信息。有了metastore服務(wù)徐紧,就可以有多個(gè)客戶端同時(shí)連接
静檬,而且這些客戶端不需要知道MySQL數(shù)據(jù)庫(kù)的用戶名和密碼,只需要連接metastore 服務(wù)即可并级。
<property>
<name>hive.metastore.uris</name>
<value>thrift://cloudera01:9083</value>
</property>
腳本測(cè)試使用Spark SQL連接遠(yuǎn)程hive拂檩,如果沒(méi)有權(quán)限訪問(wèn)hive表修改HADOOP_USER_NAME
,例如為hdfs
import org.apache.spark.sql.SparkSession
object test {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
// hive
val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
spark.sql("show databases").show()
val df = spark.sql("select * from test_gp.student_info")
df.show()
}
}