(五)apache ignite-Persistence 緩存查詢

Apache Ignite提供了一個強大的查詢API,支持基于謂詞的掃描查詢、符合ANSI-99的SQL查詢治泥、文本搜索查詢和持續(xù)查詢藐窄。
假設(shè)緩存包含以下Company對象:

public class Company { 
private Long id;
private String companyName; 
private String email;
private String address; 
private String city; 
private String state; 
private String zipCode; 
private String phoneNumber; 
private String faxNumber; 
private String webAddress;
}

緩存的company數(shù)據(jù)記錄將被分區(qū)并分布在Ignite數(shù)據(jù)網(wǎng)格中。你可以查詢遍歷緩存記錄楞慈,并查找感興趣的特定記錄幔烛。但是遍歷整個緩存并是低效的,因為必須獲取整個數(shù)據(jù)集并在本地迭代囊蓝。如果數(shù)據(jù)集非常大饿悬,將會增加網(wǎng)絡傳輸負擔。Apache Ignite提供了一個編程接口聚霜,允許對緩存數(shù)據(jù)集執(zhí)行不同類型的查詢狡恬。你可以根據(jù)應用程序需求選擇API,例如蝎宇,如果你想執(zhí)行文本搜索弟劲,Ignite文本查詢是最佳選擇。
Apache Ignite提供了查詢API的兩個主要抽象:Query和QueryCursor接口:

Name Description
Query 所有Ignite緩存查詢的基類姥芥。必須使用SqlQuery和TextQuery進行SQL和文本查詢兔乞。這個抽象類還提供了setLocal()和setPageSize()等方法來在本地節(jié)點中執(zhí)行查詢,并為返回的游標設(shè)置頁面大小。
QueryCursor 接口庸追,用翻頁迭代表示查詢結(jié)果霍骄。當不需要分頁時,可以使用QueryCursor.getAll()方法锚国,它將獲取整個查詢結(jié)果并將其存儲在集合中腕巡。注意,無論何時在for循環(huán)中迭代游標或顯式地獲取迭代器血筑,都必須顯式地關(guān)閉游標绘沉。

Scan queries

Ignite Scan queries允許在緩存數(shù)據(jù)集上運行分布式查詢。如果沒有指定謂詞豺总,查詢返回緩存的所有數(shù)據(jù)記錄车伞。你可以根據(jù)存儲在緩存中的對象定義任何謂詞。
查詢將對所有數(shù)據(jù)記錄應用謂詞以查找匹配喻喳。為了演示掃描查詢的功能另玖,我們將使用以下數(shù)據(jù)集。


image.png

where

  • ID: Serial Number
  • CAT: CAT (公司名稱首寫字母)
  • COMPANY_NAME:公司名稱
  • EMAIL: 公司或個人使用的email
  • ADDRESS:街道地址或地區(qū)
  • CITY:城市名稱
  • STATE:州名
  • ZIPCODE:郵編
  • PHONE_NUMBER:公司或個人聯(lián)系電話
  • FAX_NUMBER:傳真
    該數(shù)據(jù)集包含位于紐約州的所有公司聯(lián)系信息表伦。我們將把CSV文件中的數(shù)據(jù)記錄加載到緩存中谦去,并應用一些謂詞掃描查詢緩存中的數(shù)據(jù)記錄。

Step 1

使用以下依賴項創(chuàng)建一個新的Maven項目蹦哼。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycookcode.bigData.ignite</groupId>
  <artifactId>ignite-textquery</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>ignite-textquery</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <ignite.version>2.6.0</ignite.version>
  </properties>


  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-spring</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-slf4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-log4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>


    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>

  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>com.jolira</groupId>
        <artifactId>onejar-maven-plugin</artifactId>
        <version>1.4.4</version>
        <executions>
          <execution>
            <id>build-query</id>
            <configuration>
              <mainClass>com.mycookcode.bigData.ignite.App</mainClass>
              <attachToBuild>true</attachToBuild>
              <classifier>onejar</classifier>
              <filename>textquery-runnable.jar</filename>
            </configuration>
            <goals>
              <goal>one-jar</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

Step 2

首先鳄哭,創(chuàng)建一個新的Java類com.mycookcode.bigData.ignite.model.Company在src\main\java\com\mycookcode\bigData\ignite\model目錄,它將代表數(shù)據(jù)集纲熏。

package com.mycookcode.bigData.ignite.model;

import org.apache.ignite.cache.query.annotations.QueryTextField;

import java.io.Serializable;


public class Company implements Serializable {


    private Long id;

    @QueryTextField
    private String cat;

    @QueryTextField
    private String companyName;

    @QueryTextField
    private String email;

    @QueryTextField
    private String address;

    @QueryTextField
    private String city;

    @QueryTextField
    private String state;

    @QueryTextField
    private String zipCode;

    @QueryTextField
    private String phoneNumber;

    @QueryTextField
    private String faxNumber;

    @QueryTextField
    private String sicCode;

    @QueryTextField
    private String sicDescription;

    @QueryTextField
    private String webAddress;

