- 分詞器搜索全匹配查詢
- 低版本springboot集成es問題
- logstash同步問題
Elasticsearch是目前比較火的搜索引擎场靴,能夠做到快速的全文檢索尿庐。本文不涉及ES的原理等基礎(chǔ)知識(shí)忠怖,只是一篇關(guān)于SpringBoot如何集成Elasticsearch、使用logstash如何同步mysql數(shù)據(jù)庫中的數(shù)據(jù)到Elasticsearch的簡單入門教程抄瑟。
版本匹配
SpringBoot
提供了spring-boot-starter-data-elasticsearch
對(duì)Elasticsearch
的使用進(jìn)行了封裝凡泣,可以快速方便的使用提供的API
進(jìn)行操作,這是最簡單的集成以及操作Elasticsearch
方法。但是對(duì)于SpringBoot
以及Elasticsearch的版本有要求问麸,由于目前我們公司使用的還是SpringBoot1.5.3
的版本往衷,對(duì)應(yīng)的starter
只能支持Elasticsearch5.0
以下的版本,所以不能使用最新的7.x的Elasticsearch
严卖。
SpringBoot Version x | spring-boot-starter-data-elasticsearch Version y | Elasticsearch Version z |
---|---|---|
x < 2.0.0 | y < 2.0.0 | z < 5.0 |
x >= 2.0.0 | y >= 2.0.0 | z > 5.0 |
升級(jí)項(xiàng)目中的Springboot
版本不太現(xiàn)實(shí)席舍,而又想使用最新的Elasticsearch
,只能換一種方式集成哮笆,restClient
的集成方式来颤,這種方式對(duì)于版本的兼容性較好。restClient
有兩種稠肘,一種是low-level福铅,一種是high-level,兩者的原理基本一致项阴,區(qū)別最大的是封裝性滑黔,官方建議使用high-level
,而且low-level
將逐漸被廢棄环揽,所以我們使用elasticsearch-rest-high-level-client
進(jìn)行集成略荡。
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.0</version>
</dependency>
對(duì)于這種框架的整合,各種組件的版本一定要匹配上歉胶,如果不能對(duì)應(yīng)汛兜,會(huì)出現(xiàn)各種意想不到的情況,在這里我也是走了很多彎路才搞清楚通今。
Elasticsearch基本使用
配置host和端口
Elasticsearch
默認(rèn)的端口是9200和9300粥谬,9200是提供給http
方式連接的,9300對(duì)應(yīng)的是tcp
的方式連接辫塌,這里我們使用9200漏策。
spring:
elasticsearch:
host: 192.168.3.75
port: 9200
注入restHighLevelClient
新建一個(gè)配置類,讀取host
和port
臼氨,并創(chuàng)建一個(gè)restHighLevelClient
的bean
注入到spring
容器中哟玷。
@Configuration
public class EsConfig {
@Value("${spring.elasticsearch.port}")
private String port;
@Value("${spring.elasticsearch.host}")
private String host;
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(RestClient.builder(new HttpHost(host,Integer.parseInt(port))));
}
}
使用client進(jìn)行查詢
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("name","123"));
SearchRequest searchRequest = new SearchRequest("test_index");
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
searchResponse.getHits().forEach(i -> {
String jsonString = i.getSourceAsString();
//使用fastjson將json對(duì)象轉(zhuǎn)化為model
EsGoodsModel esGoodsModel = JSONObject.parseObject(jsonString,EsGoodsModel.class);
});
} catch (IOException e) {
e.printStackTrace();
}
這里只是簡單的展示了基本的使用,具體的查詢條件的封裝一也,像分頁、排序喉脖、條件查詢等都差不多椰苟。
使用Logstash同步數(shù)據(jù)
關(guān)于logstash
關(guān)于搜索數(shù)據(jù)的導(dǎo)入,我這里使用了官方推薦的logstash
树叽,還有一些其他的方式舆蝴,這里不做贅述。
使用logstash
最重要的是寫好conf
文件。他的格式如下
input {
}
filter {
}
output {
}
-
input
表示輸入的數(shù)據(jù)來源洁仗,可以是file
层皱、jdbc
、http
赠潦、kafka
叫胖、log4j
、redis
等很多途徑(具體可以查看
filter
主要是對(duì)數(shù)據(jù)來源進(jìn)行過濾她奥,轉(zhuǎn)換成json
格式瓮增,然后保存到Elasticsearch
中。filter
里面有很多的插件哩俭,具體官網(wǎng)有詳細(xì)的介紹绷跑,本次教程主要使用到Aggregate
聚合數(shù)據(jù)。output
是數(shù)據(jù)輸出到哪里凡资,也有很多中砸捏,本次使用輸出到Elasticsearch
中。
數(shù)據(jù)要求
目前我們項(xiàng)目中使用到的是對(duì)商品進(jìn)行檢索隙赁,商品中有一些屬性是來自于其他表垦藏,且可能有多條數(shù)據(jù),類似下面的數(shù)據(jù)結(jié)構(gòu)鸳谜。
{
"stock_info" : "300公斤",
"name" : "黃瓜",
"address" : "上海市普陀區(qū)",
"price" : "1.59",
"company_name" : "供應(yīng)商",
"number" : "SP484",
"plant_area" : "50",
"id" : "55010b5154f84a2fbec4056c185789ac",
"sl_url" : "黃瓜1_1584424256774.jpg",
"type" : 1,
"goodsLabelList" : [
{
"dictionary_value" : "綠色"
},
{
"dictionary_value" : "有機(jī)"
}
],
"attributeValueList" : [
{
"attribute_value" : "密刺黃瓜",
"attribute_id" : "68a40212b85c41019f843f8934bbbda5"
},
{
"attribute_value" : "嚴(yán)重皺縮",
"attribute_id" : "d50368f5fab442808dd27ee2c5361048"
},
{
"attribute_value" : "15~25cm",
"attribute_id" : "58771cbc4caf41789cf747c13fe755bb"
}
]
}
像這種goodsLabelList
對(duì)應(yīng)于Elasticsearch
就是嵌套的數(shù)據(jù)類型膝藕。下面就需要配置logstash的conf文件。
input
使用 jdbc
插件進(jìn)行輸入
input {
jdbc {
#這里指定connector的位置
jdbc_driver_library => "../mysql-connector-java-5.1.43-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.3.78:3306/guoxn_bab_test?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true&characterEncoding=utf8"
jdbc_user => "**"
jdbc_password => "**"
#定時(shí)時(shí)間咐扭,一分鐘一次
schedule => "* * * * *"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
record_last_run => true
use_column_value => true
#設(shè)置時(shí)區(qū)芭挽,如果默認(rèn)會(huì)有8個(gè)小時(shí)的時(shí)差
jdbc_default_timezone => "Asia/Shanghai"
#這里保存last_update_time也可以不指定
last_run_metadata_path => "../last_goods_record.txt"
#根據(jù)last_update_time進(jìn)行更新數(shù)據(jù)
tracking_column => "last_update_time"
tracking_column_type => "timestamp"
#sql省去了具體的查詢內(nèi)容,主要注意 :sql_last_value 的寫法
statement => "
SELECT .... and t1.last_update_time > :sql_last_value and t1.last_update_time < NOW() AND t1.is_delete=0 AND t1.type=1 order by t1.id desc"
}
}
jdbc
的輸入基本沒有什么問題蝗肪,主要是下面的filter
部分袜爪。
filter {
#使用aggregate進(jìn)行聚合數(shù)據(jù)
aggregate {
#task_id指定任務(wù)的id,來自于上面jdbc的sql查詢結(jié)果薛闪,進(jìn)行聚合的時(shí)候辛馆,一定要按照id進(jìn)行排序,不然可能導(dǎo)致數(shù)據(jù)的丟失
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['name'] = event.get('name')
map['sl_url'] = event.get('sl_url')
map['company_name'] = event.get('company_name')
map['description'] = event.get('description')
map['price'] = event.get('price')
map['stock_info'] = event.get('stock_info')
map['address'] = event.get('address')
map['number'] = event.get('number')
map['shelf_time'] = event.get('shelf_time')
map['shelf_stock'] = event.get('shelf_stock')
map['company_id'] = event.get('company_id')
map['plant_area'] = event.get('plant_area')
map['standard'] = event.get('standard')
map['shelf_state'] = event.get('shelf_state')
map['type'] = event.get('type')
#這里是關(guān)鍵豁延,areaListTemp是臨時(shí)集合昙篙,保存去重的數(shù)據(jù),然后遍歷到areaList中
map['areaListTemp'] ||= []
map['areaList'] ||= []
if(event.get('area_id') != nil)
if !(map['areaListTemp'].include? event.get('area_id'))
map['areaListTemp'] << event.get('area_id')
map['areaList'] << {
'area_id' => event.get('area_id')
}
end
end
map['labelList'] ||= []
map['goodsLabelList'] ||= []
if(event.get('dictionary_value') != nil)
if !(map['labelList'].include? event.get('dictionary_value'))
map['labelList'] << event.get('dictionary_value')
map['goodsLabelList'] << {
'dictionary_value' => event.get('dictionary_value')
}
end
end
map['attributeList'] ||= []
map['attributeValueList'] ||= []
if(event.get('attribute_id') != nil)
if !(map['attributeList'].include? event.get('attribute_id'))
map['attributeList'] << event.get('attribute_id')
map['attributeValueList'] << {
'attribute_id' => event.get('attribute_id'),
'attribute_value' => event.get('attribute_value')
}
end
end
map['cateList'] ||= []
map['categoryList'] ||= []
if(event.get('category_id') != nil)
if !(map['cateList'].include? event.get('category_id'))
map['cateList'] << event.get('category_id')
map['categoryList'] << {
'category_id' => event.get('category_id')
}
end
end
event.cancel()"
#使用聚合插件
push_previous_map_as_event => true
#超時(shí)時(shí)間诱咏,如果不設(shè)置苔可,logstash不知道什么時(shí)候會(huì)結(jié)束,會(huì)導(dǎo)致最后一條數(shù)據(jù)丟失袋狞。這里應(yīng)該有一個(gè)結(jié)束條件
#設(shè)置5秒是一個(gè)不嚴(yán)謹(jǐn)?shù)霓k法
timeout => 5
}
#這里刪除保存數(shù)據(jù)的臨時(shí)集合和生成的一些默認(rèn)的字段
mutate {
remove_field => ["@version","labelList","attributeList","cateList","areaListTemp"]
}
}
踩坑記錄
關(guān)于數(shù)據(jù)的聚合焚辅,這里我查找了很多資料映屋,試了很多的寫法,始終有問題要不是數(shù)據(jù)會(huì)有丟失同蜻,要不會(huì)出現(xiàn)數(shù)據(jù)的錯(cuò)亂的情況棚点。這里有幾個(gè)地方需要注意下
task_id
是sql查詢的id
,相當(dāng)于每一個(gè)id是一個(gè)task
湾蔓,正常我們使用聯(lián)表查詢的時(shí)候瘫析,因?yàn)橐粚?duì)多的關(guān)系,會(huì)生成多條記錄卵蛉,areaListTemp
保存了同一個(gè)id的多條數(shù)據(jù)中的label
字段的值颁股,并且進(jìn)行去重, 如果id
不是聚集在一起傻丝,可能導(dǎo)致臨時(shí)的集合還沒有保存完數(shù)據(jù)就被刪除甘有,導(dǎo)致數(shù)據(jù)的丟失。-
logstash會(huì)使用多線程進(jìn)行聚合任務(wù)葡缰,如果同一個(gè)聚合任務(wù)被多個(gè)線程分隔操作亏掀,最后聚合的過程中可能會(huì)丟失數(shù)據(jù),這里配置pipeline.yml文件泛释,設(shè)置工作線程為1滤愕。(這里可能出現(xiàn)性能問題)
pipeline.workers: 1
-
多個(gè)數(shù)據(jù)源同時(shí)輸入也有有坑,我這里需要維護(hù)兩個(gè)
index
怜校,所以需要使用兩個(gè)jdbc
间影,搜索網(wǎng)上的資料都是在一個(gè)conf
文件中寫,然后通過type
去區(qū)別不同的數(shù)據(jù)源分別處理,茄茁,類似下面的處理方法input { jdbc { #這里指定connector的位置 jdbc_driver_library => "../mysql-connector-java-5.1.43-bin.jar" .... type => goods statement => " SELECT .... and t1.last_update_time > :sql_last_value and t1.last_update_time < NOW() AND t1.is_delete=0 AND t1.type=1 order by t1.id desc" } jdbc { ... type => category ... } } output { //這里根據(jù)上面配置的type進(jìn)行不同的處理 if [type] == "goods" { elasticsearch { hosts => ["localhost: 9200"] index => "goods" document_id => "%{id}" } } if [type] == "category" { elasticsearch { hosts => ["localhost: 9200"] index => "category" document_id => "%{id}" } } }
但是我在7.6的版本中按照這樣的格式每次只能生成一個(gè)
index
魂贬,也沒有報(bào)錯(cuò),后來我配置了兩個(gè)conf
文件裙顽,然后在pineline.yml
中配置多個(gè)通道付燥,進(jìn)行處理- pipeline.id: goods path.config: "../config/goods.conf" pipeline.workers: 1 # pipeline.batch.size: 1000 # pipeline.output.workers: 3 # queue.type: persisted - pipeline.id: category path.config: "../config/category.conf"
每個(gè)
pineline
對(duì)應(yīng)一個(gè)conf
文件的解析,終于解決了問題愈犹。
-
對(duì)于嵌套類型的數(shù)據(jù)結(jié)構(gòu)键科,需要首先在
elasticsearch
中創(chuàng)建好index
的mapping
,否則logstash
不能自動(dòng)識(shí)別漩怎。具體mapping
格式如下{ "mappings": { "properties": { "address": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "areaList": { "type": "nested", "properties": { "area_id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "area_id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "attributeValueList": { "type": "nested", "properties": { "attribute_id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "attribute_value": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "categoryList": { "type": "nested", "properties": { "category_id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "company_id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "company_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "description": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "goodsLabelList": { "type": "nested", "properties": { "dictionary_value": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "name": { "type": "text", "analyzer": "ik_max_word", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "number": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "plant_area": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "price": { "type": "float", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "shelf_state": { "type": "long" }, "shelf_stock": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "shelf_time": { "type": "date" }, "sl_url": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "standard": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "stock_info": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "tags": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "type": { "type": "long" } } } }
這里主要注意
areaList
中的type
設(shè)置為nested
勋颖,如果需要使用分詞器的話,也可以設(shè)置好勋锤,例如name
字段使用了一個(gè)比較好用的中文ik
分詞器饭玲。