最后再來(lái)說(shuō)一下Predicate這個(gè)類(lèi)。
負(fù)載均衡使用這個(gè)類(lèi)用來(lái)過(guò)濾服務(wù)拜效。
- 1接口結(jié)構(gòu)如下
Predicate
public interface Predicate<T> {
boolean apply(@Nullable T input);
@Override
boolean equals(@Nullable Object object);
}
從接口的結(jié)構(gòu)上來(lái)看比較簡(jiǎn)單喷众,只有一個(gè)apply返回boolean類(lèi)型的結(jié)果各谚,以及一個(gè)equals方法。
下面看看相關(guān)實(shí)現(xiàn)類(lèi)
相關(guān)實(shí)現(xiàn)類(lèi)
- 2相關(guān)類(lèi)型
- 2.1 AbstractServerPredicate
這個(gè)抽象類(lèi)其實(shí)定義了基本的一個(gè)過(guò)濾流程
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
對(duì)服務(wù)列表進(jìn)行過(guò)濾到千,通過(guò)子類(lèi)的apply方法
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
}
- 2.2 相關(guān)實(shí)現(xiàn)類(lèi)
- 2.2.1 CompositePredicate
復(fù)合過(guò)濾
public class CompositePredicate extends AbstractServerPredicate {
//delegate 優(yōu)先的過(guò)濾規(guī)則
private AbstractServerPredicate delegate;
//服務(wù)列表不滿(mǎn)足要求的時(shí)候昌渤,用次要的過(guò)濾規(guī)則重新匹配
private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
private int minimalFilteredServers = 1;
private float minimalFilteredPercentage = 0;
@Override
public boolean apply(@Nullable PredicateKey input) {
使用優(yōu)先的過(guò)濾規(guī)則進(jìn)行過(guò)濾
return delegate.apply(input);
}
public static class Builder {
private CompositePredicate toBuild;
Builder(AbstractServerPredicate primaryPredicate) {
toBuild = new CompositePredicate();
toBuild.delegate = primaryPredicate;
}
Builder(AbstractServerPredicate ...primaryPredicates) {
toBuild = new CompositePredicate();
Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
toBuild.delegate = AbstractServerPredicate.ofKeyPredicate(chain);
}
public Builder addFallbackPredicate(AbstractServerPredicate fallback) {
toBuild.fallbacks.add(fallback);
return this;
}
public Builder setFallbackThresholdAsMinimalFilteredNumberOfServers(int number) {
toBuild.minimalFilteredServers = number;
return this;
}
public Builder setFallbackThresholdAsMinimalFilteredPercentage(float percent) {
toBuild.minimalFilteredPercentage = percent;
return this;
}
public CompositePredicate build() {
return toBuild;
}
}
public static Builder withPredicates(AbstractServerPredicate ...primaryPredicates) {
return new Builder(primaryPredicates);
}
public static Builder withPredicate(AbstractServerPredicate primaryPredicate) {
return new Builder(primaryPredicate);
}
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
服務(wù)列表未滿(mǎn)足要求的時(shí)候,按次要的過(guò)濾規(guī)則進(jìn)行重新過(guò)濾
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
}
- 2.2.2 ZoneAvoidancePredicate
源碼如下
public class ZoneAvoidancePredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
沒(méi)有分區(qū)直接返回
if (serverZone == null) {
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
沒(méi)有統(tǒng)計(jì)信息直接返回
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
可靠的分區(qū)數(shù)<=1憔四,直接返回
return true;
}
通過(guò)ZoneAvoidanceRule獲取分區(qū)的統(tǒng)計(jì)快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
return true;
}
拿到可靠的分區(qū)集合膀息,通過(guò)分區(qū)的統(tǒng)計(jì)數(shù)據(jù)來(lái)分析
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
如果可靠的分區(qū)中包括該分區(qū)則返回true,否則返回false
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
}
- 2.2.3 AvailabilityPredicate
用于判斷服務(wù)是否熔斷了赵,或者壓力比較大潜支,超過(guò)限制。
源碼如下
public class AvailabilityPredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
如果服務(wù)熔斷柿汛,或者活躍請(qǐng)求數(shù)超過(guò)限制了冗酿,則跳過(guò)
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
}