HIVE自定義函數(shù)類(lèi)型
1)Hive 自帶了一些函數(shù)蟋座,比如:max/min等拗踢,但是數(shù)量有限,自己可以通過(guò)自定義UDF來(lái)方便的擴(kuò)展向臀。
2)當(dāng)Hive提供的內(nèi)置函數(shù)無(wú)法滿(mǎn)足你的業(yè)務(wù)處理需要時(shí)巢墅,此時(shí)就可以考慮使用用戶(hù)自定義函數(shù)(UDF:user-defined function)。
3)根據(jù)用戶(hù)自定義函數(shù)類(lèi)別分為以下三種:
(1)UDF(User-Defined-Function)
一進(jìn)一出
(2)UDAF(User-Defined Aggregation Function)
聚集函數(shù),多進(jìn)一出
類(lèi)似于:count/max/min
(3)UDTF(User-Defined Table-Generating Functions)
一進(jìn)多出
如lateral view explore()
4)官方文檔地址
https://cwiki.apache.org/confluence/display/Hive/HivePlugins
5)編程步驟:
(1)繼承org.apache.hadoop.hive.ql.exec.UDF
(2)需要實(shí)現(xiàn)evaluate函數(shù)君纫;evaluate函數(shù)支持重載驯遇;
(3)在hive的命令行窗口創(chuàng)建函數(shù)
添加jar
add jar linux_jar_path
創(chuàng)建function
create [temporary] function [dbname.]function_name AS class_name;
(4)在hive的命令行窗口刪除函數(shù)
Drop [temporary] function [if exists] [dbname.]function_name;
6)注意事項(xiàng):UDF必須要有返回類(lèi)型,可以返回null蓄髓,但是返回類(lèi)型不能為void叉庐;
1)在項(xiàng)目中是否自定義過(guò)UDF、UDTF函數(shù)会喝,以及用他們處理了什么問(wèn)題陡叠,及自定義步驟?
(1)用UDF函數(shù)解析公共字段肢执;用UDTF函數(shù)解析事件字段匾竿。
(2)自定義UDF:繼承UDF,重寫(xiě)evaluate方法
(3)自定義UDTF:繼承自GenericUDTF蔚万,重寫(xiě)3個(gè)方法:initialize(自定義輸出的列名和類(lèi)型)岭妖,process(將結(jié)果返回forward(result)),close
2)為什么要自定義UDF/UDTF反璃?
因?yàn)樽远x函數(shù)昵慌,可以自己埋點(diǎn)Log打印日志,出錯(cuò)或者數(shù)據(jù)異常淮蜈,方便調(diào)試斋攀。
示例:
1.Maven 依賴(lài)
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
UDF 一進(jìn)一出
進(jìn)行數(shù)據(jù)結(jié)構(gòu)的轉(zhuǎn)換
把JSON數(shù)組字符串轉(zhuǎn)換為 hive 中結(jié)構(gòu)體格式
jsonstr ==> array<struct<action_id:string,item:string,item_type:string,ts:bigint>>
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.List;
@Description(name = "json_array_to_struct_array", value = "-- convert json_array to struct_array")
public class JsonArrayToStructArray extends GenericUDF {
/*
對(duì)輸入檢測(cè)
返回輸出的值的對(duì)象檢測(cè)器
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 1. 對(duì)輸入檢測(cè)
if (arguments.length < 3) {
throw new UDFArgumentException("參數(shù)個(gè)數(shù)必須執(zhí)行3個(gè)");
}
for (ObjectInspector argument : arguments) {
if (!"string".equals(argument.getTypeName())) {
throw new UDFArgumentException("參數(shù)必須是 string");
}
}
// 2. 返回輸出的值的對(duì)象檢測(cè)器
// array(struct(k:v, k:v), struct(...))
List<String> fieldNames = new ArrayList<>(); // 結(jié)構(gòu)體的每個(gè)k的名字
List<ObjectInspector> oiList = new ArrayList<>(); // 結(jié)構(gòu)體的每個(gè)k的名字
int size = arguments.length;
/*for (int i = 1; i < (size - 1) / 2 + 1; i++) {
String fieldName = getConstantStringValue(arguments, i).split(":")[0];
fieldNames.add(fieldName);
}*/
for (int i = (size - 1) / 2 + 1; i < size; i++) {
String fieldName = getConstantStringValue(arguments, i).split(":")[0];
fieldNames.add(fieldName);
// 不同的類(lèi)型, 使用不同的檢測(cè)器
String type = getConstantStringValue(arguments, i).split(":")[1];
switch (type) {
case "string":
oiList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
break;
case "int":
oiList.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
break;
case "bigint":
oiList.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
break;
default:
throw new UDFArgumentException("未知的不支持的類(lèi)型....");
}
}
return ObjectInspectorFactory
.getStandardListObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, oiList));
}
/*
對(duì)傳入的數(shù)據(jù)做計(jì)算, 返回函數(shù)最終的值
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (arguments[0].get() == null) {
return null;
}
// 1.獲取傳入的json數(shù)組
String jsonArrayString = arguments[0].get().toString();
JSONArray jsonArray = new JSONArray(jsonArrayString);
// 2. 解析數(shù)組中的數(shù)據(jù)
// 2.1 最終的數(shù)組
List<List<Object>> result = new ArrayList<>();
// 2.2 解析出來(lái)需要的每個(gè)結(jié)構(gòu)體
for(int i = 0; i < jsonArray.length(); i++){
List<Object> struct = new ArrayList<>();
result.add(struct);
JSONObject obj = jsonArray.getJSONObject(i);
// 表示結(jié)構(gòu)體應(yīng)該有多個(gè)少個(gè)字段
for(int j = 1; j < (arguments.length - 1)/2 + 1; j++){
// 獲取字段名
String name = arguments[j].get().toString();
if(obj.has(name)){
struct.add(obj.get(name));
}else{
struct.add(null);
}
}
/*
{
"displayType":"promotion",
"item":"3",
"item_type":"sku_id",
"order":1
}
json_array_to_struct_array(
get_json_object(line,'$.actions'),
'action_id',
'item',
'item_type',
'ts',
'action_id:string',
'item:string',
'item_type:string',
'ts:bigint')
array(struct(..), struct(....))
*/
}
return result;
}
/*
select a(...)
返回要展示的字符串
*/
@Override
public String getDisplayString(String[] children) {
return getStandardDisplayString("json_array_to_struct_array", children);
}
}
/*
[
{
"displayType":"promotion",
"item":"3",
"item_type":"sku_id",
"order":1
},
{
"displayType":"promotion",
"item":"1",
"item_type":"sku_id",
"order":2
},
{
"displayType":"query",
"item":"7",
"item_type":"sku_id",
"order":3
},
{
"displayType":"promotion",
"item":"5",
"item_type":"sku_id",
"order":4
}
]
*/
UDTF 一進(jìn)多出
例如:把JSON數(shù)組:
[{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610},
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}]
轉(zhuǎn)換為多行JSON對(duì)象:
{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610}
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
/**
* udtf: 一進(jìn)多出
* 1進(jìn): [{}, {}]
* 多出: "{}", "{}"
*/
@Description(name = "explode_json_array", value = "explode json array ....")
public class ExplodeJsonArray extends GenericUDTF {
/*
作用:
1.檢測(cè)輸入
2. 返回期望的數(shù)據(jù)類(lèi)型的檢測(cè)器
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1. 檢測(cè)輸入
// 1.1 獲取到傳入的參數(shù)
List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
if(inputFields.size() != 1)
throw new UDFArgumentException("函數(shù) explode_json_array 需要正好1個(gè)參數(shù)");
ObjectInspector inspector = inputFields.get(0).getFieldObjectInspector();
if(inspector.getCategory() != ObjectInspector.Category.PRIMITIVE || !inspector.getTypeName().equals("string")){
throw new UDFArgumentException("函數(shù) explode_json_array 參數(shù)類(lèi)型不匹配, 必須是一個(gè)字符串");
}
// 2. 返回期望數(shù)據(jù)類(lèi)型的檢測(cè)器
List<String> names = new ArrayList<>();
names.add("action");
List<ObjectInspector> inspectors = new ArrayList<>();
inspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
}
/*
處理數(shù)據(jù) [{}, {}, ...]
*/
@Override
public void process(Object[] args) throws HiveException {
String jsonArrayString = args[0].toString();
JSONArray jsonArray = new JSONArray(jsonArrayString);
for (int i = 0; i < jsonArray.length(); i++) {
String col = jsonArray.getString(i);
String[] cols = new String[1];
cols[0] = col;
forward(cols); // 為什么要傳遞數(shù)組? 有可能炸裂出來(lái)多列數(shù)據(jù), 所以才需要傳遞數(shù)字
}
}
/**
* 關(guān)閉資源
* 不用實(shí)現(xiàn)
*/
@Override
public void close() throws HiveException {
}
}