【Hive】Hive UDF

[TOC]

一柴罐、UDF 介紹

UDF(User-Defined Functions)即是用戶自定義的hive函數(shù)讯嫂。當 Hive 自帶的函數(shù)并不能完全滿足業(yè)務的需求耕漱,這時可以根據(jù)具體需求自定義函數(shù)。UDF 函數(shù)可以直接應用于 select 語句村刨,對查詢結構做格式化處理后淹冰,再輸出內容。

Hive 自定義函數(shù)包括三種:

  • UDF: one to one ,進來一個出去一個谨胞,row mapping, 如:upper蒜鸡、substr函數(shù)胯努;
  • UDAF(A:aggregation): many to one牢裳,進來多個出去一個,row mapping叶沛,如sum/min蒲讯;
  • UDTF(T:table-generating):one to mang,進來一個出去多行灰署,如 lateral view 與 explode 判帮。

注解使用:

@Describtion 注解是可選的,用于對函數(shù)進行說明溉箕,其中的 FUNC 字符串表示函數(shù)名晦墙,當使用 DESCRIBE FUNCTION 命令時,替換成函數(shù)名肴茄。@Describtion包含三個屬性:

  • name:用于指定 Hive 中的函數(shù)名晌畅。
  • value:用于描述函數(shù)的參數(shù)。
  • extended:額外的說明寡痰,如抗楔,給出示例,當使用 DESCRIBE FUNCTION EXTENDED name 的時候打印拦坠。

二连躏、UDF

開發(fā)自定義 UDF 函數(shù)有兩種方式:

  • 如果函數(shù)讀和返回都是基礎數(shù)據(jù)類型,即 Hadoop 和 Hive 的基本類型贞滨,如入热,Text、IntWritable晓铆、LongWritable才顿、DoubleWritable 等,那么繼承 org.apache.hadoop.hive.ql.exec.UDF 尤蒿;
  • 如果用來操作內嵌數(shù)據(jù)結構郑气,如 Map,List 和 Set腰池,則繼承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF尾组;

2.1、簡單 UDF

用簡單 UDF API 來構建一個 UDF 只涉及到編寫一個類繼承實現(xiàn)一個方法(evaluate)示弓,下面的例子來自 《Hive 編程指南》讳侨,將表中的生日字段轉換為星座。

@UDFType
@Description(
        name = "zodiac",
        value = "_FUNC_ (date) - " +
                " from the input date string " +
                " or separate month and day arguments, \n" +
                " returns the sign of the Zodiac.",
        extended = "Example :\n" +
                "> SELECT _FUNC_ (date_string) FROM src;\n" +
                "> SELECT _FUNC_ (month, day) FROM src;")
public class UDFZodiacSign extends UDF {

    private static final String ERROR_DATE_OF_MONTH = "invalid date of specify month";

    private static final String ERROR_MONTH_ARGS = "invalid argument of month";

    private static final String ERROR_DATE_STRING = "invalid date format";

    public String evaluate(Date bday) {
        return this.evaluate(bday.getMonth() + 1, bday.getDate());
    }

    public String evaluate(String dateString) {
        DateTime dateTime;
        try {
            dateTime = new DateTime(dateString);
        } catch (Exception e) {
            return ERROR_DATE_STRING;
        }
        return this.evaluate(dateTime.getMonthOfYear(), dateTime.getDayOfMonth());
    }

