spring for kafka自動配置及配置屬性

本文主要列一下spring for apache kafka的一些auto config以及屬性配置

maven

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.3.RELEASE</version>
</dependency>

這個版本使用的是kafka client 0.10.2.1版本
使用的spring retry是1.1.3.RELEASE版本

幾個關(guān)鍵配置類

  • KafkaAutoConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(
            ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
                kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean(ProducerListener.class)
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<Object, Object>();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<Object, Object>(
                this.properties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory<Object, Object>(
                this.properties.buildProducerProperties());
    }

}
  • KafkaAnnotationDrivenConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        return configurer;
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    protected static class EnableKafkaConfiguration {

    }
}

配置屬性

spring-boot-autoconfigure-1.5.7.RELEASE.jar!/META-INF/spring-configuration-metadata.json

公共配置

    {
      "name": "spring.kafka.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    },
    {
      "name": "spring.kafka.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    },
        {
      "name": "spring.kafka.ssl.key-password",
      "type": "java.lang.String",
      "description": "Password of the private key in the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.keystore-location",
      "type": "org.springframework.core.io.Resource",
      "description": "Location of the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.keystore-password",
      "type": "java.lang.String",
      "description": "Store password for the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.truststore-location",
      "type": "org.springframework.core.io.Resource",
      "description": "Location of the trust store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.truststore-password",
      "type": "java.lang.String",
      "description": "Store password for the trust store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.template.default-topic",
      "type": "java.lang.String",
      "description": "Default topic to which messages will be sent.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Template"
    }

consumer配置屬性

    {
      "name": "spring.kafka.consumer.auto-commit-interval",
      "type": "java.lang.Integer",
      "description": "Frequency in milliseconds that the consumer offsets are auto-committed to Kafka\n if 'enable.auto.commit' true.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.auto-offset-reset",
      "type": "java.lang.String",
      "description": "What to do when there is no initial offset in Kafka or if the current offset\n does not exist any more on the server.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.enable-auto-commit",
      "type": "java.lang.Boolean",
      "description": "If true the consumer's offset will be periodically committed in the background.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.fetch-max-wait",
      "type": "java.lang.Integer",
      "description": "Maximum amount of time in milliseconds the server will block before answering\n the fetch request if there isn't sufficient data to immediately satisfy the\n requirement given by \"fetch.min.bytes\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.fetch-min-size",
      "type": "java.lang.Integer",
      "description": "Minimum amount of data the server should return for a fetch request in bytes.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.group-id",
      "type": "java.lang.String",
      "description": "Unique string that identifies the consumer group this consumer belongs to.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.heartbeat-interval",
      "type": "java.lang.Integer",
      "description": "Expected time in milliseconds between heartbeats to the consumer coordinator.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.key-deserializer",
      "type": "java.lang.Class<?>",
      "description": "Deserializer class for keys.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.max-poll-records",
      "type": "java.lang.Integer",
      "description": "Maximum number of records returned in a single call to poll().",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.value-deserializer",
      "type": "java.lang.Class<?>",
      "description": "Deserializer class for values.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.listener.ack-count",
      "type": "java.lang.Integer",
      "description": "Number of records between offset commits when ackMode is \"COUNT\" or\n \"COUNT_TIME\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.ack-mode",
      "type": "org.springframework.kafka.listener.AbstractMessageListenerContainer$AckMode",
      "description": "Listener AckMode; see the spring-kafka documentation.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.ack-time",
      "type": "java.lang.Long",
      "description": "Time in milliseconds between offset commits when ackMode is \"TIME\" or\n \"COUNT_TIME\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.concurrency",
      "type": "java.lang.Integer",
      "description": "Number of threads to run in the listener containers.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.poll-timeout",
      "type": "java.lang.Long",
      "description": "Timeout in milliseconds to use when polling the consumer.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    }

producer配置

{
      "name": "spring.kafka.producer.acks",
      "type": "java.lang.String",
      "description": "Number of acknowledgments the producer requires the leader to have received\n before considering a request complete.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.batch-size",
      "type": "java.lang.Integer",
      "description": "Number of records to batch before sending.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.buffer-memory",
      "type": "java.lang.Long",
      "description": "Total bytes of memory the producer can use to buffer records waiting to be sent\n to the server.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.compression-type",
      "type": "java.lang.String",
      "description": "Compression type for all data generated by the producer.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.key-serializer",
      "type": "java.lang.Class<?>",
      "description": "Serializer class for keys.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.retries",
      "type": "java.lang.Integer",
      "description": "When greater than zero, enables retrying of failed sends.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.value-serializer",
      "type": "java.lang.Class<?>",
      "description": "Serializer class for values.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.properties",
      "type": "java.util.Map<java.lang.String,java.lang.String>",
      "description": "Additional properties used to configure the client.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末踩晶,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子枕磁,更是在濱河造成了極大的恐慌渡蜻,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件计济,死亡現(xiàn)場離奇詭異茸苇,居然都是意外死亡,警方通過查閱死者的電腦和手機沦寂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門学密,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人传藏,你說我怎么就攤上這事腻暮⊥兀” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵哭靖,是天一觀的道長具垫。 經(jīng)常有香客問我,道長试幽,這世上最難降的妖魔是什么筝蚕? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮铺坞,結(jié)果婚禮上饰及,老公的妹妹穿的比我還像新娘。我一直安慰自己康震,他們只是感情好燎含,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著腿短,像睡著了一般屏箍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上橘忱,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天赴魁,我揣著相機與錄音,去河邊找鬼钝诚。 笑死颖御,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的凝颇。 我是一名探鬼主播潘拱,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼拧略!你這毒婦竟也來了芦岂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤垫蛆,失蹤者是張志新(化名)和其女友劉穎禽最,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體袱饭,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡川无,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了虑乖。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懦趋。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖决左,靈堂內(nèi)的尸體忽然破棺而出愕够,到底是詐尸還是另有隱情走贪,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布惑芭,位于F島的核電站坠狡,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏遂跟。R本人自食惡果不足惜逃沿,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望幻锁。 院中可真熱鬧凯亮,春花似錦、人聲如沸哄尔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽岭接。三九已至富拗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鸣戴,已是汗流浹背啃沪。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留窄锅,地道東北人创千。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像入偷,于是被迫代替她去往敵國和親追驴。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容