ES中的MR

elastic search真是個(gè)讓人既愛(ài)又恨的東西疲牵,性能強(qiáng)勁,功能強(qiáng)大螟碎,但就是在使用中遇到種種問(wèn)題(多半因?yàn)槲臋n太差)鹉勒。
文章記錄一下在es 2.2.0版本中使用Scripted Metric Aggregation(也就是牛X的map-reduce)的方法。
api這是官方文檔,但是并不詳細(xì)逐虚,看完并不能干出什么事來(lái).這是java api

下面貼出完整的實(shí)踐內(nèi)容和代碼(敏感內(nèi)容已抹去)聋溜,目的是根據(jù)行為日志得出活躍分值

  • 在elasticsearch.yml文件中添加配置啟用groovy腳本
script.engine.groovy.file.aggs: true
script.engine.groovy.file.mapping: true
script.engine.groovy.file.search: true
script.engine.groovy.file.update: true
script.engine.groovy.file.plugin: true
script.engine.groovy.indexed.aggs: true
script.engine.groovy.indexed.mapping: false
script.engine.groovy.indexed.search: true
script.engine.groovy.indexed.update: false
script.engine.groovy.indexed.plugin: false
script.engine.groovy.inline.aggs: true
script.engine.groovy.inline.mapping: true
script.engine.groovy.inline.search: true
script.engine.groovy.inline.update: true
script.engine.groovy.inline.plugin: true
  • 這是es 中保存的數(shù)據(jù)
curl -XPUT "http://10.1.200.34:9200/behavior-2017.02/candidate/AVoG9St-6pLzqkumYcIr" -d '
{
    "businessLine": "platform",
    "createTime": "2017-02-04T02:30:14.000Z",
    "latitude": 0,
    "longitude": 0,
    "name": "c_login",
    "network": "unknown",
    "ownerId": 6403128,
    "ownerType": "candidate",
    "params": {
      "positionId": "2112620"
    },
    "uuid": "a36556ed286348aeb970e0ba1cda1447"
}'
curl -XPUT "http://10.1.200.34:9200/behavior-2017.02/candidate/AVoG9SgJP_y-H6mvM9g8" -d '
{
    "businessLine": "platform",
    "clientIp": "*3.1*8.113.*6",
    "createTime": "2017-02-04T02:30:13.683Z",
    "latitude": 0,
    "longitude": 0,
    "name": "c_login",
    "network": "unknown",
    "ownerId": 6403118,
    "ownerType": "candidate",
    "terminal": "pc",
    "userAgent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",
    "uuid": "0d9b007e3d624180aefd57e7df0b656c",
}'
curl -XPUT "http://10.1.200.34:9200/behavior-2017.02/candidate/AVoG9SgJP_y-H6mvM9g1" -d '
{
    "businessLine": "platform",
    "clientIp": "*23.*8.*3.1*",
    "createTime": "2017-02-04T02:30:13.683Z",
    "latitude": 0,
    "longitude": 0,
    "name": "c_register",
    "network": "unknown",
    "ownerId": 6403127,
    "ownerType": "candidate",
    "terminal": "pc",
    "userAgent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",
    "uuid": "0d9b007e3d624180aefd57e7df0b656c",
}'
  • 這是查詢(xún)用的腳本,以id為分組叭爱,統(tǒng)計(jì)行為信息撮躁。
curl -XGET "http://10.1.200.34:9200/behavior*/_search/?pretty" -d '
{
  "aggs": {
    "group_by_ownerId" : { 
        "terms" : {
         "field" : "ownerId" 
         },
         "aggs":{
          "livenessScore": {
            "scripted_metric": {
                "init_script" : {"file": "user-liveness-score-init"},
                "map_script" : {"file": "user-liveness-score-map"},
                "combine_script" : {"file": "user-liveness-score-combine"},
                "reduce_script" : {"file": "user-liveness-score-reduce"},
                "params": {
                    "_agg": {"resumeScore":{6403127:60}}
                }
            }
        }
         } 
      }
  },
  "query": {
    "filtered": {
      "query": {
        "match_all": {}
      }
    }
  },
  "fields": [
    "ownerId"
  ]
}'
  • 腳本及腳本的存放位置
