Spring Kafka深入學(xué)習(xí)分析

文章出處shenyifengtk.github.io 轉(zhuǎn)載請(qǐng)注明

本文由來(lái)块饺,有一個(gè)需求要在瀏覽器輸入Kafka topic弥虐,消費(fèi)組提交后自動(dòng)開(kāi)啟消費(fèi),這個(gè)做起來(lái)比較簡(jiǎn)單,同事使用了Kafka 驅(qū)動(dòng)包很快速完成這個(gè)洲押。我突然想到能不能通過(guò)Spring Kafka自身框架完成這個(gè)功能缔刹,不使用底層驅(qū)動(dòng)包來(lái)自做呢搂根。而引出分析整個(gè)Spring Kafka 如何實(shí)現(xiàn)注解消費(fèi)信息断楷,調(diào)用方法的。并且最后通過(guò)幾個(gè)簡(jiǎn)單的代碼完成上面小需求褥紫。

源碼解析

EnableKafka入口

kafka 模塊的開(kāi)始先從@EnableKafka 上@Import(KafkaListenerConfigurationSelector.class)

    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[] { KafkaBootstrapConfiguration.class.getName() };
    }

接著繼續(xù)看下KafkaBootstrapConfiguration類

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        if (!registry.containsBeanDefinition(
                KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

            registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
                    new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
        }

        if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
            registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                    new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
        }
    }

}

使用BeanDefinitionRegistry 將class 轉(zhuǎn)換成beanDefinition姜性,注冊(cè)到beanDefinitionMap 容器中,容器會(huì)統(tǒng)一將Map Class全部進(jìn)行實(shí)例化髓考,其實(shí)就是將這個(gè)交給Spring 初始化部念。


image.png

KafkaListenerAnnotationBeanPostProcessor 解析

下面看下kafka核心處理類KafkaListenerAnnotationBeanPostProcessor 如何解析@KafkaListener 注解,postProcessAfterInitialization 在bean 實(shí)例化后調(diào)用方法氨菇,對(duì)bean 進(jìn)行增強(qiáng)儡炼。

    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            //如果此時(shí)bean可能是代理類,則獲取原始class 查蓉,否則直接class
            Class<?> targetClass = AopUtils.getTargetClass(bean); 
           //這時(shí)類上去找@KafkaListener  乌询,因?yàn)樵赾lass 上可能出現(xiàn)多種復(fù)雜情況,這個(gè)方法封裝一系列方法能包裝找到注解
          //這里可能存在子父類同時(shí)使用注解豌研,所有只有找到一個(gè)就進(jìn)行對(duì)應(yīng)方法處理
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
              //從方法上找注解妹田,找到方法放到map中唬党,Method 當(dāng)作key
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                        Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) { //如果類上有注解的話,都有搭配@KafkaHandler使用的鬼佣,方法上找這個(gè)注解
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) { //將解析過(guò)class 緩存起來(lái)
                this.nonAnnotatedClasses.add(bean.getClass());
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);  //方法監(jiān)聽(tīng)處理的邏輯
                    }
                }
                this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); //KafkaHandler 處理邏輯
            }
        }
        return bean;
    }

@kafkaListener其實(shí)可以作用于Class 上的驶拱,搭配著@KafkaHandler一起使用,那怎么樣使用呢晶衷,我用一個(gè)簡(jiǎn)單例子展示下蓝纲。

@KafkaListener(topics = "${topic-name.lists}",groupId = "${group}",concurrency = 4)
public class Kddk {
    
    @KafkaHandler
    public void user(User user){
        
    }
    
    @KafkaHandler
    public void std(Dog dog){
        
    }
}

