導入json數(shù)據(jù)到es
- 每條記錄必須有一條對應的index聲明兆解。json文件應符合Bulk API要求
- json文件中每條數(shù)據(jù)前需包含index信息(index/type/id...)
curl -H 'Content-Type: application/x-ndjson' -s -XPOST localhost:9200/_bulk --data-binary @accounts.json
{"index":{"_index":"index2","_type":"type2","_id":0}}
{"age":10,"name":"jim"}
{"index":{"_index":"index2","_type":"type2","_id":1}}
{"age":16,"name":"tom"}
- 若導入文件中為同一個index(/index/type)绊序,則可在url中聲明默認index(/index/type)放仗,則json文件中無須聲明index和type济欢,同時牧嫉,支持部分記錄顯示聲明index和type烫映。
curl -H 'Content-Type: application/x-ndjson' -s -XPOST localhost:9200/index1/_bulk --data-binary @accounts.json #URL中聲明默認index
curl -H 'Content-Type: application/x-ndjson' -s -XPOST localhost:9200/index1/type1/_bulk --data-binary @accounts.json #URL中聲明默認index/type
{"index":{}
{"age":6,"name":"bob"}
{"index":{"_id":"2"}}
{"age":10,"name":"jim"}
{"index":{"_id":"6"}}
{"index":{"_type":"type2","_id":1}}
{"age":16,"name":"tom"}
{"index":{"_index":"index2","_type":"type3","_id":1}}
{"age":20,"name":"lucy"}
- 可直接導入符合格式要求的json让簿,無須事先創(chuàng)建mapping,es可根據(jù)json數(shù)據(jù)自動創(chuàng)建漠畜。
也可事先聲明mapping币他,,進行自定義設(shè)置 - curl命令導入示例
導入shapefile到es
查閱眾多資料后憔狞,理論上有如下幾種方式:
- 使用GDAL直接將shapefile導入ES蝴悉,失敗
- 使用GDAL(或ArcMap)將shapefile導成json文件,使用curl命令導入json文件
說明:生成的json文件格式不符合Bulk API要求瘾敢,需處理后方能導入 - 使用Arcpy編寫腳本拍冠,將shapefile導出成符合Bulk要求的json,再使用curl命令導入(或腳本實現(xiàn))
- 使用ArcMap導出geojson(或使用Arcpy)簇抵,使用python(java)解析json文件庆杜,使用Bulk API編寫腳本導入ES
其中需注意的問題為:
- 使用GDAL或ArcMap導出的geojson格式和Bulk API要求的數(shù)據(jù)格式不同
- 數(shù)據(jù)需符合Geo-shape datatype或Geo-point datatype方能使用curl命令導入
綜上,最終選擇使用第4種方法解決正压,使用GDAL將shapefile導出成geojson文件欣福,再使用python elasticsearch bulk API編寫腳本责球,解析geojson并導入ES焦履。
1. shapefile to es(失斖厝啊)
GDAL for ES Driver提供shapefile直接導入ES的方法,但導入時報錯
ogr2ogr -f "ElasticSearch" http://localhost:9200 my_shapefile.shp
#ERROR 1: HTTP error code : 405
#ERROR 8: Could not connect to server
#ElasticSearch driver failed to create http://localhost:9200/
2. 通過json文件中轉(zhuǎn)(需處理json文件格式)
另外一種方式為使用ogr2ogr工具將shapefile轉(zhuǎn)換為geojson文件嘉裤,再將geojson文件導入ES郑临。
2.1 shapefile to geojson
How to convert and import Arc Shapefile from Zillow into an elastic search database?
Ask
ogr2ogr -f GeoJSON map1.geojson map.shp
生成的json文件格式如下,不包含index信息屑宠,Bulk API無法直接導入厢洞。
{
"type": "FeatureCollection",
"features": [
{ "type": "Feature", "properties": { "ID_0": 45, "NAME_1": "Anhui" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 119.632111, 31.139344000000108 ], [ 119.644439000000148, 31.115657 ], [ 119.624672, 31.084624000000133 ] ] ] } }
{ "type": "Feature", "properties": { "ID_0": 45, "NAME_1": "Beijing" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 117.379737, 40.226871 ], [ 117.382896, 40.208718000000147 ], [ 117.369484, 40.190997 ] ] ] } }
]
}
2.2 create index mapping
Indexing Geo Shapes
可針對geo字段和重點字段進行mapping,其余字段導入時自動生成典奉。
PUT /gis1
{
"mappings": {
"province1": {
"properties": {
"location": {
"type": "geo_shape"
},
"ID_0": {
"type": "text"
},
"NAME_1": {
"type": "text"
}
}
}
}
}
2.3 import json
curl -H 'Content-Type: application/x-ndjson' -XPOST 'http://localhost:9200/gis1/province1/_bulk?pretty' --data-binary @map1.geojson
#報錯:"type":"json_e_o_f_exception","reason":"Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@1ffa375d; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@1ffa375d; line: 1, column: 3]"
需將數(shù)據(jù)改造成如下格式后躺翻,再使用bulk命令導入。
{"index":{"_id":"1"}}
{ "type": "Feature", "properties": { "ID_0": 45, "NAME_1": "Zhejiang" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 121.187362, 27.86903100000012 ], [ 121.190140000000156, 27.847919 ], [ 121.156249, 27.823749000000134 ]] ] ] } }
{"index":{"_id":"2"}}
{ "type": "Feature", "properties": { "ID_0": 46, "NAME_1": "Jiangsu" }, "geometry": { "type": "MultiPolygon", "coordinates": [ [ [ [ 121.187362, 27.86903100000012 ], [ 121.190140000000156, 27.847919 ], [ 121.156249, 27.823749000000134 ]] ] ] } }
4. python腳本導入GDAL生成的geojson文件
# rasa 20180212
# 使用ES python api插入geojson面數(shù)據(jù)(map.geojson)
import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
def set_mapping(es, index_name="content_engine", doc_type_name="en"):
my_mapping = {
doc_type_name: {
"properties": {
"location": {
"type": "geo_shape"
},
"ID_0": {
"type": "text"
},
"NAME_1": {
"type": "text"
}
}
}
}
# ignore 404 and 400
es.indices.delete(index=index_name, ignore=[400, 404])
print("delete_index")
# ignore 400 cause by IndexAlreadyExistsException when creating an index
create_index = es.indices.create(index=index_name, ignore=400)
mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping)
if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
print("Index creation failed...")
def set_data(es, input_file, index_name, doc_type_name="en"):
with open(input_file, 'r') as f:
data = json.load(f)
features = data["features"]
ACTIONS = []
i = 0
count = 0
for feature in features:
action = {}
if (feature["geometry"]["type"] == "Polygon"): # 判斷geometry類型為polygon
action = {
"_index": index_name,
"_type": doc_type_name,
"_source": {
"ID_0": feature["properties"]["ID_0"],
"NAME_1": feature["properties"]["NAME_1"],
"location": {
"type": "polygon",
"coordinates": feature["geometry"]["coordinates"]
}
}
}
else: # geometry類型為multipolygon
action = {
"_index": index_name,
"_type": doc_type_name,
"_source": {
"ID_0": feature["properties"]["ID_0"],
"NAME_1": feature["properties"]["NAME_1"],
"location": {
"type": "multipolygon",
"coordinates": feature["geometry"]["coordinates"]
}
}
}
i += 1
print("prepare insert: %s" % feature["properties"]["NAME_1"])
print("type: %s" % feature["geometry"]["type"])
ACTIONS.append(action)
if (i == 5):
success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
count += success
i = 0
ACTIONS = []
print("insert %s lines" % count)
success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
count += success
print("insert %s lines" % count)
if __name__ == '__main__':
# es = Elasticsearch(hosts=["127.0.0.1:9200"], http_auth=('elastic','changeme'),timeout=5000)
es = Elasticsearch(hosts=["127.0.0.1:9200"], timeout=5000)
set_mapping(es, "gis6", "province")
# geojson文件為ogr2ogr生成格式
# set_data(es, "./data/map-fujian.geojson", "gis6", "province") # multipolygon
# set_data(es, "./data/map-anhui.geojson", "gis6", "province") # polygon
set_data(es, "./data/map-full.geojson", "gis6", "province") # polygon & multipolygon
# set_data(es, "./data/map.geojson", "gis6", "province") # polygon
問題
1.使用python bulk導入multipolygon時報錯:Invalid LinearRing found. Found a single coordinate when expecting a coordinate array
sourcecode 638行
原因:
- 同一個geojson文件中存在存在類型為polygon和multipolygon的feature
- 設(shè)定mapping為polygon卫玖,插入multipolygon數(shù)據(jù)會報錯:
invalid number of points in LinearRing (found [1] - must be >= [4])公你; - 設(shè)定mapping為multipolygon,插入polygon數(shù)據(jù)會報錯:
Invalid LinearRing found. Found a single coordinate when expecting a coordinate array
解決方法:
判斷feature的類型假瞬,創(chuàng)建的mapping不同陕靠,ES支持同一個type中同時存儲polygon/multipolygon