最近接到一個(gè)需求桦他,需要接入es日志數(shù)據(jù)到hdfs蔫巩,進(jìn)行分析,網(wǎng)上查找了一下資料瞬铸,總結(jié)一下方法大致有如下幾種
-
hive本身直接支持連接es
可直接參考鏈接 http://lxw1234.com/archives/2015/12/585.htm
說一下這種方式的弊端:- (a)批幌、es集群通常會為了安全考慮加入用戶認(rèn)證和證書認(rèn)證础锐,上述方式不支持
- (b)嗓节、hive定義表結(jié)構(gòu)的時(shí)候字段類型映射必須與es匹配,而當(dāng)es文檔type有字段類型變更之后皆警,hive無法很好的識別拦宣,這就會hive報(bào)類似類型轉(zhuǎn)換的錯(cuò)
-
es提供了兩種java api用來操作es
es的官方api地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html- (a)、transport接口即為TCP連接
因?yàn)榧鹤隽擞脩粽J(rèn)證和證書認(rèn)證信姓,采用如下方式連接es鸵隧,遺憾的是一直連不上
因?yàn)闀r(shí)間問題,暫時(shí)沒解決這個(gè)問題意推,希望有同學(xué)有空能幫忙解決豆瘫,謝謝了
- (a)、transport接口即為TCP連接
Exception in thread "main" NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{3HUrRF8JQGCz_TlwhQOFiA}{10.17.2.79}{10.17.2.79:9305}]]
Settings settings = Settings.builder()
.put("cluster.name", esDataToText.cluster)
.put("xpack.security.user", esDataToText.userPw)
.put("xpack.ssl.key", esDataToText.keyPath)
.put("xpack.ssl.certificate", esDataToText.crtPath)
.put("xpack.ssl.certificate_authorities", esDataToText.cacrtPath)
.put("xpack.security.transport.ssl.enabled", true)
.put("client.transport.ping_timeout", "100s")
.build();
try {
TransportClient client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esDataToText.urls), esDataToText.port));
SearchResponse response = client.prepareSearch("ndf.dlp")
.setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
SearchHits resultHits = response.getHits();
Long result_cnt = resultHits.totalHits;
logger.info("數(shù)據(jù)量為:" + result_cnt);
} catch (UnknownHostException e) {
e.printStackTrace();
}
- (b)、rest接口訪問es即為http接口
這種方式以http接口的形式訪問菊值,因?yàn)閑s集群是采用ssl認(rèn)證外驱,所以我們先進(jìn)行認(rèn)證- (1) 將證書文件合成jks文件,es官網(wǎng)API是操作KeyStore
keytool -import -v -trustcacerts -file niudingfeng.crt -keystore my_keystore.jks -keypass password -storepass password - (2) 用戶密碼驗(yàn)證以及https認(rèn)證
- (1) 將證書文件合成jks文件,es官網(wǎng)API是操作KeyStore
//用戶密碼驗(yàn)證
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("bigdata", "123456qwerty"));
//ssl證書驗(yàn)證
SSLContextBuilder sslBuilder = null;
try {
sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
} catch (KeyStoreException e) {
e.printStackTrace();
}
以上為認(rèn)證代碼
- (3) 連接es獲取數(shù)據(jù)
注意:http接口默認(rèn)返回十條數(shù)據(jù)腻窒,如需要返回更多則需要制定from size
因?yàn)閑s版本問題昵宇,無法用到官方j(luò)ava high level rest client,最低版本要求為5.6,故不推薦使用這種方式
RestClient restClient = RestClient.builder(new HttpHost("testelk002.niudingfeng.com", 9205, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setDefaultCredentialsProvider(credentialsProvider);
}
})
.build();
Response response = null;
try {
String method = "GET";
String endpoint = "/ndf.dlp/_search";
String queryStr = "{\n" +
"\t\t\"query\":{ \"range\": {\n" +
" \t\t\t\t\t\"@timestamp\": {\n" +
" \t\t\t\t\t\"gte\": \"2017-12-27\",\n" +
" \t\t\t\t\t\"lte\": \"2017-12-28\"\n" +
" \t\t\t\t\t\t\t}\n" +
" \t\t\t\t\t\t}\n" +
"\t\t\t\t}\n" +
"}";
// String queryStr = "{\"query\":{\"match_all\":{}}}";
HttpEntity entity = new NStringEntity(queryStr, ContentType.APPLICATION_JSON);
response = restClient.performRequest(method,endpoint,Collections.<String, String>emptyMap(),entity);
String res = EntityUtils.toString(response.getEntity());
String resFile = "D:\\java\\es\\res.txt";
File file = new File(resFile);
if(file.exists()){
file.delete();
}
BufferedWriter bw = new BufferedWriter(new FileWriter(resFile));
bw.write(res);
bw.close();
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
-
最后我們采用Python api來實(shí)現(xiàn)
Python查詢es也有兩種方式
- (a)儿子、search
res = es.search(index='index_name',
doc_type=’type_name’, body=es_query, request_timeout=999999,params={“search_type”:”query_and_fetch”})
說明:search返回的結(jié)果為字典不是生成器瓦哎,和在sense上查詢返回的結(jié)果相同,信息比較全,
如果數(shù)據(jù)量大蒋譬,分頁用from size控制割岛,但是會排序,性能比較差
- (b)犯助、helps.scan
es_client = es.Elasticsearch(
[host],
http_auth=(user, pswd),
port=port,
use_ssl=True,
verify_certs=False,
timeout=300)
res = helpers.scan(es_client, index=index, query=query, scroll='1m',request_timeout=999999,preserve_order=False)
說明:scan是對滿足語句的結(jié)果進(jìn)行掃描蜂桶,全部返回下來,結(jié)果為一個(gè)生成器需要解析也切,scroll為滾屏?xí)r間參數(shù)扑媚,不會進(jìn)行排序,建議使用這種方式