消費(fèi)信息不同對(duì)象區(qū)分進(jìn)行處理,省去對(duì)象轉(zhuǎn)換的麻煩晌纫,我暫時(shí)想到場(chǎng)景就是這些税迷,平常很少有這些。這個(gè)實(shí)現(xiàn)原理我就不深入分析了

    protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
        //如果方法剛好被代理增強(qiáng)了缸匪,返回原始class 方法
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setMethod(methodToUse);

        String beanRef = kafkaListener.beanRef();
        this.listenerScope.addListener(beanRef, bean);
        String[] topics = resolveTopics(kafkaListener);
        TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
         //這個(gè)方法是判斷方法上是否有@RetryableTopic 注解翁狐,有的話則放回true类溢,注冊(cè)到KafkaListenerEndpointRegistry
        if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
            //解析@kafkaListener 屬性凌蔬,設(shè)置到endpoint ,注冊(cè)到KafkaListenerEndpointRegistry
            processListener(endpoint, kafkaListener, bean, beanName, topics, tps); 
        }
        this.listenerScope.removeListener(beanRef);
    }

    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
                                Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
        processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
        String containerFactory = resolve(kafkaListener.containerFactory());
        KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);
        //這里主要核心了闯冷,解析完成后砂心,注冊(cè)到KafkaListenerEndpointRegistry 中,等待下一步操作了
        this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
        processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
    }

類名MethodKafkaListenerEndpoint 都可以理解成端點(diǎn)對(duì)象蛇耀,簡(jiǎn)單地說(shuō)辩诞,端點(diǎn)是通信通道的一端》牡樱可以理解這個(gè)端點(diǎn)連接業(yè)務(wù)方法和kafka 信息之間的通信端點(diǎn)译暂。
@RetryableTopic 是spring kafka 2.7 后出的一個(gè)注解,主要作用就是在消費(fèi)kafka信息時(shí)出現(xiàn)消費(fèi)異常時(shí)撩炊,失敗重試而出現(xiàn)死信信息的處理外永,由于Kafka內(nèi)部并沒(méi)有死信隊(duì)列或者死信信息這類東西。Spring 自己搞出來(lái)一個(gè)DLT topics (Dead-Letter Topic),意思就是當(dāng)消費(fèi)信息失敗到達(dá)一定次數(shù)時(shí)拧咳,會(huì)將信息發(fā)送到指定DLT topic 中伯顶。注解可以設(shè)置重試次數(shù)、重試時(shí)間骆膝、故障異常祭衩、失敗策略等等。

其實(shí)這個(gè)processMainAndRetryListeners 方法跟下面processListener 作用差不多阅签,都有解析注解內(nèi)容掐暮,然后調(diào)用KafkaListenerEndpointRegistry.registerEndpoint 方法。
KafkaListenerEndpointRegistry 主要由Spring 容器創(chuàng)建政钟,用于實(shí)例化MessageListenerContainer
KafkaListenerEndpointRegistrar主要代碼new創(chuàng)建劫乱,并沒(méi)有交給spring容器管理织中,用于幫助bean 注冊(cè)到KafkaListenerEndpointRegistry中
這個(gè)兩個(gè)類類名特別相似,在分析源碼時(shí)被搞到暈頭轉(zhuǎn)向衷戈,分清楚后其實(shí)就挺簡(jiǎn)單了狭吼,這個(gè)類名搞混上浪費(fèi)不算時(shí)間去理解。

