(apache-shenyu 2.4.3版本)apache shenyu前身soul網(wǎng)關(guān)加叁,是一款java中spring5新引入的project-reactor的webflux,reactor-netty等為基礎(chǔ)實(shí)現(xiàn)的高性能網(wǎng)關(guān)家夺,現(xiàn)已進(jìn)入apache孵化器雌澄,作者yu199195 (xiaoyu) (github.com)
作者也是國內(nèi)知名開源社區(qū)dromara的創(chuàng)始人,并且作有多個(gè)開源產(chǎn)品,apache-shenyu是其中之一apache/incubator-shenyu: ShenYu is High-Performance Java API Gateway. (github.com)
本文只關(guān)心shenyu如何使用disruptor的api以及解決什么問題卫漫,也會(huì)繼續(xù)了解一下disruptor提供了哪些api供我們使用
disruptor是典型的消費(fèi)者生產(chǎn)者模型,通過無鎖的環(huán)形數(shù)組ringBuffer作為隊(duì)列肾砂,并提供多種消費(fèi)者等待策略大大提高了性能
shenyu-disruptor模塊
對外暴露api的入口為DisruptorProviderManage類列赎,開始擼代碼
// 這是一個(gè)泛型化的類,并且對外暴露了構(gòu)造方法镐确,以實(shí)例維度使用當(dāng)前類
public class DisruptorProviderManage<T> {
public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
// 消費(fèi)者數(shù)量默認(rèn)使用當(dāng)前及其計(jì)算資源 * 2(可能是核心數(shù)或者線程數(shù)或者其他包吝,取決于很多因素)
private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1;
private final Integer size;
private final Integer consumerSize;
// 我們自己定義的消費(fèi)者工廠,用于承接不同業(yè)務(wù)源葫,分別作為shenyu的交互的client和server側(cè)兩邊的消費(fèi)者
private final QueueConsumerFactory<T> consumerFactory;
// disruptor诗越,也是泛型,綁定的當(dāng)前實(shí)例
private DisruptorProvider<T> provider;
public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory, final Integer ringBufferSize) {
this(consumerFactory,
DEFAULT_CONSUMER_SIZE,
ringBufferSize);
}
public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory) {
this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
}
public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory,
final int consumerSize,
final int ringBufferSize) {
this.consumerFactory = consumerFactory;
this.size = ringBufferSize;
this.consumerSize = consumerSize;
}
// 啟動(dòng)disruptor
public void startup() {
this.startup(false);
}
public void startup(final boolean isOrderly) {
//一個(gè)可排序的任務(wù)執(zhí)行器,當(dāng)然如果是傳入 true才會(huì)做真正的可排序邏輯,現(xiàn)在默認(rèn)是false
OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
// 可以看出這是消費(fèi)者的任務(wù)執(zhí)行器
DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
int newConsumerSize = this.consumerSize;
EventFactory<DataEvent<T>> eventFactory;
if (isOrderly) {
// 如果是可排序的任務(wù)執(zhí)行臼氨,那么消費(fèi)者數(shù)量就是一個(gè)掺喻,即一個(gè)線程進(jìn)行消費(fèi)
newConsumerSize = 1;
// 可排序的 disruptor事件工廠
eventFactory = new OrderlyDisruptorEventFactory<>();
} else {
eventFactory = new DisruptorEventFactory<>();
}
// disruptor 啟動(dòng)類
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
size,
// 生產(chǎn)者任務(wù)執(zhí)行器,disruptor是典型的消費(fèi)者生產(chǎn)者模型储矩,通過
DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
// 多生產(chǎn)者模型
ProducerType.MULTI,
// 生產(chǎn)者阻塞策略,這其實(shí)是一個(gè)游標(biāo)阻塞策略感耙,用于等待可用的序列
// 因?yàn)槠涫褂脽o鎖的環(huán)形數(shù)組,需要先請求一個(gè)可用序列即環(huán)形數(shù)組下標(biāo)來放入事件
// 當(dāng)前默認(rèn)使用性能較低的策略持隧,但是消耗cpu較少
new BlockingWaitStrategy());
// 初始化消費(fèi)者數(shù)組
QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
for (int i = 0; i < newConsumerSize; i++) {
consumers[i] = new QueueConsumer<>(executor, consumerFactory);
}
disruptor.handleEventsWithWorkerPool(consumers);
// 事件處理器即硼,也就是消費(fèi)者執(zhí)行側(cè)報(bào)錯(cuò)的鉤子,默認(rèn)忽略
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
// 啟動(dòng)
disruptor.start();
// 獲取隊(duì)列
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
// 將disruptor要素放入我們封裝好的 對外暴露的操作類
provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
}
public DisruptorProvider<T> getProvider() {
return provider;
}
}
上述代碼可以看出DisruptorProviderManage類是一個(gè)引導(dǎo)類屡拨,幫助我們?nèi)?gòu)造disruptor實(shí)例只酥,并將當(dāng)前實(shí)例通過泛型與其綁定。
下面來看disruptor的實(shí)例DisruptorProvider類
// 也是泛型呀狼,當(dāng)前類為真正disruptor的封裝
public class DisruptorProvider<T> {
// 大名鼎鼎的 ringBuffer
private final RingBuffer<DataEvent<T>> ringBuffer;
// disruptor的對外api類,我們這里將他封裝到當(dāng)前類
private final Disruptor<DataEvent<T>> disruptor;
// 是否可排序裂允。shenyu提供了一個(gè)可排序的disruptor運(yùn)行機(jī)制
private final boolean isOrderly;
// 這個(gè)是一個(gè)轉(zhuǎn)換器,disruptor封裝的ringBuffer環(huán)形數(shù)組區(qū)別于普通的隊(duì)列哥艇。
// 這里的生產(chǎn)者發(fā)布一個(gè)事件需要先從ringBuffer獲取sequence绝编,也就是獲取到環(huán)形數(shù)組可用的下標(biāo)才可以發(fā)布事件到ringBuffer中
// 發(fā)布事件時(shí)直接傳入sequence,并不會(huì)傳入數(shù)據(jù)貌踏,通過sequence和要發(fā)布的數(shù)據(jù)關(guān)聯(lián)十饥。消費(fèi)時(shí)也去拿可消費(fèi)的sequence。
// 因?yàn)?RingBuffer提供泛型化的參數(shù)祖乳,但是逗堵,傳入的參數(shù)不一定是指定的泛型化的類型,disruptor提供這個(gè)函數(shù)式接口來轉(zhuǎn)換參數(shù)
// shenyu轉(zhuǎn)換方式是又定義了一個(gè) DataEvent的泛型眷昆,通過參數(shù)轉(zhuǎn)換set到DataEvent中
private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
// 同上蜒秤,通過轉(zhuǎn)換set了排序用的hash值
private final EventTranslatorTwoArg<DataEvent<T>, T, String> orderlyArg = (event, sequence, t, orderly) -> {
if (event instanceof OrderlyDataEvent) {
((OrderlyDataEvent<T>) event).setHash(orderly);
}
event.setData(t);
};
public DisruptorProvider(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor, final boolean isOrderly) {
this.ringBuffer = ringBuffer;
this.disruptor = disruptor;
this.isOrderly = isOrderly;
}
public void onData(final T data) {
//是否可排序 不可變汁咏,初始化時(shí)就指定了
if (isOrderly) {
throw new IllegalArgumentException("The current provider is of orderly type. Please use onOrderlyData() method.");
}
try {
// 調(diào)用ringBuffer發(fā)布事件,傳入?yún)?shù)和轉(zhuǎn)換器
ringBuffer.publishEvent(translatorOneArg, data);
} catch (Exception ex) {
logger.error("ex", ex);
}
}
public void onOrderlyData(final T data, final String... hashArray) {
if (!this.isOrderly) {
throw new IllegalArgumentException("The current provider is not of orderly type. Please use onData() method.");
}
try {
// 多加一個(gè) 用于排序的 hash值
String hash = String.join(":", hashArray);
ringBuffer.publishEvent(orderlyArg, data, hash);
} catch (Exception ex) {
logger.error("ex", ex);
}
}
//記得關(guān)閉資源
public void shutdown() {
if (null != disruptor) {
disruptor.shutdown();
}
}
}
看到上面生產(chǎn)者操作類后作媚,再看看 消費(fèi)者類的構(gòu)建QueueConsumer
// WorkHandler是 disruptor抽象出來的 消費(fèi)者處理類梆暖,提供給我們實(shí)現(xiàn)來做消費(fèi)者的業(yè)務(wù)邏輯
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// 可排序的 執(zhí)行器,也可以用作正常邏輯的執(zhí)行器
private final OrderlyExecutor executor;
// shenyu 自己封裝的 消費(fèi)者工廠掂骏,用于創(chuàng)建消費(fèi)者執(zhí)行器(實(shí)際就是實(shí)現(xiàn)runnable的執(zhí)行單元)
private final QueueConsumerFactory<T> factory;
public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
this.executor = executor;
this.factory = factory;
}
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
ThreadPoolExecutor executor = orderly(t);
// 獲取傳入的 執(zhí)行業(yè)務(wù)的執(zhí)行器
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 放入事件
queueConsumerExecutor.setData(t.getData());
// help gc 切斷引用鏈,具體邏輯不是很清楚厚掷,應(yīng)該是防止一直存在引用鏈導(dǎo)致無法被gc回收
t.setData(null);
// 執(zhí)行業(yè)務(wù)
executor.execute(queueConsumerExecutor);
}
}
// 如果是 可排序的事件弟灼,并且hash值不為空則使用可排序執(zhí)行器選擇出一個(gè)SingletonExecutor執(zhí)行器
private ThreadPoolExecutor orderly(final DataEvent<T> t) {
if (t instanceof OrderlyDataEvent && !isEmpty(((OrderlyDataEvent<T>) t).getHash())) {
return executor.select(((OrderlyDataEvent<T>) t).getHash());
} else {
return executor;
}
}
private boolean isEmpty(final String t) {
return t == null || t.isEmpty();
}
}
//---------------shenyu封裝的可排序的線程池--------
public class OrderlyExecutor extends ThreadPoolExecutor {
// 線程安全的 可排序的map,目前jdk內(nèi)只有這一個(gè)可排序的concurrentMap的實(shí)現(xiàn)
private final ConcurrentSkipListMap<Long, SingletonExecutor> virtualExecutors = new ConcurrentSkipListMap<>();
// 線程選擇器
private final ThreadSelector threadSelector = new ThreadSelector();
public OrderlyExecutor(
final boolean isOrderly,
final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue,
final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
orderlyThreadPool(isOrderly, corePoolSize, threadFactory);
}
private void orderlyThreadPool(final boolean isOrderly, final int corePoolSize, final ThreadFactory threadFactory) {
// 如果是可排序則特殊處理一下
if (isOrderly) {
// 將設(shè)置的線程池的數(shù)量按下標(biāo)分別初始化一個(gè) 單個(gè)線程池放入ConcurrentSkipListMap中
IntStream.range(0, corePoolSize).forEach(index -> {
SingletonExecutor singletonExecutor = new SingletonExecutor(threadFactory);
// 將當(dāng)前單線程的線程池 + 下標(biāo) 進(jìn)行hash
String hash = singletonExecutor.hashCode() + ":" + index;
byte[] bytes = threadSelector.sha(hash);
for (int i = 0; i < 4; i++) {
// 將hash過的值 再從 0 ~ 3 哈希一次存入冒黑。也就是map中有4個(gè)key執(zhí)行同一個(gè)線程池執(zhí)行器田绑。為什么這么做不太清楚
this.virtualExecutors.put(threadSelector.hash(bytes, i), singletonExecutor);
}
});
}
}
public SingletonExecutor select(final String hash) {
long select = threadSelector.select(hash);
// 通過hash值命中 線程池
if (!virtualExecutors.containsKey(select)) {
// 如果沒有命中,則選擇最靠近自己的hash值的線程池抡爹,但是是向后取(也就是hash值大于等于當(dāng)前值的第一個(gè))
SortedMap<Long, SingletonExecutor> tailMap = virtualExecutors.tailMap(select);
if (tailMap.isEmpty()) {
// 如果計(jì)算出來的hash值已經(jīng)是最后一個(gè)了兔魂,則取當(dāng)前map的第一個(gè)值
select = virtualExecutors.firstKey();
} else {
select = tailMap.firstKey();
}
}
return virtualExecutors.get(select);
}
private static final class ThreadSelector {
public long select(final String hash) {
byte[] digest = sha(hash);
return hash(digest, 0);
}
private long hash(final byte[] digest, final int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] sha(final String hash) {
byte[] bytes = hash.getBytes(StandardCharsets.UTF_8);
return Hashing
.sha256()
.newHasher()
.putBytes(bytes)
.hash().asBytes();
}
}
}
看到上面的可排序的線程池邏輯癌瘾,其實(shí)就是結(jié)合了thread的priority屬性的一個(gè)優(yōu)先級線程池邏輯。
下面再來看看關(guān)于event的封裝
// 自己封裝的 事件類,基于泛型
public class DataEvent<T> {
private T data;
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
}
// ---------------事件工廠盖彭,實(shí)現(xiàn)了disruptor的事件工廠---------------
public class DisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
@Override
public DataEvent<T> newInstance() {
return new DataEvent<>();
}
}
//---------帶有優(yōu)先級邏輯的事件-------
public class OrderlyDataEvent<T> extends DataEvent<T> {
private String hash;
public String getHash() {
return hash;
}
public void setHash(final String hash) {
this.hash = hash;
}
}
// -------- 帶有優(yōu)先級邏輯的 事件工廠------
public class OrderlyDisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
@Override
public OrderlyDataEvent<T> newInstance() {
return new OrderlyDataEvent<>();
}
}
上面看完了shenyu對于 disruptor的封裝然后來看看兩個(gè)入口,消費(fèi)者和生產(chǎn)者
生產(chǎn)者入口:DisruptorProvider#onData 消費(fèi)者入口:QueueConsumerExecutor#run(實(shí)現(xiàn)了runnable作為消費(fèi)者執(zhí)行任務(wù)單元)
DisruptorProvider
先看ShenyuClientRegisterEventPublisher#publishEvent棚赔,這是一個(gè)shenyuClient端注冊事件發(fā)布器单寂,用于發(fā)布客戶端(也就是被shenyu路由的后端服務(wù)們)的注冊事件
public class ShenyuClientRegisterEventPublisher {
// 靜態(tài)的單例
private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
//這里其實(shí)是一個(gè)可替換的變量,用于存放disruptor生產(chǎn)者笑诅,泛型是shenyu封裝的數(shù)據(jù)接口调缨,兩個(gè)子類分別是元數(shù)據(jù),和接口吆你。
private DisruptorProviderManage<DataTypeParent> providerManage;
public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}
//這里的start 理論上多個(gè)地方同時(shí)使用會(huì)造成providerManage 更改弦叶。這里可以這么用是因?yàn)椴煌恼{(diào)用方是互斥的,也就是客戶端會(huì)選擇一種方式上報(bào)注冊事件
public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// disruptor的消費(fèi)者 實(shí)現(xiàn)(針對于客戶端)
RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
// 添加消費(fèi)者鉤子妇多,我們的業(yè)務(wù)邏輯沒有直接寫在消費(fèi)者實(shí)現(xiàn)的run方法中伤哺,而是嵌入了一個(gè)Set<ExecutorSubscriber<T>> subscribers,觀察者模式的調(diào)用
// 注冊一個(gè) 元數(shù)據(jù)鉤子
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 注冊一個(gè) 資源路徑上報(bào)
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
providerManage = new DisruptorProviderManage<>(factory);
// 啟動(dòng)disruptor
providerManage.startup();
}
// 發(fā)布事件 生產(chǎn)者
public void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
}
如上圖可以看出是砌梆,server端接收client端 通過http調(diào)用后直接復(fù)用了生產(chǎn)者邏輯
如上圖默责,有三對,成對是因?yàn)槎际?uri和metadata注冊咸包。
隨便點(diǎn)進(jìn)去看一個(gè)類 都是ShenyuClientServerRegisterRepository接口的子類桃序,并且是一個(gè)SPI接口,用于各種客戶端上報(bào)數(shù)據(jù)后放入disruptor中烂瘫。也就是對于disruptor他是一個(gè)生產(chǎn)者接口媒熊,但是對于client端奇适,是一個(gè)接收client上報(bào)的的server端,我們分別看看他們?nèi)绾芜\(yùn)作
// consul 通過applicationEvent監(jiān)聽到數(shù)據(jù)變化再放入 disruptor
@EventListener
public void onMetadataChange(final ConsulConfigChangedEvent event) {
Map<String, GetValue> metadataMap = event.getMetadataMap();
metadataMap.forEach((path, getValue) -> {
long modifyIndex = getValue.getModifyIndex();
if (metadataChanged(path, modifyIndex)) {
publishMetadata(getValue.getDecodedValue());
}
});
}
// ---------etcd 通過一個(gè)添加一個(gè)監(jiān)聽到 etcd客戶端來處理disruptor----
private void subscribeMetaData(final String rpcType) {
String rpcPath = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
List<String> metadataPaths = client.getChildren(rpcPath);
for (String metadataPath: metadataPaths) {
String data = client.read(metadataPath);
publishMetadata(data);
}
LOGGER.info("subscribe metadata change: {}", rpcPath);
client.subscribeChildChanges(rpcPath, new EtcdListenHandler() {
@Override
public void updateHandler(final String path, final String value) {
publishMetadata(client.read(path));
}
@Override
public void deleteHandler(final String path, final String value) {
}
});
}
//-----------nacos也是通過添加監(jiān)聽芦鳍,但是元數(shù)據(jù)監(jiān)聽的是configServer嚷往,uri數(shù)據(jù)監(jiān)聽的是nameServer,如下代碼是nameServer-----------
private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
try {
Map<String, List<URIRegisterDTO>> services = new HashMap<>();
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
healthyInstances.forEach(healthyInstance -> {
String contextPath = healthyInstance.getMetadata().get("contextPath");
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
if (RPC_URI_TYPE_SET.contains(rpcType)) {
services.values().forEach(this::publishRegisterURI);
}
LOGGER.info("subscribe uri : {}", serviceName);
namingService.subscribe(serviceName, event -> {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
instances.forEach(instance -> {
String contextPath = instance.getMetadata().get("contextPath");
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
refreshURIService(rpcType, serviceName);
}
});
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
//-------http比較簡單---------
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publisher.publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
// ----------zk 通過zk客戶端添加監(jiān)聽-------
private void subscribeMetaData(final String rpcType) {
String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreatesAndChanges((oldNode, node) -> {
if (PathMatchUtils.match(RegisterPathConstants.REGISTER_METADATA_INSTANCE_PATH, node.getPath())) {
String data = new String(node.getData(), StandardCharsets.UTF_8);
publishMetadata(data);
LOGGER.info("zookeeper register metadata success: {}", data);
}
}).build();
client.addCache(contextPathParent, listener);
}
這里不具體討論shenyu的數(shù)據(jù)上報(bào)的client和server端邏輯柠衅,這里可以看到所有server端在接收到client端上報(bào)或者監(jiān)聽到client端的數(shù)據(jù)變化都會(huì)第一時(shí)間放入disruptor處理皮仁,提供并發(fā)處理能力,也可以有削峰的效果菲宴,但是可能會(huì)帶來一些延遲贷祈。
下面再看看 消費(fèi)者 QueueConsumerExecutor類
// 實(shí)現(xiàn)了 runnable接口,可以作為disruptor的消費(fèi)者執(zhí)行任務(wù)
public abstract class QueueConsumerExecutor<T> implements Runnable {
private T data;
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
}
再回過頭看看 消費(fèi)者工廠的實(shí)現(xiàn)
而QueueConsumerExecutor就是client和server兩邊的消費(fèi)者執(zhí)行邏輯
消費(fèi)者工廠不用看了喝峦,就是創(chuàng)建不同的QueueConsumerExecutor實(shí)現(xiàn)
那么來看看QueueConsumerExecutor不同的實(shí)現(xiàn)
RegisterClientConsumerExecutor
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
this.subscribers = new HashMap<>(executorSubscriberMap);
}
@Override
public void run() {
final T data = getData();
// client端消費(fèi)者通過 工廠中的回調(diào)鉤子來執(zhí)行势誊,其實(shí)通過disruptor這一步相當(dāng)于做了限流,通過一個(gè)ringBuffer來防止client端過多的并發(fā)打進(jìn)來
//我們可以做一個(gè)緩沖作用谣蠢,通過抽象出來的回調(diào)鉤子調(diào)用真正的業(yè)務(wù)粟耻,例如http請求,nacos請求眉踱,zk的數(shù)據(jù)變化請求等等 subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}
// 這里也是 client端消費(fèi)者的工廠類挤忙,創(chuàng)建當(dāng)前client端的消費(fèi)者
public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
@Override
public RegisterClientConsumerExecutor<T> create() {
Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
.stream()
.map(e -> (ExecutorTypeSubscriber<T>) e)
.collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
return new RegisterClientConsumerExecutor<>(map);
}
@Override
public String fixName() {
return "shenyu_register_client";
}
}
}
RegisterServerConsumerExecutor邏輯大同小異,是client端的數(shù)據(jù)變化到了server端后勋锤,也經(jīng)過一層disruptor來解耦饭玲,并且削峰,由消費(fèi)者中回調(diào)鉤子執(zhí)行業(yè)務(wù)
ExecutorSubscriber 接口為shenyu中disrupor的回調(diào)鉤子接口
然后通過ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T>區(qū)分不同數(shù)據(jù)類型叁执,來處理不同數(shù)據(jù)類型的消費(fèi)者
其實(shí)就是分為了 client的metadata茄厘,uri,即ShenyuClientRegisterRepository谈宛,用于client端的uri和metadata與shenyu的數(shù)據(jù)交互(這個(gè)是shenyu提供的多種數(shù)據(jù)同步機(jī)制)由client端注冊uri和metadata
ShenyuClientRegisterService client是以什么服務(wù)形式注冊到admin次哈,讓shenyu的網(wǎng)關(guān)基于什么樣的client來調(diào)用后端服務(wù)(這里取決于用戶使用什么后端服務(wù)提供shenyu的網(wǎng)關(guān)調(diào)用),然后通過這個(gè)注冊器來注冊metadata和uri
這里有點(diǎn)繞吆录。需要一個(gè)流程圖~
期待下一篇文章梳理一下apache-shenyu的被路由的后端服務(wù)窑滞,以及shenyu-admin,shenyu-bootstrap的注冊以及數(shù)據(jù)動(dòng)態(tài)感知的設(shè)計(jì)恢筝,這部分算是apache-shenyu支持多種協(xié)議的核心哀卫。