storm如何把數(shù)據(jù)插入到elasticsearch
1 storm提供的例子
https://github.com/apache/storm/tree/master/external/storm-elasticsearch
代碼:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.1.0</version>
</dependency>
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
問(wèn)題:依賴低版本的Elasticsearch 這個(gè)問(wèn)題沒(méi)有解決
查看最新代碼已經(jīng)修復(fù)了 沒(méi)提供jar包
需要重新編譯storm1.10代碼 直接放棄了 采用下面方法
方法2 elasticsearch-hadoop
疑問(wèn)?
僅僅支持hadoop嗎 storm支持嗎我要的是storm物独?
ES-Hadoop無(wú)縫打通了ES和Hadoop兩個(gè)非常優(yōu)秀的框架棚瘟,我們既可以把HDFS的數(shù)據(jù)導(dǎo)入到ES里面做分析,也可以將es數(shù)據(jù)導(dǎo)出到HDFS上做備份吟孙,歸檔,其中值得一提的是ES-Hadoop全面的支持了Spark框架聚蝶,
其中包括Spark(五角星那個(gè)上面中間位置)
- 支持Hive(像蜜蜂的那個(gè)下面最左位置)
- 支持Cascading(有五個(gè)豎線那個(gè) 上面最右位置)
Cascading is the proven application development platform for
building data applications on Hadoop. - Storm(閃電的那個(gè))
- 當(dāng)然還有標(biāo)準(zhǔn)的MapReduce杰妓,
無(wú)論用那一個(gè)框架集成ES,都是非常簡(jiǎn)潔的碘勉。
疑問(wèn):
為了使用這個(gè)jar 是否引用一系列相關(guān)的jar呀
經(jīng)過(guò)驗(yàn)證不需要引入hadoop 但是json和http引入
折騰不起
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
<version>1.8.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.0</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>5.5.1</version>
</dependency>
-->
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.5.1</version>
</dependency>
代碼實(shí)現(xiàn)
配置文件:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>
方法3 elasticsearch 官方提供的例子
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>5.5.1</version>
</dependency>
閱讀代碼:
關(guān)鍵類:TransportClient
Elasticsearch uses standard RESTful APIs and JSON.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
SearchResponse sr = client.prepareSearch()
.setQuery(QueryBuilders.matchQuery("message", "myProduct"))
.addAggregation(AggregationBuilders.terms("top_10_states")
.field("state").size(10))
.execute().actionGet();
client.close();
es Elasticsearch from Storm
http://blog.csdn.net/sunnyyoona/article/details/52860861
https://www.elastic.co/guide/en/elasticsearch/guide/current/dynamic-mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/dynamic-field-mapping.html#date-detection
參考
Elasticsearch for Apache Hadoop
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.htmlelasticsearch-storm
https://github.com/swapnilkumbhar1602/Storm_to_ElasticSearch_Kibanastorm-elasticsearch
https://github.com/apache/storm/tree/master/external/storm-elasticsearch
-Mapping and Types
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html#_multi_fields_2
http://www.reibang.com/p/ab99d2bcd63d
http://blog.csdn.net/sunnyyoona/article/details/52860861
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/ip.html時(shí)間類型
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/date.html
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/mapping-date-format.html#strict-date-time
http://blog.csdn.net/macavalier/article/details/17632491
https://stackoverflow.com/questions/29938237/java-8-date-and-time-api-parse-yyyy-mm-ddthhmmss-sssz
https://www.w3.org/TR/NOTE-datetime
storm連接kafka
//重點(diǎn)
Storm 如何來(lái)封裝kafka接口
class:DynamicPartitionConnections