注冊(cè)endpoint

    public void registerEndpoint(KafkaLiEstenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {
        // Factory may be null, we defer the resolution right before actually creating the container
        // 這個(gè)只是一個(gè)內(nèi)部類殖妇,用來(lái)裝兩個(gè)對(duì)象的刁笙,沒(méi)有任何實(shí)現(xiàn)意義,factory 實(shí)際可能為空谦趣,這里使用延時(shí)創(chuàng)建解析這個(gè)問(wèn)題
        KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
                //這個(gè) startImmediately 并沒(méi)有被初始化疲吸,這里一定是false,當(dāng)被設(shè)置true前鹅,會(huì)直接創(chuàng)建監(jiān)聽(tīng)器容器摘悴,這時(shí)應(yīng)該是spring 容器已經(jīng)初始化完成了
            if (this.startImmediately) { // Register and start immediately
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }

這里為什么有一個(gè)startImmediately開(kāi)關(guān)呢,這里只是將endpoint 放入容器集中保存起來(lái)舰绘,等到全部添加完成后蹂喻,使用Spring InitializingBean接口afterPropertiesSet 方法進(jìn)行基礎(chǔ)注冊(cè)啟動(dòng),這是利用了Spring bean 生命周期方法來(lái)觸發(fā)捂寿,如果是Spring 完全啟動(dòng)完成后口四,那添加進(jìn)來(lái)endpoint就是不能啟動(dòng)的了,所以相當(dāng)于一個(gè)閾值開(kāi)關(guān)秦陋,開(kāi)啟后立即啟動(dòng)蔓彩。
下面看下調(diào)用KafkaListenerEndpointRegistrar.afterPropertiesSet 來(lái)開(kāi)啟各大endpoint 運(yùn)行。

    @Override
    public void afterPropertiesSet() {
        registerAllEndpoints();
    }

    protected void registerAllEndpoints() {
        synchronized (this.endpointDescriptors) {
            for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
                if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint //只有使用@KafkaHandler 才會(huì)生成這個(gè)對(duì)象
                        && this.validator != null) {
                    ((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
                }
                 //通過(guò)endpoint ,containerFactory 創(chuàng)建信息容器MessageListenerContainer 
                this.endpointRegistry.registerListenerContainer(
                        descriptor.endpoint, resolveContainerFactory(descriptor));
            }
             //全部處理完成了驳概,就可以開(kāi)啟start啟動(dòng)按鈕赤嚼,讓新增進(jìn)來(lái)立即啟動(dòng)
            this.startImmediately = true;  // trigger immediate startup
        }
    }

    //獲取內(nèi)部類KafkaListenerContainerFactory 具體實(shí)例,在延時(shí)啟動(dòng)時(shí)顺又,可能存在空更卒,這時(shí)可以使用Spring 內(nèi)部默認(rèn)
   // 如果注解上已經(jīng)備注了要使用ContainerFactory 則使用自定義,為空則使用默認(rèn)ConcurrentKafkaListenerContainerFactory
    private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {
        if (descriptor.containerFactory != null) {
            return descriptor.containerFactory;
        }
        else if (this.containerFactory != null) {
            return this.containerFactory;
        }
        else if (this.containerFactoryBeanName != null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            this.containerFactory = this.beanFactory.getBean(
                    this.containerFactoryBeanName, KafkaListenerContainerFactory.class);
            return this.containerFactory;  // Consider changing this if live change of the factory is required
        }
        else {
        //.....
        }
    }

MessageListenerContainer

看下KafkaListenerEndpointRegistry.registerListenerContainer 方法如何生成信息監(jiān)聽(tīng)器的待榔。

    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        registerListenerContainer(endpoint, factory, false);
    }

    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
            boolean startImmediately) {
        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must not be empty");
        synchronized (this.listenerContainers) {
            Assert.state(!this.listenerContainers.containsKey(id),
                    "Another endpoint is already registered with id '" + id + "'");
            //創(chuàng)建監(jiān)聽(tīng)器容器
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
           //使用map 將實(shí)例化容器保存起來(lái)逞壁,key就是 @KafkaListener id  ,這個(gè)就是所謂的beanName
            this.listenerContainers.put(id, container);
            ConfigurableApplicationContext appContext = this.applicationContext;
            String groupName = endpoint.getGroup();
         //如果注解中有設(shè)置自定義監(jiān)聽(tīng)組锐锣,這時(shí)需要獲取到監(jiān)聽(tīng)組實(shí)例锻拘,將監(jiān)聽(tīng)器容器裝起來(lái)
            if (StringUtils.hasText(groupName) && appContext != null) {
                //省略部分內(nèi)容
            }
            if (startImmediately) {  //如果是立即啟動(dòng)光督,這時(shí)需要手動(dòng)調(diào)用監(jiān)聽(tīng)器start 方法
                startIfNecessary(container);
            }
        }
    }

    protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
            KafkaListenercContainerFactory<?> factory) {
                //監(jiān)聽(tīng)器被創(chuàng)建了 
        MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

        if (listenerContainer instanceof InitializingBean) { //這時(shí)spring 容器已經(jīng)初始化完成了茧妒,生命周期方法不會(huì)再執(zhí)行了身笤,這里顯式調(diào)用它
            try {
                ((InitializingBean) listenerContainer).afterPropertiesSet();
            }
            catch (Exception ex) {
                throw new BeanInitializationException("Failed to initialize message listener container", ex);
            }
        }

        int containerPhase = listenerContainer.getPhase();
        if (listenerContainer.isAutoStartup() &&
                containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) {  // a custom phase value
            if (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {
                throw new IllegalStateException("Encountered phase mismatch between container "
                        + "factory definitions: " + this.phase + " vs " + containerPhase);
            }
            this.phase = listenerContainer.getPhase();
        }

        return listenerContainer;
    }


    private void startIfNecessary(MessageListenerContainer listenerContainer) {
        // contextRefreshed  Spring 完全啟動(dòng)完成true
        if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
            listenerContainer.start();
        }
    }