    public Company(Long id, String cat, String companyName, String email, String address, String city, String state, String zipCode, String phoneNumber, String faxNumber, String sicCode, String sicDescription, String webAddress) {
        this.id = id;
        this.cat = cat;
        this.companyName = companyName;
        this.email = email;
        this.address = address;
        this.city = city;
        this.state = state;
        this.zipCode = zipCode;
        this.phoneNumber = phoneNumber;
        this.faxNumber = faxNumber;
        this.sicCode = sicCode;
        this.sicDescription = sicDescription;
        this.webAddress = webAddress;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getCat() {
        return cat;
    }

    public void setCat(String cat) {
        this.cat = cat;
    }

    public String getCompanyName() {
        return companyName;
    }

    public void setCompanyName(String companyName) {
        this.companyName = companyName;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public String getZipCode() {
        return zipCode;
    }

    public void setZipCode(String zipCode) {
        this.zipCode = zipCode;
    }

    public String getPhoneNumber() {
        return phoneNumber;
    }

    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }

    public String getFaxNumber() {
        return faxNumber;
    }

    public void setFaxNumber(String faxNumber) {
        this.faxNumber = faxNumber;
    }

    public String getSicCode() {
        return sicCode;
    }

    public void setSicCode(String sicCode) {
        this.sicCode = sicCode;
    }

    public String getSicDescription() {
        return sicDescription;
    }

    public void setSicDescription(String sicDescription) {
        this.sicDescription = sicDescription;
    }

    public String getWebAddress() {
        return webAddress;
    }

    public void setWebAddress(String webAddress) {
        this.webAddress = webAddress;
    }

    @Override
    public String toString() {
        return "Company{" +
                "id=" + id +
                ", cat='" + cat + '\'' +
                ", companyName='" + companyName + '\'' +
                ", email='" + email + '\'' +
                ", address='" + address + '\'' +
                ", city='" + city + '\'' +
                ", state='" + state + '\'' +
                ", zipCode='" + zipCode + '\'' +
                ", phoneNumber='" + phoneNumber + '\'' +
                ", faxNumber='" + faxNumber + '\'' +
                ", sicCode='" + sicCode + '\'' +
                ", sicDescription='" + sicDescription + '\'' +
                ", webAddress='" + webAddress + '\'' +
                '}';
    }
}

Step 3

現(xiàn)在妆丘,可以創(chuàng)建另一個Java類并添加一個方法來將數(shù)據(jù)從CSV文件加載到Ignite緩存中(樣例數(shù)據(jù)下載地址:https://pan.baidu.com/s/1IkAG0BXbn68na0wAHmC0Q)。在文件夾src/main/java/com/mycookcode/bigData/ignite目錄中創(chuàng)建一個新的Java類com.mycookcode.bigData.ignite.App局劲。添加一個名為initialize()的方法勺拣,代碼如下:

 private static void initialize()throws InterruptedException,IOException
    {
        IgniteCache<Long,Company> companyCache = Ignition.ignite().cache(COMPANY_CACHE_NAME);


        //啟動之前清空緩存
        companyCache.clear();

        try(Stream<String> lines = Files.lines(Paths.get("/data/USA_NY_email_addresses.csv"))
            //Stream<String> lines = Files.lines(Paths.get(App.class.getClassLoader().getResources("USA_NY_email_addresses.csv").toString()))
         )
        {
            lines.skip(1).map(s1 -> s1.split("\",\"")).map(s2 -> new Company(Long.valueOf(s2[0].replaceAll("\"", "")), s2[1], s2[2], s2[3], s2[4], s2[5], s2[6], s2[7], s2[8], s2[9], s2[10], s2[11], s2[12].replaceAll("\"", "")))
            .forEach(r -> companyCache.put(r.getId(),r));
        }catch (IOException e)
        {
            System.out.println(e.getMessage());
        }
        Thread.sleep(1000);
    }

首先,我們用companyCache創(chuàng)建一個Ignite緩存鱼填,它將存儲公司的所有數(shù)據(jù)記錄药有。然后usa_ny_email_address.csv文件做為數(shù)據(jù)流被Java 8 Stream API讀取。接下來苹丸,我們跳過第一行CSV文件塑猖,按' '分割每一行,并創(chuàng)建新的公司對象存儲到Ignite緩存中谈跛。上面代碼的最后一行強制應用程序等待一秒鐘羊苟,以確保集群的所有節(jié)點都處理put請求。

Step 4

將Ignite Scan查詢應用到緩存感憾。在App類中添加一個新的Java方法scanQuery()蜡励,如下所示:

{
        IgniteCache<Long,Company> companyCache = Ignition.ignite().cache(COMPANY_CACHE_NAME);
        //查詢所有城市“紐約”-紐約的公司
        QueryCursor queryCursor = companyCache.query(new ScanQuery<Long,Company>((k,p) -> p.getCity().equalsIgnoreCase("NEW YORK")));
       for (Iterator ite = queryCursor.iterator();ite.hasNext();)
       {
           IgniteBiTuple<Long,Company> company = (IgniteBiTuple<Long, Company>) ite.next();
            System.out.println(company.getValue().getCompanyName());

        }

        queryCursor.close();
    }

在上面的偽代碼中令花,我們首先獲取公司的Ignite緩存,并使用Java 8 lambda表達式創(chuàng)建一個新的scan查詢凉倚。我們還傳遞了以下謂詞表達式:

p.getCity().equalsIgnoreCase("NEW YORK")

查詢公司對象的城市名稱等于紐約的數(shù)據(jù)兼都。Ignite將對所有緩存數(shù)據(jù)記錄應用上述謂詞,并返回該緩存數(shù)據(jù)記錄的QueryCursor稽寒,其城市名稱等于紐約扮碧。然后,我們簡單地遍歷查詢游標并在控制臺上打印公司名稱杏糙。

Step 5

編譯和執(zhí)行應用程序慎王,運行以下命令:

mvn clean install && java –jar target/textquery-runnable.jar scanquery

上圖顯示了Ignite scan查詢找到所有城市名稱為New York的公司。

Text queries

Apache Ignite中的文本查詢允許對基于字符的緩存項運行全文本查詢宏侍。為什么需要全文搜索查詢赖淤,以及它與掃描查詢的區(qū)別?例如,我們想找到紐約美容院的名單谅河。通過掃描查詢咱旱,我可以運行以下查詢:

TextQuery<Integer, Company> primavera = new TextQuery<>(Company.class, "beauty saloon");

以上查詢將返回公司名稱中包含美容院的公司名單。但這種方法也有一些缺點:

  1. 它將掃描整個緩存绷耍,如果緩存包含大量數(shù)據(jù)集吐限,這將非常低效。
  2. 如果運行查詢褂始,我們不得不需要知道公司對象的屬性诸典。在這個例子中,使用的是公司名稱病袄。
  3. 當使用基于不同屬性的復雜謂詞時搂赋,掃描查詢將變得復雜赘阀。

在日常生活中益缠,我們在網(wǎng)上進行大量的文本搜索:谷歌或Bing。大多數(shù)搜索引擎的工作原理是全文搜索基公。我們在谷歌的搜索框中輸入搜索項;谷歌搜索引擎返回包含搜索項的網(wǎng)站或資源的列表幅慌。

Apache Ignite支持基于Lucene索引的基于文本的檢索查詢。Lucene是Java中的開源全文本搜索庫轰豆,可以很容易地為任何應用程序添加搜索功能胰伍。Lucene通過向全文索引添加內(nèi)容來實現(xiàn)。然后它允許對這個索引執(zhí)行查詢酸休。這種類型的索引稱為反向索引骂租,因為它將以page-centric為數(shù)據(jù)結(jié)構(gòu)(page->words)轉(zhuǎn)換為以keyword-centric為數(shù)據(jù)結(jié)構(gòu)(word->pages)。你可以把它想象成書后面的索引斑司。

在Lucene中渗饮,文檔是索引和搜索的單位。索引可以包含一個或多個文檔。Lucene文檔不一定非得是Microsoft word中的文檔互站。例如私蕾,如果您正在創(chuàng)建一個公司的Lucene索引,那么每個公司都將在Lucene索引中表示一個文檔胡桃。Lucene search可以通過Lucene IndexSearcher從索引中檢索文檔踩叭。


image.png

在Apache Ignite中,每個節(jié)點都包含一個本地Lucene引擎翠胰,它將索引存儲在本地緩存中的數(shù)據(jù)記錄容贝。當執(zhí)行任何分布式全文查詢時,每個節(jié)點通過IndexSearcher在本地索引中執(zhí)行搜索亡容,并將結(jié)果發(fā)送回客戶機節(jié)點嗤疯,在那里聚合結(jié)果。

我們將使用前面的相同數(shù)據(jù)集闺兢,并擴展應用程序以在Ignite cache中執(zhí)行文本搜索茂缚。先稍微修改一下maven項目,添加全文搜索功能:

Step 1

添加以下maven依賴項如下:

   <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>2.6.0</version>
    </dependency>

Step 2

向com.mycookcode.bigData.ignite.model的每個字段添加@QueryTextField注釋屋谭。要使用Lucene對Company類進行索引以進行全文搜索脚囊。

package com.mycookcode.bigData.ignite.model;

import org.apache.ignite.cache.query.annotations.QueryTextField;

import java.io.Serializable;


public class Company implements Serializable {


    private Long id;

    @QueryTextField
    private String cat;

    @QueryTextField
    private String companyName;

    @QueryTextField
    private String email;

    @QueryTextField
    private String address;

    @QueryTextField
    private String city;

    @QueryTextField
    private String state;

    @QueryTextField
    private String zipCode;

    @QueryTextField
    private String phoneNumber;

    @QueryTextField
    private String faxNumber;

    @QueryTextField
    private String sicCode;

    @QueryTextField
    private String sicDescription;

    @QueryTextField
    private String webAddress;

    public Company(Long id, String cat, String companyName, String email, String address, String city, String state, String zipCode, String phoneNumber, String faxNumber, String sicCode, String sicDescription, String webAddress) {
        this.id = id;
        this.cat = cat;
        this.companyName = companyName;
        this.email = email;
        this.address = address;
        this.city = city;
        this.state = state;
        this.zipCode = zipCode;
        this.phoneNumber = phoneNumber;
        this.faxNumber = faxNumber;
        this.sicCode = sicCode;
        this.sicDescription = sicDescription;
        this.webAddress = webAddress;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getCat() {
        return cat;
    }

    public void setCat(String cat) {
        this.cat = cat;
    }

    public String getCompanyName() {
        return companyName;
    }

    public void setCompanyName(String companyName) {
        this.companyName = companyName;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public String getZipCode() {
        return zipCode;
    }

    public void setZipCode(String zipCode) {
        this.zipCode = zipCode;
    }

    public String getPhoneNumber() {
        return phoneNumber;
    }

    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }

    public String getFaxNumber() {
        return faxNumber;
    }

    public void setFaxNumber(String faxNumber) {
        this.faxNumber = faxNumber;
    }

    public String getSicCode() {
        return sicCode;
    }

    public void setSicCode(String sicCode) {
        this.sicCode = sicCode;
    }

    public String getSicDescription() {
        return sicDescription;
    }

    public void setSicDescription(String sicDescription) {
        this.sicDescription = sicDescription;
    }

    public String getWebAddress() {
        return webAddress;
    }

    public void setWebAddress(String webAddress) {
        this.webAddress = webAddress;
    }

    @Override
    public String toString() {
        return "Company{" +
                "id=" + id +
                ", cat='" + cat + '\'' +
                ", companyName='" + companyName + '\'' +
                ", email='" + email + '\'' +
                ", address='" + address + '\'' +
                ", city='" + city + '\'' +
                ", state='" + state + '\'' +
                ", zipCode='" + zipCode + '\'' +
                ", phoneNumber='" + phoneNumber + '\'' +
                ", faxNumber='" + faxNumber + '\'' +
                ", sicCode='" + sicCode + '\'' +
                ", sicDescription='" + sicDescription + '\'' +
                ", webAddress='" + webAddress + '\'' +
                '}';
    }
}

Step 3

創(chuàng)建一個新的靜態(tài)方法叫textQuery與以下內(nèi)容:

private static void textQuery()
    {
        IgniteCache<Long,Company> companyCache = Ignition.ignite().cache(COMPANY_CACHE_NAME);

        TextQuery<Integer,Company> john = new TextQuery<>(Company.class,"John");


        TextQuery<Integer, Company> primavera = new TextQuery<>(Company.class, "beauty saloon");

        System.out.println("==So many companies with information about 'John'=="+companyCache.query(john).getAll());
        System.out.println("==A company which name with ' beauty salon'=="+companyCache.query(primavera).getAll());
    }

上面的代碼與scanQuery方法非常相似。首先桐磁,我們檢索Company緩存悔耘,并創(chuàng)建兩個文本查詢john和beauty saloon。然后使用返回Company列表并將結(jié)果打印到控制臺的文本執(zhí)行查詢我擂。

Step 4

使用以下命令編譯并運行應用程序:

mvn clean install && java –jar target/textquery-runnable.jar textquery

你可以使用不同的搜索條件編輯文本查詢應用程序衬以。在正常的用例中,Ignite內(nèi)置文本查詢應該足以執(zhí)行全文本搜索校摩。
以下是Scan query和Text query完整的代碼例子:
Ignite配置文件:/resources/example-ignite.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47500</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Ignite服務啟動類com.mycookcode.bigData.ignite.App來完成緩存的查詢:

package com.mycookcode.bigData.ignite;

import com.mycookcode.bigData.ignite.model.Company;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.TextQuery;
import org.apache.ignite.configuration.CacheConfiguration;

import org.apache.ignite.lang.IgniteBiTuple;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.stream.Stream;

public class App 
{

    private static final String SCAN_QUERY = "scanquery";
    private static final String TEXT_QUERY = "textquery";

    //定義用來存儲公司對象的分區(qū)緩存名稱
    private static final String COMPANY_CACHE_NAME = App.class.getSimpleName() + "-company";

    public static void main( String[] args )throws Exception
    {

        try (Ignite ignite = Ignition.start("example-ignite.xml"))
        {
            CacheConfiguration<Long,Company> employeeCacheCfg = new CacheConfiguration<>(COMPANY_CACHE_NAME);
            employeeCacheCfg.setCacheMode(CacheMode.PARTITIONED);
            employeeCacheCfg.setIndexedTypes(Long.class, Company.class);

            try(IgniteCache<Long,Company> employeeCache = ignite.createCache(employeeCacheCfg))
            {
                if(args.length <= 0)
                {
                    System.out.println("Usages! java -jar .\\\\target\\\\cache-store-runnable.jar scanquery|textquery");
                    System.exit(0);
                }
                initialize();
                if(args[0].equalsIgnoreCase(SCAN_QUERY))
                {
                    scanQuery();
                    System.out.println("Scan query example finished.");
                }else if (args[0].equalsIgnoreCase(TEXT_QUERY)) {
                    textQuery();
                    System.out.println("Text query example finished.");
                }
            }
        }
    }

    private static void initialize()throws InterruptedException,IOException
    {
        IgniteCache<Long,Company> companyCache = Ignition.ignite().cache(COMPANY_CACHE_NAME);


        //啟動之前清空緩存
        companyCache.clear();

        try(Stream<String> lines = Files.lines(Paths.get("/data/USA_NY_email_addresses.csv"))
            //Stream<String> lines = Files.lines(Paths.get(App.class.getClassLoader().getResources("USA_NY_email_addresses.csv").toString()))
         )
        {
            lines.skip(1).map(s1 -> s1.split("\",\"")).map(s2 -> new Company(Long.valueOf(s2[0].replaceAll("\"", "")), s2[1], s2[2], s2[3], s2[4], s2[5], s2[6], s2[7], s2[8], s2[9], s2[10], s2[11], s2[12].replaceAll("\"", "")))
            .forEach(r -> companyCache.put(r.getId(),r));
        }catch (IOException e)
        {
            System.out.println(e.getMessage());
        }
        Thread.sleep(1000);
    }


    private static void textQuery()
    {
        IgniteCache<Long,Company> companyCache = Ignition.ignite().cache(COMPANY_CACHE_NAME);

        TextQuery<Integer,Company> john = new TextQuery<>(Company.class,"John");


        TextQuery<Integer, Company> primavera = new TextQuery<>(Company.class, "beauty saloon");

        System.out.println("==So many companies with information about 'John'=="+companyCache.query(john).getAll());
        System.out.println("==A company which name with ' beauty salon'=="+companyCache.query(primavera).getAll());
    }


    private static void scanQuery()
    {
        IgniteCache<Long,Company> companyCache = Ignition.ignite().cache(COMPANY_CACHE_NAME);
        //查詢所有城市“紐約”-紐約的公司
        QueryCursor queryCursor = companyCache.query(new ScanQuery<Long,Company>((k,p) -> p.getCity().equalsIgnoreCase("NEW YORK")));
       for (Iterator ite = queryCursor.iterator();ite.hasNext();)
       {
           IgniteBiTuple<Long,Company> company = (IgniteBiTuple<Long, Company>) ite.next();
            System.out.println(company.getValue().getCompanyName());

        }

        queryCursor.close();
    }


}

SQL queries

Apache Ignite提供了SqlQuery和SqlFieldsQuery API來支持針對緩存的SQL查詢看峻。SQL語法是完全支持ANSI-99,可以執(zhí)行一個聚合函數(shù)衙吩,比如AVG()互妓、COUNT()、分組或排序坤塞。當數(shù)據(jù)駐留在不同的緩存中時冯勉,Apache Ignite還包括用于查詢內(nèi)存數(shù)據(jù)的分布式SQL連接查詢(并置和非并置)。與Hazelcast或Infinispan等內(nèi)存廠商相比摹芙,支持ANSI-99 SQL查詢是Apache Ignite的獨特特性之一灼狰。
過去,針對大型數(shù)據(jù)集的分布式連接非常具有挑戰(zhàn)性浮禾,因為在不同表或緩存中查找單個鍵的開銷相對較大交胚。因此坛悉,大多數(shù)NoSQL或內(nèi)存緩存供應商不支持查詢連接。在這種情況下承绸,用戶通過組合多個查詢結(jié)果手動執(zhí)行連接查詢裸影。然而,Apache Ignite以一種不同的方式解決了連接查詢問題军熏。在Ignite 1.7之前版本為了獲得可靠的查詢結(jié)果轩猩,只在本地數(shù)據(jù)上執(zhí)行查詢,數(shù)據(jù)應該放在同一個節(jié)點上荡澎。

然而均践,1.7或更高版本的Apache Ignite版本提供了一種分布式連接查詢的新方法,名為非并置分布式連接查詢摩幔,其中SQL連接查詢不再需要數(shù)據(jù)并置彤委。在交叉連接查詢中,數(shù)據(jù)應該駐留在同一個節(jié)點上或衡。否則焦影,可能會得到不準確的查詢結(jié)果。

緩存以兩種方式分布:復制和分區(qū)封断。在復制模式下斯辰,所有節(jié)點都包含其主數(shù)據(jù)集和備份副本。在分區(qū)模式下坡疼,通過Ignite集群復制數(shù)據(jù)集彬呻。當使用Ignite集群的分區(qū)拓撲時墅拭,所有分布式連接的復雜性都會隨之而來涛碑。在分區(qū)模式中枝恋,如果正在執(zhí)行任何交叉連接查詢锅移,Apache Ignite就無法保證獲得可靠的查詢結(jié)果。

Apache Ignite還提供了Java注釋在抛,使SQL查詢可以訪問字段筋遭。此外碉熄,還可以使用相同的注釋對字段進行索引详囤,以便更快地查詢财骨。對于復雜的查詢镐作,Apache Ignite還提供了一個多字段索引以加速復雜條件下的查詢藏姐。Apache Ignite還提供了用于SQL投影的Java API。使用SqlFieldsQuery该贾,只能選擇指定的字段羔杨,而不能選擇整個對象。

有了Ignite SQL核心概念杨蛋,就可以解決大多數(shù)SQL問題了兜材。在本小節(jié)中主要討論以下幾點:

  • 帶有注釋的投影和索引理澎。
  • 查詢API。
  • 并置曙寡,非并置分布式連接查詢糠爬。
  • 性能調(diào)優(yōu)SQL查詢。

Dataset

我們將要使用的數(shù)據(jù)集是來自Postsql數(shù)據(jù)庫的Employee和Department實體举庶。部門(dept)和員工(emp)實體的結(jié)構(gòu)非常簡單执隧,它們之間是一對多的關(guān)系(見下圖)。


image.png

Projection and indexing with annotations

首先户侥,為了對緩存數(shù)據(jù)記錄使用SQL查詢镀琉,必須使實體字段對SQL查詢可訪問∪锾疲可以通過以下兩種方式實現(xiàn):

  • org.apache.ignite.cache.query.annotations.QuerySqlField
  • org.apache.ignite.cache.QueryEntity

接下來的偽代碼將顯示@QuerySqlField注釋的使用屋摔。

public class Employee implements Serializable {
private static final AtomicInteger GENERATED_ID = new AtomicInteger(); @QuerySqlField
private Integer empno;
@QuerySqlField
private String ename;
@QueryTextField
private LocalDate hiredate;
private Integer sal;
@QuerySqlField
private Integer deptno;
// rest of the code is omitted
}

讓我們詳細看看上面的代碼。通過使用@QuerySqlField注釋替梨,我們已經(jīng)允許SQL查詢使用empno钓试、ename、hiredate和sal屬性副瀑。注意亚侠,SQL查詢不啟用屬性或字段sal。每個實體都有兩個預定義的字段:_key和_val俗扇,它們表示到整個鍵的鏈接和緩存數(shù)據(jù)記錄的值硝烂。當數(shù)據(jù)記錄很簡單并且想過濾它的數(shù)值時,這樣做是很有用的铜幽。例如執(zhí)行一條查詢語句:

SELECT * FROM Employee WHERE _key = 100

假設(shè)在Ignite中有以下緩存滞谢。

IgniteCache<Integer, String> myCache = Ignition.ignite().cache(CACHE_NAME);

還可以執(zhí)行Select * from myCache _key=101這樣的查詢。
現(xiàn)在除抛,通過我們的Employee實體狮杨,我們可以執(zhí)行以下任何一個SQL查詢。

select e.ename ,e.sal, m.ename, m.sal from Employee e, (select ename, empno, sal from Empl\
oyee) m where e.mgr = m.empno and e.sal > m.sal

如果希望執(zhí)行一些查詢到忽,在控制臺中運行以下命令橄教。

mvn clean install && java –jar ./target/sql-query-employees-runnable.jar

還可以使用@QuerySqlField注釋對字段值進行索引,以加速查詢執(zhí)行速度喘漏。要創(chuàng)建單個列索引护蝶,可以使用@QuerySqlField(index = true)注釋字段。將為實體字段創(chuàng)建索引值翩迈。讓我們看一個例子持灰,如下:

public class Employee implements Serializable {
private static final AtomicInteger GENERATED_ID = new AtomicInteger(); @QuerySqlField(index = true)
private Integer empno;
@QuerySqlField
private String ename;
@QueryTextField
private String job;
@QuerySqlField
private Integer mgr;
@QuerySqlField
private LocalDate hiredate;
@QuerySqlField
private Integer sal;
@QuerySqlField(index = true)
private Integer deptno;
// rest of the code is ommitted
}

在上述代碼片段中,我們?yōu)樽侄蝒mpno和deptno創(chuàng)建了索引负饲。

還可以將一組中的一個或多個索引組合在一起堤魁,以使用復雜條件加速查詢執(zhí)行速度喂链。在這種情況下,必須使用@QuerySqlField.Group注釋 妥泉⊥治ⅲ可以放置多個@QuerySqlField。如果希望字段參與多個組索引盲链,可以將注釋分組到orderedGroups赏表。

public class Employee implements Serializable {

@QuerySqlField(orderedGroups={@QuerySqlField.Group(
name = "hiredate_salary_idx", order = 0, descending = true)}) 
private LocalDate hireDate;

@QuerySqlField(index = true, orderedGroups={@QuerySqlField.Group(
name = " hiredate _salary_idx", order = 3)}) 
private Integer sal;
}

使用上述配置,可以像這樣執(zhí)行SQL查詢

select e.ename ,e.sal, m.ename, m.sal from Employee e, (select ename, empno, sal from Empl\
oyee) m where e.mgr = m.empno and (e.sal > m.sal and e.hiredate >=’DATE_TIME’);

Query API

Apache Ignite提供了兩個不同的查詢API來對緩存數(shù)據(jù)記錄執(zhí)行SQL查詢匈仗。

  1. org.apache.ignite.cache.query.SqlQuery:該類總是返回整個鍵和對象的值瓢剿。它與Hibernate HQL非常相似。例如悠轩,可以運行下面的查詢來獲得所有雇員的工資在1000到2000之間的數(shù)據(jù)间狂。
SqlQuery qry = new SqlQuery<>(Employee.class, "sal > 1000 and sal <= 2000");
  1. org.apache.ignite.cache.query.SqlFieldsQuery:該查詢可以基于SQL select子句返回特定的數(shù)據(jù)字段』鸺埽可以選擇只選擇特定的字段鉴象,以最小化網(wǎng)絡和序列化開銷。對于要執(zhí)行一些聚合查詢時何鸡,也是非常有用的纺弊。例如:
SqlFieldsQuery qry = new SqlFieldsQuery(,"select avg(e.sal), d.dname " +,"from Employee e, \"" + DEPARTMENT_CACHE_NAME + "\".Department d " +,"where e.deptno = d.deptno " +,"group by d.dname " +,"having avg(e.sal) > ?");

Collocated distributed Joins(并置分布式連接查詢)

到目前為止,我們討論了Apache Ignite的緩存拓撲骡男。讓我們深入探討另一個重要的主題:數(shù)據(jù)并置淆游。在分區(qū)模式下,數(shù)據(jù)集將被分區(qū)并位于不同的Ignite節(jié)點中隔盛。這意味著與指定Department相關(guān)的Employee可以位于不同的節(jié)點犹菱,反之亦然。在運行時吮炕,如果我們想要執(zhí)行任何業(yè)務邏輯腊脱,我們需要找出與他們Department相關(guān)的Employee將非常耗時。為了解決這個問題龙亲,Apache Ignite提供了affinity key概念陕凹,其中相關(guān)數(shù)據(jù)集可以位于同一個節(jié)點上。例如鳄炉,通過Employee id和Department id的affinity鍵AffinityKey(int empNo, int deptNo)杜耙, Ignite將確保所有Employee的數(shù)據(jù)與他們的Department數(shù)據(jù)駐留在同一個節(jié)點上。在一個下面的示例中解釋所有細節(jié)迎膜。

Step 1

將Department Java類添加到項目中泥技,如下所示:

public class Department implements Serializable {
private static final AtomicInteger GENERATED_ID = new AtomicInteger(); 
@QuerySqlField(index = true)
private Integer deptno;
@QuerySqlField
private String dname;
@QuerySqlField
private String loc;
public Department(String dname, String loc) {
this.deptno = GENERATED_ID.incrementAndGet(); 
this.dname = dname;
this.loc = loc;
}
// setter and getter are omitted here
}

在上面的Java類中浆兰,deptno是Department ID磕仅,該值將用作緩存中的緩存鍵珊豹。我們還將為Department entity提供一個單獨的緩存。我們可以初始化一個Department的緩存榕订,并將Department實體存儲如下:

IgniteCache<Integer, Department> deptCache = Ignition.ignite().cache(DEPARTMENT_CACHE_NAME);
// 創(chuàng)建Department實例
Department dept1 = new Department("Accounting", "New York"); 
deptCache.put(dept1.getDeptno(), dept1);

還要注意店茶,鍵值將在SQL執(zhí)行時用作主鍵。
接下來劫恒,為Employee實體添加另一個Java類贩幻,如下所示:

public class Employee implements Serializable {
private static final AtomicInteger GENERATED_ID = new AtomicInteger(); 
@QuerySqlField(index = true)
private Integer empno;
@QuerySqlField
private String ename;
@QueryTextField
private String job;
@QuerySqlField
private Integer mgr;
@QuerySqlField
private LocalDate hiredate;
 @QuerySqlField
private Integer sal; 
@QuerySqlField(index = true) 
private Integer deptno;
private transient EmployeeKey key; //Affinity employee key
public EmployeeKey getKey()
 { 
  if (key == null) 
  {
    key = new EmployeeKey(empno, deptno); 
  }
    return key; 
  }
}

除了字段deptno和key之外,大部分都與Department類非常相似两嘴。字段deptno識別Employee所在的部門丛楚,帶有EmployeeKey類型的字段鍵為AffinityKey。讓我們仔細看看它的定義憔辫。

public class EmployeeKey implements Serializable { 
private final int empNo;
@AffinityKeyMapped
private final int deptNo;
public EmployeeKey(int empNo, int deptNo) 
{
 this.empNo = empNo;
 this.deptNo = deptNo;
} 
}

與empno不同趣些,key(類型EmployeeKey)將是緩存鍵。EmployeeKey類是映射到Employee id和Department id的鍵贰您,Department id (deptno)將是Employee的關(guān)聯(lián)鍵坏平。下面的代碼將用于在緩存中添加Employee。

IgniteCache<EmployeeKey, Employee> employeeCache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);
// Employees
Employee emp1 = new Employee("King", dept1, "President", null, localDateOf("17-11-1981"), 5000);
// 注意锦亦,我們?yōu)镋mployee對象使用自定義關(guān)聯(lián)鍵
// 確保所有Employee都與所在的Department分配到相同的節(jié)點上舶替。
employeeCache.put(emp1.getKey(), emp1);

因此,Employee King將與Department Accounting位于同一個節(jié)點上「茉埃現(xiàn)在顾瞪,如果我們從SqlQueryEmployees類執(zhí)行sqlFieldsQueryWithJoin方法,應該運行以下SQL查詢:

select e.ename, d.dname from Employee e, departments.Department d where e.deptno = d.deptno;

還可以從命令行運行SqlQueryEmployees類抛蚁,如下所示:

java –jar ./target/sql-query-employees-runnable.jar

該方法的輸出如下:


image.png

以上SQL查詢返回屬于其Department的所有Employee玲昧。下面的流程圖將解釋在執(zhí)行SQL時的工作過程。


image.png

根據(jù)上圖詳細的執(zhí)行流程如下:
  1. PhaseQ:Ignite客戶端節(jié)點初始化SQL查詢并將SQL查詢發(fā)送到所有節(jié)點篮绿。
  2. Phase E(Q):接收SQL查詢的所有Ignite節(jié)點都對本地數(shù)據(jù)運行查詢孵延。到目前為止,我們使用的是關(guān)聯(lián)鍵亲配,本地數(shù)據(jù)將包含Employee及其所在Department尘应。
  3. Phase R1-3:所有節(jié)點都將它們的執(zhí)行結(jié)果集發(fā)送到Ignite客戶端節(jié)點。
  4. PhaseR:Ignite客戶端節(jié)點將以reducer的形式出現(xiàn)吼虎,并將所有結(jié)果集中在一個結(jié)果集中犬钢。在我們的示例中,它將把結(jié)果打印到控制臺思灰。

注意玷犹,在常規(guī)SQL中,緩存的名稱充當模式名稱洒疚。這意味著所有緩存都可以通過引號中的緩存名稱引用歹颓。在上面的SQL查詢中坯屿,Employee是默認的schema名,我們顯式地為Department定義緩存名巍扛。

Non-collocated distributed joins(非并置分布式連接查詢)

在現(xiàn)實中领跛,不可能總是將所有數(shù)據(jù)放在同一個節(jié)點中。大多數(shù)情況下撤奸,當您通過一個特別的查詢對熱門數(shù)據(jù)進行分析時吠昭。在這種情況下,從版本1.7.0或更高版本開始胧瓜,您可以在非配置緩存上使用非并置的分布式連接矢棚。通過設(shè)置sqlquery.setDistributedJoins (true)參數(shù),可以為指定的查詢啟用非并置SQL連接府喳。當啟用此參數(shù)時幻妓,查詢映射到的節(jié)點將通過發(fā)送廣播請求或單播請求,從遠程節(jié)點請求本地沒有緩存的數(shù)據(jù)劫拢。執(zhí)行流程如下圖所示肉津。


image.png

使用這種方法,我們可以使用Employee的empno字段作為緩存鍵舱沧,而不是use - eeKey(empno, deptno)關(guān)聯(lián)鍵妹沙。因此,我們應該對代碼進行如下修改:

// Employees
Employee emp1 = new Employee("King", dept1, "President", null, localDateOf("17-11-1981"), 5000);
employeeCache.put(emp1.getEmpno(), emp1);

現(xiàn)在我們可以執(zhí)行以下查詢:

select e.ename, d.dname from Employee e, departments.Department d where e.deptno = d.deptno

即使經(jīng)過以上的修改熟吏,我們?nèi)匀粫玫揭粋€完整的結(jié)果距糖,不管Employee已經(jīng)不再與其Department數(shù)據(jù)進行并置。在這種情況下牵寺,廣播請求將從節(jié)點發(fā)送到所有其他節(jié)點悍引。如果你想使用單播請求,我們必須稍微改變我們的SQL查詢?nèi)缦?

