本文基于實(shí)際生產(chǎn)環(huán)境中的Thrift+zookeeper實(shí)現(xiàn)的rpc調(diào)用總結(jié),大致有以下幾個(gè)部分:
1: 服務(wù)端將服務(wù)注冊(cè)在zk中
1.1 解析服務(wù)端的網(wǎng)卡IP弥搞;
1.2 獲取zookeeper客戶端對(duì)象邮绿;
1.3 實(shí)現(xiàn)服務(wù)接口的注冊(cè);
2: 基于zookeeper實(shí)現(xiàn)服務(wù)接口的自動(dòng)發(fā)現(xiàn)
3: 實(shí)現(xiàn)客戶端連接池和客戶端通過(guò)代理調(diào)用服務(wù)
一 服務(wù)端將服務(wù)注冊(cè)在zk中
代碼展示
1.1解析thrift-server端IP地址,用于注冊(cè)服務(wù)
接口
public interface ThriftServerIpResolve {
// 獲取服務(wù)所在機(jī)器的Ip
String getServerIp() throws Exception;
String getServerIp(boolean publicIpOnly) throws Exception;
void reset();
//當(dāng)IP變更時(shí),將會(huì)調(diào)用reset方法
static interface IpRestCalllBack{
public void rest(String newIp);
}
}
實(shí)現(xiàn)
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
private Logger logger = LoggerFactory.getLogger(getClass());
//緩存
private String serverIp;
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
@Override
public String getServerIp() {
return getServerIp(false);
}
@Override
public String getServerIp(boolean publicIpOnly) {
if (serverIp != null) {
return serverIp;
}
// 一個(gè)主機(jī)有多個(gè)網(wǎng)絡(luò)接口
try {
Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
NetworkInterface netInterface = netInterfaces.nextElement();
// 每個(gè)網(wǎng)絡(luò)接口,都會(huì)有多個(gè)"網(wǎng)絡(luò)地址",比如一定會(huì)有l(wèi)ookback地址,會(huì)有siteLocal地址等.以及IPV4或者IPV6 .
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if(address instanceof Inet6Address){
continue;
}
if (!address.isLoopbackAddress()) {
if (publicIpOnly && !address.isSiteLocalAddress()) {
serverIp = address.getHostAddress();
logger.info("resolve server ip :" + serverIp);
continue;
} else if (!publicIpOnly && address.isSiteLocalAddress()) {
serverIp = address.getHostAddress();
logger.info("resolve server ip :" + serverIp);
continue;
}
}
}
}
} catch (SocketException e) {
e.printStackTrace();
}
return serverIp;
}
@Override
public void reset() {
serverIp = null;
}
}
1.2 獲取zookeeper客戶端鏈接對(duì)象
public class ZookeeperFactory implements FactoryBean<CuratorFramework> ,Closeable{
private String zkHosts;
// session超時(shí)
private int sessionTimeout = 30000;
private int connectionTimeout = 30000;
// 共享一個(gè)zk鏈接
private boolean singleton = true;
// 全局path前綴,常用來(lái)區(qū)分不同的應(yīng)用
private String namespace;
private final static String ROOT = "rpc";
private CuratorFramework zkClient;
public void setZkHosts(String zkHosts) {
this.zkHosts = zkHosts;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public void setSingleton(boolean singleton) {
this.singleton = singleton;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
@Override
public CuratorFramework getObject() throws Exception {
if (singleton) {
if (zkClient == null) {
zkClient = create();
zkClient.start();
}
return zkClient;
}
return create();
}
@Override
public Class<?> getObjectType() {
return CuratorFramework.class;
}
@Override
public boolean isSingleton() {
return singleton;
}
public CuratorFramework create() throws Exception {
if (StringUtils.isEmpty(namespace)) {
namespace = ROOT;
} else {
namespace = ROOT +"/"+ namespace;
}
return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
}
// 使用CuratorFramework創(chuàng)建zk客戶端對(duì)象
/**
* connectString zk集群的地址攀例,包括ip和端口
* sessionTimeout
* connectionTimeout
* namespace 不同的應(yīng)用可以使用不同的命名空間區(qū)分
* ExponentialBackoffRetry表示重試機(jī)制船逮,重連的時(shí)間間隔隨著重
* 試的次數(shù)遞增的,如果時(shí)間間隔計(jì)算出來(lái)大于默認(rèn)的最大sleep時(shí)
* 間的話粤铭,則取最大sleep時(shí)間挖胃。ExponentialBackoffRetry 除了時(shí)間
* 的限制以外,還有最大重連次數(shù)的限制梆惯。而
* ExponentialBackoffRetry策略只是讓用戶設(shè)置最大sleep時(shí)間而
* 已酱鸭。默認(rèn)的最大時(shí)間是Integer.MAX_VALUE毫秒。
**/
public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null).build();
}
public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}
1.3 將服務(wù)接口注冊(cè)到zookeeper中
1.3.1 服務(wù)端注冊(cè)工廠調(diào)用注冊(cè)服務(wù)的方法加袋,異步啟動(dòng)服務(wù)
/**
* 服務(wù)端注冊(cè)服務(wù)工廠
*/
public class ThriftServiceServerFactory implements InitializingBean ,Closeable{
// 服務(wù)注冊(cè)本機(jī)端口
private Integer port = 8299;
// 優(yōu)先級(jí)
private Integer weight = 1;// default
// 服務(wù)實(shí)現(xiàn)類
private Object service;// serice實(shí)現(xiàn)類
//服務(wù)版本號(hào)
private String version;
// 是否只取公網(wǎng)ip凛辣,如果是true抱既,zk中只注冊(cè)公網(wǎng)ip职烧;如果是false,zk中只注冊(cè)私網(wǎng)ip
private boolean publicIpOnly = false;
// 解析本機(jī)IP
private ThriftServerIpResolve thriftServerIpResolve;
//服務(wù)注冊(cè)
private ThriftServerAddressRegister thriftServerAddressRegister;
private ServerThread serverThread;
private boolean zkUse = true;
public void setPort(Integer port) {
this.port = port;
}
public void setWeight(Integer weight) {
this.weight = weight;
}
public void setService(Object service) {
this.service = service;
}
public void setVersion(String version) {
this.version = version;
}
public void setZkUse(boolean zkUse) {
this.zkUse = zkUse;
}
public void setPublicIpOnly(boolean publicIpOnly) {
this.publicIpOnly = publicIpOnly;
}
public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
this.thriftServerIpResolve = thriftServerIpResolve;
}
public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
this.thriftServerAddressRegister = thriftServerAddressRegister;
}
@Override
public void afterPropertiesSet() throws Exception {
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
if (StringUtils.isEmpty(serverIP)) {
throw new ThriftException("cant find server ip...");
}
String hostname = serverIP + ":" + port + ":" + weight;
Class<?> serviceClass = service.getClass();
// 獲取實(shí)現(xiàn)類接口
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
TProcessor processor = null;
String serviceName = null;
for (Class<?> clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
serviceName = clazz.getEnclosingClass().getName();
String pname = serviceName + "$Processor";
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> pclass = classLoader.loadClass(pname);
if (!TProcessor.class.isAssignableFrom(pclass)) {
continue;
}
Constructor<?> constructor = pclass.getConstructor(clazz);
processor = (TProcessor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要單獨(dú)的線程,因?yàn)閟erve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// 注冊(cè)服務(wù)
if (zkUse && thriftServerAddressRegister != null) {
thriftServerAddressRegister.register(serviceName, version, hostname);
}
}
class ServerThread extends Thread {
private TServer server;
ServerThread(TProcessor processor, int port) throws Exception {
---------------------
/** TThreadedSelectorServer模式是thrift-server最高級(jí)的工作模
式:主要有以下幾個(gè)不分組成
TThreadedSelectorServer模式是目前Thrift提供的最高級(jí)的模式,
它內(nèi)部有如果幾個(gè)部分構(gòu)成:
(1) 一個(gè)AcceptThread線程對(duì)象蚀之,專門用于處理監(jiān)聽socket上的新連接蝗敢;
(2) 若干個(gè)SelectorThread對(duì)象專門用于處理業(yè)務(wù)socket的網(wǎng)絡(luò)I/O操作,所有網(wǎng)絡(luò)數(shù)據(jù)的讀寫均是有這些線程來(lái)完成足删;
(3) 一個(gè)負(fù)載均衡器SelectorThreadLoadBalancer對(duì)象寿谴,主要用于AcceptThread線程接收到一個(gè)新socket連接請(qǐng)求時(shí),決定將這個(gè)新連接請(qǐng)求分配給哪個(gè)SelectorThread線程失受。
(4) 一個(gè)ExecutorService類型的工作線程池讶泰,在SelectorThread線程中,監(jiān)聽到有業(yè)務(wù)socket中有調(diào)用請(qǐng)求過(guò)來(lái)拂到,則將請(qǐng)求讀取之后痪署,交個(gè)ExecutorService線程池中的線程完成此次調(diào)用的具體執(zhí)行;
---------------------
**/
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
TProcessorFactory processorFactory = new TProcessorFactory(processor);
tArgs.processorFactory(processorFactory);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
tArgs.selectorThreads(4); // 設(shè)置selector線程數(shù)兄旬,默認(rèn)是2
tArgs.workerThreads(32); // 設(shè)置工作線程數(shù)狼犯,默認(rèn)是5,在數(shù)據(jù)庫(kù)負(fù)載高時(shí)有可能會(huì)堵塞
server = new TThreadedSelectorServer(tArgs);
}
@Override
public void run(){
try{
//啟動(dòng)服務(wù)
server.serve();
}catch(Exception e){
//
}
}
public void stopServer(){
server.stop();
}
}
public void close() {
serverThread.stopServer();
}
}
1.3.1 真正的注冊(cè)實(shí)現(xiàn)
第一步:監(jiān)聽zk連接變化领铐;
第二步:將服務(wù)暴露的ip和port以虛擬節(jié)點(diǎn)的形式創(chuàng)建在zk的接口服務(wù)路徑下悯森,切換到指定的組;
第三步:注冊(cè)成功后使用NodeCache監(jiān)聽節(jié)點(diǎn)內(nèi)容變化绪撵,如果原本節(jié)點(diǎn)不存在瓢姻,那么Cache就會(huì)在節(jié)點(diǎn)被創(chuàng)建時(shí)觸發(fā)監(jiān)聽事件,如果該節(jié)點(diǎn)被刪除莲兢,就無(wú)法再觸發(fā)監(jiān)聽事件汹来。任意節(jié)點(diǎn)的內(nèi)容變化都會(huì)重新注冊(cè)。
// 啟動(dòng)服務(wù)
@Override
public void afterPropertiesSet() throws Exception {
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
if (StringUtils.isEmpty(serverIP)) {
throw new ThriftException("cant find server ip...");
}
String hostname = serverIP + ":" + port + ":" + weight;
Class<?> serviceClass = service.getClass();
// 獲取實(shí)現(xiàn)類接口
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
TProcessor processor = null;
String serviceName = null;
for (Class<?> clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
serviceName = clazz.getEnclosingClass().getName();
String pname = serviceName + "$Processor";
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> pclass = classLoader.loadClass(pname);
if (!TProcessor.class.isAssignableFrom(pclass)) {
continue;
}
Constructor<?> constructor = pclass.getConstructor(clazz);
processor = (TProcessor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要單獨(dú)的線程,因?yàn)閟erve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// 注冊(cè)服務(wù)
if (zkUse && thriftServerAddressRegister != null) {
thriftServerAddressRegister.register(serviceName, version, hostname);
}
}
class ServerThread extends Thread {
private TServer server;
ServerThread(TProcessor processor, int port) throws Exception {
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
TProcessorFactory processorFactory = new TProcessorFactory(processor);
tArgs.processorFactory(processorFactory);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
tArgs.selectorThreads(4); // 設(shè)置selector線程數(shù)改艇,默認(rèn)是2
tArgs.workerThreads(32); // 設(shè)置工作線程數(shù)收班,默認(rèn)是5,在數(shù)據(jù)庫(kù)負(fù)載高時(shí)有可能會(huì)堵塞
server = new TThreadedSelectorServer(tArgs);
}
@Override
public void run(){
try{
//啟動(dòng)服務(wù)
server.serve();
}catch(Exception e){
//
}
}
public void stopServer(){
server.stop();
}
}
// 實(shí)現(xiàn)注冊(cè)和節(jié)點(diǎn)監(jiān)聽
/**
* 初始化注冊(cè)方法
* @param service 服務(wù)接口名稱谒兄,一個(gè)產(chǎn)品中不能重復(fù)
* @param version 服務(wù)接口的版本號(hào)摔桦,默認(rèn)1.0.0
* @param address 服務(wù)發(fā)布的地址和端口
*/
@Override
public void register(String service, String version, String address) throws InterruptedException {
// 輸入校驗(yàn)
String[] parts = address.split(":");
if (parts.length < 3) {
logger.error("ThriftZookeeper Register Error: address invalid '" + address + "'");
throw new ThriftException("ThriftZookeeper Register Error: address invalid '" + address + "'");
}
// 拆解內(nèi)容
String ip = parts[0];
String port = parts[1];
String weight = parts[2];
// 增加連接狀態(tài)監(jiān)聽
setConnectionListener(service, version, ip, port, weight);
// 注冊(cè)
generalRegister(service, version, ip, port, weight);
//監(jiān)聽組別變化后重新注冊(cè)
setGroupListener(service, version, ip, port, weight);
}
public void setConnectionListener(final String service, final String version, final String ip, final String port, final String weight) {
// 如果zk尚未啟動(dòng),則啟動(dòng)
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
logger.info("設(shè)置ZK連接狀態(tài)監(jiān)聽的Listener");
zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("ZkClient狀態(tài)變化:" + newState.name());
if (newState == ConnectionState.LOST) { //處理session過(guò)期
logger.info("ZkClient連接斷開,session過(guò)期");
int i = 0;
while (true) {
logger.info("嘗試重新連接到zk..." + (i++));
try {
if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
logger.info("嘗試重新注冊(cè)zk的臨時(shí)節(jié)點(diǎn)...");
// 重新注冊(cè)
generalRegister(service, version, ip, port, weight);
break;
}
} catch (InterruptedException e) {
logger.info("嘗試重新注冊(cè)zk的臨時(shí)節(jié)點(diǎn)異常(InterruptedException)", e);
break;
} catch (Exception e) {
logger.info("嘗試重新注冊(cè)zk的臨時(shí)節(jié)點(diǎn)異常(Exception)", e);
break;
}
}
}
}
});
}
/**
* 監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的變化重新注冊(cè)
* @param version 版本號(hào)
* @param weight 服務(wù)權(quán)重信息
* @throws InterruptedException
*/
public void setGroupListener(final String service, final String version, final String ip, final String port, final String weight) throws InterruptedException {
final String ipServicePath = getInstanceGroupPath(localInstance, ip);
try {
createGroupNodeIfNotExists(localInstance, ip);
final NodeCache nodeCache = new NodeCache(zkClient, ipServicePath, false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
new NodeCacheListener() {
public void nodeChanged() throws Exception {
synchronized (lock) {
logger.info("服務(wù)端監(jiān)聽到節(jié)點(diǎn): " + ipServicePath + " 變化承疲, for service:" + service);
generalRegister(service, version, ip, port, weight);
}
}
}
);
} catch (Exception e) {
logger.error("nodeCache start exception:",e);
throw new ThriftException("nodeCache start exception:", e);
}
}
/**
* 通用注冊(cè)方法
* @param service 服務(wù)名
* @param version 版本號(hào)
* @param ip 服務(wù)地址邻耕,IP
* @param port 服務(wù)端口
* @param weight 權(quán)重信息,格式為1-10的整數(shù)字符串形式燕鸽,例如"5"
*
* 獲取當(dāng)前zk中組別配置兄世,如果和本地不同,則刪除zk中舊組別下的注冊(cè)地址啊研,在新組別下注冊(cè)
*/
public void generalRegister(String service, String version, String ip, String port, String weight) {
logger.info("開始注冊(cè)ServiceLocate, 服務(wù): " + service);
// 如果zk尚未啟動(dòng),則啟動(dòng)
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
String ipPort = ip + ":" + port;
// 獲取組別
String ipInstancePath = getInstanceGroupPath(localInstance, ip);
String groupJson = null;
String group = "";
try {
createGroupNodeIfNotExists(localInstance, ip);
groupJson = new String(zkClient.getData().forPath(ipInstancePath));
GroupConfig groupConfig = JSON.parseObject(groupJson, GroupConfig.class);
if (groupConfig == null || groupConfig.getGroup() == null) {
throw new ThriftException("獲取到錯(cuò)誤的分組配置御滩,分組注冊(cè)不會(huì)進(jìn)行鸥拧,請(qǐng)檢查配置內(nèi)容是否正確");
}
group = groupConfig.getGroup();
} catch (Exception e) {
logger.error("獲取組別失敗, 按默認(rèn)組別執(zhí)行", e);
group = defaultGroup;
}
logger.info("注冊(cè)到group: " + group);
// 注冊(cè)
RegisterConfig registerConfig = new RegisterConfig();
registerConfig.setWeight(weight);
registerConfig.setGroup(group);
String groupWeightString = JSON.toJSONString(registerConfig);
try {
String serviceLocatePath = getServiceLocatePath(service);
// 創(chuàng)建當(dāng)前服務(wù)定位存儲(chǔ)節(jié)點(diǎn),結(jié)構(gòu):
// serviceLocatePath
// |-- address1(服務(wù)定位存儲(chǔ)節(jié)點(diǎn)削解,臨時(shí)節(jié)點(diǎn))
// |-- address2(服務(wù)定位存儲(chǔ)節(jié)點(diǎn)富弦,臨時(shí)節(jié)點(diǎn))
if (zkClient.checkExists().forPath(serviceLocatePath + "/" + ipPort) != null) {
// 已經(jīng)存在則修改Data
zkClient.setData().forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
} else {
// 不存在則創(chuàng)建
// 使用creatingParentContainersIfNeeded創(chuàng)建服務(wù)定位根目錄(固定)
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
}
// 切換當(dāng)前注冊(cè)組別
currentRegisterGroup = group;
logger.info("服務(wù): " + service + " 注冊(cè)到group: " + group + " 成功");
} catch (Exception e) {
logger.error("zk分組注冊(cè)異常:", e);
throw new ThriftException("zk分組注冊(cè)異常:", e);
}
}
/**
* 如果本地服務(wù)在ZK中不存在分組注冊(cè)信息,則創(chuàng)建一個(gè)分組信息節(jié)點(diǎn)
*/
private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
GroupConfig groupConfig = new GroupConfig();
groupConfig.setGroup(defaultGroup);
groupConfig.setWeight(defaultWeight);
if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
logger.info("創(chuàng)建group_config氛驮,localInstance:" + localInstance + "腕柜,ip:" + ip);
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
}
}
二 基于zk的服務(wù)自動(dòng)發(fā)現(xiàn)
1.使用NodeCache監(jiān)聽zk節(jié)點(diǎn),也就是服務(wù)的變化矫废;
2.使用NodePathChild監(jiān)聽子節(jié)點(diǎn)盏缤,也就是ip:port的變化;
public void afterPropertiesSet() throws Exception {
logger.info("Provider初始化開始");
// 本機(jī)IP獲取
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String ipAddress = thriftServerIpResolve.getServerIp(publicIpOnly);
if (StringUtils.isEmpty(ipAddress)) {
throw new ThriftException("can not find server ip...");
}
logger.info("Provider創(chuàng)建ServiceLocate監(jiān)聽器. targetService:" + targetService);
buildServiceLocateListener();
cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
countDownLatch.await();
logger.info("Provider創(chuàng)建GroupConfig監(jiān)聽器蓖扑,localInstance:" + localInstance);
buildInstanceGroupListener(ipAddress);
logger.info("Provider初始化完成");
}
private void buildInstanceGroupListener(final String ip) {
// 如果zk尚未啟動(dòng),則啟動(dòng)
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
final String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
try {
// 如果不存在則先創(chuàng)建節(jié)點(diǎn)
createGroupNodeIfNotExists(localInstance, ip);
groupNodeCache = new NodeCache(zkClient, serviceGroupPath, false);
groupNodeCache.getListenable().addListener(
new NodeCacheListener() {
public void nodeChanged() throws Exception {
synchronized (lock) {
groupNodeCache.rebuild();
createGroupNodeIfNotExists(localInstance, ip);
rebuildGroup();
}
}
}
);
groupNodeCache.start(true);
rebuildGroup();
} catch (Exception e) {
logger.error("nodeCache start exception:",e);
throw new ThriftException("nodeCache start exception:", e);
}
}
/**
* 如果本地服務(wù)在ZK中不存在分組注冊(cè)信息蛾找,則創(chuàng)建一個(gè)分組信息節(jié)點(diǎn)
*/
private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
GroupConfig groupConfig = new GroupConfig();
groupConfig.setGroup(defaultGroup);
groupConfig.setWeight(defaultWeight);
if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
}
}
private void rebuildGroup() throws UnsupportedEncodingException {
String data = new String(groupNodeCache.getCurrentData().getData(), "utf-8");
logger.info("客戶端監(jiān)聽到分組變化,當(dāng)前內(nèi)容 " + data + " 進(jìn)行同步");
GroupConfig groupConfig = JSON.parseObject(data, GroupConfig.class);
if (groupConfig == null || groupConfig.getGroup() == null) {
logger.error("分組數(shù)據(jù)錯(cuò)誤赵誓,緩存區(qū)域不會(huì)變更打毛,請(qǐng)查看分組配置區(qū)數(shù)據(jù),localInstance:" + localInstance
+ "targetService:" + targetService + "俩功,data: " + data);
return;
}
currentGroup = groupConfig.getGroup();
}
private void buildServiceLocateListener() throws Exception {
// 如果zk尚未啟動(dòng),則啟動(dòng)
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
// 服務(wù)地址監(jiān)聽
// 尋找目標(biāo)分組
final String serviceLocatePath = getServiceLocatePath(targetService);
cachedPath = new PathChildrenCache(zkClient, serviceLocatePath, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
logger.info("Connection is reconection.");
break;
case CONNECTION_SUSPENDED:
logger.info("Connection is suspended.");
break;
case CONNECTION_LOST:
logger.warn("Connection error,waiting...");
// 這個(gè)return很講究幻枉,ZK掛掉后,目的是當(dāng)ZK掛掉后诡蜓,不影響本地緩存
return;
case INITIALIZED:
// countDownLatch.countDown();
logger.warn("Connection init ...");
break;
default:
}
// 任何節(jié)點(diǎn)的數(shù)據(jù)變動(dòng),都會(huì)rebuild,此處為一個(gè)"簡(jiǎn)單的"做法.
cachedPath.rebuild();
synchronized (lock) {
rebuild();
}
countDownLatch.countDown();
}
};
cachedPath.getListenable().addListener(childrenCacheListener);
}
protected void rebuild() {
logger.info("即將更新本地地址緩存, localService: " + localInstance + ", targetService: " + targetService);
List<ChildData> children = cachedPath.getCurrentData();
if (children == null || children.isEmpty()) {
// 有可能所有的thrift server都與zookeeper斷開了鏈接
// 但是有可能,thrift client與thrift server之間的網(wǎng)絡(luò)是良好的
// 因此此處是否需要清空container,是需要多方面考慮的.
container.clear();
trace.clear();
ipPortQueue.clear();
logger.error("在注冊(cè)服務(wù)區(qū)無(wú)法找到子節(jié)點(diǎn)");
return;
}
String path = null;
Map<String, List<InetSocketAddress>> currentMap = new HashMap<String, List<InetSocketAddress>>();
try {
for (ChildData data : children) {
path = data.getPath();
String address = new String(path.getBytes(), "utf-8");
String[] parts = address.split("/");
String ipPort = parts[parts.length-1];
String jsonString = new String(data.getData(), "utf-8");
RegisterConfig registerConfig = JSON.parseObject(jsonString, RegisterConfig.class);
if (registerConfig.getWeight() == null) {
throw new ThriftException("獲取權(quán)重失敗");
}
String weight = registerConfig.getWeight();
String group = registerConfig.getGroup();
// 當(dāng)前InetAddress列表
List<InetSocketAddress> addressList = transfer(weight, ipPort);
// 添加到容器currentMap
if (!currentMap.containsKey(group)) {
currentMap.put(group, new ArrayList<InetSocketAddress>());
}
List<InetSocketAddress> groupList = currentMap.get(group);
groupList.addAll(addressList);
currentMap.put(group, groupList);
}
trace.clear();
container.clear();
ipPortQueue.clear();
for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
String group = entry.getKey();
List<InetSocketAddress> current = entry.getValue();
Collections.shuffle(current);
// 先組裝到備份容器
if (!trace.containsKey(group)) {
trace.put(group, new HashSet<InetSocketAddress>());
}
Set<InetSocketAddress> traceGroup = trace.get(group);
traceGroup.addAll(current);
// 組裝容器
if (!container.containsKey(group)) {
container.put(group, new ArrayList<InetSocketAddress>());
}
List<InetSocketAddress> groupContainer = container.get(group);
groupContainer.addAll(current);
// 組裝隊(duì)列
if (!ipPortQueue.containsKey(group)) {
ipPortQueue.put(group, new LinkedList<InetSocketAddress>());
}
Queue<InetSocketAddress> groupQueue = ipPortQueue.get(group);
groupQueue.addAll(current);
}
logger.info("分組緩存重建完畢");
for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
logger.info("group:" + entry.getKey() + ", target:" + entry.getValue());
}
} catch (Exception e) {
logger.error("重建緩存失敗" + e.getMessage());
throw new ThriftException("重建緩存失敗", e);
}
}
/**
* 根據(jù)權(quán)重分配初始化"IP:PORT"集合
* @param weight 權(quán)重字符串熬甫,例如"5"
* @param ipPort 例如10.0.0.1:9050
* @return 根據(jù)權(quán)重信息返回類似10.0.0.1:9050集合
*/
private List<InetSocketAddress> transfer(String weight, String ipPort) {
List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
InetAddress ipAddress = null;
try {
ipAddress = InetAddress.getByName(ipPort.split(":")[0]);
} catch (UnknownHostException e) {
logger.error("獲取IP地址失敗:" + e.getMessage());
}
int port = Integer.parseInt(ipPort.split(":")[1]);
InetSocketAddress inetSocketAddress = new InetSocketAddress(ipAddress, port);
for (int i = 0; i < Integer.parseInt(weight); i++) {
result.add(inetSocketAddress);
}
return result;
}
/**
* 根據(jù)權(quán)重情況隨機(jī)獲取IP:PORT
* 取當(dāng)前分組的隊(duì)列
*/
@Override
public synchronized InetSocketAddress selector() {
Queue<InetSocketAddress> currentQueue = ipPortQueue.get(currentGroup);
List<InetSocketAddress> currentContainer = container.get(currentGroup);
Set<InetSocketAddress> currentTrace = trace.get(currentGroup);
if (currentQueue == null || currentQueue.isEmpty()) {
if (currentContainer != null && !currentContainer.isEmpty()) {
currentQueue.addAll(currentContainer);
} else if(currentTrace != null && !currentTrace.isEmpty()) {
synchronized (lock) {
currentContainer.addAll(currentTrace);
Collections.shuffle(currentContainer);
currentQueue.addAll(currentContainer);
}
}
}
if (currentQueue == null || currentQueue.size() == 0) {
logger.error("找不到可用服務(wù),localInstance:" + localInstance + "目標(biāo)服務(wù):" + targetService);
throw new ThriftException("找不到可用服務(wù)蔓罚,localInstance:" + localInstance + "目標(biāo)服務(wù):" + targetService);
}
return currentQueue.poll();
}
@Override
public boolean validateGroup(String group, InetSocketAddress address) {
// 當(dāng)前trace中是否存在該address若無(wú)則排除
Set<InetSocketAddress> cTrace = trace.get(currentGroup);
if (cTrace == null) {
logger.error("找不到可用服務(wù)椿肩,Trace為空");
return false;
}
return cTrace.contains(address);
}
@Override
public String getGroup() {
return currentGroup;
}
@Override
public List<InetSocketAddress> findServerAddressList() {
List<InetSocketAddress> currentContainer = container.get(currentGroup);
return Collections.unmodifiableList(currentContainer);
}
@Override
public String getService() {
return targetService;
}
@Override
public String getServiceUrl() {
return "";
}
public void close(){
try {
cachedPath.close();
groupNodeCache.close();
} catch (IOException e) {
}
zkClient.close();
}
三 客戶端jdk代理實(shí)現(xiàn)客戶端代理調(diào)用遠(yuǎn)程服務(wù),客戶端代理交給GenericObjectPool實(shí)現(xiàn)創(chuàng)建豺谈,銷毀郑象。
@Override
public void afterPropertiesSet() throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 加載Iface接口
objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
// 加載Client.Factory類
Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
// 實(shí)現(xiàn)客戶端連接池
ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
poolConfig.maxActive = maxActive;
poolConfig.maxIdle = 1;
poolConfig.minIdle = 0;
poolConfig.minEvictableIdleTimeMillis = idleTime;
poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L;
poolConfig.testOnBorrow=true;
poolConfig.testOnReturn=false;
poolConfig.testWhileIdle=false;
pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
// 創(chuàng)建客戶端代理,實(shí)現(xiàn)遠(yuǎn)程調(diào)用茬末,異常厂榛,超時(shí)重試機(jī)制
proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 記錄調(diào)用開始時(shí)間t1
long t1 = System.currentTimeMillis();
// 調(diào)用遠(yuǎn)程的目標(biāo)方法,重試3次
Object reObject = null;
Exception reException = null;
TServiceClient client = null;
boolean success = false;
int MAX_RETRY = 3, retry = 0;
while (++retry <= MAX_RETRY) {
client = pool.borrowObject();
boolean flag = true;
try {
reObject = method.invoke(client, args);
} catch (Exception e) {
flag = false;
reException = e;
logger.info("retry:"+e.getMessage(),e);
} finally {
logger.info("retry time:"+retry);
if (flag) {
pool.returnObject(client);
} else {
pool.invalidateObject(client);
}
}
// 執(zhí)行成功丽惭,退出循環(huán)
if (flag) {
success = true;
break;
}
// 只有超時(shí)類異常進(jìn)行重試
boolean needRetry = false;
if (reException != null && reException instanceof InvocationTargetException) {
Throwable cause1 = reException.getCause();
if (cause1 != null && cause1 instanceof TTransportException) {
Throwable cause2 = cause1.getCause();
if (cause2 != null && (cause2 instanceof SocketTimeoutException || cause2 instanceof ConnectTimeoutException)) {
if (method.getName() != null && method.getName().startsWith("get")) {
logger.info("timeout needRetry set true");
needRetry = true;
}
}
}
}
if (!needRetry)
break;
}
// 記錄調(diào)用結(jié)束時(shí)間t2
long t2 = System.currentTimeMillis();
printLog(client, method, (t2 - t1), success, retry > MAX_RETRY ? MAX_RETRY : retry);
if (!success) {
if (reException instanceof InvocationTargetException)
throw ((InvocationTargetException) reException).getTargetException();
else
throw reException;
} else {
return reObject;
}
}
});
}
3.1 客戶端連接池的實(shí)現(xiàn)
// 客戶端銷毀
@Override
public void destroyObject(TServiceClient client) throws Exception {
if (callback != null) {
try {
callback.destroy(client);
} catch (Exception e) {
logger.warn("destroyObject:{}", e);
}
}
clientGroupMap.remove(client);
clientAddressMap.remove(client);
logger.info("destroyObject:{}", client);
TTransport pin = client.getInputProtocol().getTransport();
pin.close();
TTransport pout = client.getOutputProtocol().getTransport();
pout.close();
}
// 客戶端創(chuàng)建
@Override
public TServiceClient makeObject() throws Exception {
InetSocketAddress address = serverAddressProvider.selector();
String group = serverAddressProvider.getGroup();
if (address == null) {
new ThriftException("No provider available for remote service");
}
TTransport transport;
TProtocol protocol;
if (StringUtils.isEmpty(serverAddressProvider.getServiceUrl())) {
// 如果serviceUrl是空击奶,則采用TFramedTransport,適用于Java的thrift服務(wù)
// socket超時(shí)30s责掏,connect超時(shí)5s
TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000, 5000);
transport = new TFramedTransport(tsocket);
protocol = new TBinaryProtocol(transport);
} else {
// 如果serviceUrl不空柜砾,則采用THttpClient的transport,適用于php的thrift服務(wù)
String url = "";
try {
url = "http://" + address.getHostName() + ":" + address.getPort()
+ serverAddressProvider.getServiceUrl();
} catch (NullPointerException e) {
if (address == null) {
logger.error("address is null");
}
if (serverAddressProvider == null) {
logger.error("serverAddressProvider is null");
}
throw e;
}
transport = new THttpClient(url);
protocol = new TBinaryProtocol(transport);
}
TServiceClient client = this.clientFactory.getClient(protocol);
clientGroupMap.put(client, group);
clientAddressMap.put(client, address);
transport.open();
if (callback != null) {
try {
callback.make(client);
} catch (Exception e) {
logger.warn("makeObject:{}", e);
}
}
return client;
}