Redis Pipelining
可以一次發(fā)送多個(gè)命令璧诵,并按順序執(zhí)行邪财、返回結(jié)果,節(jié)省RTT(Round Trip Time)。
使用Pipelining
Jedis客戶端支持Redis的Pipelining坝初,使用方式如下:
public static void main(String[] args) {
Jedis jedis = null;
Pipeline pipeline = null;
try{
jedis = new Jedis("localhost", 6379);
//使用pipeline
pipeline = jedis.pipelined();
//開(kāi)始時(shí)間
long start = System.currentTimeMillis();
//刪除lists
pipeline.del("lists");
//循環(huán)添加10000個(gè)元素
for(int i = 0; i < 10000; i++){
pipeline.rpush("lists", i + "");
}
//執(zhí)行
pipeline.sync();
//結(jié)束時(shí)間
long end = System.currentTimeMillis();
System.out.println(end - start);
}catch (Exception e){
e.printStackTrace();
}finally {
if(pipeline != null){
try {
pipeline.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(jedis != null){
jedis.close();
}
}
}
=> "237"
同樣的命令,不使用Pipelining消耗時(shí)間是800ms-900ms钾军。測(cè)試時(shí)Redis Server和Client都運(yùn)行在同一臺(tái)機(jī)器鳄袍,由于本地環(huán)回接口(loopback interface)的原因RTT會(huì)非常短,真實(shí)環(huán)境下的差距會(huì)更大吏恭。
從協(xié)議層面看Pipelining
從Redis的RESP協(xié)議上看拗小,Pipelining并沒(méi)有什么特殊的地方,只是把多個(gè)命令連續(xù)的發(fā)送給Redis Server樱哼,然后一一解析返回結(jié)果:
public static void main(String[] args) throws Exception{
Socket socket = new Socket();
//TIME_WAIT狀態(tài)下可以復(fù)用端口
socket.setReuseAddress(true);
//空閑時(shí)發(fā)送數(shù)據(jù)包哀九,確認(rèn)服務(wù)端狀態(tài)
socket.setKeepAlive(true);
//關(guān)閉Nagle算法,盡快發(fā)送
socket.setTcpNoDelay(true);
//調(diào)用close方法立即關(guān)閉socket搅幅,丟棄所有未發(fā)送的數(shù)據(jù)包
socket.setSoLinger(true, 0);
//連接server
socket.connect(new InetSocketAddress("localhost", 6379), 3000);
//設(shè)置讀取時(shí)超時(shí)時(shí)間
socket.setSoTimeout(3000);
OutputStream os = socket.getOutputStream();
InputStream is = socket.getInputStream();
/**
* SET 命令 寫第一個(gè)命令
* 協(xié)議: array 3個(gè)元素 SET simpleKey simpleValue
*/
os.write(getBytes("*3\r\n$3\r\nSET\r\n$9\r\nsimpleKey\r\n$11\r\nsimpleValue\r\n"));
/**
* GET 命令 寫第二個(gè)命令
* 協(xié)議: array 2個(gè)元素 GET simpleKey
*/
os.write(getBytes("*2\r\n$3\r\nGET\r\n$9\r\nsimpleKey\r\n"));
os.flush();
/**
* 解析第一個(gè)命令SET的返回結(jié)果
*/
String result = analysisResult(is);
System.out.println("SET command response : " + result);
System.out.println();
/**
* 解析第二個(gè)命令GET返回結(jié)果
*/
String value = analysisResult(is);
System.out.println("GET command response : " + value);
}
=>
response type is : +
SET command response : OK
response type is : $
$ value len : 11
GET command response : simpleValue
RESP協(xié)議和詳細(xì)代碼可以參考《Redis協(xié)議:RESP》阅束。
Jedis的Pipelining實(shí)現(xiàn)方式
Pipeline pipeline = jedis.pipelined();
通過(guò)Jedis對(duì)象的pipelined方法可以創(chuàng)建Pipeline對(duì)象。pipelined方法內(nèi)部實(shí)際上是把Jedis對(duì)象的client賦給了pipeline茄唐。在《Redis客戶端:Jedis》中介紹過(guò)Jedis類的結(jié)構(gòu)息裸,Pipeline類的結(jié)構(gòu)與Jedis類似也實(shí)現(xiàn)了多個(gè)接口。不同的是方法的返回值沪编,所有Pipeline中方法的返回值都被封裝成了Response類呼盆。
當(dāng)通過(guò)Pipeline對(duì)象執(zhí)行命令時(shí),同樣也會(huì)委托給內(nèi)部的Client對(duì)象去執(zhí)行蚁廓,但不會(huì)立即調(diào)用client的getXXX方法獲取返回結(jié)果访圃,而是創(chuàng)建了一個(gè)Response對(duì)象:
public Response<Long> del(String key) {
getClient(key).del(key);
return getResponse(BuilderFactory.LONG);
}
DEL
命令的返回值是0或1表示是否刪除成功,所以傳入了BuilderFactory.LONG
用來(lái)解析Integer型的返回結(jié)果相嵌。
protected <T> Response<T> getResponse(Builder<T> builder) {
Response<T> lr = new Response<T>(builder);
pipelinedResponses.add(lr);
return lr;
}
在getResponse方法中腿时,創(chuàng)建了Response對(duì)象克胳,每個(gè)Response對(duì)象都有一個(gè)解析返回結(jié)果的Builder。Response按照命令的執(zhí)行順序被添加到pipelinedResponses隊(duì)列中圈匆。
Pipeline對(duì)象的sync方法會(huì)真正的執(zhí)行命令:
public void sync() {
if (getPipelinedResponseLength() > 0) {
//這里會(huì)真正的調(diào)用client執(zhí)行命令漠另,并獲取返回結(jié)果
List<Object> unformatted = client.getAll();
//將按照協(xié)議解析、分隔好的返回結(jié)果跃赚,按順序賦給隊(duì)列中的Response
for (Object o : unformatted) {
generateResponse(o);
}
}
}
在getAll方法中會(huì)按照RESP協(xié)議的結(jié)構(gòu)解析返回結(jié)果笆搓,將輸入流中的內(nèi)容按照協(xié)議格式切分成每個(gè)命令的返回結(jié)果:
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
//執(zhí)行命令
flush();
//pipelinedCommands是一個(gè)計(jì)數(shù)器,記錄了執(zhí)行了多少個(gè)命令
while (pipelinedCommands > except) {
try {
//readProtocolWithCheckingBroken中解析了返回結(jié)果
all.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
all.add(e);
}
//每解析一個(gè)命令纬傲,計(jì)數(shù)減1满败,為0時(shí)退出循環(huán)
pipelinedCommands--;
}
return all;
}
readProtocolWithCheckingBroken方法解析的方式與普通Jedis解析方式一致。當(dāng)從Response獲取返回結(jié)果時(shí)叹括,會(huì)用設(shè)置好的Builder把unformatted的數(shù)據(jù)轉(zhuǎn)換成對(duì)應(yīng)的結(jié)構(gòu)算墨。
public class Response<T> {
protected T response = null;
...
//通過(guò)get獲取命令返回值
public T get() {
...
if (!built) {
//通過(guò)builder格式化返回結(jié)果
build();
}
...
return response;
}
private void build() {
...
response = builder.build(data);
...
}
}