select e.ename, d.dname from Employee e, departments.Department d where e.deptno = d._key

注意帽氓,我們在SQL連接中使用_key預定義索引而不是d.deptno字段趣斤。讓我們詳細信息看下上圖中的執(zhí)行流程。

  1. Phase Q:Ignite客戶端節(jié)點初始化SQL查詢并將SQL查詢發(fā)送到所有節(jié)點黎休。

  2. Phase E(Q): 接收SQL查詢的所有Ignite節(jié)點都對本地數(shù)據(jù)運行查詢浓领。

  3. Phase D(Q):如果任何數(shù)據(jù)在本地缺失,它將通過以下方式從遠程節(jié)點請求多播或單播請求势腮。

  4. Phase R1-3:所有節(jié)點都將它們的執(zhí)行結(jié)果集發(fā)送到Ignite客戶端節(jié)點联贩。

  5. Phase R: Ignite客戶端節(jié)點將以reducer的形式出現(xiàn),并將所有結(jié)果集中在一個結(jié)果集中捎拯。在我們的示例中泪幌,它將把結(jié)果打印到控制臺。

以下是Sql query完整的代碼樣例:
Ignite配置文件:/resources/example-ignite.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
       http://www.apache.org/licenses/LICENSE-2.0
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<!--
    Ignite configuration with all defaults and enabled p2p deployment and enabled events.
    Used for testing IgniteSink running Ignite in a client mode.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47500</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Ignite 日志配置文件:/resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>
        </encoder>
    </appender>
    <logger name="com.mycookcode.bigData.ignite" level="INFO" additivity="false">
        <appender-ref ref="STDOUT"/>
    </logger>
    <!-- Strictly speaking, the level attribute is not necessary since -->
    <!-- the level of the root level is set to DEBUG by default.       -->
    <root level="DEBUG">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

