apache-shenyu之Disruptor如何應(yīng)用

(apache-shenyu 2.4.3版本)apache shenyu前身soul網(wǎng)關(guān)加叁,是一款java中spring5新引入的project-reactor的webflux,reactor-netty等為基礎(chǔ)實(shí)現(xiàn)的高性能網(wǎng)關(guān)家夺,現(xiàn)已進(jìn)入apache孵化器雌澄,作者yu199195 (xiaoyu) (github.com)

作者也是國內(nèi)知名開源社區(qū)dromara的創(chuàng)始人,并且作有多個(gè)開源產(chǎn)品,apache-shenyu是其中之一apache/incubator-shenyu: ShenYu is High-Performance Java API Gateway. (github.com)

本文只關(guān)心shenyu如何使用disruptor的api以及解決什么問題卫漫,也會(huì)繼續(xù)了解一下disruptor提供了哪些api供我們使用

disruptor是典型的消費(fèi)者生產(chǎn)者模型,通過無鎖的環(huán)形數(shù)組ringBuffer作為隊(duì)列肾砂,并提供多種消費(fèi)者等待策略大大提高了性能

shenyu-disruptor模塊
所有的類了

對外暴露api的入口為DisruptorProviderManage類列赎,開始擼代碼

// 這是一個(gè)泛型化的類,并且對外暴露了構(gòu)造方法镐确,以實(shí)例維度使用當(dāng)前類
public class DisruptorProviderManage<T> {
    public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
    // 消費(fèi)者數(shù)量默認(rèn)使用當(dāng)前及其計(jì)算資源 * 2(可能是核心數(shù)或者線程數(shù)或者其他包吝,取決于很多因素)
    private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1;
    private final Integer size;
    private final Integer consumerSize;
    // 我們自己定義的消費(fèi)者工廠,用于承接不同業(yè)務(wù)源葫,分別作為shenyu的交互的client和server側(cè)兩邊的消費(fèi)者
    private final QueueConsumerFactory<T> consumerFactory;
    // disruptor诗越,也是泛型,綁定的當(dāng)前實(shí)例
    private DisruptorProvider<T> provider;
    public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory, final Integer ringBufferSize) {
        this(consumerFactory,
                DEFAULT_CONSUMER_SIZE,
                ringBufferSize);
    }
    public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory) {
        this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
    }
    public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory,
                                   final int consumerSize,
                                   final int ringBufferSize) {
        this.consumerFactory = consumerFactory;
        this.size = ringBufferSize;
        this.consumerSize = consumerSize;
    }
// 啟動(dòng)disruptor
    public void startup() {
        this.startup(false);
    }
    public void startup(final boolean isOrderly) {
//一個(gè)可排序的任務(wù)執(zhí)行器,當(dāng)然如果是傳入 true才會(huì)做真正的可排序邏輯,現(xiàn)在默認(rèn)是false
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
// 可以看出這是消費(fèi)者的任務(wù)執(zhí)行器
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
// 如果是可排序的任務(wù)執(zhí)行臼氨,那么消費(fèi)者數(shù)量就是一個(gè)掺喻,即一個(gè)線程進(jìn)行消費(fèi)
            newConsumerSize = 1;
// 可排序的 disruptor事件工廠
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
// disruptor 啟動(dòng)類
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
// 生產(chǎn)者任務(wù)執(zhí)行器,disruptor是典型的消費(fèi)者生產(chǎn)者模型储矩,通過
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
// 多生產(chǎn)者模型
                ProducerType.MULTI,
// 生產(chǎn)者阻塞策略,這其實(shí)是一個(gè)游標(biāo)阻塞策略感耙,用于等待可用的序列
// 因?yàn)槠涫褂脽o鎖的環(huán)形數(shù)組,需要先請求一個(gè)可用序列即環(huán)形數(shù)組下標(biāo)來放入事件
// 當(dāng)前默認(rèn)使用性能較低的策略持隧,但是消耗cpu較少
                new BlockingWaitStrategy());
        // 初始化消費(fèi)者數(shù)組
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
// 事件處理器即硼,也就是消費(fèi)者執(zhí)行側(cè)報(bào)錯(cuò)的鉤子,默認(rèn)忽略
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
// 啟動(dòng)
        disruptor.start();
// 獲取隊(duì)列
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
// 將disruptor要素放入我們封裝好的 對外暴露的操作類
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }
    public DisruptorProvider<T> getProvider() {
        return provider;
    }
}

