前言
很多時(shí)候谚鄙,比如用structure streaming消費(fèi)kafka數(shù)據(jù)各拷,默認(rèn)可能是得到key,value字段,key是偏移量闷营,value是一個(gè)byte數(shù)組烤黍。很可能value其實(shí)是一個(gè)Json字符串。這個(gè)時(shí)候我們?cè)撊绾斡肧QL操作這個(gè)json里的東西呢傻盟?另外蚊荣,如果我處理完的數(shù)據(jù),我想寫(xiě)入到kafka,但是我想把整條記錄作為json格式寫(xiě)入到Kafka,又該怎么寫(xiě)這個(gè)SQL呢莫杈?
get_json_object
第一個(gè)就是get_json_object,具體用法如下:
select get_json_object('{"k": "foo", "v": 1.0}','$.k') as k
需要給定get_json_object 一個(gè)json字段名(或者字符串)奢入,然后通過(guò)類似jsonPath的方式去拿具體的值筝闹。
這個(gè)方法其實(shí)有點(diǎn)麻煩,如果要提取里面的是個(gè)字段腥光,我就要寫(xiě)是個(gè)類似的東西关顷,很復(fù)雜。
from_json
具體用法如下:
select a.k from (
select from_json('{"k": "foo", "v": 1.0}','k STRING, v STRING',map("","")) as a
)
這個(gè)方法可以給json定義一個(gè)Schema,這樣在使用時(shí)武福,就可以直接使用a.k這種方式了议双,會(huì)簡(jiǎn)化很多。
to_json
該方法可以把對(duì)應(yīng)字段轉(zhuǎn)化為json字符串捉片,比如:
select to_json(struct(*)) AS value
可以把所有字段轉(zhuǎn)化為json字符串平痰,然后表示成value字段汞舱,接著你就可以把value字段寫(xiě)入Kafka了。是不是很簡(jiǎn)單宗雇。