項目的pom.xml依賴配置文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycookcode.bigData.ignite</groupId>
  <artifactId>ignite-sqlquery</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>ignite-sqlquery</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <ignite.version>2.6.0</ignite.version>
  </properties>


  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-spring</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-slf4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>

    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.3</version>
    </dependency>

    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.2.3</version>
    </dependency>


  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>com.jolira</groupId>
        <artifactId>onejar-maven-plugin</artifactId>
        <version>1.4.4</version>
        <executions>
          <execution>
            <id>build-query</id>
            <configuration>
              <mainClass>com.mycookcode.bigData.ignite.App</mainClass>
              <attachToBuild>true</attachToBuild>
              <classifier>onejar</classifier>
              <filename>sql-query-employees-runnable.jar</filename>
            </configuration>
            <goals>
              <goal>one-jar</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

緩存的實體類文件:
com.mycookcode.bigData.ignite.model.Department:

package com.mycookcode.bigData.ignite.model;

import org.apache.ignite.cache.query.annotations.QuerySqlField;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;



public class Department implements Serializable {

    private static final AtomicInteger GENERATED_ID = new AtomicInteger();

    @QuerySqlField(index = true)
    private Integer deptno;

    @QuerySqlField
    private String dname;

    @QuerySqlField
    private String loc;