上述代碼可以看出DisruptorProviderManage類是一個(gè)引導(dǎo)類屡拨,幫助我們?nèi)?gòu)造disruptor實(shí)例只酥,并將當(dāng)前實(shí)例通過泛型與其綁定。
下面來看disruptor的實(shí)例DisruptorProvider類

// 也是泛型呀狼,當(dāng)前類為真正disruptor的封裝
public class DisruptorProvider<T> {
    // 大名鼎鼎的 ringBuffer
    private final RingBuffer<DataEvent<T>> ringBuffer;
    // disruptor的對外api類,我們這里將他封裝到當(dāng)前類
    private final Disruptor<DataEvent<T>> disruptor;
    // 是否可排序裂允。shenyu提供了一個(gè)可排序的disruptor運(yùn)行機(jī)制
    private final boolean isOrderly;
    // 這個(gè)是一個(gè)轉(zhuǎn)換器,disruptor封裝的ringBuffer環(huán)形數(shù)組區(qū)別于普通的隊(duì)列哥艇。
// 這里的生產(chǎn)者發(fā)布一個(gè)事件需要先從ringBuffer獲取sequence绝编,也就是獲取到環(huán)形數(shù)組可用的下標(biāo)才可以發(fā)布事件到ringBuffer中
// 發(fā)布事件時(shí)直接傳入sequence,并不會(huì)傳入數(shù)據(jù)貌踏,通過sequence和要發(fā)布的數(shù)據(jù)關(guān)聯(lián)十饥。消費(fèi)時(shí)也去拿可消費(fèi)的sequence。
// 因?yàn)?RingBuffer提供泛型化的參數(shù)祖乳,但是逗堵,傳入的參數(shù)不一定是指定的泛型化的類型,disruptor提供這個(gè)函數(shù)式接口來轉(zhuǎn)換參數(shù)
// shenyu轉(zhuǎn)換方式是又定義了一個(gè) DataEvent的泛型眷昆,通過參數(shù)轉(zhuǎn)換set到DataEvent中
    private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
    // 同上蜒秤,通過轉(zhuǎn)換set了排序用的hash值
    private final EventTranslatorTwoArg<DataEvent<T>, T, String> orderlyArg = (event, sequence, t, orderly) -> {
        if (event instanceof OrderlyDataEvent) {
            ((OrderlyDataEvent<T>) event).setHash(orderly);
        }
        event.setData(t);
    };
    public DisruptorProvider(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor, final boolean isOrderly) {
        this.ringBuffer = ringBuffer;
        this.disruptor = disruptor;
        this.isOrderly = isOrderly;
    }
    public void onData(final T data) {
//是否可排序 不可變汁咏,初始化時(shí)就指定了
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
// 調(diào)用ringBuffer發(fā)布事件,傳入?yún)?shù)和轉(zhuǎn)換器
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
    public void onOrderlyData(final T data, final String... hashArray) {
        if (!this.isOrderly) {
            throw new IllegalArgumentException("The current provider is not of orderly type. Please use onData() method.");
        }
        try {
// 多加一個(gè) 用于排序的 hash值
            String hash = String.join(":", hashArray);
            ringBuffer.publishEvent(orderlyArg, data, hash);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
//記得關(guān)閉資源
    public void shutdown() {
        if (null != disruptor) {
            disruptor.shutdown();
        }
    }
}

看到上面生產(chǎn)者操作類后作媚,再看看 消費(fèi)者類的構(gòu)建QueueConsumer

// WorkHandler是 disruptor抽象出來的 消費(fèi)者處理類梆暖,提供給我們實(shí)現(xiàn)來做消費(fèi)者的業(yè)務(wù)邏輯
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    // 可排序的 執(zhí)行器,也可以用作正常邏輯的執(zhí)行器
    private final OrderlyExecutor executor;
    // shenyu 自己封裝的 消費(fèi)者工廠掂骏,用于創(chuàng)建消費(fèi)者執(zhí)行器(實(shí)際就是實(shí)現(xiàn)runnable的執(zhí)行單元)
    private final QueueConsumerFactory<T> factory;

    public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
        this.executor = executor;
        this.factory = factory;
    }
    
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            ThreadPoolExecutor executor = orderly(t);
// 獲取傳入的 執(zhí)行業(yè)務(wù)的執(zhí)行器
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 放入事件
            queueConsumerExecutor.setData(t.getData());
            // help gc 切斷引用鏈,具體邏輯不是很清楚厚掷,應(yīng)該是防止一直存在引用鏈導(dǎo)致無法被gc回收
            t.setData(null);
// 執(zhí)行業(yè)務(wù)
            executor.execute(queueConsumerExecutor);
        }
    }
    // 如果是 可排序的事件弟灼,并且hash值不為空則使用可排序執(zhí)行器選擇出一個(gè)SingletonExecutor執(zhí)行器
    private ThreadPoolExecutor orderly(final DataEvent<T> t) {
        if (t instanceof OrderlyDataEvent && !isEmpty(((OrderlyDataEvent<T>) t).getHash())) {
            return executor.select(((OrderlyDataEvent<T>) t).getHash());
        } else {
            return executor;
        }
    }
    
    private boolean isEmpty(final String t) {
        return t == null || t.isEmpty();
    }
}
//---------------shenyu封裝的可排序的線程池--------
public class OrderlyExecutor extends ThreadPoolExecutor {
    // 線程安全的 可排序的map,目前jdk內(nèi)只有這一個(gè)可排序的concurrentMap的實(shí)現(xiàn)
    private final ConcurrentSkipListMap<Long, SingletonExecutor> virtualExecutors = new ConcurrentSkipListMap<>();
    // 線程選擇器
    private final ThreadSelector threadSelector = new ThreadSelector();
    
