本篇文章主要是向讀者介紹如何制定Druid攝入數(shù)據(jù)的規(guī)范落君,指出開(kāi)發(fā)過(guò)程中需要注意的關(guān)鍵事項(xiàng)和規(guī)則亲铡,方便剛接觸Druid的同學(xué)快速入門(mén)芙盘。同時(shí)方便自己后期翻閱侦锯。
-
環(huán)境準(zhǔn)備
默認(rèn)同學(xué)們本地已經(jīng)有Druid的環(huán)境剖踊,以下操作都是基于0.12.3版本的Druid操作的庶弃。
-
數(shù)據(jù)準(zhǔn)備
①使用kafka產(chǎn)生模擬數(shù)據(jù)
ts:時(shí)間 startIP: 發(fā)送發(fā)IP startPort: 發(fā)送方IP端口 endIP: 接收方IP endPort: 接收方IP端口 protocol:IP協(xié)議 packets: packets bytes: 傳輸多少bytes costTime: 耗時(shí)
②樣例數(shù)據(jù)
{"ts":"2019-01-18T01:01:35Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":10, "bytes":1000, "costTime": 1.4} {"ts":"2019-01-18T01:01:51Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":20, "bytes":2000, "costTime": 3.1} {"ts":"2019-01-18T01:01:59Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":2000, "endPort":3000, "protocol": 6, "packets":30, "bytes":3000, "costTime": 0.4} {"ts":"2019-01-18T01:02:14Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":40, "bytes":4000, "costTime": 7.9} {"ts":"2019-01-18T01:02:29Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":50, "bytes":5000, "costTime": 10.2} {"ts":"2019-01-18T01:03:29Z","startIP":"1.1.1.1", "endIP":"2.2.2.2", "startPort":5000, "endPort":7000, "protocol": 6, "packets":60, "bytes":6000, "costTime": 4.3} {"ts":"2019-01-18T02:33:14Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":100, "bytes":10000, "costTime": 22.4} {"ts":"2019-01-18T02:33:45Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":200, "bytes":20000, "costTime": 34.5} {"ts":"2019-01-18T02:35:45Z","startIP":"7.7.7.7", "endIP":"8.8.8.8", "startPort":4000, "endPort":5000, "protocol": 17, "packets":300, "bytes":30000, "costTime": 46.3}
-
Druid攝入數(shù)據(jù)規(guī)范
Schema的定義,Druid攝入數(shù)據(jù)規(guī)范的核心是dataSchema,dataSchema定義了如何解析輸入的數(shù)據(jù)德澈,并將數(shù)據(jù)存儲(chǔ)到Druid中歇攻。
1.dataSchema
首先我們創(chuàng)建一個(gè)json的文件:kafka-index-day-roll-up.json,在該文件中添加空dataSchema;
"dataSchema" : {}
2.DataSource name
DataSource name指定梆造,數(shù)據(jù)源名稱(chēng)由dataSchema中的datasource參數(shù)指定缴守,在這里我們叫做realtime_kafka_to_druid,可以看作是數(shù)據(jù)庫(kù)的表名;
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", }
3.parser-解釋器
dataSchema中有一個(gè)parser這個(gè)字段镇辉,它是解釋輸入數(shù)據(jù)的解析器屡穗,上面的案例中我們使用的是JSON格式的字符串,因此我們使用JSON格式的字符串解釋器解析數(shù)據(jù)忽肛。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json" } } }
4.Time column - 時(shí)間列
解釋器parser需要知道數(shù)據(jù)中每條數(shù)據(jù)的產(chǎn)生時(shí)間(main timestamp),這個(gè)時(shí)間戳需要定義在 timestampSpec中村砂。數(shù)據(jù)中有一列ts就是我們所需要的timestamp,因此我們將帶有該信息的timestampSpec 添加到parseSpec中。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" } } } }
5.Column types
上面我們已經(jīng)定義了time的列屹逛,接下來(lái)我們定義其它列的類(lèi)型础废。Druid支持的column types: String, Long, Float, Double.我們將在接下來(lái)的小節(jié)中討論以及如何使用它們。在我們?nèi)ザx非時(shí)間序列之前煎源,我們首先來(lái)討論一下rollup色迂。
6.Rollup
druid在通過(guò)roll-up處理后,會(huì)將原始數(shù)據(jù)在注入的時(shí)候就開(kāi)始進(jìn)行匯總處理手销。roll-up是在數(shù)據(jù)存儲(chǔ)到segment之前進(jìn)行的第一層聚合操作歇僧。
①如果rollup設(shè)置成true,這個(gè)時(shí)候就需要我們把輸入的columns進(jìn)行分為兩類(lèi),維度(dimensions)和度量(metrics).dimensions是我們進(jìn)行g(shù)roup的時(shí)候需要的列,metrics是我們進(jìn)行聚合時(shí)需要的列诈悍。
②如果rollup設(shè)置成false,這個(gè)時(shí)候我們會(huì)將輸入的所有columns當(dāng)做dimensions處理祸轮,并且沒(méi)有預(yù)聚合的發(fā)生。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" } } }, "granularitySpec" : { "rollup" : true } }
7.選擇dimension和metrics
①在上面給到的數(shù)據(jù)集中侥钳,很明顯的就可以區(qū)分開(kāi) dimensions 和 metrics适袜。
Dimensions: startIP | startPort | endIP | endPort | protocol Metrics: packets | bytes | costTime
②接下來(lái)我們?nèi)绾卧跀z入數(shù)據(jù)規(guī)范中定義這些 dimensions列 和 metrics列呢?Dimensions:使用dimensionsSpec在parseSpec中指定舷夺。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "rollup" : true } }
注:每個(gè)維度都有一個(gè)name 和 type苦酱,type的類(lèi)型可能是:"long", "float", "double", "string"。我們注意到startIP這個(gè)"string"類(lèi)型的維度给猾,它僅僅只需要指定名字就可以了疫萤。
③.在druid中,string 類(lèi)型是默認(rèn)的敢伸。除此之外扯饶,我們注意一下protocol是一個(gè)數(shù)值型的。但是我們定義的時(shí)候?qū)⑵涠x為 string池颈。Druid會(huì)強(qiáng)制將該類(lèi)型進(jìn)行轉(zhuǎn)換尾序。Metrics:使用metricsSpec 在dataSchema中指定。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "rollup" : true } }
注:當(dāng)我們定義metric時(shí)躯砰,有必要指定在rollup期間對(duì)該列執(zhí)行的聚合類(lèi)型每币。我們將packets和bytes定義成long sum聚合操作,costTime定義成double sum聚合操作弃揽。 metricsSpec的嵌套級(jí)別與dimensionSpec或parseSpec不同脯爪,它和dataSchema屬于同一嵌套級(jí)別则北。除此矿微,我們還定義了一個(gè)count聚合操作器,它會(huì)在rollup過(guò)程中尚揣,記錄輸入的數(shù)據(jù)量總共有多少涌矢。支持的聚合器類(lèi)型詳情點(diǎn)擊link
8.不使用rollup
如果不適用roolup所有輸入的colums都被當(dāng)做"dimensions",不再區(qū)分"dimensions" 和"metrics"快骗。
"dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" }, { "name" : "packets", "type" : "long" }, { "name" : "bytes", "type" : "long" }, { "name" : "startPort", "type" : "double" } ] }
9.Define Granularities-粒度的定義娜庇。
接下來(lái)還有一些其他的屬性需要在granularitySpec中設(shè)置,granularitySpec支持2中類(lèi)型(type):uniform和arbitrary方篮。在這里名秀,我們使用uniform這種類(lèi)型,這會(huì)使所有的segment都有統(tǒng)一的間隔大小(比如:每個(gè)segment都保存一個(gè)小時(shí)內(nèi)的值)藕溅。
①segment granularity這個(gè)屬性是指一個(gè)segment應(yīng)該包含多大時(shí)間間隔的數(shù)據(jù)匕得,可以是: DAY, WEEK,HOUR , MINUTE...... 在這里,我們制定segment的粒度是HOUR汁掠。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "rollup" : true } }
②.query granularity:查詢(xún)的粒度通過(guò)queryGranularity配置在granularitySpec中略吨,在這里我們使用minute粒度。
"dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE" "rollup" : true } }
③.Define an interval:定義時(shí)間間隔考阱,在這個(gè)時(shí)間間隔之外的數(shù)據(jù)將不會(huì)被處理翠忠。注意,這個(gè)參數(shù)設(shè)置只在批處理中(batch)乞榨。interval需要在 granularitySpec中指定秽之。
"granularitySpec" : { "intervals" : ["2019-01-17/2019-01-18"] }
10.定義輸入數(shù)據(jù)的數(shù)據(jù)源
輸入數(shù)據(jù)的數(shù)據(jù)源在ioConfig中指定,每個(gè)任務(wù)類(lèi)型都有它自己的ioConfig吃既。本文采用從kafka中獲取數(shù)據(jù)政溃,ioConfig配置如下:
{ "type" : "index", "spec" : { "dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "rollup" : true } }, "ioConfig": { "topic": "druid-topic-book", "replicas": 1, "taskDuration": "PT5M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "host1:9092,host2:9092,host3:9092" } } } }
11.tuningConfig-額外的配置
每個(gè)攝入任務(wù)都有一個(gè)tuningConfig部分,讓開(kāi)發(fā)人員自行配置态秧。在這里根據(jù)輸入的數(shù)據(jù)源kafka來(lái)進(jìn)行配置tuningConfig董虱。type索引任務(wù)類(lèi)型,此處是kafka 申鱼。reportParseExceptions默認(rèn)是false愤诱,如果開(kāi)啟這個(gè)功能,當(dāng)攝入數(shù)據(jù)過(guò)程中出現(xiàn)數(shù)據(jù)異常將會(huì)導(dǎo)致攝入數(shù)據(jù)停止捐友。
"tuningConfig": { "type": "kafka", "reportParseExceptions": false }
12.下面是我們?cè)O(shè)置的攝入數(shù)據(jù)的規(guī)范淫半。
{ "type" : "index", "spec" : { "dataSchema" : { "dataSource" : "realtime_kafka_to_druid", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "format" : "auto", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "startIP", { "name" : "startPort", "type" : "long" }, { "name" : "endIP", "type" : "string" }, { "name" : "endPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "rollup" : true } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "topic": "druid-topic-book", "replicas": 1, "taskDuration": "PT5M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "host1:9092,host2:9092,host3:9092" } } } }
13.kafka的TuningConfig和IOConfig配置詳情可以參考:
http://druid.io/docs/0.12.3/development/extensions-core/kafka-ingestion.html
-
提交我們的task,然后查詢(xún)數(shù)據(jù)匣砖。
1.需要在Overlord節(jié)點(diǎn)執(zhí)行:
curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/kafka-druid/kafka-index-day-roll-up.json http://host1:8090/druid/indexer/v1/supervisor
2.此刻開(kāi)啟程序科吭,往kafka的topic=druid-topic-book中發(fā)送數(shù)據(jù),此代碼不做重點(diǎn)猴鲫。
3.上面的步驟執(zhí)行完之后对人,我們可以查看druid最終存入的數(shù)據(jù)。需要在broker節(jié)點(diǎn)執(zhí)行拂共。
①.rollup-select-sql.json內(nèi)容牺弄,注意查詢(xún)的DataSource名稱(chēng)
{ "query":"select * from \"realtime_kafka_to_druid\"" }
② 執(zhí)行
curl -X 'POST' -H 'Content-Type:application/json' -d @rollup-select-sql.json http://host2:8082/druid/v2/sql
③最終存入druid中的數(shù)據(jù):
[ { "__time": "2019-01-18T01:01:00.000Z", "bytes": 6000, "costTime": 4.9, "count": 3, "endIP": "2.2.2.2", "endPort": 3000, "packets": 60, "protocol": "6", "startIP": "1.1.1.1", "startPort": 2000 }, { "__time": "2019-01-18T01:02:00.000Z", "bytes": 9000, "costTime": 18.1, "count": 2, "endIP": "2.2.2.2", "endPort": 7000, "packets": 90, "protocol": "6", "startIP": "1.1.1.1", "startPort": 5000 }, { "__time": "2019-01-18T01:03:00.000Z", "bytes": 6000, "costTime": 4.3, "count": 1, "endIP": "2.2.2.2", "endPort": 7000, "packets": 60, "protocol": "6", "startIP": "1.1.1.1", "startPort": 5000 }, { "__time": "2019-01-18T02:33:00.000Z", "bytes": 30000, "costTime": 56.9, "count": 2, "endIP": "8.8.8.8", "endPort": 5000, "packets": 300, "protocol": "17", "startIP": "7.7.7.7", "startPort": 4000 }, { "__time": "2019-01-18T02:35:00.000Z", "bytes": 30000, "costTime": 46.3, "count": 1, "endIP": "8.8.8.8", "endPort": 5000, "packets": 300, "protocol": "17", "startIP": "7.7.7.7", "startPort": 4000 } ]