[root@jiqi001 scripts]# pwd
/apps/elasticsearch/config/scripts
[root@jiqi001 scripts]# ls
user-liveness-score-combine.groovy  user-liveness-score-init.groovy  user-liveness-score-map.groovy  user-liveness-score-reduce.groovy
_agg.loginScoreInWeek=0;
_agg.loginScoreInMonth=0;
_agg.registerScoreInWeek=0;
_agg.registerScoreInMonth=0;
~                      
"user-liveness-score-init.groovy" 15L, 383C
xDaysBefore = Math.round((new Date().getTime() - doc.createTime) / 1000 / 60 / 60 / 24);
behaviorName = doc.name.value;
resumeScore = _agg.resumeScore.get(String.valueOf(doc.ownerId.value));
if (behaviorName.equals("c_login")) {
    if (xDaysBefore <= 7) {
        if (_agg.loginScoreInWeek < 4) {
            _agg.loginScoreInWeek += 2;
        }
    } else if (7 < xDaysBefore && xDaysBefore <= 30) {
        if (_agg.loginScoreInMonth < 2) {
            _agg.loginScoreInMonth += 1;
        }
    }
}  else if (behaviorName.equals("c_register")) {
    if (resumeScore != null && resumeScore > 30) {
        if (xDaysBefore <= 7) {
            if (_agg.registerScoreInWeek == 0) {
                _agg.registerScoreInWeek = 5;
            }
        } else if (7 < xDaysBefore && xDaysBefore <= 30) {
            if (_agg.registerScoreInMonth == 0) {
                _agg.registerScoreInMonth = 3;
            }
        }
    }
};
~                                                                                                                                                                                                                                       
"user-liveness-score-map.groovy" 66L, 2657C
```
```shell
_agg
~                                                                                                             
"user-liveness-score-combine.groovy" 1L, 5C
```
````shell
double score = 0;
loginScoreInWeek=0;
loginScoreInMonth=0;
registerScoreInWeek=0;
registerScoreInMonth=0;
for (a in _aggs) {
  if(loginScoreInWeek<4){
    loginScoreInWeek += a.get("loginScoreInWeek");
  };
  if(loginScoreInMonth<2){
    loginScoreInMonth += a.get("loginScoreInMonth");
  };
  if(registerScoreInWeek<5){
    registerScoreInWeek += a.get("registerScoreInWeek");
  };
  if(registerScoreInMonth<3){
    registerScoreInMonth += a.get("registerScoreInMonth");
  };
 };
  if(loginScoreInWeek>4){
    loginScoreInWeek =4;
  };
  if(loginScoreInMonth>2){
    loginScoreInMonth =2;
  };
  if(registerScoreInWeek>5){
    registerScoreInWeek =5;
  };
  if(registerScoreInMonth>3){
    registerScoreInMonth =3;
  };
 score += loginScoreInWeek;
 score += loginScoreInMonth;
 score += registerScoreInWeek;
 score += registerScoreInMonth;
 return score;
"user-liveness-score-reduce.groovy"
```