    public OrderlyExecutor(
            final boolean isOrderly,
            final int corePoolSize,
            final int maximumPoolSize,
            final long keepAliveTime,
            final TimeUnit unit,
            final BlockingQueue<Runnable> workQueue,
            final ThreadFactory threadFactory,
            final RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        orderlyThreadPool(isOrderly, corePoolSize, threadFactory);
    }
    
    private void orderlyThreadPool(final boolean isOrderly, final int corePoolSize, final ThreadFactory threadFactory) {
// 如果是可排序則特殊處理一下
        if (isOrderly) {
// 將設(shè)置的線程池的數(shù)量按下標(biāo)分別初始化一個(gè) 單個(gè)線程池放入ConcurrentSkipListMap中
            IntStream.range(0, corePoolSize).forEach(index -> {
                SingletonExecutor singletonExecutor = new SingletonExecutor(threadFactory);
// 將當(dāng)前單線程的線程池 + 下標(biāo) 進(jìn)行hash
                String hash = singletonExecutor.hashCode() + ":" + index;
                byte[] bytes = threadSelector.sha(hash);
                for (int i = 0; i < 4; i++) {
// 將hash過的值 再從 0 ~ 3 哈希一次存入冒黑。也就是map中有4個(gè)key執(zhí)行同一個(gè)線程池執(zhí)行器田绑。為什么這么做不太清楚
                    this.virtualExecutors.put(threadSelector.hash(bytes, i), singletonExecutor);
                }
            });
        }
    }
    
    public SingletonExecutor select(final String hash) {
        long select = threadSelector.select(hash);
// 通過hash值命中 線程池
        if (!virtualExecutors.containsKey(select)) {
// 如果沒有命中,則選擇最靠近自己的hash值的線程池抡爹,但是是向后取(也就是hash值大于等于當(dāng)前值的第一個(gè))
            SortedMap<Long, SingletonExecutor> tailMap = virtualExecutors.tailMap(select);
            if (tailMap.isEmpty()) {
// 如果計(jì)算出來的hash值已經(jīng)是最后一個(gè)了兔魂,則取當(dāng)前map的第一個(gè)值
                select = virtualExecutors.firstKey();
            } else {
                select = tailMap.firstKey();
            }
        }
        return virtualExecutors.get(select);
    }

    private static final class ThreadSelector {
        
        public long select(final String hash) {
            byte[] digest = sha(hash);
            return hash(digest, 0);
        }
        
        private long hash(final byte[] digest, final int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }
        
        private byte[] sha(final String hash) {
            byte[] bytes = hash.getBytes(StandardCharsets.UTF_8);
            return Hashing
                    .sha256()
                    .newHasher()
                    .putBytes(bytes)
                    .hash().asBytes();
        }
    }
}

看到上面的可排序的線程池邏輯癌瘾,其實(shí)就是結(jié)合了thread的priority屬性的一個(gè)優(yōu)先級線程池邏輯。
下面再來看看關(guān)于event的封裝