主要就是通過(guò)KafkaListenercContainerFactory 信息監(jiān)聽(tīng)工廠來(lái)創(chuàng)建監(jiān)聽(tīng)器MessageListenerContainer ,通過(guò)繼承了SmartLifecycle斤彼。SmartLifecycle接口是Spring 在初始化完成后分瘦,根據(jù)接口isAutoStartup() 返回值是否實(shí)現(xiàn)該接口的類中對(duì)應(yīng)的start()蘸泻。Spring 當(dāng)spring 完全初始化完成后,SmartLifecycle 接口就不會(huì)被Spring 調(diào)用執(zhí)行嘲玫,這時(shí)就需要手動(dòng)執(zhí)行start 方法悦施,所以startIfNecessary 方法才會(huì)判斷容器已經(jīng)啟動(dòng)完成了。

MessageListenerContainer

    public C createListenerContainer(KafkaListenerEndpoint endpoint) {
        C instance = createContainerInstance(endpoint);
        JavaUtils.INSTANCE
                .acceptIfNotNull(endpoint.getId(), instance::setBeanName);
        if (endpoint instanceof AbstractKafkaListenerEndpoint) {
                //配置kafka 設(shè)置去团,因?yàn)橄裥畔⑾M(fèi)提交ack抡诞,信息消費(fèi)批量這些設(shè)置都是通過(guò)配置設(shè)定的,這些信息都在factory保存著土陪,這時(shí)將配置信息設(shè)置給endpoint 
            configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
        }
        //這里是核心昼汗,將注解聲明bean method 創(chuàng)建成MessagingMessageListenerAdapter 信息監(jiān)聽(tīng)適配器,在將適配器初始化參數(shù)去創(chuàng)建信息監(jiān)聽(tīng)器鬼雀,交給instance
        endpoint.setupListenerContainer(instance, this.messageConverter);
       //將concurrency  并發(fā)數(shù)設(shè)置上
        initializeContainer(instance, endpoint);
       //自定義配置
        customizeContainer(instance);
        return instance;
    }

這時(shí)kafka 配置信息顷窒、@KafkaListener 信息、消費(fèi)方法源哩、bean 已經(jīng)全部設(shè)置createListenerContainer鞋吉,這時(shí)監(jiān)聽(tīng)器容器就可以啟動(dòng)kafka 拉取信息,調(diào)用方法進(jìn)行處理了璧疗。