- Java代碼
```java
    private Map<String, Double> getUsersLiveScore(Map<String, Object> userAndResumeScore) throws InterruptedException, ExecutionException {
        Map<String, Double> userAndLivenessScore = new HashMap<>();
        Map<String, Object> param = new HashMap<>();
        param.put("resumeScore", userAndResumeScore);
        Map<String, Object> params = new HashMap<>();
        params.put("_agg", param);
        Client client = eSClient.getClient();
        AggregationBuilder aggregation = AggregationBuilders.terms("group_by_ownerId")
                .field("ownerId")
                .subAggregation(
                        AggregationBuilders.scriptedMetric("livenessScore")
                                .params(params)
                                .initScript(new Script("user-liveness-score-init", ScriptService.ScriptType.FILE, "groovy", null))
                                .mapScript(new Script("user-liveness-score-map", ScriptService.ScriptType.FILE, "groovy", null))
                                .combineScript(new Script("user-liveness-score-combine", ScriptService.ScriptType.FILE, "groovy", null))
                                .reduceScript(new Script("user-liveness-score-reduce", ScriptService.ScriptType.FILE, "groovy", null))
                );
        TermsQueryBuilder ownerId = QueryBuilders.termsQuery("ownerId", userAndResumeScore.keySet());
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

        boolQueryBuilder.must(ownerId);
        boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(DateTime.now().plusDays(-30).toDate()));
        SearchResponse response = client.prepareSearch("behavior-*")
                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                .setQuery(boolQueryBuilder)
                .addAggregation(aggregation)
                .setFrom(0)
                .setSize(3000)
                .addField("ownerId")
                .execute()
                .get();
        for (Aggregation agg : response.getAggregations()) {
            List<Terms.Bucket> buckets = ((LongTerms) agg).getBuckets();
            for (Terms.Bucket bucket : buckets) {
                String userId = bucket.getKeyAsString();
                for (Aggregation agg2 : bucket.getAggregations()) {
                    double score = (double) ((InternalScriptedMetric) agg2).aggregation();
                    userAndLivenessScore.put(String.valueOf(userId), score);
                }
            }
        }
        return userAndLivenessScore;
    }
```
- Tips
1.參數(shù)param必須放在_agg變量里。
2.可以用"combine_script":"_agg;","reduce_script":"_aggs;"來(lái)調(diào)試腳本买雾。
3.combine組合的結(jié)果會(huì)以分片為分組把曼,并非整個(gè)查詢(xún)結(jié)果的組合.比如查詢(xún)一個(gè)index如果在5個(gè)分片上有結(jié)果則返回一個(gè)長(zhǎng)度為5的數(shù)組,
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市漓穿,隨后出現(xiàn)的幾起案子嗤军,更是在濱河造成了極大的恐慌,老刑警劉巖晃危,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叙赚,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡僚饭,警方通過(guò)查閱死者的電腦和手機(jī)震叮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)鳍鸵,“玉大人苇瓣,你說(shuō)我怎么就攤上這事〕ス裕” “怎么了钓简?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)汹想。 經(jīng)常有香客問(wèn)我,道長(zhǎng)撤蚊,這世上最難降的妖魔是什么古掏? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮侦啸,結(jié)果婚禮上槽唾,老公的妹妹穿的比我還像新娘。我一直安慰自己光涂,他們只是感情好庞萍,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著忘闻,像睡著了一般钝计。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,541評(píng)論 1 305
  • 那天私恬,我揣著相機(jī)與錄音债沮,去河邊找鬼。 笑死本鸣,一個(gè)胖子當(dāng)著我的面吹牛疫衩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播荣德,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼闷煤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了涮瞻?” 一聲冷哼從身側(cè)響起鲤拿,我...
    開(kāi)封第一講書(shū)人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎饲宛,沒(méi)想到半個(gè)月后皆愉,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡艇抠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年幕庐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片家淤。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡异剥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出絮重,到底是詐尸還是另有隱情冤寿,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布青伤,位于F島的核電站督怜,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏狠角。R本人自食惡果不足惜号杠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望丰歌。 院中可真熱鬧姨蟋,春花似錦、人聲如沸立帖。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)晓勇。三九已至堂飞,卻和暖如春灌旧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背酝静。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工节榜, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人别智。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓宗苍,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親薄榛。 傳聞我的和親對(duì)象是個(gè)殘疾皇子讳窟,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容