// 自己封裝的 事件類,基于泛型
public class DataEvent<T> {
    private T data;
    public T getData() {
        return data;
    }
    public void setData(final T data) {
        this.data = data;
    }
}
// ---------------事件工廠盖彭,實(shí)現(xiàn)了disruptor的事件工廠---------------
public class DisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
    
    @Override
    public DataEvent<T> newInstance() {
        return new DataEvent<>();
    }
}
//---------帶有優(yōu)先級邏輯的事件-------
public class OrderlyDataEvent<T> extends DataEvent<T> {
  
    private String hash;
    public String getHash() {
        return hash;
    }
    public void setHash(final String hash) {
        this.hash = hash;
    }
}
// -------- 帶有優(yōu)先級邏輯的 事件工廠------
public class OrderlyDisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
    
    @Override
    public OrderlyDataEvent<T> newInstance() {
        return new OrderlyDataEvent<>();
    }
}

上面看完了shenyu對于 disruptor的封裝然后來看看兩個(gè)入口,消費(fèi)者和生產(chǎn)者

生產(chǎn)者入口:DisruptorProvider#onData 消費(fèi)者入口:QueueConsumerExecutor#run(實(shí)現(xiàn)了runnable作為消費(fèi)者執(zhí)行任務(wù)單元)

DisruptorProvider


生產(chǎn)者

先看ShenyuClientRegisterEventPublisher#publishEvent棚赔,這是一個(gè)shenyuClient端注冊事件發(fā)布器单寂,用于發(fā)布客戶端(也就是被shenyu路由的后端服務(wù)們)的注冊事件

public class ShenyuClientRegisterEventPublisher {
    // 靜態(tài)的單例
    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    //這里其實(shí)是一個(gè)可替換的變量,用于存放disruptor生產(chǎn)者笑诅,泛型是shenyu封裝的數(shù)據(jù)接口调缨,兩個(gè)子類分別是元數(shù)據(jù),和接口吆你。
    private DisruptorProviderManage<DataTypeParent> providerManage;
    public static ShenyuClientRegisterEventPublisher getInstance() {
        return INSTANCE;
    }
//這里的start 理論上多個(gè)地方同時(shí)使用會(huì)造成providerManage 更改弦叶。這里可以這么用是因?yàn)椴煌恼{(diào)用方是互斥的,也就是客戶端會(huì)選擇一種方式上報(bào)注冊事件
    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// disruptor的消費(fèi)者 實(shí)現(xiàn)(針對于客戶端)
        RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
// 添加消費(fèi)者鉤子妇多,我們的業(yè)務(wù)邏輯沒有直接寫在消費(fèi)者實(shí)現(xiàn)的run方法中伤哺,而是嵌入了一個(gè)Set<ExecutorSubscriber<T>> subscribers,觀察者模式的調(diào)用
// 注冊一個(gè) 元數(shù)據(jù)鉤子
        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 注冊一個(gè) 資源路徑上報(bào)
        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
        providerManage = new DisruptorProviderManage<>(factory);
// 啟動(dòng)disruptor
        providerManage.startup();
    }
   // 發(fā)布事件 生產(chǎn)者
    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }
}
上述類的生成者方法調(diào)用

如上圖可以看出是砌梆,server端接收client端 通過http調(diào)用后直接復(fù)用了生產(chǎn)者邏輯


還是生產(chǎn)者

如上圖默责,有三對,成對是因?yàn)槎际?uri和metadata注冊咸包。
隨便點(diǎn)進(jìn)去看一個(gè)類 都是ShenyuClientServerRegisterRepository接口的子類桃序,并且是一個(gè)SPI接口,用于各種客戶端上報(bào)數(shù)據(jù)后放入disruptor中烂瘫。也就是對于disruptor他是一個(gè)生產(chǎn)者接口媒熊,但是對于client端奇适,是一個(gè)接收client上報(bào)的的server端,我們分別看看他們?nèi)绾芜\(yùn)作


所有client上報(bào)數(shù)據(jù)的server端
// consul 通過applicationEvent監(jiān)聽到數(shù)據(jù)變化再放入 disruptor
    @EventListener
    public void onMetadataChange(final ConsulConfigChangedEvent event) {
        Map<String, GetValue> metadataMap = event.getMetadataMap();
        metadataMap.forEach((path, getValue) -> {
            long modifyIndex = getValue.getModifyIndex();
            if (metadataChanged(path, modifyIndex)) {
                publishMetadata(getValue.getDecodedValue());
            }
        });
    }
