本人微信公眾號(jwfy)歡迎關(guān)注
上一期完成了手寫一個RPC框架,看看100個線程同時調(diào)用效果如何吩谦,但還是遺留了很多問題以及可以優(yōu)化的點溉箕,這次就完全重寫之前的代碼荔棉,演進到v2版本,使得代碼邏輯更加規(guī)范的同時,引入ZooKeeper輔助完成服務(wù)治理连躏。
在代碼展示之前還是先介紹一些基本的概念以及設(shè)計思路,ZooKeeper是什么笋除,服務(wù)治理又是什么等寂屏,最后貼了部分關(guān)鍵代碼以說明和v1版本的區(qū)別,有哪些點的改進措施策严。
最后還提了個問題:線程池打滿了怎么辦?穗慕,你有什么好的解決方案呢?
ZooKeeper
ZooKeeper(直譯為動物管理員妻导,簡稱zk)是一個分布式逛绵、開源的應(yīng)用協(xié)調(diào)服務(wù)怀各,利用和Paxos類似的ZAB選舉算法實現(xiàn)分布式一致性服務(wù)。有類似于Unix文件目錄的節(jié)點信息术浪,同時可以針對節(jié)點的變更添加watcher監(jiān)聽以能夠即使感知到節(jié)點信息變更瓢对。可提供的功能例如域名服務(wù)胰苏、配置維護硕蛹、同步以及組服務(wù)等(此功能介紹來自官網(wǎng)描述:It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface)。如下圖就是DUBBO存儲在ZooKeeper的節(jié)點數(shù)據(jù)情況硕并。
在本地啟動服務(wù)后通過zk客戶端連接后也可通過命令查看節(jié)點信息法焰,如下圖所示。
ZooKeeper包含了4種不同含義的功能節(jié)點倔毙,在每次創(chuàng)建節(jié)點之前都需要明確聲明節(jié)點類型埃仪。
類型 | 定義 | 描述 |
---|---|---|
PERSISTENT | 持久化目錄節(jié)點 | 客戶端與zookeeper斷開連接后,該節(jié)點依舊存在 |
PERSISTENT_SEQUENTIAL | 持久化順序編號目錄節(jié)點 | 客戶端與zookeeper斷開連接后陕赃,該節(jié)點依舊存在卵蛉,只是Zookeeper給該節(jié)點名稱進行順序編號 |
EPHEMERAL | 臨時目錄節(jié)點 | 客戶端與zookeeper斷開連接后,該節(jié)點被刪除 |
EPHEMERAL_SEQUENTIAL | 臨時順序編號目錄節(jié)點 | 客戶端與zookeeper斷開連接后么库,該節(jié)點被刪除傻丝,只是Zookeeper給該節(jié)點名稱進行順序編號 |
ZooKeeper使用之前需要先進行安裝,后開啟服務(wù)端的服務(wù)诉儒,我們的服務(wù)作為客戶端
連接ZooKeeper以便于后續(xù)的操作葡缰。具體可參考官網(wǎng)文檔Zookeeper3.5.5 官方文檔,在實際的java項目開發(fā)中也是可以通過maven引入ZkClient或者Curator開源的客戶端忱反,在本文學(xué)習筆記中是使用的Curator运准,因為其已經(jīng)封裝了原始的節(jié)點注冊、數(shù)據(jù)獲取缭受、添加watcher等功能胁澳。具體maven引入的版本如下,
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
服務(wù)治理
服務(wù)治理也就是針對服務(wù)進行管理的措施米者,例如服務(wù)發(fā)現(xiàn)
韭畸、服務(wù)暴露
、負載均衡
蔓搞、快速上下線
等都是服務(wù)治理的具體體現(xiàn)胰丁。
- 服務(wù)發(fā)現(xiàn):從服務(wù)管理中心獲取到需要的服務(wù)相關(guān)信息,例如我們可以從zk中獲取相關(guān)服務(wù)的機器信息喂分,然后我們就可以和具體機器直連完成相關(guān)功能
- 服務(wù)暴露:服務(wù)提供方可以提供什么樣子的功能锦庸,經(jīng)過服務(wù)暴露暴露出去,其他使用方就可以通過服務(wù)發(fā)現(xiàn)發(fā)現(xiàn)具體的服務(wù)提供方信息
- 負載均衡:一般針對的是服務(wù)提供方蒲祈,避免大量請求同時打到一臺機器上甘萧,采用隨機萝嘁、輪詢等措施讓請求均分到各個機器上,提供服務(wù)效率扬卷,
限流
牙言,灰度
等也都是類似的操作,通過動態(tài)路由怪得、軟負載的形式處理分發(fā)請求咱枉。 - 快速上線下:以往需要上下線可能需要殺掉機器上的進程,現(xiàn)在只需要讓該服務(wù)停止暴露即可徒恋,實現(xiàn)服務(wù)的靈活上下線蚕断。
數(shù)據(jù)處理流程
服務(wù)端:服務(wù)的提供方,接受網(wǎng)絡(luò)傳輸?shù)恼埱髷?shù)據(jù)入挣、通過網(wǎng)絡(luò)把應(yīng)答數(shù)據(jù)發(fā)送給客戶端
客戶端:服務(wù)的調(diào)用方基括,使用本地代理,通過網(wǎng)絡(luò)把請求數(shù)據(jù)發(fā)送出去财岔,接受服務(wù)端返回的應(yīng)答數(shù)據(jù)
所有的數(shù)據(jù)傳輸都是按照上面圖片說的流程來的,如果需要添加自定義的序列化工具河爹,則需要在把數(shù)據(jù)提交到socket的輸出流緩沖區(qū)之前按照序列化工具完成序列化操作匠璧,反序列化則進行反向操作即可。
RPC 實踐 V2版本
文件夾目錄如下圖所示咸这,其中:
- balance文件夾:負載均衡有關(guān)
- config文件夾:網(wǎng)絡(luò)套接字傳輸?shù)臄?shù)據(jù)模型以及服務(wù)暴露夷恍、服務(wù)發(fā)現(xiàn)的數(shù)據(jù)模型
- core文件夾:核心文件夾,包含了服務(wù)端和客戶端的請求處理媳维、代理生成等
- demo文件夾:測試試用
- io.protocol文件夾:目前是只有具體的請求對象和網(wǎng)絡(luò)io的封裝
- register:服務(wù)注冊使用酿雪,實現(xiàn)了使用zk進行服務(wù)注冊和服務(wù)發(fā)現(xiàn)的操作
由于代碼太長,只貼部分重要的代碼操作侄刽。
服務(wù)暴露 & 服務(wù)發(fā)現(xiàn)
public interface ServiceRegister {
/**
* 服務(wù)注冊
* @param config
*/
void register(BasicConfig config);
/**
* 服務(wù)發(fā)現(xiàn)指黎,從注冊中心獲取可用的服務(wù)提供方信息
* @param request
* @return
*/
InetSocketAddress discovery(RpcRequest request, ServiceType nodeType);
}
默認使用了CuratorFramework客戶端完成zk數(shù)據(jù)的操作
public class ZkServiceRegister implements ServiceRegister {
private CuratorFramework client;
private static final String ROOT_PATH = "jwfy/simple-rpc";
private LoadBalance loadBalance = new DefaultLoadBalance();
public ZkServiceRegister() {
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory
.builder()
.connectString("127.0.0.1:2182")
.sessionTimeoutMs(50000)
.retryPolicy(policy)
.namespace(ROOT_PATH)
.build();
// 業(yè)務(wù)的根路徑是 /jwfy/simple-rpc ,其他的都會默認掛載在這里
this.client.start();
System.out.println("zk啟動正常");
}
@Override
public void register(BasicConfig config) {
String interfacePath = "/" + config.getInterfaceName();
try {
if (this.client.checkExists().forPath(interfacePath) == null) {
// 創(chuàng)建 服務(wù)的永久節(jié)點
this.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(interfacePath);
}
config.getMethods().forEach(method -> {
try {
String methodPath = null;
ServiceType serviceType = config.getType();
if (serviceType == ServiceType.PROVIDER) {
// 服務(wù)提供方,需要暴露自身的ip州丹、port信息醋安,而消費端則不需要
String address = getServiceAddress(config);
methodPath = String.format("%s/%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName(), address);
} else {
methodPath = String.format("%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName());
}
System.out.println("zk path: [" + methodPath + "]");
this.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(methodPath, "0".getBytes());
// 創(chuàng)建臨時節(jié)點,節(jié)點包含了服務(wù)提供段的信息
} catch (Exception e) {
e.getMessage();
}
});
} catch (Exception e) {
e.getMessage();
}
}
@Override
public InetSocketAddress discovery(RpcRequest request, ServiceType nodeType) {
String path = String.format("/%s/%s/%s", request.getClassName(), nodeType.getType(), request.getMethodName());
try {
List<String> addressList = this.client.getChildren().forPath(path);
// 采用負載均衡的方式獲取服務(wù)提供方信息,不過并沒有添加watcher監(jiān)聽模式
String address = loadBalance.balance(addressList);
if (address == null) {
return null;
}
return parseAddress(address);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private String getServiceAddress(BasicConfig config) {
String hostInfo = new StringBuilder()
.append(config.getHost())
.append(":")
.append(config.getPort())
.toString();
return hostInfo;
}
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.valueOf(result[1]));
}
public void setLoadBalance(LoadBalance loadBalance) {
// 可以重新設(shè)置負載均衡的策略
this.loadBalance = loadBalance;
}
}
服務(wù)啟動后利用zkclient查詢到在zk中包含的節(jié)點信息墓毒,其中默認的命名空間是jwfy/simple-rpc
負載均衡
public interface LoadBalance {
String balance(List<String> addressList);
}
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
return null;
}
if (addressList.size() == 1) {
return addressList.get(0);
}
return doLoad(addressList);
}
abstract String doLoad(List<String> addressList);
}
public class DefaultLoadBalance extends AbstractLoadBalance {
@Override
String doLoad(List<String> addressList) {
Random random = new Random();
// 利用隨機函數(shù)選擇一個吓揪,其中random.nextIn生成的數(shù)據(jù)是在[0, size) 之間
return addressList.get(random.nextInt(addressList.size()));
}
}
上面的負載均衡代碼其實很簡單,就是從一個機器列表addressList中選擇一個所计,如果列表為空或者不存在則直接返回null柠辞,如果機器只有1臺則直接獲取返回即可,當列表記錄超過1條后利用隨機函數(shù)生成一個列表偏移量以獲取對應(yīng)數(shù)據(jù)主胧。也可以按照類似完成更多負載均衡的策略叭首,然后調(diào)用setLoadBalance方法就可以了习勤。
IO 處理
public interface MessageProtocol {
/**
* 服務(wù)端解析從網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù),轉(zhuǎn)變成request
* @param inputStream
* @return
*/
void serviceToRequest(RpcRequest request, InputStream inputStream);
/**
* 服務(wù)端把計算機的結(jié)果包裝好放棒,通過outputStream 返回給客戶端
* @param response
* @param outputStream
* @param <T>
*/
<T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);
/**
* 客戶端把請求拼接好姻报,通過outputStream發(fā)送到服務(wù)端
* @param request
* @param outputStream
*/
void clientToRequest(RpcRequest request, OutputStream outputStream);
/**
* 客戶端接收到服務(wù)端響應(yīng)的結(jié)果,轉(zhuǎn)變成response
* @param response
* @param inputStream
*/
void clientGetResponse(RpcResponse response, InputStream inputStream);
}
實現(xiàn)類DefaultMessageProtocol
public class DefaultMessageProtocol implements MessageProtocol {
@Override
public void serviceToRequest(RpcRequest request, InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
request.setClassName(input.readUTF());
request.setMethodName(input.readUTF());
request.setParameterTypes((Class<?>[])input.readObject());
request.setArguments((Object[])input.readObject());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
try {
ObjectOutputStream output = new ObjectOutputStream(outputStream);
output.writeBoolean(response.getError());
output.writeObject(response.getResult());
output.writeObject(response.getErrorMessage());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void clientToRequest(RpcRequest request, OutputStream outputStream) {
try {
ObjectOutputStream ouput = new ObjectOutputStream(outputStream);
ouput.writeUTF(request.getClassName());
ouput.writeUTF(request.getMethodName());
ouput.writeObject(request.getParameterTypes());
ouput.writeObject(request.getArguments());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void clientGetResponse(RpcResponse response, InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
response.setError(input.readBoolean());
response.setResult(input.readObject());
response.setErrorMessage((String) input.readObject());
} catch (Exception e) {
e.printStackTrace();
}
}
}
服務(wù)端請求處理
public class ServiceHandler {
private ThreadPoolExecutor executor = null;
private RpcService rpcService;
private MessageProtocol messageProtocol;
public ServiceHandler(RpcService rpcService) {
this.rpcService = rpcService;
ThreadFactory commonThreadName = new ThreadFactoryBuilder()
.setNameFormat("Parse-Task-%d")
.build();
this.executor = new ThreadPoolExecutor(
10,
10,
2,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
commonThreadName, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
SocketTask socketTask = (SocketTask) r;
Socket socket = socketTask.getSocket();
if (socket != null) {
try {
socket.close();
System.out.println("reject socket:" + socketTask + ", and closed");
// 無法及時處理和響應(yīng)的就快速拒絕掉
} catch (IOException e) {
}
}
}
});
}
public RpcService getRpcService() {
return rpcService;
}
public void setRpcService(RpcService rpcService) {
this.rpcService = rpcService;
}
public MessageProtocol getMessageProtocol() {
return messageProtocol;
}
public void setMessageProtocol(MessageProtocol messageProtocol) {
this.messageProtocol = messageProtocol;
}
public void handler(Socket socket) {
// 接收到新的套接字间螟,包裝成為一個runnable提交給線程去執(zhí)行
this.executor.execute(new SocketTask(socket));
}
class SocketTask implements Runnable {
private Socket socket;
public SocketTask(Socket socket) {
this.socket = socket;
}
public Socket getSocket() {
return socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
RpcRequest request = new RpcRequest();
messageProtocol.serviceToRequest(request, inputStream);
// 獲取客戶端請求數(shù)據(jù)吴旋,統(tǒng)一包裝成RpcRequest
RpcResponse response = rpcService.invoke(request);
// 反射調(diào)用,得到具體的返回值
System.out.println("request:[" + request + "], response:[" + response + "]");
messageProtocol.serviceGetResponse(response, outputStream);
// 再返回給客戶端
} catch (Exception e) {
// error
} finally {
if (socket != null) {
// SOCKET 關(guān)閉一定要加上厢破,要不然會出各種事情
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
客戶端 代理對象
public class ProxyInstance implements InvocationHandler {
private RpcClient rpcClient;
private Class clazz;
public ProxyInstance(RpcClient client, Class clazz) {
this.rpcClient = client;
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setClassName(clazz.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setArguments(args);
// 獲取服務(wù)提供方信息荣瑟,這里既是服務(wù)發(fā)現(xiàn)的入口,找到一個合適的可用的服務(wù)提供方信息
InetSocketAddress address = rpcClient.discovery(request);
System.out.println("[" + Thread.currentThread().getName() + "]discover service:" + address);
// 發(fā)起網(wǎng)絡(luò)請求摩泪,得到請求數(shù)據(jù)
RpcResponse response = rpcClient.invoke(request, address);
return response.getResult();
}
}
上面的InetSocketAddress address = rpcClient.discovery(request)
是相比v1多了一個最重要的東西笆焰,每次獲取請求后都實時從zk中獲取對應(yīng)的服務(wù)提供方信息,這就是服務(wù)發(fā)現(xiàn)见坑。
實踐
public class Client {
public static void main(String[] args) {
RpcClient rpcClient = new RpcClient();
rpcClient.subscribe(Calculate.class);
rpcClient.start();
Calculate<Integer> calculateProxy = rpcClient.getInstance(Calculate.class);
for(int i=0; i< 200; i++) {
new Thread(() -> {
long start = System.currentTimeMillis();
int s1 = new Random().nextInt(100);
int s2 = new Random().nextInt(100);
int s3 = calculateProxy.add(s1, s2);
System.out.println("[" + Thread.currentThread().getName() + "]a: " + s1 + ", b:" + s2 + ", c=" + s3 + ", 耗時:" + (System.currentTimeMillis() - start));
}).start();
}
}
}
客戶端開啟200個線程后嚷掠,執(zhí)行結(jié)果是順利執(zhí)行的,在服務(wù)端開啟的接受請求被添加到線程池中荞驴,而代碼中線程池的任務(wù)隊列長度是200不皆,可以完全的存儲200個線程,但是如果我們把客戶端請求量從200個改成500個呢熊楼,又會出現(xiàn)什么情況霹娄?
服務(wù)端
客戶端
如上述的圖片顯示,當請求量打滿線程池之后鲫骗,線程池的拒絕策略就開始生效了犬耻,在本代碼中是直接調(diào)用了close操作,而客戶端感知到關(guān)閉后也會出現(xiàn)io錯誤执泰,而正常的請求則順利執(zhí)行枕磁。其中還有輸出discover服務(wù)發(fā)現(xiàn)了服務(wù)提供方的機器信息,這也是符合起初的想法的术吝。
這里一定要加上一些策略以及時關(guān)閉無法處理的socket透典,否則就會出現(xiàn)服務(wù)提供方無任何可執(zhí)行,但是服務(wù)調(diào)用方卻還在等待中顿苇,因為socket并沒有關(guān)閉峭咒,從而出現(xiàn)資源被占用了,還不執(zhí)行相關(guān)任務(wù)纪岁。
提個問題:線程池打滿了怎么辦?
在本demo中采取了非常粗暴的策略凑队,直接丟棄了無法處理的任務(wù),在實際的線上業(yè)務(wù)中,可以先加機器以能再最短的時間內(nèi)恢復(fù)線上情況漩氨,后期結(jié)合業(yè)務(wù)特點提出針對性的解決方案西壮。如果業(yè)務(wù)接受一定的延遲,可以考慮接入kafka類似的消息隊列(削峰是mq的一大特點)叫惊;如果對時間要求很高款青,要么加機器,要么壓榨機器性能霍狰,可能之前設(shè)置的線程池的數(shù)量太小抡草,那就需要調(diào)節(jié)線程池的各個核心數(shù)據(jù),修改線程池的任務(wù)隊列類型也是可以考慮的蔗坯;此外也有可能是業(yè)務(wù)耗時太多康震,無法及時處理完全造成請求堆積導(dǎo)致的,那么就需要考慮業(yè)務(wù)的同步改異步化宾濒。最后線上告警也需要及時完善腿短。
沒有絕對的解決方案,只有最合適當下場景的方案绘梦,沒有銀彈橘忱,任何不具體結(jié)合業(yè)務(wù)的方案都是扯淡。
總結(jié)思考
v2版本相比v1版本修改了整個代碼結(jié)構(gòu)卸奉,使得結(jié)構(gòu)能夠更加明確钝诚,引入zookeeper作為服務(wù)治理功能,大致介紹了zookeeper的特點以及功能择卦,給服務(wù)注冊、服務(wù)發(fā)現(xiàn)郎嫁、序列化協(xié)議等均留下了口子秉继,以便實現(xiàn)自定義的協(xié)議,v1的io模型是BIO泽铛,v2并沒有變化尚辑,只是由單線程改造成多線程。
整體而言符合一個簡單的rpc框架盔腔,依舊還是有很多點可以完善杠茬、優(yōu)化的點,如:
- io模型還是沒有替換弛随,后面考慮直接整體接入netty瓢喉;
- 也不應(yīng)該每次實時從zk獲取節(jié)點信息,應(yīng)該先設(shè)置一個本地緩存舀透,再利用zookeeper的watcher功能栓票,開啟一個異步線程去監(jiān)聽更新本地緩存,降低和zk交互帶來的性能損耗愕够;
- 也沒有快速失敗走贪、重試的功能佛猛,客觀情況下存在網(wǎng)絡(luò)抖動的問題,重試就可以了
- 整體的各種協(xié)議約定并沒有明確規(guī)范坠狡,比較混亂
本人微信公眾號(搜索jwfy)歡迎關(guān)注