    public Department(String dname,String loc)
    {
        this.deptno = GENERATED_ID.incrementAndGet();
        this.dname = dname;
        this.loc = loc;
    }

    public Integer getDeptno() {
        return deptno;
    }

    public void setDeptno(Integer deptno) {
        this.deptno = deptno;
    }

    public String getDname() {
        return dname;
    }

    public void setDname(String dname) {
        this.dname = dname;
    }

    public String getLoc() {
        return loc;
    }

    public void setLoc(String loc) {
        this.loc = loc;
    }

    @Override public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Department that = (Department)o;
        return Objects.equals(deptno, that.deptno) &&
                Objects.equals(dname, that.dname) &&
                Objects.equals(loc, that.loc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(deptno, dname, loc);
    }

    @Override public String toString() {
        return "Department[" +
                "deptno=" + deptno +
                ", dname='" + dname + '\'' +
                ", loc='" + loc + '\'' +
                ']';
    }
}

com.mycookcode.bigData.ignite.model.EmployeeKey:

package com.mycookcode.bigData.ignite.model;


import org.apache.ignite.cache.affinity.AffinityKeyMapped;

import java.io.Serializable;
import java.util.Objects;

public class EmployeeKey implements Serializable{

    private final int empNo;

    @AffinityKeyMapped
    private final int deptNo;

