[TOC]
一柴罐、UDF 介紹
UDF(User-Defined Functions)即是用戶自定義的hive函數(shù)讯嫂。當 Hive 自帶的函數(shù)并不能完全滿足業(yè)務的需求耕漱,這時可以根據(jù)具體需求自定義函數(shù)。UDF 函數(shù)可以直接應用于 select 語句村刨,對查詢結構做格式化處理后淹冰,再輸出內容。
Hive 自定義函數(shù)包括三種:
- UDF: one to one ,進來一個出去一個谨胞,row mapping, 如:upper蒜鸡、substr函數(shù)胯努;
- UDAF(A:aggregation): many to one牢裳,進來多個出去一個,row mapping叶沛,如sum/min蒲讯;
- UDTF(T:table-generating):one to mang,進來一個出去多行灰署,如 lateral view 與 explode 判帮。
注解使用:
@Describtion 注解是可選的,用于對函數(shù)進行說明溉箕,其中的 FUNC 字符串表示函數(shù)名晦墙,當使用 DESCRIBE FUNCTION 命令時,替換成函數(shù)名肴茄。@Describtion包含三個屬性:
- name:用于指定 Hive 中的函數(shù)名晌畅。
- value:用于描述函數(shù)的參數(shù)。
- extended:額外的說明寡痰,如抗楔,給出示例,當使用 DESCRIBE FUNCTION EXTENDED name 的時候打印拦坠。
二连躏、UDF
開發(fā)自定義 UDF 函數(shù)有兩種方式:
- 如果函數(shù)讀和返回都是基礎數(shù)據(jù)類型,即 Hadoop 和 Hive 的基本類型贞滨,如入热,Text、IntWritable晓铆、LongWritable才顿、DoubleWritable 等,那么繼承 org.apache.hadoop.hive.ql.exec.UDF 尤蒿;
- 如果用來操作內嵌數(shù)據(jù)結構郑气,如 Map,List 和 Set腰池,則繼承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF尾组;
2.1、簡單 UDF
用簡單 UDF API 來構建一個 UDF 只涉及到編寫一個類繼承實現(xiàn)一個方法(evaluate)示弓,下面的例子來自 《Hive 編程指南》讳侨,將表中的生日字段轉換為星座。
@UDFType
@Description(
name = "zodiac",
value = "_FUNC_ (date) - " +
" from the input date string " +
" or separate month and day arguments, \n" +
" returns the sign of the Zodiac.",
extended = "Example :\n" +
"> SELECT _FUNC_ (date_string) FROM src;\n" +
"> SELECT _FUNC_ (month, day) FROM src;")
public class UDFZodiacSign extends UDF {
private static final String ERROR_DATE_OF_MONTH = "invalid date of specify month";
private static final String ERROR_MONTH_ARGS = "invalid argument of month";
private static final String ERROR_DATE_STRING = "invalid date format";
public String evaluate(Date bday) {
return this.evaluate(bday.getMonth() + 1, bday.getDate());
}
public String evaluate(String dateString) {
DateTime dateTime;
try {
dateTime = new DateTime(dateString);
} catch (Exception e) {
return ERROR_DATE_STRING;
}
return this.evaluate(dateTime.getMonthOfYear(), dateTime.getDayOfMonth());
}
public String evaluate(Integer month, Integer day) {
switch (month) {
//判斷是幾月
case 1:
//判斷是當前月的哪一段時間奏属;然后就可以得到星座了跨跨;下面代碼都一樣的
if (day > 0 && day < 20) {
return "魔蝎座";
} else if (day < 32) {
return "水瓶座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 2:
if (day > 0 && day < 19) {
return "水瓶座";
} else if (day < 29) {
return "雙魚座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 3:
if (day > 0 && day < 21) {
return "雙魚座";
} else if (day < 32) {
return "白羊座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 4:
if (day > 0 && day < 20) {
return "白羊座";
} else if (day < 31) {
return "金牛座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 5:
if (day > 0 && day < 21) {
return "金牛座";
} else if (day < 32) {
return "雙子座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 6:
if (day > 0 && day < 22) {
return "雙子座";
} else if (day < 31) {
return "巨蟹座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 7:
if (day > 0 && day < 23) {
return "巨蟹座";
} else if (day < 32) {
return "獅子座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 8:
if (day > 0 && day < 23) {
return "獅子座";
} else if (day < 32) {
return "處女座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 9:
if (day > 0 && day < 23) {
return "處女座";
} else if (day < 31) {
return "天平座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 10:
if (day > 0 && day < 24) {
return "天平座";
} else if (day < 32) {
return "天蝎座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 11:
if (day > 0 && day < 23) {
return "天蝎座";
} else if (day < 31) {
return "射手座";
} else {
return ERROR_DATE_OF_MONTH;
}
case 12:
if (day > 0 && day < 22) {
return "射手座";
} else if (day < 32) {
return "摩羯座";
} else {
return ERROR_DATE_OF_MONTH;
}
default:
return ERROR_MONTH_ARGS;
}
}
}
測試一下:
public class UDFZodiacSignTest {
@Test
public void testUDFZodiacSign() {
UDFZodiacSign example = new UDFZodiacSign();
Assert.assertEquals("魔蝎座", example.evaluate(1, 1));
Assert.assertEquals("魔蝎座", example.evaluate("2019-01-01"));
}
}
2.2、復雜 GenericUDF
GenericUDF API 提供了一種方法去處理那些不是可寫類型的對象,例如:struct勇婴,map 和 array 類型忱嘹。
這個 API 需要用戶親自為函數(shù)的參數(shù)管理對象存儲格式,驗證接收的參數(shù)的數(shù)量與類型耕渴。
這個 API 要求實現(xiàn)以下方法:
// 這個類似于簡單 API 的 evaluate 方法拘悦,它可以讀取輸入數(shù)據(jù)和返回結果
abstract Object evaluate(GenericUDF.DeferredObject[] arguments);
// 該方法應當是描述該 UDF 的字符串,顯示函數(shù)的提示信息
abstract String getDisplayString(String[] children);
// 只調用一次橱脸,在任何 evaluate() 調用之前础米,可以接收到一個可以表示函數(shù)輸入參數(shù)類型的 object inspectors 數(shù)組
// 是用來驗證該函數(shù)是否接收正確的參數(shù)類型和參數(shù)個數(shù)的地方
abstract ObjectInspector initialize(ObjectInspector[] arguments);
例子同樣來自 《Hive 編程指南》,編寫一個用戶自定義函數(shù)添诉,稱之為nvl()屁桑,這個函數(shù)傳入的值如果是 null,那么就返回一個默認值栏赴。
函數(shù) nvl() 要求有 2 個參數(shù)蘑斧。如果第 1 個參數(shù)是非null值,那么就返回這個值艾帐;如果第 1 個參數(shù)是 null,那么就返回第 2 個參數(shù)的值盆偿。
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@Description(name = "nvl",
value = "_FUNC_(value, default_value) - Returns default value if value is null else returns value",
extended = "Example:\n"
+ " > SELECT _FUNC_(null, 'bla') FROM src limit 1; \n")
public class GenericUDFNvl extends GenericUDF {
private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
private ObjectInspector[] argumentOIs;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
argumentOIs = arguments;
// 1.檢驗參數(shù)個數(shù)
if (arguments.length != 2) {
throw new UDFArgumentException("The operator 'NVL' accepts 2 arguments.");
}
// 2.檢驗參數(shù)類型
returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
if (!(returnOIResolver.update(arguments[0]) && returnOIResolver.update(arguments[1]))) {
throw new UDFArgumentTypeException(2, "The 1st and 2nd args of function NLV should have the same type, "
+ "but they are different: \"" + arguments[0].getTypeName() + "\" and \"" + arguments[1].getTypeName() + "\"");
}
// 3.返回類型柒爸,和傳入的參數(shù)類型一致
return returnOIResolver.get();
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(), argumentOIs[0]);
if (retVal == null) {
retVal = returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);
}
return retVal;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("if ");
sb.append(children[0]);
sb.append(" is null ");
sb.append("returns ");
sb.append(children[1]);
return sb.toString();
}
}
測試一下:
public class GenericUDFNvlTest {
@Test
public void testGenericUDFNvl() throws HiveException {
// 建立需要的模型
GenericUDFNvl example = new GenericUDFNvl();
ObjectInspector stringOI1 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
ObjectInspector stringOI2 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
StringObjectInspector resultInspector = (StringObjectInspector) example.initialize(new ObjectInspector[]{stringOI1, stringOI2});
// 測試結果
Object result1 = example.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject(null), new GenericUDF.DeferredJavaObject("a")});
Assert.assertEquals("a", resultInspector.getPrimitiveJavaObject(result1));
// 測試結果
Object result2 = example.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject("dd"), new GenericUDF.DeferredJavaObject("a")});
Assert.assertNotEquals("a", resultInspector.getPrimitiveJavaObject(result2));
}
}
三、UDAF
PS:該段部分來自 Hive UDAF開發(fā)詳解
UDAF 開發(fā)主要涉及到以下兩個抽象類:
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
大致上事扭,UDAF 函數(shù)讀取數(shù)據(jù)(mapper)捎稚,聚集一堆 mapper 輸出到部分聚集結果(combiner),并且最終創(chuàng)建一個最終的聚集結果(reducer)求橄。因為需要對多個combiner 進行聚集今野,所以需要保存部分聚集結果。
3.1罐农、AbstractGenericUDAFResolver
Resolver 要覆蓋實現(xiàn) getEvaluator 方法条霜,該方法會根據(jù) sql 傳人的參數(shù)數(shù)據(jù)格式指定調用哪個 Evaluator 進行處理。
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)
throws SemanticException {
throw new SemanticException(
"This UDAF does not support the deprecated getEvaluator() method.");
}
3.2涵亏、GenericUDAFEvaluator
UDAF 邏輯處理主要發(fā)生在 Evaluator 中宰睡,要實現(xiàn)該抽象類的幾個方法。理解Evaluator 之前气筋,先介紹 ObjectInspector 接口與 GenericUDAFEvaluator 中的內部類 Model拆内。
ObjectInspector:主要是解耦數(shù)據(jù)使用與數(shù)據(jù)格式,使數(shù)據(jù)流在輸入輸出端可以切換不同的輸入輸出格式宠默,不同的 Operator上使用不同的格式麸恍。
-
Model:Model 代表了 UDAF 在 mapreduce 的各個階段。
public static enum Mode { /** * PARTIAL1: 這個是mapreduce的map階段:從原始數(shù)據(jù)到部分數(shù)據(jù)聚合 * 將會調用iterate()和terminatePartial() */ PARTIAL1, /** * PARTIAL2: 這個是mapreduce的map端的Combiner階段搀矫,負責在map端合并map的數(shù)據(jù):從部分數(shù)據(jù)聚合到部分數(shù)據(jù)聚合 * 將會調用merge() 和 terminatePartial() */ PARTIAL2, /** * FINAL: mapreduce的reduce階段:從部分數(shù)據(jù)的聚合到完全聚合 * 將會調用merge()和terminate() */ FINAL, /** * COMPLETE: 如果出現(xiàn)了這個階段抹沪,表示mapreduce只有map刻肄,沒有reduce,所以map端就直接出結果了:從原始數(shù)據(jù)直接到完全聚合 * 將會調用 iterate()和terminate() */ COMPLETE };
一般情況下采够,完整的 UDAF 邏輯是一個 mapreduce 過程肄方,如果有mapper 和reducer,就會經歷 PARTIAL1(mapper)蹬癌,F(xiàn)INAL(reducer)权她,如果還有 combiner,那就會經歷 PARTIAL1(mapper)逝薪,PARTIAL2(combiner)隅要,F(xiàn)INAL(reducer)。
而有一些情況下的 mapreduce董济,只有mapper步清,而沒有 reducer,所以就會只有COMPLETE 階段虏肾,這個階段直接輸入原始數(shù)據(jù)廓啊,出結果。
3.3封豪、GenericUDAFEvaluator 的方法
// 確定各個階段輸入輸出參數(shù)的數(shù)據(jù)格式 ObjectInspectors谴轮,一般負責初始化內部字段,通常初始化用來存放最終結果的變量
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// 保存數(shù)據(jù)聚集結果的類
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重置聚集結果
public void reset(AggregationBuffer agg) throws HiveException;
// map階段,迭代處理輸入sql傳過來的列數(shù)據(jù)
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// map與combiner結束返回結果吹埠,得到部分數(shù)據(jù)聚集結果
public Object terminatePartial(AggregationBuffer agg) throws HiveException;
// combiner合并map返回的結果第步,還有reducer合并mapper或combiner返回的結果。
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// reducer階段缘琅,輸出最終結果
public Object terminate(AggregationBuffer agg) throws HiveException;
3.4粘都、圖解Model與Evaluator關系
Model 各階段對應 Evaluator 方法調用
Evaluator 各個階段下處理 mapreduce 流程
3.5、編碼實例
下面的函數(shù)代碼是計算指定列中字符的總數(shù)(包括空格):
/**
* @author Administrator
*/
@Description(
name = "letters",
value = "_FUNC_(expr) - 返回該列中所有字符串的字符總數(shù)")
public class GenericUDAFTotalNumOfLetters extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Exactly one argument is expected.");
}
ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Argument must be PRIMITIVE, but "
+ oi.getCategory().name()
+ " was passed.");
}
PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;
if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentTypeException(0,
"Argument must be String, but "
+ inputOI.getPrimitiveCategory().name()
+ " was passed.");
}
return new TotalNumOfLettersEvaluator();
}
public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {
PrimitiveObjectInspector inputOI;
PrimitiveObjectInspector integerOI;
private IntWritable result;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
super.init(m, parameters);
result = new IntWritable(0);
inputOI = (PrimitiveObjectInspector) parameters[0];
integerOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
// 指定各個階段輸出數(shù)據(jù)格式都為Integer類型
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
/**
* 存儲當前字符總數(shù)的類
*/
static class LetterSumAgg implements AggregationBuffer {
int sum = 0;
void add(int num) {
sum += num;
}
}
/**
* 創(chuàng)建新的聚合計算的需要的內存刷袍,用來存儲mapper,combiner,reducer運算過程中的相加總和翩隧。
*/
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
LetterSumAgg sum = new LetterSumAgg();
reset(sum);
return sum;
}
/**
* mapreduce支持mapper和reducer的重用,所以為了兼容呻纹,也需要做內存的重用鸽心。
*/
@Override
public void reset(AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg;
myagg.sum = 0;
}
/**
* map階段調用,把保存當前和的對象agg居暖,再加上輸入的參數(shù)傳入顽频。
*/
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
if (parameters[0] != null) {
LetterSumAgg myagg = (LetterSumAgg) agg;
Object p1 = inputOI.getPrimitiveJavaObject(parameters[0]);
myagg.add(String.valueOf(p1).length());
}
}
/**
* mapper 結束要返回的結果,還有 combiner 結束返回的結果
*/
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
/**
* combiner合并map返回的結果太闺,還有reducer合并mapper或combiner返回的結果糯景。
*/
@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
LetterSumAgg myagg = (LetterSumAgg) agg;
myagg.sum += PrimitiveObjectInspectorUtils.getInt(partial, integerOI);
}
}
/**
* reducer返回結果,或者是只有mapper,沒有reducer時蟀淮,在mapper端返回結果最住。
*/
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg;
result.set(myagg.sum);
return result;
}
}
}
測試:
public class GenericUDAFTotalNumOfLettersTest {
private GenericUDAFTotalNumOfLetters example;
private GenericUDAFEvaluator evaluator;
private ObjectInspector[] output;
private PrimitiveObjectInspector[] poi;
GenericUDAFTotalNumOfLetters.TotalNumOfLettersEvaluator.LetterSumAgg agg;
Object[] param1 = {"tom"};
Object[] param2 = {"tomT"};
Object[] param3 = {"wu kong"};
Object[] param4 = {"wu le"};
@Before
public void setUp() throws Exception {
example = new GenericUDAFTotalNumOfLetters();
//All the data are String
String[] typeStrs = {"string"/*, "string", "string"*/};
TypeInfo[] types = makePrimitiveTypeInfoArray(typeStrs);
evaluator = example.getEvaluator(types);
poi = new PrimitiveObjectInspector[1];
poi[0] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveObjectInspector.PrimitiveCategory.STRING);
/* poi[1] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveObjectInspector.PrimitiveCategory.STRING);
poi[2] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveObjectInspector.PrimitiveCategory.STRING);*/
//The output inspector
output = new ObjectInspector[1];
output[0] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveObjectInspector.PrimitiveCategory.INT);
/*output[0] = ObjectInspectorFactory.getStandardListObjectInspector(poi[0]);*/
agg = (GenericUDAFTotalNumOfLetters.TotalNumOfLettersEvaluator.LetterSumAgg) evaluator.getNewAggregationBuffer();
}
@After
public void tearDown() throws Exception {
}
@Test(expected = UDFArgumentTypeException.class)
public void testGetEvaluateorWithComplexTypes() throws Exception {
TypeInfo[] types = new TypeInfo[1];
types[0] = TypeInfoFactory.getListTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"));
example.getEvaluator(types);
}
@Test(expected = UDFArgumentTypeException.class)
public void testGetEvaluateorWithNotSupportedTypes() throws Exception {
TypeInfo[] types = new TypeInfo[1];
types[0] = TypeInfoFactory.getPrimitiveTypeInfo("boolean");
example.getEvaluator(types);
}
@Test(expected = UDFArgumentTypeException.class)
public void testGetEvaluateorWithMultiParams() throws Exception {
String[] typeStrs3 = {"double", "int", "string"};
TypeInfo[] types3 = makePrimitiveTypeInfoArray(typeStrs3);
example.getEvaluator(types3);
}
@Test
public void testIterate() throws HiveException {
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
evaluator.reset(agg);
evaluator.iterate(agg, param1);
Assert.assertEquals(3, agg.sum);
evaluator.iterate(agg, param2);
Assert.assertEquals(7, agg.sum);
evaluator.iterate(agg, param3);
Assert.assertEquals(14, agg.sum);
}
@Test
public void testTerminatePartial() throws Exception {
testIterate();
Object partial = evaluator.terminatePartial(agg);
Assert.assertTrue(partial instanceof IntWritable);
Assert.assertEquals(new IntWritable(14), partial);
}
@Test
public void testMerge() throws Exception {
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
evaluator.reset(agg);
evaluator.iterate(agg, param1);
evaluator.iterate(agg, param2);
Object partial1 = evaluator.terminatePartial(agg);
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
evaluator.reset(agg);
evaluator.iterate(agg, param3);
Object partial2 = evaluator.terminatePartial(agg);
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
evaluator.reset(agg);
evaluator.iterate(agg, param4);
Object partial3 = evaluator.terminatePartial(agg);
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, output);
evaluator.reset(agg);
evaluator.merge(agg, partial1);
Assert.assertEquals(7, agg.sum);
evaluator.merge(agg, partial2);
Assert.assertEquals(14, agg.sum);
evaluator.merge(agg, partial3);
Assert.assertEquals(19, agg.sum);
}
@Test
public void testTerminate() throws Exception {
evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, poi);
evaluator.reset(agg);
evaluator.iterate(agg, param1);
evaluator.iterate(agg, param2);
evaluator.iterate(agg, param3);
evaluator.iterate(agg, param4);
Object term = evaluator.terminate(agg);
Assert.assertTrue(term instanceof IntWritable);
Assert.assertEquals(term, new IntWritable(19));
}
/**
* Generate some TypeInfo from the typeStrs
*/
private TypeInfo[] makePrimitiveTypeInfoArray(String[] typeStrs) {
int len = typeStrs.length;
TypeInfo[] types = new TypeInfo[len];
for (int i = 0; i < len; i++) {
types[i] = TypeInfoFactory.getPrimitiveTypeInfo(typeStrs[i]);
}
return types;
}
}
四、UDTF
Hive 中 UDTF 可以將一行轉成一行多列怠惶,也可以將一行轉成多行多列涨缚,使用頻率較高。
一個 UDTF 必須繼承 GenericUDTF 抽象類然后實現(xiàn)抽象類中的 initialize策治,process脓魏,和 close方法。
- initialize:確定傳入參數(shù)的類型并確定 UDTF 生成表的每個字段的數(shù)據(jù)類型(即輸入類型和輸出類型)通惫,主要是判斷輸入類型并確定返回的字段類型茂翔。
- process:調用了 initialize() 后,Hive 將把 UDTF 參數(shù)傳給 process() 方法履腋,處理一條輸入記錄珊燎,輸出若干條結果記錄,該方法中遵湖,每一次調用 forward() 產生一行悔政;如果產生多列可以將多個列的值放在一個數(shù)組中,然后將該數(shù)組傳入到 forward() 函數(shù)延旧。
- close:在 process 調用結束后調用谋国,用于進行其它一些額外操作,只執(zhí)行一次垄潮。
public class GenericUDTFNameParserGeneric extends GenericUDTF {
private PrimitiveObjectInspector stringOI = null;
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("GenericUDTFNameParserGeneric() takes exactly one argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
&& ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentException("GenericUDTFNameParserGeneric() takes a string as a parameter");
}
// 輸入格式(inspectors)
stringOI = (PrimitiveObjectInspector) args[0];
// 輸出格式(inspectors) -- 有兩個屬性的對象
List<String> fieldNames = new ArrayList<>(2);
List<ObjectInspector> fieldOIs = new ArrayList<>(2);
fieldNames.add("name");
fieldNames.add("surname");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
private ArrayList<Object[]> processInputRecord(String name) {
ArrayList<Object[]> result = new ArrayList<>();
// 忽略null值與空值
if (name == null || name.isEmpty()) {
return result;
}
String[] tokens = name.split("\\s+");
if (tokens.length == 2) {
result.add(new Object[]{tokens[0], tokens[1]});
} else if (tokens.length == 4 && tokens[1].equals("and")) {
result.add(new Object[]{tokens[0], tokens[3]});
result.add(new Object[]{tokens[2], tokens[3]});
}
return result;
}
@Override
public void process(Object[] record) throws HiveException {
final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();
ArrayList<Object[]> results = processInputRecord(name);
for (Object[] r : results) {
forward(r);
}
}
@Override
public void close() throws HiveException {
// do nothing
}
}
五烹卒、UDF 使用
5.1闷盔、準備步驟
數(shù)據(jù)準備:
cat ./people.txt
John Smith
John and Ann White
Ted Green
Dorothy
把該文件上載到 hdfs 目錄 /user/wqf 中:
hadoop fs -mkdir /user/wqf/people
hadoop fs -put ./people.txt /user/wqf/people
然后創(chuàng)建 hive 外部表弯洗,在 hive shell 中執(zhí)行:
CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
ESCAPED BY ''
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/wqf/people';
maven pom 中添加如下配置,然后運行 mvn assembly:assembly
:
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
將 jar 包上傳到 hive 服務器逢勾。
5.2牡整、臨時添加 UDF
進入 hive 中:
hive> add jar /home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar
Added [/home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar] to class path
Added resources: [/home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar]
hive > CREATE TEMPORARY FUNCTION myNvl as 'me.w1992wishes.hive.udf.GenericUDFNvl';
hive> select myNvl(name, 'a') from people limit 1;
OK
John Smith
hive> CREATE TEMPORARY FUNCTION myCount as 'me.w1992wishes.hive.udf.GenericUDAFTotalNumOfLetters';
hive> select myCount(name) from people;
OK
44
hive> CREATE TEMPORARY FUNCTION myParser as 'me.w1992wishes.hive.udf.GenericUDTFNameParser';
hive> select myParser(name) from people;
OK
John Smith
John White
Ann White
Ted Green
Time taken: 0.18 seconds, Fetched: 4 row(s)
這種方式在會話結束后,函數(shù)自動銷毀溺拱,因此每次打開新的會話逃贝,都需要重新 add jar
并且 CREATE TEMPORARY FUNCTION
。
5.3迫摔、永久添加 UDF
不能是本地 jar 包沐扳,需要上傳 jar 包到 hdfs 目錄中:
hadoop fs -put hive-udf-1.0-SNAPSHOT.jar /user/hive/jars
然后進入 hive 中,創(chuàng)建函數(shù):
hive> create function myCount as 'me.w1992wishes.hive.udf.GenericUDAFTotalNumOfLetters' using jar 'hdfs:/user/hive/jars/hive-udf-1.0-SNAPSHOT.jar';
OK
44
六句占、參考資料
1.《Hive 編程指南》
2.Hive UDAF開發(fā)詳解