    public String evaluate(Integer month, Integer day) {

        switch (month) {
            //判斷是幾月
            case 1:
                //判斷是當前月的哪一段時間奏属;然后就可以得到星座了跨跨;下面代碼都一樣的
                if (day > 0 && day < 20) {
                    return "魔蝎座";
                } else if (day < 32) {
                    return "水瓶座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 2:
                if (day > 0 && day < 19) {
                    return "水瓶座";
                } else if (day < 29) {
                    return "雙魚座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 3:
                if (day > 0 && day < 21) {
                    return "雙魚座";
                } else if (day < 32) {
                    return "白羊座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 4:
                if (day > 0 && day < 20) {
                    return "白羊座";
                } else if (day < 31) {
                    return "金牛座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 5:
                if (day > 0 && day < 21) {
                    return "金牛座";
                } else if (day < 32) {
                    return "雙子座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 6:
                if (day > 0 && day < 22) {
                    return "雙子座";
                } else if (day < 31) {
                    return "巨蟹座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 7:
                if (day > 0 && day < 23) {
                    return "巨蟹座";
                } else if (day < 32) {
                    return "獅子座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 8:
                if (day > 0 && day < 23) {
                    return "獅子座";
                } else if (day < 32) {
                    return "處女座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 9:
                if (day > 0 && day < 23) {
                    return "處女座";
                } else if (day < 31) {
                    return "天平座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 10:
                if (day > 0 && day < 24) {
                    return "天平座";
                } else if (day < 32) {
                    return "天蝎座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 11:
                if (day > 0 && day < 23) {
                    return "天蝎座";
                } else if (day < 31) {
                    return "射手座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            case 12:
                if (day > 0 && day < 22) {
                    return "射手座";
                } else if (day < 32) {
                    return "摩羯座";
                } else {
                    return ERROR_DATE_OF_MONTH;
                }
            default:
                return ERROR_MONTH_ARGS;
        }

    }

}

測試一下:

public class UDFZodiacSignTest {

    @Test
    public void testUDFZodiacSign() {
        UDFZodiacSign example = new UDFZodiacSign();
        Assert.assertEquals("魔蝎座", example.evaluate(1, 1));
        Assert.assertEquals("魔蝎座", example.evaluate("2019-01-01"));
    }

}

2.2、復雜 GenericUDF

GenericUDF API 提供了一種方法去處理那些不是可寫類型的對象,例如:struct勇婴,map 和 array 類型忱嘹。

這個 API 需要用戶親自為函數(shù)的參數(shù)管理對象存儲格式,驗證接收的參數(shù)的數(shù)量與類型耕渴。

這個 API 要求實現(xiàn)以下方法:

// 這個類似于簡單 API 的 evaluate 方法拘悦,它可以讀取輸入數(shù)據(jù)和返回結果
abstract Object evaluate(GenericUDF.DeferredObject[] arguments);  
  
// 該方法應當是描述該 UDF 的字符串,顯示函數(shù)的提示信息
abstract String getDisplayString(String[] children);  
  
// 只調用一次橱脸,在任何 evaluate() 調用之前础米,可以接收到一個可以表示函數(shù)輸入參數(shù)類型的 object inspectors 數(shù)組
// 是用來驗證該函數(shù)是否接收正確的參數(shù)類型和參數(shù)個數(shù)的地方
abstract ObjectInspector initialize(ObjectInspector[] arguments);  

例子同樣來自 《Hive 編程指南》,編寫一個用戶自定義函數(shù)添诉,稱之為nvl()屁桑,這個函數(shù)傳入的值如果是 null,那么就返回一個默認值栏赴。

函數(shù) nvl() 要求有 2 個參數(shù)蘑斧。如果第 1 個參數(shù)是非null值,那么就返回這個值艾帐;如果第 1 個參數(shù)是 null,那么就返回第 2 個參數(shù)的值盆偿。

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

@Description(name = "nvl",
        value = "_FUNC_(value, default_value) - Returns default value if value is null else returns value",
        extended = "Example:\n"
                + " > SELECT _FUNC_(null, 'bla') FROM src limit 1; \n")
public class GenericUDFNvl extends GenericUDF {

    private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
    private ObjectInspector[] argumentOIs;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        argumentOIs = arguments;
        // 1.檢驗參數(shù)個數(shù)
        if (arguments.length != 2) {
            throw new UDFArgumentException("The operator 'NVL' accepts 2 arguments.");
        }

        // 2.檢驗參數(shù)類型
        returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
        if (!(returnOIResolver.update(arguments[0]) && returnOIResolver.update(arguments[1]))) {
            throw new UDFArgumentTypeException(2, "The 1st and 2nd args of function NLV should have the same type, "
                    + "but they are different: \"" + arguments[0].getTypeName() + "\" and \"" + arguments[1].getTypeName() + "\"");
        }

        // 3.返回類型柒爸,和傳入的參數(shù)類型一致
        return returnOIResolver.get();
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(), argumentOIs[0]);
        if (retVal == null) {
            retVal = returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);
        }
        return retVal;
    }

    @Override
    public String getDisplayString(String[] children) {
        StringBuilder sb = new StringBuilder();
        sb.append("if ");
        sb.append(children[0]);
        sb.append(" is null ");
        sb.append("returns ");
        sb.append(children[1]);
        return sb.toString();
    }

}

測試一下:

public class GenericUDFNvlTest {

    @Test
    public void testGenericUDFNvl() throws HiveException {
        // 建立需要的模型
        GenericUDFNvl example = new GenericUDFNvl();
        ObjectInspector stringOI1 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
        ObjectInspector stringOI2 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
        StringObjectInspector resultInspector = (StringObjectInspector) example.initialize(new ObjectInspector[]{stringOI1, stringOI2});

        // 測試結果
        Object result1 = example.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject(null), new GenericUDF.DeferredJavaObject("a")});
        Assert.assertEquals("a", resultInspector.getPrimitiveJavaObject(result1));

        // 測試結果
        Object result2 = example.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject("dd"), new GenericUDF.DeferredJavaObject("a")});
        Assert.assertNotEquals("a", resultInspector.getPrimitiveJavaObject(result2));
    }

}

三、UDAF

PS:該段部分來自 Hive UDAF開發(fā)詳解

UDAF 開發(fā)主要涉及到以下兩個抽象類:

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

大致上事扭,UDAF 函數(shù)讀取數(shù)據(jù)(mapper)捎稚,聚集一堆 mapper 輸出到部分聚集結果(combiner),并且最終創(chuàng)建一個最終的聚集結果(reducer)求橄。因為需要對多個combiner 進行聚集今野,所以需要保存部分聚集結果。

3.1罐农、AbstractGenericUDAFResolver

Resolver 要覆蓋實現(xiàn) getEvaluator 方法条霜,該方法會根據(jù) sql 傳人的參數(shù)數(shù)據(jù)格式指定調用哪個 Evaluator 進行處理。

public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
  throws SemanticException {
  throw new SemanticException(
        "This UDAF does not support the deprecated getEvaluator() method.");
}

3.2涵亏、GenericUDAFEvaluator

UDAF 邏輯處理主要發(fā)生在 Evaluator 中宰睡,要實現(xiàn)該抽象類的幾個方法。理解Evaluator 之前气筋,先介紹 ObjectInspector 接口與 GenericUDAFEvaluator 中的內部類 Model拆内。

  • ObjectInspector:主要是解耦數(shù)據(jù)使用與數(shù)據(jù)格式,使數(shù)據(jù)流在輸入輸出端可以切換不同的輸入輸出格式宠默,不同的 Operator上使用不同的格式麸恍。

  • Model:Model 代表了 UDAF 在 mapreduce 的各個階段。

    public static enum Mode {
        /**
         * PARTIAL1: 這個是mapreduce的map階段:從原始數(shù)據(jù)到部分數(shù)據(jù)聚合
         * 將會調用iterate()和terminatePartial()
         */
        PARTIAL1,
            /**
         * PARTIAL2: 這個是mapreduce的map端的Combiner階段搀矫,負責在map端合并map的數(shù)據(jù):從部分數(shù)據(jù)聚合到部分數(shù)據(jù)聚合
         * 將會調用merge() 和 terminatePartial() 
         */
        PARTIAL2,
            /**
         * FINAL: mapreduce的reduce階段:從部分數(shù)據(jù)的聚合到完全聚合 
         * 將會調用merge()和terminate()
         */
        FINAL,
            /**
         * COMPLETE: 如果出現(xiàn)了這個階段抹沪,表示mapreduce只有map刻肄,沒有reduce,所以map端就直接出結果了:從原始數(shù)據(jù)直接到完全聚合
          * 將會調用 iterate()和terminate()
         */
        COMPLETE
      };
    

一般情況下采够,完整的 UDAF 邏輯是一個 mapreduce 過程肄方,如果有mapper 和reducer,就會經歷 PARTIAL1(mapper)蹬癌,F(xiàn)INAL(reducer)权她,如果還有 combiner,那就會經歷 PARTIAL1(mapper)逝薪,PARTIAL2(combiner)隅要,F(xiàn)INAL(reducer)。

而有一些情況下的 mapreduce董济,只有mapper步清,而沒有 reducer,所以就會只有COMPLETE 階段虏肾,這個階段直接輸入原始數(shù)據(jù)廓啊,出結果。

3.3封豪、GenericUDAFEvaluator 的方法

// 確定各個階段輸入輸出參數(shù)的數(shù)據(jù)格式 ObjectInspectors谴轮,一般負責初始化內部字段,通常初始化用來存放最終結果的變量
public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
 
// 保存數(shù)據(jù)聚集結果的類
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
 
// 重置聚集結果
public void reset(AggregationBuffer agg) throws HiveException;
 
// map階段,迭代處理輸入sql傳過來的列數(shù)據(jù)
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
 
// map與combiner結束返回結果吹埠,得到部分數(shù)據(jù)聚集結果
public Object terminatePartial(AggregationBuffer agg) throws HiveException;
 
// combiner合并map返回的結果第步,還有reducer合并mapper或combiner返回的結果。
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
 
// reducer階段缘琅,輸出最終結果
public Object terminate(AggregationBuffer agg) throws HiveException;

3.4粘都、圖解Model與Evaluator關系

Model 各階段對應 Evaluator 方法調用

image

Evaluator 各個階段下處理 mapreduce 流程

image

3.5、編碼實例

下面的函數(shù)代碼是計算指定列中字符的總數(shù)(包括空格):

/**
 * @author Administrator
 */
@Description(
        name = "letters",
        value = "_FUNC_(expr) - 返回該列中所有字符串的字符總數(shù)")
public class GenericUDAFTotalNumOfLetters extends AbstractGenericUDAFResolver {

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
            throws SemanticException {
        if (parameters.length != 1) {
            throw new UDFArgumentTypeException(parameters.length - 1,
                    "Exactly one argument is expected.");
        }

        ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);

        if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentTypeException(0,
                    "Argument must be PRIMITIVE, but "
                            + oi.getCategory().name()
                            + " was passed.");
        }

        PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;

        if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentTypeException(0,
                    "Argument must be String, but "
                            + inputOI.getPrimitiveCategory().name()
                            + " was passed.");
        }

        return new TotalNumOfLettersEvaluator();
    }

    public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

        PrimitiveObjectInspector inputOI;
        PrimitiveObjectInspector integerOI;

        private IntWritable result;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {

            super.init(m, parameters);
            result = new IntWritable(0);
            inputOI = (PrimitiveObjectInspector) parameters[0];
            integerOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
            // 指定各個階段輸出數(shù)據(jù)格式都為Integer類型
            return PrimitiveObjectInspectorFactory.writableIntObjectInspector;

        }

        /**
         * 存儲當前字符總數(shù)的類
         */
        static class LetterSumAgg implements AggregationBuffer {
            int sum = 0;

            void add(int num) {
                sum += num;
            }
        }

        /**
         * 創(chuàng)建新的聚合計算的需要的內存刷袍,用來存儲mapper,combiner,reducer運算過程中的相加總和翩隧。
         */
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            LetterSumAgg sum = new LetterSumAgg();
            reset(sum);
            return sum;
        }

