gRPC學(xué)習(xí)記錄(六)--客戶(hù)端連接池
標(biāo)簽(空格分隔): javaWEB
對(duì)于客戶(hù)端來(lái)說(shuō)建立一個(gè)channel是昂貴的,因?yàn)閯?chuàng)建channel需要連接,但是建立一個(gè)stub是很簡(jiǎn)單的,就像創(chuàng)建一個(gè)普通對(duì)象,因此Channel就需要復(fù)用,也就是說(shuō)需要實(shí)現(xiàn)一個(gè)連接池應(yīng)用.本文使用commons-pool2
來(lái)實(shí)現(xiàn)連接池應(yīng)用. 該案例可以當(dāng)成連接池模板案例.
<!--客戶(hù)端連接池-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
寫(xiě)法的封裝類(lèi)似我的另一篇博文: Redis學(xué)習(xí)記錄(二)--使用Jedis連接,模仿SpringJdbcTemplate模板類(lèi)寫(xiě)法.
首先定義一個(gè)回調(diào)接口.
public interface WorkCallBack<S> {
void callback(S s);
}
再者定義一個(gè)產(chǎn)生連接池的工廠,需要繼承自BasePooledObjectFactory,其用處是產(chǎn)生連接池中需要的客戶(hù)端.
public class HelloWorldFactory extends BasePooledObjectFactory<HelloWorldClientSingle> {
private String host = "127.0.0.1";
private int port = 50051;
@Override
public HelloWorldClientSingle create() throws Exception {
return new HelloWorldClientSingle(this.host,this.port);
}
@Override
public PooledObject<HelloWorldClientSingle> wrap(HelloWorldClientSingle helloWorldClientSingle) {
return new DefaultPooledObject<>(helloWorldClientSingle);
}
@Override
public void destroyObject(PooledObject<HelloWorldClientSingle> p) throws Exception {
p.getObject().shutdown();
super.destroyObject(p);
}
}
實(shí)現(xiàn)相應(yīng)的連接池,注意看execute()
方法,該方法是一個(gè)模板方法,進(jìn)行從池中獲取客戶(hù)端和歸還客戶(hù)端給連接池,其主要邏輯都定義在WorkCallBack
接口中,也因此從服務(wù)端拿回的數(shù)據(jù)要在其里面處理完畢.
public class HelloWorldClientPool {
private static GenericObjectPool<HelloWorldClientSingle> objectPool = null;
static {
// 連接池的配置
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 池中的最大連接數(shù)
poolConfig.setMaxTotal(8);
// 最少的空閑連接數(shù)
poolConfig.setMinIdle(0);
// 最多的空閑連接數(shù)
poolConfig.setMaxIdle(8);
// 當(dāng)連接池資源耗盡時(shí),調(diào)用者最大阻塞的時(shí)間,超時(shí)時(shí)拋出異常 單位:毫秒數(shù)
poolConfig.setMaxWaitMillis(-1);
// 連接池存放池化對(duì)象方式,true放在空閑隊(duì)列最前面,false放在空閑隊(duì)列最后
poolConfig.setLifo(true);
// 連接空閑的最小時(shí)間,達(dá)到此值后空閑連接可能會(huì)被移除,默認(rèn)即為30分鐘
poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L);
// 連接耗盡時(shí)是否阻塞,默認(rèn)為true
poolConfig.setBlockWhenExhausted(true);
// 連接池創(chuàng)建
objectPool = new GenericObjectPool<>(new HelloWorldFactory(), poolConfig);
}
/**
* 從連接池獲取對(duì)象
*/
private static HelloWorldClientSingle borrowObject(){
try {
HelloWorldClientSingle clientSingle = objectPool.borrowObject();
System.out.println("總創(chuàng)建線程數(shù)"+objectPool.getCreatedCount());
return clientSingle;
} catch (Exception e) {
e.printStackTrace();
}
//連接池失敗則主動(dòng)創(chuàng)建
return createClient();
}
/**
* 當(dāng)連接池異常,則主動(dòng)創(chuàng)建對(duì)象
*/
private static HelloWorldClientSingle createClient(){
return new HelloWorldClientSingle("127.0.0.1", 55001);
}
/**
* 執(zhí)行器
* @param workCallBack 主要服務(wù)內(nèi)容
*/
public static Runnable execute(WorkCallBack<HelloWorldClientSingle> workCallBack){
return () -> {
HelloWorldClientSingle client = borrowObject();
try {
workCallBack.callback(client);
} finally {
/** 將連接對(duì)象返回給連接池 */
objectPool.returnObject(client);
}
};
}
}
對(duì)于客戶(hù)端來(lái)說(shuō),因?yàn)榇娓遣荒芄蚕淼?所以存根要放在調(diào)用函數(shù)中實(shí)例化,不要擔(dān)心性能問(wèn)題,這個(gè)操作就是創(chuàng)建一個(gè)對(duì)象.greeterBlockingStub = GreeterGrpc.newBlockingStub(channel).withCompression("gzip");
該代碼從之前的構(gòu)造函數(shù)中移位到了greet中.
public class HelloWorldClientSingle {
private final ManagedChannel channel; //一個(gè)gRPC信道
private GreeterGrpc.GreeterBlockingStub greeterBlockingStub;//阻塞/同步 存根
//初始化信道和存根
public HelloWorldClientSingle(String host,int port){
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
//客戶(hù)端方法
public void greet(String name){
//需要用到存根時(shí)創(chuàng)建,不可復(fù)用
greeterBlockingStub = GreeterGrpc.newBlockingStub(channel).withCompression("gzip");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = greeterBlockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
System.out.println("RPC調(diào)用失敗:"+e.getMessage());
return;
}
System.out.println("服務(wù)器返回信息:"+response.getMessage());
}
}
寫(xiě)一個(gè)簡(jiǎn)單的測(cè)試類(lèi)
public class HelloWorldClientTest {
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread( HelloWorldClientPool.execute(clientSingle -> {
clientSingle.greet("world");
})).start();
}
}
}
因?yàn)槲蚁薅ǖ?個(gè)客戶(hù)端,所以最高也就8個(gè)channel再跑了.