1.Hive數(shù)據(jù)導(dǎo)入的六種類型:
以下面兩個表來實驗:
create table emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
partitioned by (dt string)
row format delimited
fields terminated by '\t';
create external table dept (
deptno int,
dname string,
loc string)
partitioned by (dt string)
row format delimited
fields terminated by '\t';
1.從本地文件系統(tǒng)導(dǎo)入到hive表中:
使用load data語句可以直接導(dǎo)入本地文件到hive表中,加關(guān)鍵字local表示從本地系統(tǒng)上傳文件到hive表:
hive> load data local inpath '/home/natty.ma/bigdata/hadoop/files/emp.txt' overwrite into table testdb.emp1;
2.從HDFS導(dǎo)入文件到hive表中:
如果不加關(guān)鍵字local表示從hdfs加載文件到hive表。下面先上傳本地文件到hdfs,再加載到hive表:
$bin/hdfs dfs -put /home/natty.ma/bigdata/hadoop/files/emp.txt /user/natty.ma
hive> load data inpath '/user/natty.ma/emp.txt' overwrite into table testdb.emp partition (dt='20170228');
load語句實際上是 移動文件
3.加載數(shù)據(jù)覆蓋表中已有的數(shù)據(jù):
上邊2個語句中么夫,load data語句,OVERWRITE參數(shù)決定加載文件時是否覆蓋。
4.創(chuàng)建表時通過select語句加載:
hive> create table testdb.emp2 as select * from testdb.emp1;
create table ... as 語句會走mapreduce哺窄。
5.創(chuàng)建表,通過insert語句加載:
hive> insert into table testdb.emp3 select * from testdb.emp2;
可以增加overwrite選項账锹,來選擇insert時萌业,是否覆蓋原表的數(shù)據(jù)。如果不加overwrite參數(shù)再執(zhí)行一次該語句奸柬,那么emp3表的數(shù)據(jù)將會翻倍生年,查看該表hdfs的目錄,會發(fā)現(xiàn)有2個文件(而不是一個文件):
$bin/hdfs dfs -ls /user/hive/warehouse/testdb.db/emp3;
Found 2 items
-rwxrwxrwx 2 root supergroup 661 2017-02-28 10:55 /user/hive/warehouse/testdb.db/emp3/000000_0
-rwxrwxrwx 2 root supergroup 661 2017-02-28 10:57 /user/hive/warehouse/testdb.db/emp3/000000_0_copy_1
6.創(chuàng)建表的時候通過Location指定:
先創(chuàng)建一張表鸟缕,并指定該表的LOCATION晶框,之后往LOCATON目錄(HDFS上的路徑)上傳數(shù)據(jù)文件,再查詢表時懂从,就可以看到數(shù)據(jù)了授段。
create table emp4(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited
fields terminated by '\t'
LOCATION '/user/natty.ma/hive_test/emp';
$bin/hdfs dfs -put /home/natty.ma/bigdata/hadoop/files/emp.txt /user/natty.ma/hive_test/emp
PS: 我在實驗時發(fā)現(xiàn),如果先上傳文件番甩,再創(chuàng)建一個表侵贵,LOCATION目錄是上傳的HDFS目錄的話,上傳的目錄會被覆蓋清空缘薛。
2.Hive導(dǎo)出數(shù)據(jù)的幾種方式:
1.使用Insert語句導(dǎo)出到本地(或HDFS):
通過LOCAL關(guān)鍵字來確定輸出到本地或hdfs窍育。
hive> INSERT OVERWRITE LOCAL DIRECTORY '/home/natty.ma/bigdata/hadoop/output' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select * from testdb.emp4;
hive> insert overwrite directory '/user/natty.ma/hive_test/output' select * from testdb.emp4;
需要注意的是,在導(dǎo)出到Linux本地目錄后宴胧,需要cd到上一級目錄漱抓,再進入該目錄才能看到導(dǎo)出的文件。
2.hadoop命令導(dǎo)出:
使用get命令恕齐,直接將HDFS上的文件導(dǎo)出到Linux本地系統(tǒng)中乞娄。
hive> dfs -get /user/natty.ma/hive_test/emp/* /home/natty.ma/bigdata/hadoop/output/;
3.使用 hive -e "SQL" > xx.txt方式導(dǎo)出文件:
hive -e可以執(zhí)行執(zhí)行Hive SQL,將結(jié)果導(dǎo)出到文件即可显歧。
hive -e "select * from testdb.emp4" > /home/natty.ma/bigdata/hadoop/output/output.txt
4.使用 sqoop導(dǎo)出文件:
sqoop的使用在后邊會介紹仪或。
3.設(shè)置 reduce task數(shù)目和排序分析:
hive中有4種主要的排序方式:
- order by : 全局排序,對所有數(shù)據(jù)進行排序士骤。
- sort by :局部排序范删,對每個reduce中的數(shù)據(jù)進行排序,但是全局不排序拷肌。
- distribute by:按照某個字段來分區(qū)到旦,同一個分區(qū)的數(shù)據(jù)交給一個reduce來處理。
- cluster by:是distribute by ... sort by ...的組合巨缘,但是不同于上邊兩項,distribute by 和sort by必須是同一項带猴。
在hive中昔汉,可以設(shè)置執(zhí)行作業(yè)的reduce的數(shù)量:
set mapreduce.job.reduces = <NUMBER>
同時,reduce的數(shù)量拴清,也決定了結(jié)果輸出的文件的個數(shù)靶病。
hive> set mapreduce.job.reduces = 3;
hive> insert overwrite local directory '/home/natty.ma/bigdata/hadoop/output/' select * from testdb.emp4 distribute by deptno sort by empno;
上邊語句會直接輸出3個文件。如果set成5的話口予,當然會輸出5個文件娄周。
我在實驗過程中,也發(fā)現(xiàn)了一個文件沪停,如果執(zhí)行的select語句煤辨,不需要進行reduce階段裳涛。例如select * 或者select 部分字段,就只會生成一個文件了众辨。
4.設(shè)置哪些HQL執(zhí)行MapReduce:
在執(zhí)行HQL語句時端三,我們注意到,有些HQL需要執(zhí)行MapReduce鹃彻,而有些則不需要郊闯。對于是否執(zhí)行MapReduce可以進行配置≈胫辏可以在hive-site.xml或者通過set設(shè)置团赁。
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
<description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have
any aggregations or distincts (which incurs RS), lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
</description>
</property>
可以有minimal和more來進行選擇,首先谨履,可以不進行MR的查詢必須是單庫查詢欢摄、不含有任何子查詢、不包含聚合函數(shù)笋粟、distinct剧浸、視圖、關(guān)聯(lián)等矗钟。
1. minimal:
SELECT * 唆香, 只篩選partition字段, LIMIT子句吨艇。
那么類似的語句不會執(zhí)行MapReduce:
hive> select * from testdb.emp where dt>='20170227' limit 10;
但是 select empno from testdb.emp where dt>='20170227' limit 10; 會執(zhí)行MapReduce躬它。
PS: 在執(zhí)行此sql時,報了錯誤: org.apache.hadoop.hive.ql.ppd.ExprWalkerInfo.getConvertedNode
經(jīng)查證hive jira东涡,這是hive的一個bug冯吓,需要做如下設(shè)置,可以避免錯誤:
hive> set hive.optimize.ppd=false;
2. more :
如果配置的是more的模式疮跑,相比于minimal组贺,如果只select部分字段也不會走MR。
5.Hiveserver2的使用:
Hiverserver(Thrift Hive Server)是老版本的祖娘,為了解決同時響應(yīng)多客戶端請求的問題失尖,提供了升級的Hiveserver2版本。
首先渐苏,Hive是一個訪問HDFS的客戶端掀潮。但是Hive也是可以實現(xiàn)C/S結(jié)構(gòu)的,Hiveserver2服務(wù)開啟后琼富,就可以實現(xiàn)Server的功能仪吧,可以響應(yīng)其他(Hive Client端)的請求。那么其他客戶端可以連接Hiveserver2服務(wù)的Server鞠眉。
[官方 hiveserver2介紹][1]
[1]:https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2
1. 配置hive-site.xml:
啟動hiveserver2 服務(wù)薯鼠,需要配置這兩項(其余兩項默認值即可):
- hive.server2.thrift.port
- hive.server2.thrift.bind.host
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
<description>Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'.</description>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop-senior01.pmpa.com </value>
<description>Bind host on which to run the HiveServer2 Thrift service. </description>
</property>
2. 啟動hiveserver2服務(wù):
注意以后臺運行方式啟動hiveserver2择诈,啟動后監(jiān)控10000端口是否在監(jiān)聽來判斷是否啟動成功了。
$ hive --service hiveserver2 &
$ netstat -antp | grep 10000
3. 客戶端連接hiveserver:
可以以兩種方式連接hiveserver: JDBC和Beeline 出皇。
(1)Beeline方式(基于 SQLLine CLI的JDBC客戶端):
下面是以Beeline命令行方式登錄:
$ bin/beeline
beeline> help
beeline> !connect jdbc:hive2://hd-master:10000 root 123456
0: jdbc:hive2://hd-master:10000> show databases;
0: jdbc:hive2://hd-master:10000> select * from testdb.emp4;
beeline的命令在執(zhí)行時羞芍,需要在前邊加上!,help命令可以查看具體詳細用法恶迈。connect連接串的格式 jdbc:hive2://主機名(ip):端口號 用戶名 密碼
用戶名和密碼是搭建hadoop hive環(huán)境的用戶名和其密碼涩金。
(2)JDBC方式訪問:
使用JDBC方式訪問hive谱醇,classpath需要包含以下jar包:
- hadoop-common*.jar
- $HIVE_HOME/lib/*.jar
下面一個例子暇仲,訪問hive中的dept表,并提供了執(zhí)行更新語句的方式:
package com.pmpa.hiveserver.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;;
public class HiveJDBC {
private static String jdbcdriver = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) {
ArrayList<Dept> depts = new ArrayList<Dept>();
try {
Class.forName(jdbcdriver);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
try {
Connection con =
DriverManager.getConnection("jdbc:hive2://localhost:10000/testdb","root", "123456");
Statement stmt = con.createStatement();
//無返回結(jié)果的更新元數(shù)據(jù)的語句副渴。
String drop_str = "drop table if exists jdbc_test";
stmt.execute(drop_str);
String create_str = "create table jdbc_test (id int, name string, url string)";
stmt.execute(create_str);
//有返回結(jié)果集的查詢語句
String query_str = "select * from dept";
System.out.println("Running: " + query_str);
ResultSet rs = stmt.executeQuery(query_str);
while(rs.next())
{
depts.add(new Dept(rs.getInt(1),rs.getString(2)
,rs.getString(3),rs.getString(4)));
}
System.out.println("Complete Running: " + query_str);
System.out.print(depts);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
其中奈附,Dept是hive表testdb.dept的POJO類。
為了加載classpath煮剧,使用下面的shell來配置classpath:
#!/bin/bash
for i in `ls ${HADOOP_HOME}/share/hadoop/common/*.jar`
do
CLASSPATH=${i}:${CLASSPATH}
done
for i in `ls ${HIVE_HOME}/lib/*.jar`; do
CLASSPATH=${i}:${CLASSPATH}
done
java -cp ${CLASSPATH} com.pmpa.hiveserver.jdbc.HiveJDBC
PS:需要注意如果打成jar包斥滤,不能使用 java -cp ${classpath} -jar xx.jar xxx 方式,這樣的話無法加載classpath勉盅。
java -classpath some.jar -jar test.jar
這種方式是不行的佑颇,因為使用classpath指定的jar是由AppClassloader來加載,java 命令 加了-jar 參數(shù)以后草娜,AppClassloader就只關(guān)注test.jar范圍內(nèi)的class了挑胸,classpath參數(shù)失效。
6. Hive UDF開發(fā):
1. UDF介紹
除了Hive中提供的一些基本函數(shù)min宰闰、max等等外茬贵。還可以自行開發(fā)定義函數(shù)UDF。UDF函數(shù)可以直接應(yīng)用于select語句移袍。需要特別注意以下幾點:
- 自定義UDF需要繼承類org.apache.hadoop.hive.ql.exec.UDF
- 需要實現(xiàn)evaluate()方法解藻,該方法可以重載,不同的重載方法葡盗,對應(yīng)著該函數(shù)不同參數(shù)用法螟左。
- UDF必須要有返回類型,也就是evaluate()方法觅够,可以返回null值路狮,但是該方法不可以是void的。
- 建議使用hadoop中定義的數(shù)據(jù)類型蔚约,例如Text奄妨、LongWritable等,不建議使用java數(shù)據(jù)類型苹祟。
2. UDF開發(fā)
下面以一個簡單的函數(shù)為例砸抛,展示Hive UDF的開發(fā)過程评雌。
函數(shù)功能: 輸入20170312 返回值 '2017-03-12'。下面是詳細步驟:
(1)開發(fā)GeneralTest類:
package com.pmpa.hiveserver.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class GeneralTest extends UDF {
private Text out = new Text();
public Text evaluate(LongWritable date_int){
String input = date_int.toString();
String output = input.substring(0,4) + "-" + input.substring(4,6) + "-"
+ input.substring(6,8) ;
out.set(output);
return out;
}
}
(2)java程序打成jar包直焙,并發(fā)布到目標機器上景东。
上傳到目錄:/home/natty.ma/bigdata/hadoop/javaDev/GeneralTest.jar
(3)在hive客戶端添加jar包:
hive> add jar /home/natty.ma/bigdata/hadoop/javaDev/GeneralTest.jar;
(4)創(chuàng)建臨時函數(shù):
hive> CREATE TEMPORARY FUNCTION general_test_mn AS 'com.pmpa.hiveserver.udf.GeneralTest';
(5)函數(shù)測試:
hive> SELECT general_test_mn(20160203);
OK
2016-02-03
3. UDF類型
UDF只能實現(xiàn)“一進一出”操作,如果要“多進一出”需要實現(xiàn)UDAF奔誓。 所謂一進一出斤吐,是輸入一行,輸出一行厨喂,例如上邊的例子和措。多進一出就是所謂的聚合函數(shù)。將多行輸入記錄蜕煌,聚合輸出一行派阱。類似于sum()。
UDF包含以下類型:
- UDF: User Defined Function 斜纪, 一進一出贫母。
- UDAF: User Defined Aggregation Function , 多進一出盒刚,聚合函數(shù)腺劣。 例如sum,count因块。
- UDTF: User Defined Table Function橘原, 一進多出,例如lateral view explore()
下面的文章比較清晰地介紹了這幾種UDF的開發(fā)方法:
4. UDF開發(fā)(繼承GenericUDF)
5. UDTF開發(fā):
開發(fā)udtf贮聂,需要繼承類:org.apache.hadoop.hive.ql.udf.generic.GenericUDTF靠柑。
為了更好地理解UDTF,可以參考hive內(nèi)置的一個UDTF吓懈,即explode歼冰、json_tuple函數(shù)等。
需要重載3個方法
// in this method we specify input and output parameters: input ObjectInspector and an output struct
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
// here we process an input record and write out any resulting records
abstract void process(Object[] record) throws HiveException;
// this function is Called to notify the UDTF that there are no more rows to process. Clean up code or additional output can be produced here.
abstract void close() throws HiveException;
initialize()方法指定輸入和輸出參數(shù):輸入?yún)?shù)耻警,ObjectInspector隔嫡;輸出參數(shù):struct。
process()方法處理輸入記錄甘穿,并生成結(jié)果記錄腮恩。
close()方法表示沒有記錄需要處理。