spring-kafka源碼解析

spring-kafka 源碼解析流程

  • KafkaListenerAnnotationBeanPostProcessor
    該類實現(xiàn)BeanPostProcessor

加載標注有KafkaListener抡句,KafkaListeners 的方法粤攒,類,封裝MethodKafkaListenerEndpoint 并注冊到KafkaListenerEndpointRegistrar

public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                        Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }
  • KafkaListenerAnnotationBeanPostProcessor
    該類實現(xiàn)了SmartInitializingSingleton接口煌妈,所有bean加載完成后
public void afterSingletonsInstantiated() {
        this.registrar.setBeanFactory(this.beanFactory);

        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, KafkaListenerConfigurer> instances =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
            for (KafkaListenerConfigurer configurer : instances.values()) {
                configurer.configureKafkaListeners(this.registrar);
            }
        }

        if (this.registrar.getEndpointRegistry() == null) {
            if (this.endpointRegistry == null) {
                Assert.state(this.beanFactory != null,
                        "BeanFactory must be set to find endpoint registry by bean name");
                this.endpointRegistry = this.beanFactory.getBean(
                        KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                        KafkaListenerEndpointRegistry.class);
            }
            this.registrar.setEndpointRegistry(this.endpointRegistry);
        }

        if (this.defaultContainerFactoryBeanName != null) {
            this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
        }

        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
        }
        else {
            addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
        }

        // Actually register all listeners
//重點是這個方法,該方法中履婉,創(chuàng)建MessageListenerContainer時獲取Topic設(shè)置的并發(fā)數(shù)煤篙,默認是1。
        this.registrar.afterPropertiesSet();
        Map<String, ContainerGroupSequencer> sequencers =
                this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
        sequencers.values().forEach(seq -> seq.initialize());
    }
  • AbstractMessageListenerContainer
    該類繼承了Lifecycle接口毁腿,會執(zhí)行重寫start方法辑奈。
public final void start() {
        checkGroupId();
        synchronized (this.lifecycleMonitor) {
            if (!isRunning()) {
                Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
                        () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
                doStart();
            }
        }
    }

···
執(zhí)行到下面方法

protected void doStart() {
        if (isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) { // stand-alone container
            checkTopics();
        }
        ContainerProperties containerProperties = getContainerProperties();
        checkAckMode(containerProperties);

        Object messageListener = containerProperties.getMessageListener();
        AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
        ListenerType listenerType = determineListenerType(listener);

//實例化ListenerConsumer,其中進行主題訂閱
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
        setRunning(true);
        this.startLatch = new CountDownLatch(1);

//該方法默認會使用SimpleAsyncTaskExecutor線程池,并開啟線程
        this.listenerConsumerFuture = consumerExecutor
                .submitListenable(this.listenerConsumer);
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Consumer thread failed to start - does the configured task executor "
                        + "have enough threads to support all containers and concurrency?");
                publishConsumerFailedToStart();
            }
        }
        catch (@SuppressWarnings(UNUSED) InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

其中實例化ListenerConsumer 時會訂閱主題已烤,如subscribeOrAssignTopics(this.consumer);

同時該內(nèi)部類ListenerConsumer 實現(xiàn)了runnable接口

  • ListenerConsumer
    線程開啟后執(zhí)行run方法鸠窗,調(diào)用kafka-client 原生包 循環(huán)poll數(shù)據(jù)
public void run() {
            ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
            publishConsumerStartingEvent();
            this.consumerThread = Thread.currentThread();
            setupSeeks();
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);
            this.count = 0;
            this.last = System.currentTimeMillis();
            initAssignedPartitions();
            publishConsumerStartedEvent();
            Throwable exitThrowable = null;
            while (isRunning()) {
                try {

//獲取方法
                    pollAndInvoke();
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
                    exitThrowable = nofpe;
                    break;
                }
                catch (AuthenticationException | AuthorizationException ae) {
                    if (this.authExceptionRetryInterval == null) {
                        ListenerConsumer.this.logger.error(ae,
                                "Authentication/Authorization Exception and no authExceptionRetryInterval set");
                        this.fatalError = true;
                        exitThrowable = ae;
                        break;
                    }
                    else {
                        ListenerConsumer.this.logger.error(ae,
                                "Authentication/Authorization Exception, retrying in "
                                        + this.authExceptionRetryInterval.toMillis() + " ms");
                        // We can't pause/resume here, as KafkaConsumer doesn't take pausing
                        // into account when committing, hence risk of being flooded with
                        // GroupAuthorizationExceptions.
                        // see: https://github.com/spring-projects/spring-kafka/pull/1337
                        sleepFor(this.authExceptionRetryInterval);
                    }
                }
                catch (FencedInstanceIdException fie) {
                    this.fatalError = true;
                    ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
                            + "' has been fenced");
                    exitThrowable = fie;
                    break;
                }
                catch (StopAfterFenceException e) {
                    this.logger.error(e, "Stopping container due to fencing");
                    stop(false);
                    exitThrowable = e;
                }
                catch (Error e) { // NOSONAR - rethrown
                    this.logger.error(e, "Stopping container due to an Error");
                    this.fatalError = true;
                    wrapUp(e);
                    throw e;
                }
                catch (Exception e) {
                    handleConsumerException(e);
                }
                finally {
                    clearThreadState();
                }
            }
            wrapUp(exitThrowable);
        }