直接從信息監(jiān)聽(tīng)器ConcurrentMessageListenerContainer啟動(dòng)方法開(kāi)始

    public final void start() {
        checkGroupId();
        synchronized (this.lifecycleMonitor) {
            if (!isRunning()) { //監(jiān)聽(tīng)狀態(tài)坯辩,測(cè)試還沒(méi)有開(kāi)始監(jiān)聽(tīng)馁龟,所以監(jiān)聽(tīng)狀態(tài)應(yīng)該為false
                Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
                        () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
                 //抽象方法崩侠,由子類去實(shí)現(xiàn)
                doStart();
            }
        }
    }

    @Override
    protected void doStart() {
        if (!isRunning()) {
             //topic 正則匹配,根據(jù)規(guī)則去匹配sever所有topic坷檩,沒(méi)有則拋出異常
            checkTopics();
            ContainerProperties containerProperties = getContainerProperties();
           //已經(jīng)獲取到消費(fèi)組的分區(qū)和offset
            TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                 // 當(dāng) concurrency  并發(fā)數(shù)超過(guò)分區(qū)時(shí)却音,這里會(huì)打印警告日志
                this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
                        + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                        + topicPartitions.length);
                 //注意這里,強(qiáng)制將并發(fā)數(shù)改成最大分?jǐn)?shù)矢炼,在設(shè)置消費(fèi)并發(fā)時(shí)系瓢,不用擔(dān)心分區(qū)數(shù)量并發(fā)超過(guò)
                this.concurrency = topicPartitions.length;
            }
            setRunning(true); //開(kāi)始監(jiān)聽(tīng)
                //concurrency 就是創(chuàng)建容器時(shí),從@KafkaListener 解析處理的并發(fā)數(shù)
              // 可以看出并發(fā)數(shù)控制著  KafkaMessageListenerContainer 實(shí)例產(chǎn)生
            for (int i = 0; i < this.concurrency; i++) {
                //創(chuàng)建 KafkaMessageListenerContainer 對(duì)象
                KafkaMessageListenerContainer<K, V> container =
                        constructContainer(containerProperties, topicPartitions, i);
               //配置監(jiān)聽(tīng)器容器攔截器句灌、通知這些夷陋,如果沒(méi)有配置默認(rèn)都是null
                configureChildContainer(i, container);
                if (isPaused()) {
                    container.pause();
                }
                container.start(); //啟動(dòng)任務(wù)
                //因?yàn)樗邢M(fèi)現(xiàn)場(chǎng)都是同一個(gè)容器創(chuàng)建的,當(dāng)要停止某個(gè)消費(fèi)topic胰锌,需要對(duì)containers進(jìn)行操作
                this.containers.add(container);
            }
        }
    }

    private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties,
            @Nullable TopicPartitionOffset[] topicPartitions, int i) {

        KafkaMessageListenerContainer<K, V> container;
        if (topicPartitions == null) {
            container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); // NOSONAR
        }
        else { //如果存在分區(qū)骗绕,每一個(gè)消費(fèi)都有平分分區(qū)
            container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, // NOSONAR
                    containerProperties, partitionSubset(containerProperties, i));
        }
        return container;
    }

看到了@KafkaListener 并發(fā)數(shù)如何實(shí)現(xiàn)的,并且并發(fā)數(shù)不能超過(guò)分區(qū)數(shù)的资昧,如果并發(fā)數(shù)小于分區(qū)數(shù)酬土,則會(huì)出現(xiàn)平分的情況,可能會(huì)讓一個(gè)消費(fèi)占有多個(gè)分區(qū)情況格带。這里在創(chuàng)建KafkaMessageListenerContainer 去對(duì)Kafka topic 進(jìn)行消費(fèi)撤缴。

KafkaMessageListenerContainer

