Spring-Kafka(八)—— KafkaListener定時(shí)啟動(dòng)(禁止自啟動(dòng))

定時(shí)啟動(dòng)的意義何在

如果只學(xué)習(xí)技術(shù)不討論其應(yīng)用范圍那就是在耍流氓啊合蔽,為了不做那個(gè)流氓,我還是犧牲一下色相吧_
在這里我舉一個(gè)定時(shí)啟動(dòng)的應(yīng)用場(chǎng)景:
比如現(xiàn)在單機(jī)環(huán)境下,我們需要利用Kafka做數(shù)據(jù)持久化的功能,由于用戶(hù)活躍的時(shí)間為早上10點(diǎn)至晚上12點(diǎn)日麸,那在這個(gè)時(shí)間段做一個(gè)大數(shù)據(jù)量的持久化可能會(huì)影響數(shù)據(jù)庫(kù)性能導(dǎo)致用戶(hù)體驗(yàn)降低,我們可以選擇在用戶(hù)活躍度低的時(shí)間段去做持久化的操作逮光,也就是晚上12點(diǎn)后到第二條的早上10點(diǎn)前代箭。


使用KafkaListenerEndpointRegistry

這里需要提及一下,@KafkaListener這個(gè)注解所標(biāo)注的方法并沒(méi)有在IOC容器中注冊(cè)為Bean涕刚,而是會(huì)被注冊(cè)在KafkaListenerEndpointRegistry中嗡综,KafkaListenerEndpointRegistry在SpringIOC中已經(jīng)被注冊(cè)為Bean,具體可以看一下該類(lèi)的源碼杜漠,當(dāng)然不是使用注解方式注冊(cè)啦...

public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap();
    private int phase = 2147483547;
    private ConfigurableApplicationContext applicationContext;
    private boolean contextRefreshed;
    ......
}



那我們?cè)趺醋孠afkaListener定時(shí)啟動(dòng)呢极景?

  1. 禁止KafkaListener自啟動(dòng)(AutoStartup)
  2. 編寫(xiě)兩個(gè)定時(shí)任務(wù),一個(gè)晚上12點(diǎn)驾茴,一個(gè)早上10點(diǎn)
  3. 分別在12點(diǎn)的任務(wù)上啟動(dòng)KafkaListener盼樟,在10點(diǎn)的任務(wù)上關(guān)閉KafkaListener

這里需要注意一下啟動(dòng)監(jiān)聽(tīng)容器的方法,項(xiàng)目啟動(dòng)的時(shí)候監(jiān)聽(tīng)容器是未啟動(dòng)狀態(tài)锈至,而resume是恢復(fù)的意思不是啟動(dòng)的意思晨缴,所以我們需要判斷容器是否運(yùn)行,如果運(yùn)行則調(diào)用resume方法峡捡,否則調(diào)用start方法

@Component
@EnableScheduling
public class TaskListener{

    private static final Logger log= LoggerFactory.getLogger(TaskListener.class);

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自動(dòng)啟動(dòng)
        container.setAutoStartup(false);
        return container;
    }

    @KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory")
    public void durableListener(String data) {
        //這里做數(shù)據(jù)持久化的操作
        log.info("topic.quick.durable receive : " + data);
    }


    //定時(shí)器击碗,每天凌晨0點(diǎn)開(kāi)啟監(jiān)聽(tīng)
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        log.info("開(kāi)啟監(jiān)聽(tīng)");
        //判斷監(jiān)聽(tīng)容器是否啟動(dòng)筑悴,未啟動(dòng)則將其啟動(dòng)
        if (!registry.getListenerContainer("durable").isRunning()) {
            registry.getListenerContainer("durable").start();
        }
        registry.getListenerContainer("durable").resume();
    }

    //定時(shí)器,每天早上10點(diǎn)關(guān)閉監(jiān)聽(tīng)
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        log.info("關(guān)閉監(jiān)聽(tīng)");
        registry.getListenerContainer("durable").pause();
    }



}




原本不想測(cè)試的延都,奈何心腸太好