        /**
         * mapreduce支持mapper和reducer的重用,所以為了兼容呻纹,也需要做內存的重用鸽心。
         */
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            LetterSumAgg myagg = (LetterSumAgg) agg;
            myagg.sum = 0;
        }

        /**
         * map階段調用,把保存當前和的對象agg居暖,再加上輸入的參數(shù)傳入顽频。
         */
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)
                throws HiveException {
            if (parameters[0] != null) {
                LetterSumAgg myagg = (LetterSumAgg) agg;
                Object p1 = inputOI.getPrimitiveJavaObject(parameters[0]);
                myagg.add(String.valueOf(p1).length());
            }
        }

        /**
         * mapper 結束要返回的結果,還有 combiner 結束返回的結果
         */
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }

        /**
         * combiner合并map返回的結果太闺,還有reducer合并mapper或combiner返回的結果糯景。
         */
        @Override
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            if (partial != null) {

                LetterSumAgg myagg = (LetterSumAgg) agg;

                myagg.sum += PrimitiveObjectInspectorUtils.getInt(partial, integerOI);
            }
        }

        /**
         * reducer返回結果,或者是只有mapper,沒有reducer時蟀淮,在mapper端返回結果最住。
         */
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            LetterSumAgg myagg = (LetterSumAgg) agg;
            result.set(myagg.sum);
            return result;
        }

    }
}