因?yàn)镵afkaMessageListenerContainer和ConcurrentMessageListenerContainer都是通過(guò)extends AbstractMessageListenerContainer 重寫doStart()開(kāi)啟任務(wù)刹枉,直接看見(jiàn)doStart就可以知道程序入口了。

    protected void doStart() {
        if (isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) { // stand-alone container
            checkTopics();
        }
        ContainerProperties containerProperties = getContainerProperties();
        //檢查是否非自動(dòng)ack屈呕,在org.springframework.kafka.listener.ContainerProperties.AckMode 有多種模式
        checkAckMode(containerProperties);
        // 
        Object   = containerProperties.getMessageListener();
         //任務(wù)執(zhí)行器微宝,看起倆像一個(gè)線程池Executor ,本質(zhì)上是直接使用Thread來(lái)啟動(dòng)任務(wù)的
        AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
         //這個(gè)一個(gè)枚舉類虎眨,根據(jù)類型生成type芥吟,type 標(biāo)記著如何處理kafka 信息,有批量的专甩、單條的钟鸵、手動(dòng)提交、自動(dòng)提交
        ListenerType listenerType = determineListenerType(listener);
           //ListenerConsumer 內(nèi)部類涤躲,有關(guān)Kafka 任何信息都可以直接去取的
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
        setRunning(true); //設(shè)置運(yùn)行狀態(tài)
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerFuture = consumerExecutor
                .submitListenable(this.listenerConsumer);//啟動(dòng)線程
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Consumer thread failed to start - does the configured task executor "
                        + "have enough threads to support all containers and concurrency?");
                publishConsumerFailedToStart();
            }
        }
        catch (@SuppressWarnings(UNUSED) InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

在這里主要邏輯就是啟動(dòng)線程去去處理kafka 信息拉取棺耍。我們直接去看ListenerConsumer run() 就行了。

        @Override // NOSONAR complexity
        public void run() {
            ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
               //向spring容器發(fā)布事件
            publishConsumerStartingEvent();
            this.consumerThread = Thread.currentThread();
            setupSeeks();
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);
            this.count = 0;
            this.last = System.currentTimeMillis();
               //從kafka 獲取消費(fèi)組 分區(qū) offset种樱,保存起來(lái)
            initAssignedPartitions();
             //發(fā)布事件
            publishConsumerStartedEvent();
            Throwable exitThrowable = null;
            while (isRunning()) {
                try {
                            //核心  拉取信息和 調(diào)用方法去處理信息
                    pollAndInvoke();
                }
                //省略

pollAndInvoke 這個(gè)方法就是拉取信息和處理的過(guò)程了蒙袍,方法太繁瑣了,無(wú)非就是如何去調(diào)用endpoint 生成信息處理器嫩挤,并且將參數(shù)注入方法中害幅。

總結(jié)

image.png

結(jié)合上面圖,簡(jiǎn)單總結(jié)下Spring Kafka 如何通過(guò)一個(gè)簡(jiǎn)單注解實(shí)現(xiàn)對(duì)方法消費(fèi)信息的岂昭。首先通過(guò)Spring 前置處理器機(jī)制使用KafkaListenerAnnotationBeanPostProcessor 掃描所有已經(jīng)實(shí)例化的bean以现,找出帶有@KafkaListener bean 和方法,解析注解的內(nèi)容設(shè)置到MethodKafkaListenerEndpoint约啊,并且注冊(cè)到KafkaListenerEndpointRegistry邑遏,有它統(tǒng)一保存起來(lái),等到執(zhí)行前置處理器統(tǒng)一將KafkaListenerEndpointRegistry保存起來(lái)的enpoint恰矩,注冊(cè)到KafkaListenerEndpointRegistrar,根據(jù)enpoint生成ConcurrentMessageListenerContainer记盒,在根據(jù)并發(fā)數(shù)去生成對(duì)應(yīng)數(shù)量的KafkaMessageListenerContainer,最后使用Thread 異步啟動(dòng)Kafka 信息拉去外傅,調(diào)用bean 方法進(jìn)行處理纪吮。
還理解了topic 分區(qū)和并發(fā)數(shù)如何關(guān)聯(lián)的,還知道kafka消費(fèi)是可控制的萎胰,處理Kafka信息方法碾盟,返回值可以被推送到另一個(gè)topic的、也是第一次知道有@RetryableTopic 重試機(jī)制奥洼,還有DLT 死信topic巷疼。如果不是看源碼分析,平常工作場(chǎng)景估計(jì)很少用得上這些。現(xiàn)在看源碼多了嚼沿,越來(lái)越有感覺(jué)查看代碼更能加深你對(duì)框架學(xué)習(xí)估盘,心得。

動(dòng)態(tài)訂閱

看了這么多代碼骡尽,對(duì)照處理器CV下就遣妥,簡(jiǎn)單版動(dòng)態(tài)監(jiān)聽(tīng)就可以實(shí)現(xiàn)了

@Component
public class ListenerMessageCommand<K,V> implements CommandLineRunner {

    @Autowired
    private Cusmotd cusmotd;

    @Autowired
    private KafkaListenerEndpointRegistry endpointRegistry;

    @Autowired
    private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;

    private Logger logger = LoggerFactory.getLogger(ListenerMessageCommand.class);

    @Override
    public void run(String... args) throws Exception {
        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBean(cusmotd);
        Method method = ReflectionUtils.findMethod(cusmotd.getClass(), "dis", ConsumerRecord.class);
        endpoint.setMethod(method);
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        endpoint.setId("tk.shengyifeng.custom#1");
        endpoint.setGroupId("test");
        endpoint.setTopicPartitions(new TopicPartitionOffset[0]);
        endpoint.setTopics("skdsk");
        endpoint.setClientIdPrefix("comuserd_");
        endpoint.setConcurrency(1);
        endpointRegistry.registerListenerContainer(endpoint,kafkaListenerContainerFactory,true);
        logger.info("register...............");
    }
}

我們看過(guò)完整代碼,知道監(jiān)聽(tīng)動(dòng)作是由KafkaListenerContainerFactory創(chuàng)建后攀细,調(diào)用實(shí)例start 方法開(kāi)始的箫踩,并且我們還能拿到監(jiān)聽(tīng)容器對(duì)象,可以調(diào)用對(duì)象各式API谭贪,可以動(dòng)態(tài)停止對(duì)topic消費(fèi)哦境钟。

@RestController
@RequestMapping("kafka")
public class KafkaController<K,V> {
    @Autowired
    private Cusmotd cusmotd;

    @Autowired
    private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;

    private Map<String,MessageListenerContainer> containerMap = new ConcurrentReferenceHashMap<>();

    @GetMapping("start/topic")
    public void startTopic(String topicName,String groupName){
        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBean(cusmotd);
        Method method = ReflectionUtils.findMethod(cusmotd.getClass(), "dis", ConsumerRecord.class);
        endpoint.setMethod(method);
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        endpoint.setId("tk.shengyifeng.custom#1");
        endpoint.setGroupId(groupName);
        endpoint.setTopicPartitions(new TopicPartitionOffset[0]);
        endpoint.setTopics(topicName);
        endpoint.setClientIdPrefix("comuserd_");
        endpoint.setConcurrency(1);
        MessageListenerContainer listenerContainer = kafkaListenerContainerFactory.createListenerContainer(endpoint);
        listenerContainer.start();
        containerMap.put(topicName,listenerContainer);
    }

    @GetMapping("stop/topic")
    public void stopTopic(String topicName){
        if (containerMap.containsKey(topicName))
            containerMap.get(topicName).stop();
    }
}

這個(gè)簡(jiǎn)單http接口,通過(guò)接口方式支持對(duì)外擴(kuò)容的方式動(dòng)態(tài)訂閱頻道俭识,并且支持已經(jīng)訂閱topic消費(fèi)停下來(lái)慨削。
使用@kafkaListener 聲明方法消費(fèi)的同學(xué)不用羨慕的,Spring 提供機(jī)制可以去獲取MessageListenerContainer套媚,上面代碼分析我們知道了KafkaListenerEndpointRegistry內(nèi)部的listenerContainers 會(huì)保存所有container實(shí)例缚态,并且提供外部方法根據(jù)id去獲取對(duì)象,而且KafkaListenerEndpointRegistry還是有spring 進(jìn)行實(shí)例化的堤瘤,所以....
為了方便獲取id簡(jiǎn)單玫芦,可以在使用注解時(shí),手動(dòng)指定id 值本辐,如果沒(méi)有指定則id桥帆,默認(rèn)生成規(guī)則是org.springframework.kafka.KafkaListenerEndpointContainer# + 自增長(zhǎng)

SpringBoot 自動(dòng)配置

大家可能好奇,Spring boot中Kafka配置信息如何給kafkaListenerContainerFactory师郑,因?yàn)樗峭ㄟ^(guò)Spring 容器初始化的环葵,源碼中并沒(méi)有看見(jiàn)帶有構(gòu)造器的參數(shù)注入调窍。想要具體了解宝冕,只有看KafkaAnnotationDrivenConfiguration,ConcurrentKafkaListenerContainerFactoryConfigurer

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    private final RecordMessageConverter messageConverter;

    private final RecordFilterStrategy<Object, Object> recordFilterStrategy;

    private final BatchMessageConverter batchMessageConverter;

    private final KafkaTemplate<Object, Object> kafkaTemplate;

    private final KafkaAwareTransactionManager<Object, Object> transactionManager;

    private final ConsumerAwareRebalanceListener rebalanceListener;

    private final ErrorHandler errorHandler;

    private final BatchErrorHandler batchErrorHandler;

    private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;

    private final RecordInterceptor<Object, Object> recordInterceptor;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
            ObjectProvider<RecordMessageConverter> messageConverter,
            ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
            ObjectProvider<BatchMessageConverter> batchMessageConverter,
            ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
            ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
            ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
            ObjectProvider<BatchErrorHandler> batchErrorHandler,
            ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
            ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
        this.properties = properties;
        this.messageConverter = messageConverter.getIfUnique();
        this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
        this.batchMessageConverter = batchMessageConverter
                .getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
        this.kafkaTemplate = kafkaTemplate.getIfUnique();
        this.transactionManager = kafkaTransactionManager.getIfUnique();
        this.rebalanceListener = rebalanceListener.getIfUnique();
        this.errorHandler = errorHandler.getIfUnique();
        this.batchErrorHandler = batchErrorHandler.getIfUnique();
        this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
        this.recordInterceptor = recordInterceptor.getIfUnique();
    }

