實(shí)際開發(fā)過程中的問題:
公共環(huán)境中多個(gè)人注冊(cè)在同一個(gè) eureka 集群中,根據(jù)負(fù)載均衡原則,消費(fèi)者會(huì)獲取所有的服務(wù)提供者對(duì)其中一個(gè)發(fā)起調(diào)用,某用戶本來想只調(diào)用自己的提供者進(jìn)行 debug,結(jié)果調(diào)用竄到了其他人注冊(cè)的提供者上,影響開發(fā)效率.
約定名詞:
- routingTag: 路由鍵,根據(jù)該值決定請(qǐng)求發(fā)送到哪個(gè)服務(wù)提供者分片.
- 分片: 具有相同 routingKe的一組服務(wù),包括提供者和消費(fèi)者,目的就是想做到消費(fèi)者和提供者在同一個(gè)分片.
方案一
優(yōu)點(diǎn):方便簡單
缺點(diǎn): 1. 每次本地調(diào)試都有改動(dòng),一旦提交,代碼合并沖突,對(duì)開發(fā)者不友好
方案二
自己定制開發(fā)負(fù)載均衡策略來實(shí)現(xiàn)
優(yōu)點(diǎn): 不用改動(dòng)
缺點(diǎn): 需要自己開發(fā)
為了長遠(yuǎn)的效率考慮選擇方案二
思路
- 提供者在配置文件注明自己的分片,消費(fèi)者也標(biāo)明自己的分片,
- 獲取所有提供者的信息,根據(jù)分片信息進(jìn)行分組過濾,獲取到同一個(gè)分片的一組提供者,對(duì)其發(fā)起調(diào)用
Eureka 已經(jīng)提供了metadata-map
來自定義元數(shù)據(jù):
eureka.instance.metadata-map
實(shí)現(xiàn)
- 在服務(wù)的消費(fèi)者和提供者的元信息中定義
eureka.instance.metadata-map.routingTag=ABC
PS 這個(gè)ABC可以設(shè)置成從機(jī)器獲得唯一標(biāo)識(shí).
2019年08月06日更新 我項(xiàng)目中使用 ${user.name}-${spring.cloud.client.ipAddress}
- 仿照
com.netflix.loadbalancer.RandomRule
繼承AbstractLoadBalancerRule
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.ReflectUtil;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.groupingBy;
public class TagRule extends AbstractLoadBalancerRule {
@Autowired
DiscoveryClient discoveryClient;
@Value("${eureka.instance.metadata-map.routingTag}")
String routingTag;
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
}
@Override
public Server choose(Object key) {
ILoadBalancer loadBalancer = getLoadBalancer();
if (loadBalancer == null) {
return null;
}
Server resultServer = null;
while (resultServer == null) {
if (Thread.interrupted()) {
return null;
}
//獲取所有服務(wù)提供者信息
List<Server> allServers = loadBalancer.getAllServers();
if (allServers.size() == 0) {
return null;
} else {
resultServer = allServers.get(0);
//獲取提供者的實(shí)例信息
List<ServiceInstance> instances = discoveryClient.getInstances(resultServer.getMetaInfo().getServiceIdForDiscovery());
//根據(jù)實(shí)例中 metadata.routingTag進(jìn)行分組
Map<String, List<ServiceInstance>> collect = instances.stream().collect(groupingBy(s -> s.getMetadata().get("routingTag")));
//獲取與當(dāng)前消費(fèi)者在同一個(gè)分片的服務(wù)生產(chǎn)者 by routingTag
List<ServiceInstance> serviceInstances = collect.get(routingTag);
List<Server> sameTagServers = new ArrayList<>();
for (ServiceInstance serviceInstance : serviceInstances) {
for (Server server : allServers) {
InstanceInfo instanceInfo = (InstanceInfo) ReflectUtil.getFieldValue(server, "instanceInfo");
if (instanceInfo.getInstanceId().equalsIgnoreCase(((EurekaDiscoveryClient.EurekaServiceInstance) serviceInstance).getInstanceInfo().getInstanceId())) {
sameTagServers.add(server);
}
}
}
//隨機(jī)選出一個(gè)
if (CollUtil.isNotEmpty(sameTagServers)) {
resultServer = sameTagServers.get(RandomUtil.randomInt(0,sameTagServers.size()));
}
}
if (resultServer == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}
if (resultServer.isAlive()) {
return (resultServer);
}
// Shouldn't actually happen.. but must be transient or a bug.
resultServer = null;
Thread.yield();
}
System.out.println("選中的服務(wù)是:" + resultServer);
return resultServer;
}
}
- 配置中啟用該 rule
@Bean
@Scope("prototype")//這個(gè)注解不能少,否則快速調(diào)用幾次就會(huì)出現(xiàn)com.netflix.client.ClientException: Load balancer does not have available server for client 異常
public IRule tagRule() {
return new TagRule();
}
- 測(cè)試,服務(wù)提供者配置一個(gè)和消費(fèi)者相同的
routingTag
,服務(wù)提供者再配置多個(gè)不同的進(jìn)行驗(yàn)證
如圖:
user是服務(wù)提供者,
message 是服務(wù)消費(fèi)者.
8000是 eureka
9001的 routingTag 是 ABC
9999和 9998 的 routingTag 是 www
8002的 routingTag 是 www
8001是 為未啟用 TagRule
效果如下:tab1 是 8002 端口,tab2 是 8001 端口
完整代碼地址: GitHub
不完善的地方就是同一個(gè)分片中的服務(wù)是隨機(jī)選擇的,不過一般用于調(diào)試也不會(huì)在本地啟用很多實(shí)例來搞自己.
下一篇解決一下 MQ 亂竄的問題.