// ---------etcd 通過一個(gè)添加一個(gè)監(jiān)聽到 etcd客戶端來處理disruptor----
    private void subscribeMetaData(final String rpcType) {
        String rpcPath = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
        List<String> metadataPaths = client.getChildren(rpcPath);
        for (String metadataPath: metadataPaths) {
            String data = client.read(metadataPath);
            publishMetadata(data);
        }
        LOGGER.info("subscribe metadata change: {}", rpcPath);
        client.subscribeChildChanges(rpcPath, new EtcdListenHandler() {
            @Override
            public void updateHandler(final String path, final String value) {
                publishMetadata(client.read(path));
            }
            @Override
            public void deleteHandler(final String path, final String value) {
            }
        });
    }
//-----------nacos也是通過添加監(jiān)聽芦鳍,但是元數(shù)據(jù)監(jiān)聽的是configServer嚷往,uri數(shù)據(jù)監(jiān)聽的是nameServer,如下代碼是nameServer-----------
    private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
        final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
        try {
            Map<String, List<URIRegisterDTO>> services = new HashMap<>();
            List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
            healthyInstances.forEach(healthyInstance -> {
                String contextPath = healthyInstance.getMetadata().get("contextPath");
                String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
                subscribeMetadata(serviceConfigName);
                metadataConfigCache.add(serviceConfigName);
                String metadata = healthyInstance.getMetadata().get("uriMetadata");
                URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
                services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
                uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
            });
            if (RPC_URI_TYPE_SET.contains(rpcType)) {
                services.values().forEach(this::publishRegisterURI);
            }
            LOGGER.info("subscribe uri : {}", serviceName);
            namingService.subscribe(serviceName, event -> {
                if (event instanceof NamingEvent) {
                    List<Instance> instances = ((NamingEvent) event).getInstances();
                    instances.forEach(instance -> {
                        String contextPath = instance.getMetadata().get("contextPath");
                        uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
                    });
                    refreshURIService(rpcType, serviceName);
                }
            });
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }
//-------http比較簡單---------
    @PostMapping("/register-metadata")
    @ResponseBody
    public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
        publisher.publish(metaDataRegisterDTO);
        return ShenyuResultMessage.SUCCESS;
    }
// ----------zk 通過zk客戶端添加監(jiān)聽-------
    private void subscribeMetaData(final String rpcType) {
        String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreatesAndChanges((oldNode, node) -> {
                    if (PathMatchUtils.match(RegisterPathConstants.REGISTER_METADATA_INSTANCE_PATH, node.getPath())) {
                        String data = new String(node.getData(), StandardCharsets.UTF_8);
                        publishMetadata(data);
                        LOGGER.info("zookeeper register metadata success: {}", data);
                    }
                }).build();
        client.addCache(contextPathParent, listener);
    }

這里不具體討論shenyu的數(shù)據(jù)上報(bào)的client和server端邏輯柠衅,這里可以看到所有server端在接收到client端上報(bào)或者監(jiān)聽到client端的數(shù)據(jù)變化都會(huì)第一時(shí)間放入disruptor處理皮仁,提供并發(fā)處理能力,也可以有削峰的效果菲宴,但是可能會(huì)帶來一些延遲贷祈。

下面再看看 消費(fèi)者 QueueConsumerExecutor類
// 實(shí)現(xiàn)了 runnable接口,可以作為disruptor的消費(fèi)者執(zhí)行任務(wù)
public abstract class QueueConsumerExecutor<T> implements Runnable {

    private T data;

    public T getData() {
        return data;
    }

    public void setData(final T data) {
        this.data = data;
    }
}

再回過頭看看 消費(fèi)者工廠的實(shí)現(xiàn)


client和server都會(huì)使用disrupotr的消費(fèi)者真正的處理

而QueueConsumerExecutor就是client和server兩邊的消費(fèi)者執(zhí)行邏輯


image.png

消費(fèi)者工廠不用看了喝峦,就是創(chuàng)建不同的QueueConsumerExecutor實(shí)現(xiàn)
那么來看看QueueConsumerExecutor不同的實(shí)現(xiàn)

RegisterClientConsumerExecutor