作為其實(shí)Spring Boot自動(dòng)配置原理就是由spring-boot-autoconfigure 包編碼實(shí)現(xiàn)的邓萨,在根據(jù)@ConditionalOnClass 注解來(lái)決定是否啟動(dòng)配置類地梨,所以當(dāng)你引入對(duì)應(yīng)pox時(shí),就會(huì)啟動(dòng)配置類了缔恳,配置信息會(huì)注入到KafkaProperties對(duì)象中宝剖,然后將properties 設(shè)置到工廠對(duì)象,實(shí)例化對(duì)象交給spring 容器歉甚,你會(huì)發(fā)現(xiàn)大多數(shù)自動(dòng)配置都是這樣套路万细。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市纸泄,隨后出現(xiàn)的幾起案子赖钞,更是在濱河造成了極大的恐慌腰素,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雪营,死亡現(xiàn)場(chǎng)離奇詭異弓千,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)献起,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門洋访,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人谴餐,你說(shuō)我怎么就攤上這事姻政。” “怎么了岂嗓?”我有些...
    開(kāi)封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵扶歪,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我摄闸,道長(zhǎng)善镰,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任年枕,我火速辦了婚禮炫欺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘熏兄。我一直安慰自己品洛,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布摩桶。 她就那樣靜靜地躺著桥状,像睡著了一般。 火紅的嫁衣襯著肌膚如雪硝清。 梳的紋絲不亂的頭發(fā)上辅斟,一...
    開(kāi)封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音芦拿,去河邊找鬼士飒。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蔗崎,可吹牛的內(nèi)容都是我干的酵幕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼缓苛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼芳撒!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤笔刹,失蹤者是張志新(化名)和其女友劉穎庐完,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體徘熔,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡门躯,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了酷师。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讶凉。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖山孔,靈堂內(nèi)的尸體忽然破棺而出懂讯,到底是詐尸還是另有隱情,我是刑警寧澤台颠,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布褐望,位于F島的核電站,受9級(jí)特大地震影響串前,放射性物質(zhì)發(fā)生泄漏瘫里。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一荡碾、第九天 我趴在偏房一處隱蔽的房頂上張望谨读。 院中可真熱鬧,春花似錦坛吁、人聲如沸劳殖。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)哆姻。三九已至,卻和暖如春玫膀,著一層夾襖步出監(jiān)牢的瞬間矛缨,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工匆骗, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留劳景,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓碉就,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親闷串。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瓮钥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容