定時(shí)啟動(dòng)的意義何在
如果只學(xué)習(xí)技術(shù)不討論其應(yīng)用范圍那就是在耍流氓啊合蔽,為了不做那個(gè)流氓,我還是犧牲一下色相吧
在這里我舉一個(gè)定時(shí)啟動(dòng)的應(yīng)用場(chǎng)景:
比如現(xiàn)在單機(jī)環(huán)境下,我們需要利用Kafka做數(shù)據(jù)持久化的功能,由于用戶(hù)活躍的時(shí)間為早上10點(diǎn)至晚上12點(diǎn)日麸,那在這個(gè)時(shí)間段做一個(gè)大數(shù)據(jù)量的持久化可能會(huì)影響數(shù)據(jù)庫(kù)性能導(dǎo)致用戶(hù)體驗(yàn)降低,我們可以選擇在用戶(hù)活躍度低的時(shí)間段去做持久化的操作逮光,也就是晚上12點(diǎn)后到第二條的早上10點(diǎn)前代箭。
使用KafkaListenerEndpointRegistry
這里需要提及一下,@KafkaListener這個(gè)注解所標(biāo)注的方法并沒(méi)有在IOC容器中注冊(cè)為Bean涕刚,而是會(huì)被注冊(cè)在KafkaListenerEndpointRegistry中嗡综,KafkaListenerEndpointRegistry在SpringIOC中已經(jīng)被注冊(cè)為Bean,具體可以看一下該類(lèi)的源碼杜漠,當(dāng)然不是使用注解方式注冊(cè)啦...
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
protected final Log logger = LogFactory.getLog(this.getClass());
private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap();
private int phase = 2147483547;
private ConfigurableApplicationContext applicationContext;
private boolean contextRefreshed;
......
}
那我們?cè)趺醋孠afkaListener定時(shí)啟動(dòng)呢极景?
- 禁止KafkaListener自啟動(dòng)(AutoStartup)
- 編寫(xiě)兩個(gè)定時(shí)任務(wù),一個(gè)晚上12點(diǎn)驾茴,一個(gè)早上10點(diǎn)
- 分別在12點(diǎn)的任務(wù)上啟動(dòng)KafkaListener盼樟,在10點(diǎn)的任務(wù)上關(guān)閉KafkaListener
這里需要注意一下啟動(dòng)監(jiān)聽(tīng)容器的方法,項(xiàng)目啟動(dòng)的時(shí)候監(jiān)聽(tīng)容器是未啟動(dòng)狀態(tài)锈至,而resume是恢復(fù)的意思不是啟動(dòng)的意思晨缴,所以我們需要判斷容器是否運(yùn)行,如果運(yùn)行則調(diào)用resume方法峡捡,否則調(diào)用start方法
@Component
@EnableScheduling
public class TaskListener{
private static final Logger log= LoggerFactory.getLogger(TaskListener.class);
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止自動(dòng)啟動(dòng)
container.setAutoStartup(false);
return container;
}
@KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory")
public void durableListener(String data) {
//這里做數(shù)據(jù)持久化的操作
log.info("topic.quick.durable receive : " + data);
}
//定時(shí)器击碗,每天凌晨0點(diǎn)開(kāi)啟監(jiān)聽(tīng)
@Scheduled(cron = "0 0 0 * * ?")
public void startListener() {
log.info("開(kāi)啟監(jiān)聽(tīng)");
//判斷監(jiān)聽(tīng)容器是否啟動(dòng)筑悴,未啟動(dòng)則將其啟動(dòng)
if (!registry.getListenerContainer("durable").isRunning()) {
registry.getListenerContainer("durable").start();
}
registry.getListenerContainer("durable").resume();
}
//定時(shí)器,每天早上10點(diǎn)關(guān)閉監(jiān)聽(tīng)
@Scheduled(cron = "0 0 10 * * ?")
public void shutDownListener() {
log.info("關(guān)閉監(jiān)聽(tīng)");
registry.getListenerContainer("durable").pause();
}
}
原本不想測(cè)試的延都,奈何心腸太好
修改修改一下定時(shí)器注解雷猪,修改為距離現(xiàn)在時(shí)間較近的時(shí)間點(diǎn),然后寫(xiě)入些數(shù)據(jù)晰房,啟動(dòng)SpringBoot項(xiàng)目,靜靜的等待時(shí)間的到來(lái)
//這個(gè)代表16:24執(zhí)行
@Scheduled(cron = "0 24 16 * * ?")
@Test
public void testTask() {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send("topic.quick.durable", "this is durable message");
}
}
這里可以看到在16:24的時(shí)候啟動(dòng)了監(jiān)聽(tīng)容器射沟,監(jiān)聽(tīng)容器也成功從Topic中獲取到了數(shù)據(jù)殊者,等到16:28的時(shí)候容器被暫停了,這個(gè)時(shí)候可以運(yùn)行一下測(cè)試方法验夯,看看監(jiān)聽(tīng)容器是否還能獲取數(shù)據(jù)猖吴,答案肯定是不行的鴨。
2018-09-12 16:24:00.003 INFO 2872 --- [pool-1-thread-1] com.viu.kafka.listen.TaskListener : 開(kāi)啟監(jiān)聽(tīng)
2018-09-12 16:24:00.004 INFO 2872 --- [pool-1-thread-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = durable
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 15000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2018-09-12 16:24:00.007 INFO 2872 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.2
2018-09-12 16:24:00.007 INFO 2872 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 2a121f7b1d402825
2018-09-12 16:24:00.007 INFO 2872 --- [pool-1-thread-1] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2018-09-12 16:24:00.012 INFO 2872 --- [ durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=durable] Discovered group coordinator admin-PC:9092 (id: 2147483647 rack: null)
2018-09-12 16:24:00.013 INFO 2872 --- [ durable-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=durable] Revoking previously assigned partitions []
2018-09-12 16:24:00.014 INFO 2872 --- [ durable-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2018-09-12 16:24:00.014 INFO 2872 --- [ durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=durable] (Re-)joining group
2018-09-12 16:24:00.021 INFO 2872 --- [ durable-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=durable] Successfully joined group with generation 6
2018-09-12 16:24:00.021 INFO 2872 --- [ durable-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=durable] Setting newly assigned partitions [topic.quick.durable-0]
2018-09-12 16:24:00.024 INFO 2872 --- [ durable-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.durable-0]
2018-09-12 16:24:00.042 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:24:00.043 INFO 2872 --- [ durable-0-C-1] com.viu.kafka.listen.TaskListener : topic.quick.durable receive : this is durable message
2018-09-12 16:28:00.023 INFO 2872 --- [pool-1-thread-1] com.viu.kafka.listen.TaskListener : 關(guān)閉監(jiān)聽(tīng)