greenplum 導(dǎo)入 Spark計算的RoaringBitmap

最近在做基于標簽的圈人顽馋。通過bitmap來做,使用開源的RoaringBitmap幌羞,數(shù)據(jù)存儲在hive上寸谜。
開始是通過greenplum的pxf插件,將數(shù)據(jù)導(dǎo)入到gp属桦,然后聚合標簽生成Roaringbitmap熊痴。
但是這樣的方式效率低,于是在spark中構(gòu)建聂宾,然后將構(gòu)建好的bitmap導(dǎo)入gp中果善。
開始使用udaf的方式 這樣計算效率較低

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.RoaringBitmap;
 
import java.io.*;
import java.util.ArrayList;
import java.util.List;
 
/**
 * 實現(xiàn)自定義聚合函數(shù)Bitmap
 */
public class UdafBitMap extends UserDefinedAggregateFunction {
    @Override
    public StructType inputSchema() {
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
        return DataTypes.createStructType(structFields);
    }
 
    @Override
    public StructType bufferSchema() {
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
        return DataTypes.createStructType(structFields);
    }
 
    @Override
    public DataType dataType() {
        return DataTypes.LongType;
    }
 
    @Override
    public boolean deterministic() {
        //是否強制每次執(zhí)行的結(jié)果相同
        return false;
    }
 
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        //初始化
        buffer.update(0, null);
    }
 
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        // 相同的executor間的數(shù)據(jù)合并
        // 1. 輸入為空直接返回不更新
        Object in = input.get(0);
        if(in == null){
            return ;
        }
        // 2. 源為空則直接更新值為輸入
        byte[] inBytes = (byte[]) in;
        Object out = buffer.get(0);
        if(out == null){
            buffer.update(0, inBytes);
            return ;
        }
        // 3. 源和輸入都不為空使用bitmap去重合并
        byte[] outBytes = (byte[]) out;
        byte[] result = outBytes;
        RoaringBitmap outRR = new RoaringBitmap();
        RoaringBitmap inRR = new RoaringBitmap();
        try {
            outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
            inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
            outRR.or(inRR);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            outRR.serialize(new DataOutputStream(bos));
            result = bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        buffer.update(0, result);
    }
 
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        //不同excutor間的數(shù)據(jù)合并
        update(buffer1, buffer2);
    }
 
    @Override
    public Object evaluate(Row buffer) {
        //根據(jù)Buffer計算結(jié)果
        long r = 0l;
        Object val = buffer.get(0);
        if (val != null) {
            RoaringBitmap rr = new RoaringBitmap();
            try {
                rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
                r = rr.getLongCardinality();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return r;
    }
}

因為RoaringBitmap是復(fù)雜的類,不能直接存儲gp系谐,需要序列化成 bytea 類型巾陕。
基本思路是每個分區(qū)的數(shù)據(jù)構(gòu)建一個bitmap,然后序列化到hdfs上,通過pxf
插件鄙煤,建立外表的方式將數(shù)據(jù)導(dǎo)入gp
1.gp中建表dim_{colName}_tag(id int,userids bytea)晾匠。此處是bytea類型。
2.在spark中建立roaringbitmap梯刚。每個分區(qū)的數(shù)據(jù)生成一個bitmap混聊,然后序列化。這里使用scala寫的

 mp.foreach(m => {
        val v = m._1
        val d = m._2
        println(s"current tag $t1xjfzv col_value ${v}")
        val colsql = s"select $nft3317,row_id from mytable where ${col} = ${v}"
        val coldf = spark.sql(colsql)

        val res = coldf.mapPartitions(each => {
          val mrb = new RoaringBitmap()
          each.map(_.getLong(1).toInt).toList.foreach(mrb.add(_))
          mrb.runOptimize()
          val array = new Array[Byte](mrb.serializedSizeInBytes)
          mrb.serialize(new DataOutputStream(new OutputStream() {
            var c = 0
            override
            def close(): Unit = {
            }
            override
            def flush(): Unit = {
            }
            override
            def write(b: Int): Unit = {
              array({
                c += 1;
                c - 1
              }) = b.toByte
            }
            override
            def write(b: Array[Byte]): Unit = {
              write(b, 0, b.length)
            }
            override
            def write(b: Array[Byte], off: Int, l: Int): Unit = {
              System.arraycopy(b, off, array, c, l)
              c += l
            }
          }))
          Iterator((d, array))
        }) 

3.spark數(shù)據(jù)寫到保存到hdfs乾巧,可以采用parquet格式。
4.在gp中建立外表预愤。使用pxf插件沟于。

CREATE EXTERNAL TABLE dim_${colName}_$tag(tag int,row_id bytea) LOCATION ('pxf:/$RELATE_ROW_PATH/pt=$pt/$tag?PROFILE=hdfs:parquet') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');"

這里外表與hdfs的目錄對應(yīng)。這樣可以導(dǎo)入數(shù)據(jù)到gp中植康。
5.最重要的一步旷太,就是將序列化的RoaringBitmap反序列化生成roaringbitmap。
建立tagtable(id int,userids roaringbitmap)销睁。需要提前安裝roaringbitmap插件供璧。

"INSERT INTO btable SELECT tag, rb_or_agg(cast(cast(row_id as varchar) as roaringbitmap)), current_timestamp from dim_${colName}_$tag group by tag;"

最核心的部分是

  1. cast(row_id as varchar) 二進制數(shù)據(jù)轉(zhuǎn)成字符
  2. cast(cast(row_id as varchar) as roaringbitmap 字符轉(zhuǎn)成roaringbitmap。
    目前冻记,只找到了這重點方法睡毒。雖然官網(wǎng)提供了spark-gp的connector,但是沒有測試成功將bytea數(shù)據(jù)直接寫入gp冗栗。
    只能中間導(dǎo)入的方式演顾。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市隅居,隨后出現(xiàn)的幾起案子钠至,更是在濱河造成了極大的恐慌,老刑警劉巖胎源,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棉钧,死亡現(xiàn)場離奇詭異,居然都是意外死亡涕蚤,警方通過查閱死者的電腦和手機宪卿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赞季,“玉大人愧捕,你說我怎么就攤上這事∩旯常” “怎么了次绘?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我邮偎,道長管跺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任禾进,我火速辦了婚禮豁跑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘泻云。我一直安慰自己艇拍,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布宠纯。 她就那樣靜靜地躺著卸夕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪婆瓜。 梳的紋絲不亂的頭發(fā)上快集,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天,我揣著相機與錄音廉白,去河邊找鬼个初。 笑死,一個胖子當(dāng)著我的面吹牛猴蹂,可吹牛的內(nèi)容都是我干的院溺。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼晕讲,長吁一口氣:“原來是場噩夢啊……” “哼覆获!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起瓢省,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤弄息,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后勤婚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體摹量,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年馒胆,在試婚紗的時候發(fā)現(xiàn)自己被綠了缨称。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡祝迂,死狀恐怖睦尽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情型雳,我是刑警寧澤当凡,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布山害,位于F島的核電站,受9級特大地震影響沿量,放射性物質(zhì)發(fā)生泄漏浪慌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一朴则、第九天 我趴在偏房一處隱蔽的房頂上張望权纤。 院中可真熱鬧,春花似錦乌妒、人聲如沸汹想。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽欧宜。三九已至,卻和暖如春拴魄,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背席镀。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工匹中, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人豪诲。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓顶捷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親屎篱。 傳聞我的和親對象是個殘疾皇子服赎,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355