blink sql-client 提交kafka流式任務

前置任務:
改造原有的sql-client粘咖,使之能夠讀取文件峰髓,提交里面的任務到flink集群或杠。
啟動blink集群參考 http://www.reibang.com/p/4f59e512b178
目標:使用sql-client提交任務融蹂,讀取kafka消息然走,自定義udtf解析悍赢,存入csv文件决瞳。

1.sql文件

create table kafka_stream(
  messageKey varbinary, 
  `message` varbinary, 
  topic varchar,
  `partition` int,
  `offset` bigint
) with (
  type ='kafka011',
  topic = 't102',
`group.id`='t1',
bootstrap.servers = 'localhost:9092'
);

create table csv_sink(
id varchar,
name varchar,
age varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
);
insert into csv_sink
SELECT
    T.id,
    T.user_name,
    T.age
from
    kafka_stream as S LEFT JOIN
    LATERAL TABLE (parseDataMessage(message)) as T (
        id,
        user_name,
        age
    ) on true;

2.udtf 解析kafka 消息

kafka里消息格式是

{"attributes":{"schemaName":"dbtest","tableName":"result1"},"fieldCount":3,"fields":[{"index":0,"name":"id","null":false,"primaryKey":true,"type":"INTEGER","value":"90995"},{"index":1,"name":"user_name","null":false,"primaryKey":false,"type":"VARCHAR","value":"a"},{"index":2,"name":"age","null":false,"primaryKey":false,"type":"INTEGER","value":"90995"}],"timestamp":1550733014456}

udtf


public class ParseDataMessageUDTF extends TableFunction<Row> {
   public static final String __TIMESTAMP = "__timestamp";
   public static final String __EVENT_TYPE = "__event_type";
   public static final String __ATTRIBUTES = "__attributes";

   private List<String> fieldName = Lists.newArrayList();

   public ParseDataMessageUDTF(String args) {
       fieldName = Arrays.asList(args.split(","));
   }


   public void eval(byte[] message) {
       String mess = new String(message, Charset.forName("UTF-8"));
       DataMessage dataMessage = JSON.parseObject(mess, DataMessage.class);
       Row row = new Row(fieldName.size());
       Map<String, List<Field>> map = dataMessage.getFields().stream().collect(Collectors.groupingBy(Field::getName));
       for (int i = 0; i < fieldName.size(); i++) {
           switch (fieldName.get(i)) {
               case __TIMESTAMP:
                   row.setField(i, dataMessage.getTimestamp());
                   break;
               case __EVENT_TYPE:
                   row.setField(i, dataMessage.getEventType().toString());
                   break;
               case __ATTRIBUTES:
                   row.setField(i, dataMessage.getAttributes());
                   break;
               default:
                   List<Field> flist = map.get(fieldName.get(i));
                   if (flist != null && !flist.isEmpty()) {
                       row.setField(i, flist.get(0).getValue());
                   } else {
                       row.setField(i, null);
                   }
           }

       }
       collect(row);
   }

   @Override
   // 如果返回值是Row,就必須重載實現(xiàn)這個方法左权,顯式地告訴系統(tǒng)返回的字段類型
   public DataType getResultType(Object[] arguments, Class[] argTypes) {
       TypeInformation[] typeInformations = new TypeInformation[fieldName.size()];

       for (int i = 0; i < fieldName.size(); i++) {
           switch (fieldName.get(i)) {
               case __TIMESTAMP:
                   typeInformations[i] = BasicTypeInfo.of(Long.class);
                   break;
               case __EVENT_TYPE:
                   typeInformations[i] = TypeInformation.of(String.class);
                   break;
               case __ATTRIBUTES:
                   typeInformations[i] = new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                  break;

               default:
                   typeInformations[i] = BasicTypeInfo.of(String.class);
           }
       }

       RowTypeInfo rowType = new RowTypeInfo(typeInformations);
       return new TypeInfoWrappedDataType(rowType);
   }
}

3.打shade包 參考 http://www.reibang.com/p/4f481fd8c0cb

4.注冊udtf funtion

拷貝sql-client-defaults.yaml 為sql-client-kafka.yaml

functions: # empty list
- name: parseDataMessage
  from: class
  class: io.bigdata.blink.udf.ParseDataMessageUDTF
  constructor: 
    - "id,user_name,age"

4.提交任務

./sql-client.sh embedded --sqlPath /Users/IdeaProjects/github/apache-flink/build-target/bin/kafka.sql -e sql-client-kafka.yaml -j ../lib/udtf-1.0-SNAPSHOT.jar

寫入數(shù)據(jù)到kafka

5.運行結(jié)果

image.png
image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末皮胡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子赏迟,更是在濱河造成了極大的恐慌屡贺,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異甩栈,居然都是意外死亡泻仙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門量没,熙熙樓的掌柜王于貴愁眉苦臉地迎上來玉转,“玉大人,你說我怎么就攤上這事允蜈≡┒郑” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵饶套,是天一觀的道長漩蟆。 經(jīng)常有香客問我,道長妓蛮,這世上最難降的妖魔是什么怠李? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮蛤克,結(jié)果婚禮上捺癞,老公的妹妹穿的比我還像新娘。我一直安慰自己构挤,他們只是感情好髓介,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著筋现,像睡著了一般唐础。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上矾飞,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天一膨,我揣著相機與錄音,去河邊找鬼洒沦。 笑死咐低,一個胖子當著我的面吹牛庶近,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播膛锭,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼卢未,長吁一口氣:“原來是場噩夢啊……” “哼携取!你這毒婦竟也來了毁涉?” 一聲冷哼從身側(cè)響起饼齿,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎姻氨,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體剪验,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡肴焊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年前联,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片娶眷。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡似嗤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出届宠,到底是詐尸還是另有隱情烁落,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布豌注,位于F島的核電站伤塌,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏轧铁。R本人自食惡果不足惜每聪,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望齿风。 院中可真熱鬧药薯,春花似錦、人聲如沸救斑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽脸候。三九已至穷娱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間纪他,已是汗流浹背鄙煤。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留茶袒,地道東北人梯刚。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像薪寓,于是被迫代替她去往敵國和親亡资。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容