站在巨人的肩膀上
- ContinuousQuery(簡(jiǎn)稱CQ):持續(xù)查詢,是指Client可以按照OQL(Object Query Language)查詢語(yǔ)句注冊(cè)自己感興趣的event部念,而這些event將發(fā)送給Client的Listener誊稚,一旦Server有event發(fā)生扳缕,就會(huì)將此event傳遞給Client丈冬。
- 監(jiān)聽的事件類型:update create destroy
- CQ查詢的特性:
能夠使用標(biāo)準(zhǔn)的OQL語(yǔ)句
對(duì)CQ事件進(jìn)行管理
完全整合C/S架構(gòu)
基于數(shù)據(jù)值的訂閱
活躍查詢執(zhí)行
一個(gè)簡(jiǎn)單的業(yè)務(wù)需求:
Client向Server訂閱監(jiān)聽年齡在15~35歲Customer之間的數(shù)據(jù):
Server
@SpringBootApplication
@CacheServerApplication(name = "GemFireContinuousQueryServer")
public class Application {
@Bean(name = "Customers")
PartitionedRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
PartitionedRegionFactoryBean<Long, Customer> customers = new PartitionedRegionFactoryBean<>();
customers.setCache(gemfireCache);
customers.setClose(false);
customers.setPersistent(false);
return customers;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
pom.xml 中的關(guān)鍵依賴:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-gemfire</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.gj.demo</groupId>
<artifactId>gemfire-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
Client
@SpringBootApplication
@ClientCacheApplication(name = "GemFireContinuousQueryClient", subscriptionEnabled = true)
@SuppressWarnings("unused")
public class Application {
@Bean(name = "Customers")
ClientRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
ClientRegionFactoryBean<Long, Customer> customers = new ClientRegionFactoryBean<>();
customers.setCache(gemfireCache);
customers.setClose(true);
customers.setShortcut(ClientRegionShortcut.PROXY);
return customers;
}
@Bean
ContinuousQueryListenerContainer continuousQueryListenerContainer(GemFireCache gemfireCache) {
Region<Long, Customer> customers = gemfireCache.getRegion("/Customers");
ContinuousQueryListenerContainer container = new ContinuousQueryListenerContainer();
container.setCache(gemfireCache);
container.setQueryListeners(asSet(ageQueryDefinition(customers, 15,35)));
return container;
}
private ContinuousQueryDefinition ageQueryDefinition(Region<Long, Customer> customers, int
ageFrom,int ageTo){
String query = String.format("SELECT * FROM /Customers c WHERE c.getAge().intValue() > %d AND c.getAge().intValue() < %d ", ageFrom,ageTo);
return new ContinuousQueryDefinition("Young Query ",query,newQueryListener(customers,"Young Query"));
}
private ContinuousQueryListener newQueryListener(Region<Long, Customer> customers, String qualifier) {
return event -> {
System.err.printf("new order!" + event.toString());
};
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Customer:
@Region
public class Customer implements Serializable {
private static final long serialVersionUID = -3860687524824507124L;
private String firstname, lastname;
private int age;
Long id;
//此處省略了get,set方法
@Override
public String toString() {
return String.format("%1$s %2$s %3$s", getFirstname(), getLastname(),getAge());
}
}
程序跑起來(lái)~
分別啟動(dòng)Server和Client,在瀏覽器傳入相關(guān)參數(shù):
查看Client的控制臺(tái)日志:
至此吠勘,就是一個(gè)完整的Client向Server訂閱監(jiān)聽,收到訂閱消息的全過(guò)程居兆。
CQ查詢的數(shù)據(jù)流
當(dāng)數(shù)據(jù)條目在服務(wù)器端更新時(shí),新數(shù)據(jù)會(huì)經(jīng)過(guò)下面的步驟:
1.region條目發(fā)生變更
2.每一個(gè)事件竹伸,服務(wù)器的CQ處理框架檢查是否與運(yùn)行的CQ匹配
3.如果數(shù)據(jù)條目的變更匹配了CQ查詢泥栖,CQ事件將被發(fā)送到客戶端上的CQ監(jiān)聽器簇宽,CQ監(jiān)聽器獲得此事件。
如上圖所示:
X條目新值和舊值都匹配了CQ查詢吧享,因此查詢結(jié)果的更新的事件被發(fā)送出來(lái)魏割。
Y條目舊值匹配了,但這是查詢結(jié)果的一部分钢颂,Y條目操作為失敗 钞它,因?yàn)椴樵兘Y(jié)果被銷毀的事件被發(fā)送出來(lái)。
Z條目為新創(chuàng)殊鞭,并不匹配CQ事件遭垛,所以事件不發(fā)送。
值得注意的是操灿,CQ并不更新客戶端的Region,CQ作為CQ監(jiān)聽器的通告工具而服務(wù)锯仪,CQ監(jiān)聽器可以按照客戶應(yīng)用的要求任意編程。
當(dāng)一個(gè)CQ運(yùn)行在服務(wù)器Region的時(shí)候趾盐,每一個(gè)Server條目更新線程都放在CQ查詢中庶喜,如果old value或者new value 滿足查詢條件,線程將放到CqEvent的Client隊(duì)列中去救鲤,一旦Client接受了此事件溃卡,CqEvent將被傳遞到CqListeners的onEvent方法上,如下圖所示:
QueryService 接口提供的方法
create a new CQ and specify whether it is durable
execute a CQ,with or without an initial set
list all the CQs registered by the client
close and stop CQs at the cache and region level
get a handle on CqStatistics for the client
CqQuery:管理持續(xù)查詢的方法蜒简,通過(guò)QueryService 創(chuàng)建瘸羡,用于開啟和停止CQ執(zhí)行,同時(shí)查詢其他與CQ想關(guān)聯(lián)的對(duì)象搓茬,such as CQ屬性犹赖,CQ統(tǒng)計(jì)和CQ狀態(tài)。
CqListener:用于處理持續(xù)查詢的事件卷仑。
*CqEvent:提供了從Server發(fā)送的所有的CQ事件信息峻村,此事件被傳遞到CqListener的onEvent方法。
程序媛小白一枚锡凝,如有錯(cuò)誤粘昨,煩請(qǐng)批評(píng)指正!(#.#)