測試:

public class GenericUDAFTotalNumOfLettersTest {

    private GenericUDAFTotalNumOfLetters example;
    private GenericUDAFEvaluator evaluator;
    private ObjectInspector[] output;
    private PrimitiveObjectInspector[] poi;

    GenericUDAFTotalNumOfLetters.TotalNumOfLettersEvaluator.LetterSumAgg agg;

    Object[] param1 = {"tom"};
    Object[] param2 = {"tomT"};
    Object[] param3 = {"wu kong"};
    Object[] param4 = {"wu le"};

    @Before
    public void setUp() throws Exception {

        example = new GenericUDAFTotalNumOfLetters();

        //All the data are String
        String[] typeStrs = {"string"/*, "string", "string"*/};
        TypeInfo[] types = makePrimitiveTypeInfoArray(typeStrs);

        evaluator = example.getEvaluator(types);

        poi = new PrimitiveObjectInspector[1];
        poi[0] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                PrimitiveObjectInspector.PrimitiveCategory.STRING);
/*        poi[1] =  PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                PrimitiveObjectInspector.PrimitiveCategory.STRING);
        poi[2] =  PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                PrimitiveObjectInspector.PrimitiveCategory.STRING);*/

        //The output inspector
        output = new ObjectInspector[1];
        output[0] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                PrimitiveObjectInspector.PrimitiveCategory.INT);
        /*output[0] = ObjectInspectorFactory.getStandardListObjectInspector(poi[0]);*/

        agg = (GenericUDAFTotalNumOfLetters.TotalNumOfLettersEvaluator.LetterSumAgg) evaluator.getNewAggregationBuffer();
    }

    @After
    public void tearDown() throws Exception {

    }

    @Test(expected = UDFArgumentTypeException.class)
    public void testGetEvaluateorWithComplexTypes() throws Exception {
        TypeInfo[] types = new TypeInfo[1];
        types[0] = TypeInfoFactory.getListTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"));
        example.getEvaluator(types);
    }

    @Test(expected = UDFArgumentTypeException.class)
    public void testGetEvaluateorWithNotSupportedTypes() throws Exception {
        TypeInfo[] types = new TypeInfo[1];
        types[0] = TypeInfoFactory.getPrimitiveTypeInfo("boolean");
        example.getEvaluator(types);
    }

    @Test(expected = UDFArgumentTypeException.class)
    public void testGetEvaluateorWithMultiParams() throws Exception {
        String[] typeStrs3 = {"double", "int", "string"};
        TypeInfo[] types3 = makePrimitiveTypeInfoArray(typeStrs3);
        example.getEvaluator(types3);
    }

    @Test
    public void testIterate() throws HiveException {
        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
        evaluator.reset(agg);

        evaluator.iterate(agg, param1);
        Assert.assertEquals(3, agg.sum);

        evaluator.iterate(agg, param2);
        Assert.assertEquals(7, agg.sum);

        evaluator.iterate(agg, param3);
        Assert.assertEquals(14, agg.sum);
    }

    @Test
    public void testTerminatePartial() throws Exception {

        testIterate();

        Object partial = evaluator.terminatePartial(agg);

        Assert.assertTrue(partial instanceof IntWritable);
        Assert.assertEquals(new IntWritable(14), partial);
    }

    @Test
    public void testMerge() throws Exception {
        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
        evaluator.reset(agg);
        evaluator.iterate(agg, param1);
        evaluator.iterate(agg, param2);
        Object partial1 = evaluator.terminatePartial(agg);

        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
        evaluator.reset(agg);
        evaluator.iterate(agg, param3);
        Object partial2 = evaluator.terminatePartial(agg);

        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
        evaluator.reset(agg);
        evaluator.iterate(agg, param4);
        Object partial3 = evaluator.terminatePartial(agg);

        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, output);
        evaluator.reset(agg);
        evaluator.merge(agg, partial1);
        Assert.assertEquals(7, agg.sum);

        evaluator.merge(agg, partial2);
        Assert.assertEquals(14, agg.sum);

        evaluator.merge(agg, partial3);
        Assert.assertEquals(19, agg.sum);
    }

    @Test
    public void testTerminate() throws Exception {
        evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, poi);
        evaluator.reset(agg);

        evaluator.iterate(agg, param1);
        evaluator.iterate(agg, param2);
        evaluator.iterate(agg, param3);
        evaluator.iterate(agg, param4);
        Object term = evaluator.terminate(agg);

        Assert.assertTrue(term instanceof IntWritable);
        Assert.assertEquals(term, new IntWritable(19));
    }

    /**
     * Generate some TypeInfo from the typeStrs
     */
    private TypeInfo[] makePrimitiveTypeInfoArray(String[] typeStrs) {
        int len = typeStrs.length;

        TypeInfo[] types = new TypeInfo[len];

        for (int i = 0; i < len; i++) {
            types[i] = TypeInfoFactory.getPrimitiveTypeInfo(typeStrs[i]);
        }

        return types;
    }
}