    public EmployeeKey(int empNo, int deptNo) {
        this.empNo = empNo;
        this.deptNo = deptNo;
    }

    public int getEmpNo() {
        return empNo;
    }

    public int getDeptNo() {
        return deptNo;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        EmployeeKey key = (EmployeeKey)o;
        return empNo == key.empNo &&
                deptNo == key.deptNo;
    }

    @Override
    public int hashCode() {
        return Objects.hash(empNo, deptNo);
    }

    @Override
    public String toString() {
        return "EmployeeKey[" +
                "empNo=" + empNo +
                ", deptNo=" + deptNo +
                ']';
    }
}

com.mycookcode.bigData.ignite.model.Employee:

package com.mycookcode.bigData.ignite.model;


import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QueryTextField;

import java.io.Serializable;
import java.time.LocalDate;
import java.util.concurrent.atomic.AtomicInteger;

public class Employee implements Serializable{

    private static final AtomicInteger GENERATED_ID = new AtomicInteger();

    @QuerySqlField(index = true)
    private Integer empno;

    @QuerySqlField
    private String ename;

    @QueryTextField
    private String job;

    @QuerySqlField
    private Integer mgr;

    @QuerySqlField
    private LocalDate hiredate;

    @QuerySqlField
    private Integer sal;

    @QuerySqlField(index = true)
    private Integer deptno;

