嗯师郑,最近實(shí)在是閑的有點(diǎn)不知所措了
背景介紹
elasticsearch-sql插件是之前發(fā)現(xiàn)的一款可以用sql來代替es本身令人頭疼的語法的插件寥院。es的查詢一般來說是使用curl去訪問它的rest接口,大部分情況下如果我需要查詢一些數(shù)據(jù)我都必須打開head插件然后小心謹(jǐn)慎的編寫json查詢字符串嗡靡,時(shí)不時(shí)的還要判斷自己時(shí)不時(shí)多了少了大括號逗號冀宴,其中苦悶可想而知李皇;并且es的查詢語法畢竟也沒有到像sql一樣可以熟練祟印,每次查詢的時(shí)候還是要去復(fù)制以前的模板過來修改肴沫。我個(gè)人為了工作是有收藏一些常用的查詢語句拿來改的,但該插件可以使用sql語句去查詢es索引蕴忆,方便之余便也想探究它的源碼颤芬。
依賴介紹
- elasticsearch5.6.10(快速構(gòu)造集群方法可以參考我之前的文章使用docker-compose構(gòu)建elasticsearch集群)
- idea (當(dāng)然你可以使用別的ide)
- elasticsearch-sql插件5.6.10.0
過程分析
搭建環(huán)境
首先還是一樣,訪問elasticsearch-sql的github地址套鹅,很意外的看到這是在NLPchina賬號的倉庫下站蝠,居然是國產(chǎn)的作品!那么更值得去分析一下了卓鹿。下方的readme也提示了不同版本之間的對應(yīng)關(guān)系菱魔,目前支持的最新版本是6.3.0.不過我最近在測試的是5.6.10版本。所以到本地目錄做如下操作
git clone xxxx
git tag
git check 5.6.10.0
OK吟孙,我們成功check到5.6.10版本的源代碼澜倦。接下來打開IDEA進(jìn)行Import。導(dǎo)入過程中無腦next就完事了杰妓≡逯危看文件目錄的文件有pom文件所以可以很清晰的確認(rèn)該項(xiàng)目是由maven管理,剩下的就按照平時(shí)管理maven項(xiàng)目的方式進(jìn)行處理就可以了巷挥。
總體分析
首先可以看下工程的整個(gè)大致目錄結(jié)構(gòu)
├── BUILDING.md
├── LICENSE
├── README.md
├── doc
│ └── features.md
├── elasticsearch-sql.iml
├── open-source.pom.xml
├── pom.xml
├── src
│ ├── _site
│ ├── assembly
│ ├── main
│ ├── site-server
│ └── test
└── target
├── classes
├── generated-sources
├── generated-test-sources
└── test-classes
對整個(gè)工程會(huì)有一個(gè)大致的了解栋艳,然后打開pom文件瀏覽整個(gè)工程的依賴構(gòu)成。稍微會(huì)關(guān)注幾個(gè)依賴句各,比如es依賴包的版本是否正確吸占。但這個(gè)時(shí)候看到一個(gè)比較出乎我意料的依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.15</version>
</dependency>
我個(gè)人因?yàn)楣ぷ鞯年P(guān)系和關(guān)系型數(shù)據(jù)庫們打的交道不多,但是這個(gè)大名鼎鼎的產(chǎn)品我還是知道的
Druid是一個(gè)JDBC組件庫凿宾,包括數(shù)據(jù)庫連接池矾屯、SQL Parser等組件。DruidDataSource是最好的數(shù)據(jù)庫連接池
我第一反應(yīng)是為什么它會(huì)存在初厚,這個(gè)插件主要是在和es集群互動(dòng)件蚕,實(shí)際上不會(huì)使用到mysql驅(qū)動(dòng),并不會(huì)使用到JDBC這個(gè)組件产禾,為什么pom文件中會(huì)有它的出現(xiàn)呢排作?其實(shí)往后看就明白了。
初步嘗試
再回看上面的目錄樹亚情,可以看到src目錄下的幾個(gè)子目錄妄痪,有幾個(gè)目錄名字都是見名思義,這也是我覺得java圈子中一些規(guī)范的好處楞件,約定大于配置衫生。比如assembly目錄下一定回事打包配置文件裳瘪,main目錄下有源碼的根包罪针,test目錄下會(huì)有單元測試代碼彭羹。所以順利成章的我會(huì)去先通過單元測試來了解整個(gè)插件的源代碼。
├── AggregationTest.java
├── CSVResultsExtractorTests.java
├── DeleteTest.java
├── ExplainTest.java
├── JDBCTests.java
├── JoinTests.java
├── MainTestSuite.java
├── MethodQueryTest.java
├── MultiQueryTests.java
├── MyTest.java
├── QueryTest.java
├── SQLFunctionsTest.java
├── ShowTest.java
├── SourceFieldTest.java
├── SqlParserTests.java
├── TestsConstants.java
├── UtilTests.java
└── WktToGeoJsonConverterTests.java
上面是test目錄下的文件結(jié)構(gòu)泪酱,其中MyTest文件是我加的派殷。
從文件名上可以猜測出對應(yīng)es各個(gè)操作的測試以及一些其他的測試,比如AggregationTest就很容易猜測說它是聚合操作的相關(guān)測試墓阀,我們初來乍到愈腾,找一個(gè)最簡單的測試,QueryTest.java岂津。
@Test
public void searchTypeTest() throws IOException, SqlParseException, SQLFeatureNotSupportedException{
SearchHits response = query(String.format("SELECT * FROM %s/phrase LIMIT 1000", TEST_INDEX));
Assert.assertEquals(4, response.getTotalHits());
}
上面是QueryTest類的第一個(gè)測試方法,看樣子也很簡單悦即,做一次
SELECT * FROM TEST_INDEX LIMIT 1000
的查詢吮成,結(jié)果如果等于4的話單元測試通過
tips:Assert是斷言的意思,當(dāng)然我知道你已經(jīng)知道辜梳。
無腦直接運(yùn)行粱甫,即使我知道我什么配置文件都沒配置過。
java.lang.NullPointerException
at org.nlpcn.es4sql.QueryTest.query(QueryTest.java:942)
at org.nlpcn.es4sql.QueryTest.searchTypeTest(QueryTest.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
毫不意外地收到報(bào)錯(cuò)作瞄,但是為什么是空指針異常茶宵?我原來的猜測是肯定會(huì)跳出找不到集群,然后我跟隨去配置集群地址就好宗挥。根據(jù)錯(cuò)誤棧我來到了這個(gè)query方法
private SearchHits query(String query) throws SqlParseException, SQLFeatureNotSupportedException, SQLFeatureNotSupportedException {
SearchDao searchDao = MainTestSuite.getSearchDao();
SqlElasticSearchRequestBuilder select = (SqlElasticSearchRequestBuilder) searchDao.explain(query).explain();
return ((SearchResponse)select.get()).getHits();
}
該方法并沒有@Test的注解乌庶,不是單元測試方法,異常出現(xiàn)在SearchDao searchDao = MainTestSuite.getSearchDao();
searchDao是null契耿,那么為什么會(huì)是null呢瞒大?繼續(xù)跟蹤到MainTestSuite類中,然后發(fā)現(xiàn)了新天地搪桂。其實(shí)這邊也有約定大于配置的好處透敌,看到TestSuite就知道這是個(gè)批量測試的類了。類中有兩個(gè)注解@BeforeClass @AfterClass踢械,剛才的原因就找到了酗电,剛才直接獲取searchDao沒有經(jīng)過預(yù)加載,所以是null内列。那么新的問題來了撵术,我不想要運(yùn)行整個(gè)TestSuite,我只想要運(yùn)行一個(gè)測試方法话瞧,要怎么辦呢荷荤?這時(shí)候需要稍微修改下代碼了退渗,回到QueryTest.java中,添加以下兩個(gè)方法
@Before
public void setup() throws Exception {
MainTestSuite.setUp();
}
@After
public void end() throws InterruptedException {
MainTestSuite.tearDown();
}
同時(shí)有一個(gè)地方要注意蕴纳,除非你通過外部參數(shù)傳入你的es的ip和端口会油,否則可以在MainTestSuite中做以下修改
protected static InetSocketTransportAddress getTransportAddress() throws UnknownHostException {
String host = System.getenv("ES_TEST_HOST");
String port = System.getenv("ES_TEST_PORT");
if(host == null) {
host = "localhost";
System.out.println("ES_TEST_HOST enviroment variable does not exist. choose default 'localhost'");
}
if(port == null) {
port = "9302";
System.out.println("ES_TEST_PORT enviroment variable does not exist. choose default '9300'");
}
System.out.println(String.format("Connection details: host: %s. port:%s.", host, port));
return new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port));
}
將你的ip和端口直接修改。
接下來就可以直接在測試方法上右鍵運(yùn)行了古毛。
淺嘗輒止
我并沒有特別細(xì)致的閱讀完整個(gè)源代碼翻翩,我只想要找到我關(guān)注的點(diǎn)去仔細(xì)閱讀。而在我拿到這份源代碼的時(shí)候我有兩點(diǎn)特別感興趣
- 通過什么方式來封裝sql語句為es的請求
- 有沒有什么比較干凈優(yōu)雅的抽象方式
這里我不再細(xì)致的列出我怎么翻到的步驟稻薇,而是上最終結(jié)果嫂冻,同時(shí)也解答了前面為什么會(huì)有druid的疑惑。
直接看看以下這個(gè)類DefaultQueryAction.java的explain方法
@Override
public SqlElasticSearchRequestBuilder explain() throws SqlParseException {
this.request = client.prepareSearch();
setIndicesAndTypes();
setFields(select.getFields());
setWhere(select.getWhere());
setSorts(select.getOrderBys());
setLimit(select.getOffset(), select.getRowCount());
boolean usedScroll = useScrollIfNeeded(select.isOrderdSelect());
if (!usedScroll) {
request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
}
updateRequestWithIndexAndRoutingOptions(select, request);
updateRequestWithHighlight(select, request);
updateRequestWithCollapse(select, request);
SqlElasticSearchRequestBuilder sqlElasticRequestBuilder = new SqlElasticSearchRequestBuilder(request);
return sqlElasticRequestBuilder;
}
對于es的api熟悉的人看到這個(gè)就明白了this.request = client.prepareSearch();
在這里類中創(chuàng)建了一個(gè)request請求塞椎,將select對象中已經(jīng)把sql語句解析出來的結(jié)果以各種方式轉(zhuǎn)換成request中的參數(shù)桨仿,最后直接發(fā)送這個(gè)request整個(gè)封裝過程就結(jié)束了。那么這個(gè)select結(jié)果如何獲得呢案狠?我們看這個(gè)類ESActionFactory.java
public static QueryAction create(Client client, String sql) throws SqlParseException, SQLFeatureNotSupportedException {
sql = sql.replaceAll("\n"," ");
String firstWord = sql.substring(0, sql.indexOf(' '));
switch (firstWord.toUpperCase()) {
case "SELECT":
SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
if(isMulti(sqlExpr)){
MultiQuerySelect multiSelect = new SqlParser().parseMultiSelect((SQLUnionQuery) sqlExpr.getSubQuery().getQuery());
handleSubQueries(client,multiSelect.getFirstSelect());
handleSubQueries(client,multiSelect.getSecondSelect());
return new MultiQueryAction(client, multiSelect);
}
else if(isJoin(sqlExpr,sql)){
JoinSelect joinSelect = new SqlParser().parseJoinSelect(sqlExpr);
handleSubQueries(client, joinSelect.getFirstTable());
handleSubQueries(client, joinSelect.getSecondTable());
return ESJoinQueryActionFactory.createJoinAction(client, joinSelect);
}
else {
Select select = new SqlParser().parseSelect(sqlExpr);
handleSubQueries(client, select);
return handleSelect(client, select);
}
case "DELETE":
SQLStatementParser parser = createSqlStatementParser(sql);
SQLDeleteStatement deleteStatement = parser.parseDeleteStatement();
Delete delete = new SqlParser().parseDelete(deleteStatement);
return new DeleteQueryAction(client, delete);
case "SHOW":
return new ShowQueryAction(client,sql);
default:
throw new SQLFeatureNotSupportedException(String.format("Unsupported query: %s", sql));
}
}
其中最關(guān)鍵的SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
SQLQueryExpr是druid中用來描述sql語句的類服傍,不需要再自己重新封裝,只需要利用阿里的工作成果即可~高
走到這里突然想到骂铁,其實(shí)druid是一個(gè)對于數(shù)據(jù)源的管理方式和工具吹零,并不一定是結(jié)構(gòu)數(shù)據(jù)庫,如果說把es也看成一個(gè)數(shù)據(jù)源拉庵,是不是更好理解了呢灿椅?
結(jié)束
不過說到底這個(gè)插件我用的還是不多,不靈活钞支,以及前期已經(jīng)投入了很多對于es語法的學(xué)習(xí)成本茫蛹,還有一點(diǎn)是,熟悉es的語法對于使用原生的javaAPI時(shí)很有幫助烁挟。