四、UDTF

Hive 中 UDTF 可以將一行轉成一行多列怠惶,也可以將一行轉成多行多列涨缚,使用頻率較高。

一個 UDTF 必須繼承 GenericUDTF 抽象類然后實現(xiàn)抽象類中的 initialize策治,process脓魏,和 close方法。

  • initialize:確定傳入參數(shù)的類型并確定 UDTF 生成表的每個字段的數(shù)據(jù)類型(即輸入類型和輸出類型)通惫,主要是判斷輸入類型并確定返回的字段類型茂翔。
  • process:調用了 initialize() 后,Hive 將把 UDTF 參數(shù)傳給 process() 方法履腋,處理一條輸入記錄珊燎,輸出若干條結果記錄,該方法中遵湖,每一次調用 forward() 產生一行悔政;如果產生多列可以將多個列的值放在一個數(shù)組中,然后將該數(shù)組傳入到 forward() 函數(shù)延旧。
  • close:在 process 調用結束后調用谋国,用于進行其它一些額外操作,只執(zhí)行一次垄潮。
public class GenericUDTFNameParserGeneric extends GenericUDTF {

    private PrimitiveObjectInspector stringOI = null;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {

        if (args.length != 1) {
            throw new UDFArgumentException("GenericUDTFNameParserGeneric() takes exactly one argument");
        }

        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
                && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("GenericUDTFNameParserGeneric() takes a string as a parameter");
        }