ConcurrentMessageListenerContainer 設(shè)置并發(fā)數(shù)

@Override
    protected void doStart() {
        if (!isRunning()) {
            checkTopics();
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
                        + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                        + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            setRunning(true);
//根據(jù)并發(fā)數(shù)concurrency開啟訂閱主題接收消息 
            for (int i = 0; i < this.concurrency; i++) {
                KafkaMessageListenerContainer<K, V> container =
                        constructContainer(containerProperties, topicPartitions, i);
                configureChildContainer(i, container);
                if (isPaused()) {
                    container.pause();
                }
                container.start();
                this.containers.add(container);
            }
        }
    }

注冊時 AbstractKafkaListenerContainerFactory 設(shè)置批量消費

private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
        if (aklEndpoint.getRecordFilterStrategy() == null) {
            JavaUtils.INSTANCE
                    .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy);
        }
        JavaUtils.INSTANCE
                .acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
                .acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
                .acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
                .acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
                .acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
                .acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)
                .acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter);
        if (aklEndpoint.getBatchListener() == null) {
//設(shè)置批量消費
            JavaUtils.INSTANCE
                    .acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener);
        }
    }

自定義設(shè)置主題消費時間

Component
@EnableScheduling
@Slf4j
public class CustomConsumeSet {

    @Autowired
    KafkaListenerEndpointRegistry registry;

    @Scheduled(cron = "0 35 20 * * ?")
    public void startConsume(){

        MessageListenerContainer listenerContainer = registry.getListenerContainer("myContainerId");
        if(!listenerContainer.isRunning()){
            log.info("~~~~~~消費開始~~~~~~~~");
            listenerContainer.start();  //消費開始
        }else{
            log.info("~~~~~~消費繼續(xù)~~~~~~~~");
            listenerContainer.resume(); //繼續(xù)消費
        }
    }

    @Scheduled(cron = "0 40 20 * * ?")
    public void stopConsume(){
        MessageListenerContainer listenerContainer = registry.getListenerContainer("myContainerId");
        listenerContainer.stop();
        log.info("~~~~~~消費停止~~~~~~~~");
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市胯究,隨后出現(xiàn)的幾起案子稍计,更是在濱河造成了極大的恐慌,老刑警劉巖裕循,帶你破解...
    沈念sama閱讀 221,548評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件臣嚣,死亡現(xiàn)場離奇詭異,居然都是意外死亡剥哑,警方通過查閱死者的電腦和手機硅则,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來星持,“玉大人抢埋,你說我怎么就攤上這事《皆荩” “怎么了揪垄?”我有些...
    開封第一講書人閱讀 167,990評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長逻翁。 經(jīng)常有香客問我饥努,道長,這世上最難降的妖魔是什么八回? 我笑而不...
    開封第一講書人閱讀 59,618評論 1 296
  • 正文 為了忘掉前任酷愧,我火速辦了婚禮驾诈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘溶浴。我一直安慰自己乍迄,他們只是感情好,可當我...
    茶點故事閱讀 68,618評論 6 397
  • 文/花漫 我一把揭開白布士败。 她就那樣靜靜地躺著闯两,像睡著了一般。 火紅的嫁衣襯著肌膚如雪谅将。 梳的紋絲不亂的頭發(fā)上漾狼,一...
    開封第一講書人閱讀 52,246評論 1 308
  • 那天,我揣著相機與錄音饥臂,去河邊找鬼逊躁。 笑死,一個胖子當著我的面吹牛隅熙,可吹牛的內(nèi)容都是我干的稽煤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,819評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼囚戚,長吁一口氣:“原來是場噩夢啊……” “哼念脯!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起弯淘,我...
    開封第一講書人閱讀 39,725評論 0 276
  • 序言:老撾萬榮一對情侶失蹤绿店,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后庐橙,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體假勿,經(jīng)...
    沈念sama閱讀 46,268評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,356評論 3 340
  • 正文 我和宋清朗相戀三年态鳖,在試婚紗的時候發(fā)現(xiàn)自己被綠了转培。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,488評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡浆竭,死狀恐怖浸须,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情邦泄,我是刑警寧澤删窒,帶...
    沈念sama閱讀 36,181評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站顺囊,受9級特大地震影響肌索,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜特碳,卻給世界環(huán)境...
    茶點故事閱讀 41,862評論 3 333
  • 文/蒙蒙 一诚亚、第九天 我趴在偏房一處隱蔽的房頂上張望晕换。 院中可真熱鬧,春花似錦站宗、人聲如沸闸准。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽恕汇。三九已至,卻和暖如春或辖,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背枣接。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評論 1 272
  • 我被黑心中介騙來泰國打工颂暇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人但惶。 一個月前我還...
    沈念sama閱讀 48,897評論 3 376
  • 正文 我出身青樓耳鸯,卻偏偏與公主長得像,于是被迫代替她去往敵國和親膀曾。 傳聞我的和親對象是個殘疾皇子县爬,可洞房花燭夜當晚...
    茶點故事閱讀 45,500評論 2 359

推薦閱讀更多精彩內(nèi)容