SpringBoot1.5.x集成Elasticsearch

  • 分詞器搜索全匹配查詢
  • 低版本springboot集成es問題
  • logstash同步問題

Elasticsearch是目前比較火的搜索引擎场靴,能夠做到快速的全文檢索尿庐。本文不涉及ES的原理等基礎(chǔ)知識(shí)忠怖,只是一篇關(guān)于SpringBoot如何集成Elasticsearch、使用logstash如何同步mysql數(shù)據(jù)庫中的數(shù)據(jù)到Elasticsearch的簡單入門教程抄瑟。

版本匹配

SpringBoot提供了spring-boot-starter-data-elasticsearch對(duì)Elasticsearch的使用進(jìn)行了封裝凡泣,可以快速方便的使用提供的API進(jìn)行操作,這是最簡單的集成以及操作Elasticsearch方法。但是對(duì)于SpringBoot以及Elasticsearch的版本有要求问麸,由于目前我們公司使用的還是SpringBoot1.5.3的版本往衷,對(duì)應(yīng)的starter只能支持Elasticsearch5.0以下的版本,所以不能使用最新的7.x的Elasticsearch严卖。

SpringBoot Version x spring-boot-starter-data-elasticsearch Version y Elasticsearch Version z
x < 2.0.0 y < 2.0.0 z < 5.0
x >= 2.0.0 y >= 2.0.0 z > 5.0

升級(jí)項(xiàng)目中的Springboot版本不太現(xiàn)實(shí)席舍,而又想使用最新的Elasticsearch,只能換一種方式集成哮笆,restClient的集成方式来颤,這種方式對(duì)于版本的兼容性較好。restClient有兩種稠肘,一種是low-level福铅,一種是high-level,兩者的原理基本一致项阴,區(qū)別最大的是封裝性滑黔,官方建議使用high-level,而且low-level將逐漸被廢棄环揽,所以我們使用elasticsearch-rest-high-level-client進(jìn)行集成略荡。

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>7.6.0</version>
</dependency>

對(duì)于這種框架的整合,各種組件的版本一定要匹配上歉胶,如果不能對(duì)應(yīng)汛兜,會(huì)出現(xiàn)各種意想不到的情況,在這里我也是走了很多彎路才搞清楚通今。

Elasticsearch基本使用

配置host和端口

Elasticsearch默認(rèn)的端口是9200和9300粥谬,9200是提供給http方式連接的,9300對(duì)應(yīng)的是tcp的方式連接辫塌,這里我們使用9200漏策。

spring:
    elasticsearch:
    host: 192.168.3.75
    port: 9200
注入restHighLevelClient

新建一個(gè)配置類,讀取hostport臼氨,并創(chuàng)建一個(gè)restHighLevelClientbean注入到spring容器中哟玷。

@Configuration
public class EsConfig {
    @Value("${spring.elasticsearch.port}")
    private String port;
    @Value("${spring.elasticsearch.host}")
    private String host;
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(RestClient.builder(new HttpHost(host,Integer.parseInt(port))));
    }
}
使用client進(jìn)行查詢
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("name","123"));
SearchRequest searchRequest = new SearchRequest("test_index");
searchRequest.source(searchSourceBuilder);
try {
  SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  searchResponse.getHits().forEach(i -> {
    String jsonString = i.getSourceAsString();
    //使用fastjson將json對(duì)象轉(zhuǎn)化為model
    EsGoodsModel esGoodsModel = JSONObject.parseObject(jsonString,EsGoodsModel.class);
  });
} catch (IOException e) {
  e.printStackTrace();
}

這里只是簡單的展示了基本的使用,具體的查詢條件的封裝一也,像分頁、排序喉脖、條件查詢等都差不多椰苟。

使用Logstash同步數(shù)據(jù)

關(guān)于logstash

關(guān)于搜索數(shù)據(jù)的導(dǎo)入,我這里使用了官方推薦的logstash树叽,還有一些其他的方式舆蝴,這里不做贅述。

使用logstash最重要的是寫好conf文件。他的格式如下