        // 輸入格式(inspectors)
        stringOI = (PrimitiveObjectInspector) args[0];

        // 輸出格式(inspectors) -- 有兩個屬性的對象
        List<String> fieldNames = new ArrayList<>(2);
        List<ObjectInspector> fieldOIs = new ArrayList<>(2);
        fieldNames.add("name");
        fieldNames.add("surname");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    private ArrayList<Object[]> processInputRecord(String name) {
        ArrayList<Object[]> result = new ArrayList<>();

        // 忽略null值與空值
        if (name == null || name.isEmpty()) {
            return result;
        }

        String[] tokens = name.split("\\s+");

        if (tokens.length == 2) {
            result.add(new Object[]{tokens[0], tokens[1]});
        } else if (tokens.length == 4 && tokens[1].equals("and")) {
            result.add(new Object[]{tokens[0], tokens[3]});
            result.add(new Object[]{tokens[2], tokens[3]});
        }

        return result;
    }

    @Override
    public void process(Object[] record) throws HiveException {

        final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();

        ArrayList<Object[]> results = processInputRecord(name);

        for (Object[] r : results) {
            forward(r);
        }
    }

    @Override
    public void close() throws HiveException {
        // do nothing
    }
}

五烹卒、UDF 使用

5.1闷盔、準備步驟