修改修改一下定時(shí)器注解雷猪,修改為距離現(xiàn)在時(shí)間較近的時(shí)間點(diǎn),然后寫(xiě)入些數(shù)據(jù)晰房,啟動(dòng)SpringBoot項(xiàng)目,靜靜的等待時(shí)間的到來(lái)

    //這個(gè)代表16:24執(zhí)行
    @Scheduled(cron = "0 24 16 * * ?")

    @Test
    public void testTask() {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("topic.quick.durable", "this is durable message");
        }
    }



這里可以看到在16:24的時(shí)候啟動(dòng)了監(jiān)聽(tīng)容器射沟,監(jiān)聽(tīng)容器也成功從Topic中獲取到了數(shù)據(jù)殊者,等到16:28的時(shí)候容器被暫停了,這個(gè)時(shí)候可以運(yùn)行一下測(cè)試方法验夯,看看監(jiān)聽(tīng)容器是否還能獲取數(shù)據(jù)猖吴,答案肯定是不行的鴨。

2018-09-12 16:24:00.003  INFO 2872 --- [pool-1-thread-1] com.viu.kafka.listen.TaskListener        : 開(kāi)啟監(jiān)聽(tīng)
2018-09-12 16:24:00.004  INFO 2872 --- [pool-1-thread-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = durable
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 15000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2018-09-12 16:24:00.007  INFO 2872 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
2018-09-12 16:24:00.007  INFO 2872 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
2018-09-12 16:24:00.007  INFO 2872 --- [pool-1-thread-1] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
2018-09-12 16:24:00.012  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Discovered group coordinator admin-PC:9092 (id: 2147483647 rack: null)
2018-09-12 16:24:00.013  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Revoking previously assigned partitions []
2018-09-12 16:24:00.014  INFO 2872 --- [  durable-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: []
2018-09-12 16:24:00.014  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=durable] (Re-)joining group
2018-09-12 16:24:00.021  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Successfully joined group with generation 6
2018-09-12 16:24:00.021  INFO 2872 --- [  durable-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=durable] Setting newly assigned partitions [topic.quick.durable-0]
2018-09-12 16:24:00.024  INFO 2872 --- [  durable-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic.quick.durable-0]
2018-09-12 16:24:00.042  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043  INFO 2872 --- [  durable-0-C-1] com.viu.kafka.listen.TaskListener        : topic.quick.durable receive : this is durable message
2018-09-12 16:28:00.023  INFO 2872 --- [pool-1-thread-1] com.viu.kafka.listen.TaskListener        : 關(guān)閉監(jiān)聽(tīng)


更多文章請(qǐng)關(guān)注該 Spring-Kafka史上最強(qiáng)入門(mén)教程 專(zhuān)題

博主常駐地~ http://blog.seasedge.cn/archives/49.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挥转,一起剝皮案震驚了整個(gè)濱河市海蔽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌绑谣,老刑警劉巖党窜,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異借宵,居然都是意外死亡幌衣,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)壤玫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)豁护,“玉大人,你說(shuō)我怎么就攤上這事欲间〕铮” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵猎贴,是天一觀的道長(zhǎng)班缎。 經(jīng)常有香客問(wèn)我,道長(zhǎng)嘱能,這世上最難降的妖魔是什么吝梅? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮惹骂,結(jié)果婚禮上苏携,老公的妹妹穿的比我還像新娘。我一直安慰自己对粪,他們只是感情好右冻,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布装蓬。 她就那樣靜靜地躺著,像睡著了一般纱扭。 火紅的嫁衣襯著肌膚如雪牍帚。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天乳蛾,我揣著相機(jī)與錄音暗赶,去河邊找鬼。 笑死肃叶,一個(gè)胖子當(dāng)著我的面吹牛蹂随,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播因惭,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼岳锁,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蹦魔?” 一聲冷哼從身側(cè)響起激率,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎勿决,沒(méi)想到半個(gè)月后乒躺,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡剥险,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年聪蘸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片表制。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡健爬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出么介,到底是詐尸還是另有隱情娜遵,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布壤短,位于F島的核電站设拟,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏久脯。R本人自食惡果不足惜直晨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一鳄逾、第九天 我趴在偏房一處隱蔽的房頂上張望护盈。 院中可真熱鬧徒坡,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至芝雪,卻和暖如春减余,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背惩系。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工位岔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蛆挫。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓赃承,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親悴侵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容