public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
    
    private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
    
    private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
        this.subscribers = new HashMap<>(executorSubscriberMap);
    }

    @Override
    public void run() {
        final T data = getData();
// client端消費(fèi)者通過   工廠中的回調(diào)鉤子來執(zhí)行势誊,其實(shí)通過disruptor這一步相當(dāng)于做了限流,通過一個(gè)ringBuffer來防止client端過多的并發(fā)打進(jìn)來
//我們可以做一個(gè)緩沖作用谣蠢,通過抽象出來的回調(diào)鉤子調(diào)用真正的業(yè)務(wù)粟耻,例如http請求,nacos請求眉踱,zk的數(shù)據(jù)變化請求等等      subscribers.get(data.getType()).executor(Lists.newArrayList(data));
    }
// 這里也是 client端消費(fèi)者的工廠類挤忙,創(chuàng)建當(dāng)前client端的消費(fèi)者
    public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
        
        @Override
        public RegisterClientConsumerExecutor<T> create() {
            Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
                    .stream()
                    .map(e -> (ExecutorTypeSubscriber<T>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
            return new RegisterClientConsumerExecutor<>(map);
        }

        @Override
        public String fixName() {
            return "shenyu_register_client";
        }
    }
}

RegisterServerConsumerExecutor邏輯大同小異,是client端的數(shù)據(jù)變化到了server端后勋锤,也經(jīng)過一層disruptor來解耦饭玲,并且削峰,由消費(fèi)者中回調(diào)鉤子執(zhí)行業(yè)務(wù)

ExecutorSubscriber 接口為shenyu中disrupor的回調(diào)鉤子接口

然后通過ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T>區(qū)分不同數(shù)據(jù)類型叁执,來處理不同數(shù)據(jù)類型的消費(fèi)者


不同數(shù)據(jù)類型

其實(shí)就是分為了 client的metadata茄厘,uri,即ShenyuClientRegisterRepository谈宛,用于client端的uri和metadata與shenyu的數(shù)據(jù)交互(這個(gè)是shenyu提供的多種數(shù)據(jù)同步機(jī)制)由client端注冊uri和metadata
ShenyuClientRegisterService client是以什么服務(wù)形式注冊到admin次哈,讓shenyu的網(wǎng)關(guān)基于什么樣的client來調(diào)用后端服務(wù)(這里取決于用戶使用什么后端服務(wù)提供shenyu的網(wǎng)關(guān)調(diào)用),然后通過這個(gè)注冊器來注冊metadata和uri
這里有點(diǎn)繞吆录。需要一個(gè)流程圖~
期待下一篇文章梳理一下apache-shenyu的被路由的后端服務(wù)窑滞,以及shenyu-admin,shenyu-bootstrap的注冊以及數(shù)據(jù)動(dòng)態(tài)感知的設(shè)計(jì)恢筝,這部分算是apache-shenyu支持多種協(xié)議的核心哀卫。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市撬槽,隨后出現(xiàn)的幾起案子此改,更是在濱河造成了極大的恐慌,老刑警劉巖侄柔,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件共啃,死亡現(xiàn)場離奇詭異占调,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)移剪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門究珊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人纵苛,你說我怎么就攤上這事剿涮。” “怎么了攻人?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵幔虏,是天一觀的道長。 經(jīng)常有香客問我贝椿,道長,這世上最難降的妖魔是什么陷谱? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任烙博,我火速辦了婚禮,結(jié)果婚禮上烟逊,老公的妹妹穿的比我還像新娘渣窜。我一直安慰自己,他們只是感情好宪躯,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布乔宿。 她就那樣靜靜地躺著,像睡著了一般访雪。 火紅的嫁衣襯著肌膚如雪详瑞。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天臣缀,我揣著相機(jī)與錄音坝橡,去河邊找鬼。 笑死精置,一個(gè)胖子當(dāng)著我的面吹牛计寇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脂倦,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼番宁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了赖阻?” 一聲冷哼從身側(cè)響起蝶押,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎政供,沒想到半個(gè)月后播聪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體朽基,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年离陶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了稼虎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡招刨,死狀恐怖霎俩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情沉眶,我是刑警寧澤打却,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站谎倔,受9級特大地震影響柳击,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜片习,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一捌肴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧藕咏,春花似錦状知、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至盲再,卻和暖如春西设,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背答朋。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工济榨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人绿映。 一個(gè)月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓擒滑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親叉弦。 傳聞我的和親對象是個(gè)殘疾皇子丐一,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容