數(shù)據(jù)準備:

cat ./people.txt

John Smith
John and Ann White
Ted Green
Dorothy

把該文件上載到 hdfs 目錄 /user/wqf 中:

hadoop fs -mkdir /user/wqf/people
hadoop fs -put ./people.txt /user/wqf/people

然后創(chuàng)建 hive 外部表弯洗,在 hive shell 中執(zhí)行:

CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
ESCAPED BY '' 
LINES TERMINATED BY '\n'
STORED AS TEXTFILE 
LOCATION '/user/wqf/people';

maven pom 中添加如下配置,然后運行 mvn assembly:assembly:

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>
    </plugins>
</build>

將 jar 包上傳到 hive 服務器逢勾。

5.2牡整、臨時添加 UDF

進入 hive 中:

hive> add jar /home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar
Added [/home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar] to class path
Added resources: [/home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar]

hive > CREATE TEMPORARY FUNCTION myNvl as 'me.w1992wishes.hive.udf.GenericUDFNvl';
hive> select myNvl(name, 'a') from people limit 1;
OK
John Smith

hive> CREATE TEMPORARY FUNCTION myCount as 'me.w1992wishes.hive.udf.GenericUDAFTotalNumOfLetters';
hive> select myCount(name) from people;
OK
44

hive> CREATE TEMPORARY FUNCTION myParser as 'me.w1992wishes.hive.udf.GenericUDTFNameParser';
hive> select myParser(name) from people;
OK
John    Smith
John    White
Ann White
Ted Green
Time taken: 0.18 seconds, Fetched: 4 row(s)

這種方式在會話結束后,函數(shù)自動銷毀溺拱,因此每次打開新的會話逃贝,都需要重新 add jar 并且 CREATE TEMPORARY FUNCTION

5.3迫摔、永久添加 UDF

不能是本地 jar 包沐扳,需要上傳 jar 包到 hdfs 目錄中:

hadoop fs -put hive-udf-1.0-SNAPSHOT.jar /user/hive/jars

然后進入 hive 中,創(chuàng)建函數(shù):

hive> create function myCount as 'me.w1992wishes.hive.udf.GenericUDAFTotalNumOfLetters' using jar 'hdfs:/user/hive/jars/hive-udf-1.0-SNAPSHOT.jar';
OK
44

六句占、參考資料

1.《Hive 編程指南》
2.Hive UDAF開發(fā)詳解

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末沪摄,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌杨拐,老刑警劉巖祈餐,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異哄陶,居然都是意外死亡帆阳,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門屋吨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜒谤,“玉大人,你說我怎么就攤上這事离赫“攀牛” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵渊胸,是天一觀的道長旬盯。 經常有香客問我,道長翎猛,這世上最難降的妖魔是什么胖翰? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮切厘,結果婚禮上萨咳,老公的妹妹穿的比我還像新娘。我一直安慰自己疫稿,他們只是感情好培他,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著遗座,像睡著了一般舀凛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上途蒋,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天猛遍,我揣著相機與錄音,去河邊找鬼号坡。 笑死懊烤,一個胖子當著我的面吹牛,可吹牛的內容都是我干的宽堆。 我是一名探鬼主播腌紧,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼畜隶!你這毒婦竟也來了壁肋?” 一聲冷哼從身側響起逮光,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎墩划,沒想到半個月后涕刚,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡乙帮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年杜漠,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片察净。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡驾茴,死狀恐怖,靈堂內的尸體忽然破棺而出氢卡,到底是詐尸還是另有隱情锈至,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布译秦,位于F島的核電站峡捡,受9級特大地震影響,放射性物質發(fā)生泄漏筑悴。R本人自食惡果不足惜们拙,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望阁吝。 院中可真熱鬧砚婆,春花似錦、人聲如沸突勇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽甲馋。三九已至埂奈,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間摔刁,已是汗流浹背挥转。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工海蔽, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留共屈,地道東北人。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓党窜,卻偏偏與公主長得像拗引,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子幌衣,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內容