    private transient EmployeeKey key;

    public Employee(String ename, Department dept, String job, Integer mgr, LocalDate hiredate, Integer sal) {
        this.empno = GENERATED_ID.incrementAndGet();
        this.ename = ename;
        this.job = job;
        this.mgr = mgr;
        this.hiredate = hiredate;
        this.sal = sal;
        this.deptno = dept.getDeptno();
    }

    public Integer getEmpno() {
        return empno;
    }

    public void setEmpno(Integer empno) {
        this.empno = empno;
    }


    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public Integer getMgr() {
        return mgr;
    }

    public void setMgr(Integer mgr) {
        this.mgr = mgr;
    }

    public LocalDate getHiredate() {
        return hiredate;
    }

    public void setHiredate(LocalDate hiredate) {
        this.hiredate = hiredate;
    }

    public Integer getSal() {
        return sal;
    }

    public void setSal(Integer sal) {
        this.sal = sal;
    }

    public Integer getDeptno() {
        return deptno;
    }

    public void setDeptno(Integer deptno) {
        this.deptno = deptno;
    }

    //Affinity employee key
    public EmployeeKey getKey() {
        if (key == null) {
            key = new EmployeeKey(empno, deptno);
        }
        return key;
    }

    @Override public String toString() {
        return "Employee[" +
                "    ename='" + ename + '\'' +
                ", job='" + job + '\'' +
                ", mgr=" + mgr +
                ", hiredate=" + hiredate +
                ", sal=" + sal +
                ']';
    }
}

Ignite服務啟動類com.mycookcode.bigData.ignite.App,實現(xiàn)Sql query查詢服務:

package com.mycookcode.bigData.ignite;

import com.mycookcode.bigData.ignite.model.Department;
import com.mycookcode.bigData.ignite.model.Employee;
import com.mycookcode.bigData.ignite.model.EmployeeKey;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.List;

/**
 * Hello world!
 *
 */
public class App {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy");

    private static Logger logger = LoggerFactory.getLogger(App.class);


    private static final String DEPARTMENT_CACHE_NAME = App.class.getSimpleName() + "-departments";

    private static final String EMPLOYEE_CACHE_NAME = App.class.getSimpleName() + "-employees";


    public static void main(String[] args) throws Exception {

        Ignite ignite = Ignition.start("example-ignite.xml");
        logger.info("Start. Sql query example.");

        CacheConfiguration<Integer, Department> deptCacheCfg = new CacheConfiguration<>(DEPARTMENT_CACHE_NAME);

        deptCacheCfg.setCacheMode(CacheMode.REPLICATED);
        deptCacheCfg.setIndexedTypes(Integer.class, Department.class);

        CacheConfiguration<EmployeeKey, Employee> employeeCacheCfg = new CacheConfiguration<>(EMPLOYEE_CACHE_NAME);

        employeeCacheCfg.setCacheMode(CacheMode.PARTITIONED);
        employeeCacheCfg.setIndexedTypes(EmployeeKey.class, Employee.class);

        try (
                IgniteCache<Integer, Department> deptCache = ignite.createCache(deptCacheCfg);
                IgniteCache<EmployeeKey, Employee> employeeCache = ignite.createCache(employeeCacheCfg)
        ) {
            initialize();
            sqlQuery();

            sqlQueryWithJoin();
            sqlQueryEmployeesWithSalHigherManager();

            sqlFieldsQuery();
            sqlFieldsQueryWithJoin();

            aggregateQuery();
            groupByQuery();
        }
    }


    private static void initialize() throws InterruptedException {
        IgniteCache<Integer, Department> deptCache = Ignition.ignite().cache(DEPARTMENT_CACHE_NAME);
        IgniteCache<EmployeeKey, Employee> employeeCache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        //啟動之前清除緩存
        deptCache.clear();
        employeeCache.clear();

        //Departments數(shù)據(jù)初始化
        Department dept1 = new Department("Accounting", "New York");
        Department dept2 = new Department("Research", "Dallas");
        Department dept3 = new Department("Sales", "Chicago");
        Department dept4 = new Department("Operations", "Boston");

        //Employees數(shù)據(jù)初始化
        Employee emp1 = new Employee("King", dept1, "President", null, localDateOf("17-11-1981"), 5000);
        Employee emp2 = new Employee("Blake", dept3, "Manager", emp1.getEmpno(), localDateOf("01-05-1981"), 2850);
        Employee emp3 = new Employee("Clark", dept1, "Manager", emp1.getEmpno(), localDateOf("09-06-1981"), 2450);
        Employee emp4 = new Employee("Jones", dept2, "Manager", emp1.getEmpno(), localDateOf("02-04-1981"), 2975);
        Employee emp5 = new Employee("Scott", dept2, "Analyst", emp4.getEmpno(), localDateOf("13-07-1987").minusDays(85), 3000);
        Employee emp6 = new Employee("Ford", dept2, "Analyst", emp4.getEmpno(), localDateOf("03-12-1981"), 3000);
        Employee emp7 = new Employee("Smith", dept2, "Clerk", emp6.getEmpno(), localDateOf("17-12-1980"), 800);
        Employee emp8 = new Employee("Allen", dept3, "Salesman", emp2.getEmpno(), localDateOf("20-02-1981"), 1600);
        Employee emp9 = new Employee("Ward", dept3, "Salesman", emp2.getEmpno(), localDateOf("22-02-1981"), 1250);
        Employee emp10 = new Employee("Martin", dept3, "Salesman", emp2.getEmpno(), localDateOf("28-09-1981"), 1250);
        Employee emp11 = new Employee("Turner", dept3, "Salesman", emp2.getEmpno(), localDateOf("08-09-1981"), 1500);
        Employee emp12 = new Employee("Adams", dept2, "Clerk", emp5.getEmpno(), localDateOf("13-07-1987").minusDays(51), 1100);
        Employee emp13 = new Employee("James", dept3, "Clerk", emp2.getEmpno(), localDateOf("03-12-1981"), 950);
        Employee emp14 = new Employee("Miller", dept1, "Clerk", emp3.getEmpno(), localDateOf("23-01-1982"), 1300);

        deptCache.put(dept1.getDeptno(), dept1);
        deptCache.put(dept2.getDeptno(), dept2);
        deptCache.put(dept3.getDeptno(), dept3);
        deptCache.put(dept4.getDeptno(), dept4);

        employeeCache.put(emp1.getKey(), emp1);
        employeeCache.put(emp2.getKey(), emp2);
        employeeCache.put(emp3.getKey(), emp3);
        employeeCache.put(emp4.getKey(), emp4);
        employeeCache.put(emp5.getKey(), emp5);
        employeeCache.put(emp6.getKey(), emp6);
        employeeCache.put(emp7.getKey(), emp7);
        employeeCache.put(emp8.getKey(), emp8);
        employeeCache.put(emp9.getKey(), emp9);
        employeeCache.put(emp10.getKey(), emp10);
        employeeCache.put(emp11.getKey(), emp11);
        employeeCache.put(emp12.getKey(), emp12);
        employeeCache.put(emp13.getKey(), emp13);
        employeeCache.put(emp14.getKey(), emp14);

        Thread.sleep(1000);

    }