input {
  
}
filter {
  
}
output {
  
}
  • input表示輸入的數(shù)據(jù)來源洁仗,可以是file层皱、jdbchttp赠潦、kafka叫胖、log4jredis等很多途徑(具體可以查看
  • filter主要是對(duì)數(shù)據(jù)來源進(jìn)行過濾她奥,轉(zhuǎn)換成json格式瓮增,然后保存到Elasticsearch中。filter里面有很多的插件哩俭,具體官網(wǎng)有詳細(xì)的介紹绷跑,本次教程主要使用到Aggregate聚合數(shù)據(jù)。

  • output是數(shù)據(jù)輸出到哪里凡资,也有很多中砸捏,本次使用輸出到Elasticsearch中。

數(shù)據(jù)要求

目前我們項(xiàng)目中使用到的是對(duì)商品進(jìn)行檢索隙赁,商品中有一些屬性是來自于其他表垦藏,且可能有多條數(shù)據(jù),類似下面的數(shù)據(jù)結(jié)構(gòu)鸳谜。

{
    "stock_info" : "300公斤",
  "name" : "黃瓜",
  "address" : "上海市普陀區(qū)",
  "price" : "1.59",
  "company_name" : "供應(yīng)商",
  "number" : "SP484",
  "plant_area" : "50",
  "id" : "55010b5154f84a2fbec4056c185789ac",
  "sl_url" : "黃瓜1_1584424256774.jpg",
  "type" : 1,
  "goodsLabelList" : [
    {
      "dictionary_value" : "綠色"
    },
    {
      "dictionary_value" : "有機(jī)"
    }
  ],
  "attributeValueList" : [
    {
      "attribute_value" : "密刺黃瓜",
      "attribute_id" : "68a40212b85c41019f843f8934bbbda5"
    },
    {
      "attribute_value" : "嚴(yán)重皺縮",
      "attribute_id" : "d50368f5fab442808dd27ee2c5361048"
    },
    {
      "attribute_value" : "15~25cm",
      "attribute_id" : "58771cbc4caf41789cf747c13fe755bb"
    }
  ]
}

像這種goodsLabelList對(duì)應(yīng)于Elasticsearch就是嵌套的數(shù)據(jù)類型膝藕。下面就需要配置logstash的conf文件。

input使用 jdbc插件進(jìn)行輸入

input {
  jdbc {
    #這里指定connector的位置
    jdbc_driver_library => "../mysql-connector-java-5.1.43-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.3.78:3306/guoxn_bab_test?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true&characterEncoding=utf8"
    jdbc_user => "**"
    jdbc_password => "**"
    #定時(shí)時(shí)間咐扭,一分鐘一次
    schedule => "* * * * *"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    record_last_run => true
    use_column_value => true
    #設(shè)置時(shí)區(qū)芭挽,如果默認(rèn)會(huì)有8個(gè)小時(shí)的時(shí)差
    jdbc_default_timezone => "Asia/Shanghai"
    #這里保存last_update_time也可以不指定
    last_run_metadata_path => "../last_goods_record.txt"
    #根據(jù)last_update_time進(jìn)行更新數(shù)據(jù)
    tracking_column => "last_update_time"
    tracking_column_type => "timestamp"
    #sql省去了具體的查詢內(nèi)容,主要注意 :sql_last_value 的寫法
    statement => "
                SELECT .... and t1.last_update_time > :sql_last_value and t1.last_update_time < NOW() AND t1.is_delete=0 AND t1.type=1 order by t1.id desc"
    }
}

jdbc的輸入基本沒有什么問題蝗肪,主要是下面的filter部分袜爪。

filter {
            #使用aggregate進(jìn)行聚合數(shù)據(jù)
        aggregate {
                    #task_id指定任務(wù)的id,來自于上面jdbc的sql查詢結(jié)果薛闪,進(jìn)行聚合的時(shí)候辛馆,一定要按照id進(jìn)行排序,不然可能導(dǎo)致數(shù)據(jù)的丟失
            task_id => "%{id}"
            code => "
                map['id'] = event.get('id')
                map['name'] = event.get('name')
                map['sl_url'] = event.get('sl_url')
                map['company_name'] = event.get('company_name')
                map['description'] = event.get('description')
                map['price'] = event.get('price')
                map['stock_info'] = event.get('stock_info')
                map['address'] = event.get('address')
                map['number'] = event.get('number')
                map['shelf_time'] = event.get('shelf_time')
                map['shelf_stock'] = event.get('shelf_stock')
                map['company_id'] = event.get('company_id')
                map['plant_area'] = event.get('plant_area')
                map['standard'] = event.get('standard')
                map['shelf_state'] = event.get('shelf_state')
                map['type'] = event.get('type')
                            #這里是關(guān)鍵豁延,areaListTemp是臨時(shí)集合昙篙,保存去重的數(shù)據(jù),然后遍歷到areaList中
                map['areaListTemp'] ||= []
                map['areaList'] ||= []
                if(event.get('area_id') != nil)
                    if !(map['areaListTemp'].include? event.get('area_id'))
                        map['areaListTemp'] << event.get('area_id')
                        map['areaList'] << {
                            'area_id' => event.get('area_id')
                        }
                    end
                end
                map['labelList'] ||= []
                map['goodsLabelList'] ||= []
                if(event.get('dictionary_value') != nil)
                    if !(map['labelList'].include? event.get('dictionary_value'))
                        map['labelList'] << event.get('dictionary_value')
                        map['goodsLabelList'] << {
                            'dictionary_value' => event.get('dictionary_value')
                        }
                    end
                end
                map['attributeList'] ||= []
                map['attributeValueList'] ||= []
                if(event.get('attribute_id') != nil)
                    if !(map['attributeList'].include? event.get('attribute_id'))
                        map['attributeList'] << event.get('attribute_id')
                        map['attributeValueList'] << {
                            'attribute_id' => event.get('attribute_id'),
                            'attribute_value' => event.get('attribute_value')
                        }
                    end
                end
                map['cateList'] ||= []
                map['categoryList'] ||= []
                if(event.get('category_id') != nil)
                    if !(map['cateList'].include? event.get('category_id'))
                        map['cateList'] << event.get('category_id')
                        map['categoryList'] << {
                            'category_id' => event.get('category_id')
                        }
                    end
                end
            event.cancel()"
                        #使用聚合插件
            push_previous_map_as_event => true
                        #超時(shí)時(shí)間诱咏,如果不設(shè)置苔可,logstash不知道什么時(shí)候會(huì)結(jié)束,會(huì)導(dǎo)致最后一條數(shù)據(jù)丟失袋狞。這里應(yīng)該有一個(gè)結(jié)束條件
                        #設(shè)置5秒是一個(gè)不嚴(yán)謹(jǐn)?shù)霓k法
            timeout => 5
        }
                #這里刪除保存數(shù)據(jù)的臨時(shí)集合和生成的一些默認(rèn)的字段
        mutate  {
            remove_field => ["@version","labelList","attributeList","cateList","areaListTemp"]
            
    }
}

踩坑記錄

關(guān)于數(shù)據(jù)的聚合焚辅,這里我查找了很多資料映屋,試了很多的寫法,始終有問題要不是數(shù)據(jù)會(huì)有丟失同蜻,要不會(huì)出現(xiàn)數(shù)據(jù)的錯(cuò)亂的情況棚点。這里有幾個(gè)地方需要注意下

  • task_id 是sql查詢的id,相當(dāng)于每一個(gè)id是一個(gè)task湾蔓,正常我們使用聯(lián)表查詢的時(shí)候瘫析,因?yàn)橐粚?duì)多的關(guān)系,會(huì)生成多條記錄卵蛉,areaListTemp保存了同一個(gè)id的多條數(shù)據(jù)中的label字段的值颁股,并且進(jìn)行去重, 如果id不是聚集在一起傻丝,可能導(dǎo)致臨時(shí)的集合還沒有保存完數(shù)據(jù)就被刪除甘有,導(dǎo)致數(shù)據(jù)的丟失。

  • logstash會(huì)使用多線程進(jìn)行聚合任務(wù)葡缰,如果同一個(gè)聚合任務(wù)被多個(gè)線程分隔操作亏掀,最后聚合的過程中可能會(huì)丟失數(shù)據(jù),這里配置pipeline.yml文件泛释,設(shè)置工作線程為1滤愕。(這里可能出現(xiàn)性能問題)

    pipeline.workers: 1
    
  • 多個(gè)數(shù)據(jù)源同時(shí)輸入也有有坑,我這里需要維護(hù)兩個(gè)index怜校,所以需要使用兩個(gè)jdbc间影,搜索網(wǎng)上的資料都是在一個(gè)conf文件中寫,然后通過type去區(qū)別不同的數(shù)據(jù)源分別處理,茄茁,類似下面的處理方法

    input {
      jdbc {
        #這里指定connector的位置
        jdbc_driver_library => "../mysql-connector-java-5.1.43-bin.jar"
        ....
        type => goods
        statement => "
                    SELECT .... and t1.last_update_time > :sql_last_value and t1.last_update_time < NOW()           AND t1.is_delete=0 AND t1.type=1 order by t1.id desc"
        }
        jdbc {
        ...
        type => category
        ...
      }
    }
    output {
      //這里根據(jù)上面配置的type進(jìn)行不同的處理
      if [type] == "goods" {
         elasticsearch {
                    hosts => ["localhost: 9200"]   
                    index => "goods"
                    document_id => "%{id}"
        }
      }
        if [type] == "category" {
         elasticsearch {
                    hosts => ["localhost: 9200"]   
                    index => "category"
                    document_id => "%{id}"
        }
      }
    }
    

    但是我在7.6的版本中按照這樣的格式每次只能生成一個(gè)index魂贬,也沒有報(bào)錯(cuò),后來我配置了兩個(gè)conf文件裙顽,然后在pineline.yml中配置多個(gè)通道付燥,進(jìn)行處理

    - pipeline.id: goods
      path.config: "../config/goods.conf"
      pipeline.workers: 1
      # pipeline.batch.size: 1000
      # pipeline.output.workers: 3
      # queue.type: persisted
    
    - pipeline.id: category
      path.config: "../config/category.conf"
    

    每個(gè)pineline對(duì)應(yīng)一個(gè)conf文件的解析,終于解決了問題愈犹。

  • 對(duì)于嵌套類型的數(shù)據(jù)結(jié)構(gòu)键科,需要首先在elasticsearch中創(chuàng)建好indexmapping,否則logstash不能自動(dòng)識(shí)別漩怎。具體mapping格式如下

    {
        "mappings": {
            "properties": {
                "address": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "areaList": {
                    "type": "nested",
                    "properties": {
                        "area_id": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "type": "keyword",
                                    "ignore_above": 256
                                }
                            }
                        }
                    }
                },
                "area_id": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "attributeValueList": {
                    "type": "nested",
                    "properties": {
                        "attribute_id": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "type": "keyword",
                                    "ignore_above": 256
                                }
                            }
                        },
                        "attribute_value": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "type": "keyword",
                                    "ignore_above": 256
                                }
                            }
                        }
                    }
                },
                "categoryList": {
                    "type": "nested",
                    "properties": {
                        "category_id": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "type": "keyword",
                                    "ignore_above": 256
                                }
                            }
                        }
                    }
                },
                "company_id": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "company_name": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "description": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "goodsLabelList": {
                    "type": "nested",
                    "properties": {
                        "dictionary_value": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "type": "keyword",
                                    "ignore_above": 256
                                }
                            }
                        }
                    }
                },
                "id": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "name": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "number": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "plant_area": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "price": {
                    "type": "float",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "shelf_state": {
                    "type": "long"
                },
                "shelf_stock": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "shelf_time": {
                    "type": "date"
                },
                "sl_url": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "standard": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "stock_info": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "tags": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "type": {
                    "type": "long"
                }
            }
        }
    }
    

    這里主要注意areaList中的type設(shè)置為nested勋颖,如果需要使用分詞器的話,也可以設(shè)置好勋锤,例如name字段使用了一個(gè)比較好用的中文ik分詞器饭玲。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市怪得,隨后出現(xiàn)的幾起案子咱枉,更是在濱河造成了極大的恐慌,老刑警劉巖徒恋,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蚕断,死亡現(xiàn)場離奇詭異,居然都是意外死亡入挣,警方通過查閱死者的電腦和手機(jī)亿乳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來径筏,“玉大人葛假,你說我怎么就攤上這事∽烫瘢” “怎么了聊训?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長恢氯。 經(jīng)常有香客問我带斑,道長,這世上最難降的妖魔是什么勋拟? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任勋磕,我火速辦了婚禮,結(jié)果婚禮上敢靡,老公的妹妹穿的比我還像新娘瓶颠。我一直安慰自己望拖,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著过吻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪包帚。 梳的紋絲不亂的頭發(fā)上若皱,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音柠辞,去河邊找鬼团秽。 笑死,一個(gè)胖子當(dāng)著我的面吹牛叭首,可吹牛的內(nèi)容都是我干的习勤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼焙格,長吁一口氣:“原來是場噩夢啊……” “哼图毕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起眷唉,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤予颤,失蹤者是張志新(化名)和其女友劉穎囤官,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛤虐,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡党饮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了驳庭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片刑顺。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖饲常,靈堂內(nèi)的尸體忽然破棺而出蹲堂,到底是詐尸還是另有隱情,我是刑警寧澤贝淤,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布柒竞,位于F島的核電站,受9級(jí)特大地震影響霹娄,放射性物質(zhì)發(fā)生泄漏能犯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一犬耻、第九天 我趴在偏房一處隱蔽的房頂上張望踩晶。 院中可真熱鬧,春花似錦枕磁、人聲如沸渡蜻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽茸苇。三九已至,卻和暖如春沦寂,著一層夾襖步出監(jiān)牢的瞬間学密,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來泰國打工传藏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留腻暮,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓毯侦,卻偏偏與公主長得像哭靖,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子侈离,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容