    /**
     * @param
     * @return
     */
    private static LocalDate localDateOf(String parseDateText) {
        return LocalDate.parse(parseDateText, formatter);
    }


    /**
     * 以薪水范圍為查詢條件檢索數(shù)據(jù)
     */
    private static void sqlQuery() {
        IgniteCache<EmployeeKey, Employee> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlQuery<EmployeeKey, Employee> qry = new SqlQuery<EmployeeKey, Employee>(Employee.class, "sal > ? and sal <= ?");

        logDecorated("==Employee with salaries between 0 and 1000==", cache.query(qry.setArgs(0, 1000)).getAll());
        logDecorated("==Employee with salaries between 1000 and 2000==", cache.query(qry.setArgs(1000, 2000)).getAll());
        logDecorated("==Employee with salaries greater than 2000==", cache.query(qry.setArgs(2000, Integer.MAX_VALUE)).getAll());

    }

    /**
     * 基于為特定部門工作的所有員工的SQL查詢示例祸泪。
     */
    private static void sqlQueryWithJoin() {
        IgniteCache<EmployeeKey, Employee> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlQuery<EmployeeKey, Employee> qry = new SqlQuery<>(Employee.class,
                "from Employee, \"" + DEPARTMENT_CACHE_NAME + "\".Department " +
                        "where Employee.deptno = Department.deptno " +
                        "and lower(Department.dname) = lower(?)");

        logDecorated("==Following department 'Accounting' have employees (SQL join)==", cache.query(qry.setArgs("Accounting")).getAll());
        logDecorated("==Following department 'Sales' have employees (SQL join)==", cache.query(qry.setArgs("Sales")).getAll());
    }


    /**
     * 輸出所有工資高于直屬上級管理人員的員工
     */
    private static void sqlQueryEmployeesWithSalHigherManager() {
        IgniteCache<EmployeeKey, Employee> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlFieldsQuery qry = new SqlFieldsQuery("select e.ename ,e.sal,m.ename,m.sal from Employee e, " +
                "(select ename,empno,sal from Employee) m " +
                "where e.mgr = m.empno and e.sal > m.sal");
        log("==All employees those salary bigger than their direct manager==");
        logDecorated("||Employee||Emp.Salary||Manager||Mgr.Salary||", cache.query(qry).getAll());
    }


    /**
     * 基于sql的字段查詢的示例吗浩,該查詢只返回必需字段,而不是整個鍵-值對
     */
    private static void sqlFieldsQuery() {
        IgniteCache<?, ?> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlFieldsQuery qry = new SqlFieldsQuery("select ename from Employee");

        Collection<List<?>> res = cache.query(qry).getAll();

        log("==Names of all employees==", res);
    }

    /**
     * 基于sql的字段查詢的示例浴滴,該查詢只返回必需字段拓萌,而不是整個鍵-值對岁钓。
     */
    private static void sqlFieldsQueryWithJoin(){
        IgniteCache<?, ?> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlFieldsQuery qry = new SqlFieldsQuery(
                "select e.ename, d.dname " +
                        "from Employee e, \"" + DEPARTMENT_CACHE_NAME + "\".Department d " +
                        "where e.deptno = d.deptno");

        Collection<List<?>> res = cache.query(qry).getAll();

        logDecorated("==Names of all employees and departments they belong to (SQL join)==", res);
    }


    /**
     * 基于sql的字段查詢的示例升略,該查詢只返回必需字段,而不是整個鍵-值對屡限。
     */
    private static void aggregateQuery()
    {
        IgniteCache<?, ?> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlFieldsQuery qry = new SqlFieldsQuery("select sum(sal), count(sal) from Employee");

        Collection<List<?>> res = cache.query(qry).getAll();

        double sum = 0;
        long cnt = 0;

        for (List<?> row : res)
        {
            if (row.get(0) != null) {
                sum += ((BigDecimal)row.get(0)).doubleValue();
                cnt += (Long)row.get(1);
            }
        }
        log("==Average employee salary (aggregation query)==");
        log("\t" + (cnt > 0 ? (sum / cnt) : "n/a"));
    }


    /**
     * 基于sql的字段查詢的示例品嚣,該查詢只返回必需字段,而不是整個鍵-值對钧大。
     */
    private static void groupByQuery()
    {
        IgniteCache<?, ?> cache = Ignition.ignite().cache(EMPLOYEE_CACHE_NAME);

        SqlFieldsQuery qry = new SqlFieldsQuery(
                "select avg(e.sal), d.dname " +
                        "from Employee e, \"" + DEPARTMENT_CACHE_NAME + "\".Department d " +
                        "where e.deptno = d.deptno " +
                        "group by d.dname " +
                        "having avg(e.sal) > ?");

        logDecorated("==Average salaries per Department (group-by query)==", cache.query(qry.setArgs(500)).getAll());
    }

    /**
     * Prints message to logger.
     *
     * @param msg String.
     */
    private static void log(String msg) {
        logger.info("\t" + msg);
    }

    /**
     * Prints message to logger.
     *
     * @param msg String.
     */
    private static void log(String msg, Iterable<?> col) {
        logger.info("\t" + msg);
        col.forEach(c -> logger.info("\t\t" + c));
    }


    /**
     * Prints message and resultset to logger.
     *
     * @param msg String.
     * @param col Iterable
     */
    private static void logDecorated(String msg, Iterable<?> col) {
        logger.info("\t" + msg);
        col.forEach(c -> logger.info("\t\t" + c));
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翰撑,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子啊央,更是在濱河造成了極大的恐慌眶诈,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓜饥,死亡現(xiàn)場離奇詭異逝撬,居然都是意外死亡,警方通過查閱死者的電腦和手機乓土,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人著隆,你說我怎么就攤上這事凿歼。” “怎么了食磕?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵尽棕,是天一觀的道長。 經(jīng)常有香客問我彬伦,道長萄金,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任媚朦,我火速辦了婚禮氧敢,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘询张。我一直安慰自己孙乖,他們只是感情好,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著唯袄,像睡著了一般弯屈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上恋拷,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天资厉,我揣著相機與錄音,去河邊找鬼蔬顾。 笑死宴偿,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的诀豁。 我是一名探鬼主播窄刘,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼舷胜!你這毒婦竟也來了娩践?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤烹骨,失蹤者是張志新(化名)和其女友劉穎翻伺,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沮焕,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡吨岭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了遇汞。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片未妹。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖空入,靈堂內(nèi)的尸體忽然破棺而出络它,到底是詐尸還是另有隱情,我是刑警寧澤歪赢,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布化戳,位于F島的核電站,受9級特大地震影響埋凯,放射性物質(zhì)發(fā)生泄漏点楼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一白对、第九天 我趴在偏房一處隱蔽的房頂上張望掠廓。 院中可真熱鬧,春花似錦甩恼、人聲如沸蟀瞧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悦污。三九已至铸屉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間切端,已是汗流浹背彻坛。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留踏枣,地道東北人昌屉。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像椰于,于是被迫代替她去往敵國和親怠益。 傳聞我的和親對象是個殘疾皇子仪搔,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容