Spring Boot Quartz 是如何工作的

今天看到一個(gè)quartz 類(lèi)的定義,一下子引起了我興趣飒筑。

public class FlowSetup extends QuartzJobBean {

    public final Map<StackEnum, TaskFlow> flowMapping = new HashMap<>();

    @Autowired
    private FlowManager flowManager;
...
}

上面這個(gè)類(lèi)沒(méi)有定義任何annotation片吊, 為什么可以@Autowire ? 如果能autowire 說(shuō)明它是spring 管理的bean协屡,但是沒(méi)有被掃描到怎么成為bean 的呢俏脊?更進(jìn)一步的問(wèn)題就是quartz 是如何跟Spring boot 集成的。

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

利用starter的原理在
spring.factories里有 org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration
則啟動(dòng)的時(shí)候就會(huì)加載QuartzAutoConfiguration肤晓,下面看下QuartzAutoConfiguration

@Configuration
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class,
        PlatformTransactionManager.class })
        //加入QuartzProperties.class
@EnableConfigurationProperties(QuartzProperties.class)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class,
        HibernateJpaAutoConfiguration.class })
public class QuartzAutoConfiguration {

private final QuartzProperties properties;

//構(gòu)造器注入QuartzProperties
public QuartzAutoConfiguration(QuartzProperties properties,
            ObjectProvider<SchedulerFactoryBeanCustomizer> customizers,
            ObjectProvider<JobDetail[]> jobDetails,
            ObjectProvider<Map<String, Calendar>> calendars,
            ObjectProvider<Trigger[]> triggers, ApplicationContext applicationContext) {
        this.properties = properties;
        this.customizers = customizers;
        this.jobDetails = jobDetails.getIfAvailable();
        this.calendars = calendars.getIfAvailable();
        this.triggers = triggers.getIfAvailable();
        this.applicationContext = applicationContext;
    }
    
//定義一個(gè)SchedulerFactoryBean爷贫,前提是沒(méi)有SchedulerFactoryBean    
        @Bean
    @ConditionalOnMissingBean
    public SchedulerFactoryBean quartzScheduler() {
    //new 了一個(gè)SchedulerFactoryBean
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
        jobFactory.setApplicationContext(this.applicationContext);
        schedulerFactoryBean.setJobFactory(jobFactory);
        
        //這里可以定義很多quartz.properteis里的屬性
        if (this.properties.getSchedulerName() != null) {
            schedulerFactoryBean.setSchedulerName(this.properties.getSchedulerName());
        }
        schedulerFactoryBean.setAutoStartup(this.properties.isAutoStartup());
        schedulerFactoryBean
                .setStartupDelay((int) this.properties.getStartupDelay().getSeconds());
        schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(
                this.properties.isWaitForJobsToCompleteOnShutdown());
        schedulerFactoryBean
                .setOverwriteExistingJobs(this.properties.isOverwriteExistingJobs());
        if (!this.properties.getProperties().isEmpty()) {
            schedulerFactoryBean
                    .setQuartzProperties(asProperties(this.properties.getProperties()));
        }
        if (this.jobDetails != null && this.jobDetails.length > 0) {
            schedulerFactoryBean.setJobDetails(this.jobDetails);
        }
        if (this.calendars != null && !this.calendars.isEmpty()) {
            schedulerFactoryBean.setCalendars(this.calendars);
        }
        if (this.triggers != null && this.triggers.length > 0) {
            schedulerFactoryBean.setTriggers(this.triggers);
        }
        customize(schedulerFactoryBean);
        return schedulerFactoryBean;
    }   
    
}

首先定義了一個(gè)SchedulerFactoryBean, 可以看到是
implements FactoryBean补憾,也就是這是Scheduler 這個(gè)定義的生成者

public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
        BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
    //getObject() 是FactoryBean<Scheduler>的接口方法
    public Scheduler getObject() {
        return this.scheduler;
    }
}

FactoryBean和BeanFactory的區(qū)別漫萄,其實(shí)2者除了名字顛倒外,沒(méi)有必然的關(guān)系余蟹。BeanFactory是個(gè)工廠類(lèi)卷胯,顧名思義就是生產(chǎn)Bean的工廠。而FactoryBean是個(gè)生成BeanDefinition的類(lèi)威酒。

同時(shí)在QuartzAutoConfiguration中 通過(guò)inner Class SchedulerDependsOnBeanFactoryPostProcessor 進(jìn)行綁定


    /**
     * {@link AbstractDependsOnBeanFactoryPostProcessor} for Quartz {@link Scheduler} and
     * {@link SchedulerFactoryBean}.
     */
    private static class SchedulerDependsOnBeanFactoryPostProcessor extends AbstractDependsOnBeanFactoryPostProcessor {

        SchedulerDependsOnBeanFactoryPostProcessor(Class<?>... dependencyTypes) {
            super(Scheduler.class, SchedulerFactoryBean.class, dependencyTypes);
        }

    }

SchedulerDependsOnBeanFactoryPostProcessor在 QuartzSchedulerDependencyConfiguration中 進(jìn)行了bean 注冊(cè)窑睁。

        @Configuration(proxyBeanMethods = false)
        static class QuartzSchedulerDependencyConfiguration {

            @Bean
            static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerDataSourceInitializerDependsOnBeanFactoryPostProcessor() {
                return new SchedulerDependsOnBeanFactoryPostProcessor(QuartzDataSourceInitializer.class);
            }

            @Bean
            @ConditionalOnBean(FlywayMigrationInitializer.class)
            static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerFlywayDependsOnBeanFactoryPostProcessor() {
                return new SchedulerDependsOnBeanFactoryPostProcessor(FlywayMigrationInitializer.class);
            }

            @Configuration(proxyBeanMethods = false)
            @ConditionalOnClass(SpringLiquibase.class)
            static class LiquibaseQuartzSchedulerDependencyConfiguration {

                @Bean
                @ConditionalOnBean(SpringLiquibase.class)
                static SchedulerDependsOnBeanFactoryPostProcessor quartzSchedulerLiquibaseDependsOnBeanFactoryPostProcessor() {
                    return new SchedulerDependsOnBeanFactoryPostProcessor(SpringLiquibase.class);
                }

            }

        }

    }

在QuartzAutoConfiguration 還配置了SchedulerFactoryBeanCustomizer, 為schedulerFactoryBean配置datasource

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnSingleCandidate(DataSource.class)
    @ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc")
    protected static class JdbcStoreTypeConfiguration {

        @Bean
        @Order(0)
        public SchedulerFactoryBeanCustomizer dataSourceCustomizer(QuartzProperties properties, DataSource dataSource,
                @QuartzDataSource ObjectProvider<DataSource> quartzDataSource,
                ObjectProvider<PlatformTransactionManager> transactionManager,
                @QuartzTransactionManager ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
            return (schedulerFactoryBean) -> {
                DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
                schedulerFactoryBean.setDataSource(dataSourceToUse);
                PlatformTransactionManager txManager = getTransactionManager(transactionManager,
                        quartzTransactionManager);
                if (txManager != null) {
                    schedulerFactoryBean.setTransactionManager(txManager);
                }
            };
        }

        private DataSource getDataSource(DataSource dataSource, ObjectProvider<DataSource> quartzDataSource) {
            DataSource dataSourceIfAvailable = quartzDataSource.getIfAvailable();
            return (dataSourceIfAvailable != null) ? dataSourceIfAvailable : dataSource;
        }

        private PlatformTransactionManager getTransactionManager(
                ObjectProvider<PlatformTransactionManager> transactionManager,
                ObjectProvider<PlatformTransactionManager> quartzTransactionManager) {
            PlatformTransactionManager transactionManagerIfAvailable = quartzTransactionManager.getIfAvailable();
            return (transactionManagerIfAvailable != null) ? transactionManagerIfAvailable
                    : transactionManager.getIfUnique();
        }

        @Bean
        @ConditionalOnMissingBean
        public QuartzDataSourceInitializer quartzDataSourceInitializer(DataSource dataSource,
                @QuartzDataSource ObjectProvider<DataSource> quartzDataSource, ResourceLoader resourceLoader,
                QuartzProperties properties) {
            DataSource dataSourceToUse = getDataSource(dataSource, quartzDataSource);
            return new QuartzDataSourceInitializer(dataSourceToUse, resourceLoader, properties);
        }

    }

customizers.orderedStream().forEach((customizer) -> customizer.customize(schedulerFactoryBean)); 進(jìn)行配置葵孤。 Datasource 來(lái)自于DataSourceAutoConfiguration担钮,我們需要指定一個(gè)數(shù)據(jù)源,在application.yaml里

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/MYSLQ_local?serverTimezone=GMT-7&useLegacyDatetimeCode=false
    username: MYSLQ_local
    password: MYSLQ_local_local
  jpa:
    database-platform: org.hibernate.dialect.MySQL8Dialect

在看@ConditionalOnProperty(prefix = "spring.quartz", name = "job-store-type", havingValue = "jdbc") 尤仍,也就是說(shuō)當(dāng)spring.quartz.job-store-type = jdbc 的時(shí)候這個(gè)配置生效箫津。 因此我們?cè)赼pplication.yaml里要配置

spring:
  quartz:
    properties:
      org:
        quartz:
          jobStore:
            tablePrefix: cpm_qrtz_
            isClustered: true
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            useProperties: false
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 50
            threadsInheritContextClassLoaderOfInitializingThread: true
          scheduler:
            instanceId: AUTO
    job-store-type: jdbc

那么有人問(wèn)了,除了job-store-type,上面其他的配置沒(méi)看到什么時(shí)候放進(jìn)去的苏遥。怎么就可以用了呢, 首先看@EnableConfigurationProperties(QuartzProperties.class) 饼拍,它讀取了spring.quartz, 把他放入properties 對(duì)象理。

@ConfigurationProperties("spring.quartz")
public class QuartzProperties {
    private final Map<String, String> properties = new HashMap<>();
    public Map<String, String> getProperties() {
        return this.properties;
    }
}

同時(shí)這個(gè)properties 在QuartzAutoConfiguration.quartzScheduler 方法里被放入schedulerFactoryBean里

        if (!properties.getProperties().isEmpty()) {
            schedulerFactoryBean.setQuartzProperties(asProperties(properties.getProperties()));
        }

Create Scheduler

當(dāng)代碼里有這種@Autowired注入Scheduler的時(shí)候田炭,如下

 @Autowired
 Scheduler scheduler

BeanFactory會(huì)根據(jù)name為Scheduler來(lái)獲取Scheduler,最終會(huì)通過(guò)SchedulerFactoryBean.getObject()來(lái)得到Scheduler师抄。
通過(guò)分析得出FactoryBean.getObject()里得到的是this.scheduler.

public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
        BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
    private Scheduler scheduler;
    public Scheduler getObject() {
            return this.scheduler;
        }
}

但這個(gè)Scheduler 在哪里被構(gòu)造處理呢?SchedulerFactoryBean implements InitializingBean,在InitializingBean里有afterPropertiesSet()

public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>,
        BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
        
        public void afterPropertiesSet() throws Exception {
        ...
        // Initialize the Scheduler instance...
        //這里初始化了Scheduler
        this.scheduler = prepareScheduler(prepareSchedulerFactory());
        try {
            registerListeners();
            registerJobsAndTriggers();
        }
        catch (Exception ex) {
            ...
        }
    }
    
}

bean 生命周期簡(jiǎn)單理解如下教硫,注意InitializingBean#afterPropertiesSet()所處的位置
->construcor
->initialization(各種autowired)
->BeanPostProcessor#postProcessBeforeInitialization
->@postConsruct 或 InitializingBean#afterPropertiesSet() 或 @Bean(initMethod="xxx")
->BeanPostProcessor#postProcessAfterInitialization
->@PreDestroy

看prepareSchedulerFactory()方法叨吮,最后返回了SchedulerFactory,賦值給了SchedulerFactoryBean的scheduler

public class SchedulerFactoryBean{
    
    private Class<? extends SchedulerFactory> schedulerFactoryClass = 
    StdSchedulerFactory.class;
    
    private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
            private SchedulerFactory schedulerFactory;
        //這里這個(gè)SchedulerFactory肯定為空瞬矩,當(dāng)然有辦法可以讓它不為空茶鉴,通過(guò)定義SchedulerFactoryBeanCustomizer來(lái)實(shí)現(xiàn)   
        SchedulerFactory schedulerFactory = this.schedulerFactory;
        if (schedulerFactory == null) {
            // Create local SchedulerFactory instance (typically a StdSchedulerFactory)
            //這里也寫(xiě)了這里是實(shí)例化出StdSchedulerFactory
            schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
            if (schedulerFactory instanceof StdSchedulerFactory) {
            //調(diào)用initSchedulerFactory來(lái)填充StdSchedulerFactory)
            //看過(guò)Quartz的官方demo,就知道StdSchedulerFactory用來(lái)生產(chǎn)出sheduler
            initSchedulerFactory((StdSchedulerFactory) schedulerFactory);
            }
            else if (this.configLocation != null || this.quartzProperties != null ||
                    this.taskExecutor != null || this.dataSource != null) {
                throw new IllegalArgumentException(
                        "StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory);
            }
            // Otherwise, no local settings to be applied via StdSchedulerFactory.initialize(Properties)
        }
        // Otherwise, assume that externally provided factory has been initialized with appropriate settings
        return schedulerFactory;
    }
}

initSchedulerFactory主要是將配置信息配置到schedulerFactory里

private void initSchedulerFactory(StdSchedulerFactory schedulerFactory) throws SchedulerException, IOException {
        Properties mergedProps = new Properties();
        ...    
        CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
        ...
        //這里很重要景用,
        //可以直接通過(guò) application.properties里的配置來(lái)配置quartz.properties里的配置spring.quartz.properties.xxx
        schedulerFactory.initialize(mergedProps);
    }

CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps); 把從application.yaml讀到的配置 覆蓋quartz 缺省的配置涵叮。從而是application.yaml 的配置生效。

prepareScheduler(prepareSchedulerFactory())在一些列的調(diào)用后會(huì)到StdSchedulerFactory的private Scheduler instantiate()伞插。這是個(gè)很長(zhǎng)的方法
围肥,但邏輯還算簡(jiǎn)單,各種初始化,下面只列出jobStore的配置

private Scheduler instantiate() throws SchedulerException {
  //cfg 就是上個(gè)代碼片段里的mergedProps
        if (cfg == null) {
            initialize();
        }
        //jobstore,如果不配蜂怎,默認(rèn)是RAMJobStore
        //在application.properties里可以配置為
        //spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
           String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());
    
    try {
            js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' could not be instantiated.", e);
            throw initException;
        }

        SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

    //這里可以獲取到spring.quartz.properties.org.quartz.jobStore.xxx
    //xxx是jobStore這個(gè)類(lèi)的里屬性,比如isClustered置尔,clusterCheckinInterval等
    //在applicaiton.properties配置成spring.quartz.properties.org.quartz.jobStore.isClustered = true
        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
        try {
            setBeanProps(js, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' props could not be configured.", e);
            throw initException;
        }

        
        
}

因此我們要配置集群就可以在application.properties配置成如下:

spring.quartz.properties.org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
spring.quartz.properties.org.quartz.jobStore.isClustered = true
spring.quartz.properties.org.quartz.jobStore.clusterCheckinInterval = 10000
spring.quartz.properties.org.quartz.scheduler.instanceId = AUTO

再來(lái)看prepareScheduler 方法

private Scheduler prepareScheduler(SchedulerFactory schedulerFactory) throws SchedulerException {
        if (this.resourceLoader != null) {
            // Make given ResourceLoader available for SchedulerFactory configuration.
            configTimeResourceLoaderHolder.set(this.resourceLoader);
        }
        if (this.taskExecutor != null) {
            // Make given TaskExecutor available for SchedulerFactory configuration.
            configTimeTaskExecutorHolder.set(this.taskExecutor);
        }
        if (this.dataSource != null) {
            // Make given DataSource available for SchedulerFactory configuration.
            configTimeDataSourceHolder.set(this.dataSource);
        }
        if (this.nonTransactionalDataSource != null) {
            // Make given non-transactional DataSource available for SchedulerFactory configuration.
            configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
        }

        // Get Scheduler instance from SchedulerFactory.
        try {
            Scheduler scheduler = createScheduler(schedulerFactory, this.schedulerName);
            populateSchedulerContext(scheduler);

            if (!this.jobFactorySet && !(scheduler instanceof RemoteScheduler)) {
                // Use AdaptableJobFactory as default for a local Scheduler, unless when
                // explicitly given a null value through the "jobFactory" bean property.
                this.jobFactory = new AdaptableJobFactory();
            }
            if (this.jobFactory != null) {
                if (this.applicationContext != null && this.jobFactory instanceof ApplicationContextAware) {
                    ((ApplicationContextAware) this.jobFactory).setApplicationContext(this.applicationContext);
                }
                if (this.jobFactory instanceof SchedulerContextAware) {
                    ((SchedulerContextAware) this.jobFactory).setSchedulerContext(scheduler.getContext());
                }
                scheduler.setJobFactory(this.jobFactory);
            }
            return scheduler;
        }

        finally {
            if (this.resourceLoader != null) {
                configTimeResourceLoaderHolder.remove();
            }
            if (this.taskExecutor != null) {
                configTimeTaskExecutorHolder.remove();
            }
            if (this.dataSource != null) {
                configTimeDataSourceHolder.remove();
            }
            if (this.nonTransactionalDataSource != null) {
                configTimeNonTransactionalDataSourceHolder.remove();
            }
        }
    }

這里面最重要的就是createScheduler(schedulerFactory, this.schedulerName); 方法杠步。 從上文可以看出,這個(gè)shcedulerFactory的實(shí)例是StdSchedulerFactory榜轿。

    protected Scheduler createScheduler(SchedulerFactory schedulerFactory, @Nullable String schedulerName)
            throws SchedulerException {

        // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
        Thread currentThread = Thread.currentThread();
        ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
        boolean overrideClassLoader = (this.resourceLoader != null &&
                this.resourceLoader.getClassLoader() != threadContextClassLoader);
        if (overrideClassLoader) {
            currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
        }
        try {
            SchedulerRepository repository = SchedulerRepository.getInstance();
            synchronized (repository) {
                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
                Scheduler newScheduler = schedulerFactory.getScheduler();
                if (newScheduler == existingScheduler) {
                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
                }
                if (!this.exposeSchedulerInRepository) {
                    // Need to remove it in this case, since Quartz shares the Scheduler instance by default!
                    SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
                }
                return newScheduler;
            }
        }
        finally {
            if (overrideClassLoader) {
                // Reset original thread context ClassLoader.
                currentThread.setContextClassLoader(threadContextClassLoader);
            }
        }
    }

去看一下StdSchedulerFactory::getScheduler 方法

    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }

在看instantiate 方法,設(shè)置了啟動(dòng)的各種參數(shù)幽歼,比如threadpool jstore 的class 等等,這個(gè)方法超長(zhǎng)谬盐,就不仔細(xì)描述了甸私,但是最關(guān)鍵的是

    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

在 QuartzScheduler 構(gòu)造函數(shù)里, 啟動(dòng)了一個(gè)任務(wù)分配線程QuartzSchedulerThread

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

QuartzSchedulerThread ::run 方法很長(zhǎng)飞傀, 但是我還是想把他全部列出來(lái)皇型。

@Override
    public void run() {
        int acquiresFailed = 0;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    } catch (RuntimeException e) {
                        if (acquiresFailed == 0) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

找到可以運(yùn)行的 job trigger

這個(gè)run 方法里,做幾點(diǎn)事情砸烦, 第一是找到可以運(yùn)行的 job trigger

                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }

為啥quartz 一個(gè)job 只可以在一個(gè)機(jī)器上運(yùn)行啊 qsRsrcs.getJobStore().acquireNextTriggers主要就是這個(gè)方法

    public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        try {
                            List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                            Set<String> fireInstanceIds = new HashSet<String>();
                            for (FiredTriggerRecord ft : acquired) {
                                fireInstanceIds.add(ft.getFireInstanceId());
                            }
                            for (OperableTrigger tr : result) {
                                if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (SQLException e) {
                            throw new JobPersistenceException("error validating trigger acquisition", e);
                        }
                    }
                });
    }

在這鎖的包裝下弃鸦,進(jìn)行了真正的acquireNextTrigger

        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },

來(lái)看真正的數(shù)據(jù)庫(kù)邏輯

   // FUTURE_TODO: this really ought to return something like a FiredTriggerBundle,
   // so that the fireInstanceId doesn't have to be on the trigger...
   protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
       throws JobPersistenceException {
       if (timeWindow < 0) {
         throw new IllegalArgumentException();
       }
       
       List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
       Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
       final int MAX_DO_LOOP_RETRY = 3;
       int currentLoopCount = 0;
       do {
           currentLoopCount ++;
           try {
               List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
               
               // No trigger is ready to fire yet.
               if (keys == null || keys.size() == 0)
                   return acquiredTriggers;

               long batchEnd = noLaterThan;

               for(TriggerKey triggerKey: keys) {
                   // If our trigger is no longer available, try a new one.
                   OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
                   if(nextTrigger == null) {
                       continue; // next trigger
                   }
                   
                   // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
                   // put it back into the timeTriggers set and continue to search for next trigger.
                   JobKey jobKey = nextTrigger.getJobKey();
                   JobDetail job;
                   try {
                       job = retrieveJob(conn, jobKey);
                   } catch (JobPersistenceException jpe) {
                       try {
                           getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                           getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
                       } catch (SQLException sqle) {
                           getLog().error("Unable to set trigger state to ERROR.", sqle);
                       }
                       continue;
                   }
                   
                   if (job.isConcurrentExectionDisallowed()) {
                       if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
                           continue; // next trigger
                       } else {
                           acquiredJobKeysForNoConcurrentExec.add(jobKey);
                       }
                   }

                   Date nextFireTime = nextTrigger.getNextFireTime();

                   // A trigger should not return NULL on nextFireTime when fetched from DB.
                   // But for whatever reason if we do have this (BAD trigger implementation or
                   // data?), we then should log a warning and continue to next trigger.
                   // User would need to manually fix these triggers from DB as they will not
                   // able to be clean up by Quartz since we are not returning it to be processed.
                   if (nextFireTime == null) {
                       log.warn("Trigger {} returned null on nextFireTime and yet still exists in DB!",
                           nextTrigger.getKey());
                       continue;
                   }
                   
                   if (nextFireTime.getTime() > batchEnd) {
                     break;
                   }
                   // We now have a acquired trigger, let's add to return list.
                   // If our trigger was no longer in the expected state, try a new one.
                   int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                   if (rowsUpdated <= 0) {
                       continue; // next trigger
                   }
                   nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                   getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

                   if(acquiredTriggers.isEmpty()) {
                       batchEnd = Math.max(nextFireTime.getTime(), System.currentTimeMillis()) + timeWindow;
                   }
                   acquiredTriggers.add(nextTrigger);
               }

               // if we didn't end up with any trigger to fire from that first
               // batch, try again for another batch. We allow with a max retry count.
               if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                   continue;
               }
               
               // We are done with the while loop.
               break;
           } catch (Exception e) {
               throw new JobPersistenceException(
                         "Couldn't acquire next trigger: " + e.getMessage(), e);
           }
       } while (true);
       
       // Return the acquired trigger list
       return acquiredTriggers;
   }

主要看兩個(gè)方法

  • 選擇一個(gè)trigger
    List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

  • 標(biāo)注trigger 被占用,不用被其他的instance選擇了

                    // We now have a acquired trigger, let's add to return list.
                    // If our trigger was no longer in the expected state, try a new one.
                    int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                    if (rowsUpdated <= 0) {
                        continue; // next trigger
                    }
                    nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                    getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

運(yùn)行job

回到前文QuartzSchedulerThread的run 方法

                      for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }

主要是看下面這段幢痘, 去真正運(yùn)行一個(gè)job

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

咱們來(lái)看 shell.initialize(qs);

    public void initialize(QuartzScheduler sched)
        throws SchedulerException {
        this.qs = sched;

        Job job = null;
        JobDetail jobDetail = firedTriggerBundle.getJobDetail();

        try {
            job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
        } catch (SchedulerException se) {
            sched.notifySchedulerListenersError(
                    "An error occured instantiating job to be executed. job= '"
                            + jobDetail.getKey() + "'", se);
            throw se;
        } catch (Throwable ncdfe) { // such as NoClassDefFoundError
            SchedulerException se = new SchedulerException(
                    "Problem instantiating class '"
                            + jobDetail.getJobClass().getName() + "' - ", ncdfe);
            sched.notifySchedulerListenersError(
                    "An error occured instantiating job to be executed. job= '"
                            + jobDetail.getKey() + "'", se);
            throw se;
        }

        this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
    }

此時(shí)sched.getJobFactory()來(lái)生成一個(gè)job 唬格,這個(gè)jobFactory 是哪里set的? 在回到最開(kāi)始的源頭QuartzAutoConfiguration::quartzScheduler

        SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        schedulerFactoryBean.setJobFactory(jobFactory);

可以看到j(luò)obFactory 是SpringBeanJobFactory::createJobInstance

    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object job = (this.applicationContext != null ?
                this.applicationContext.getAutowireCapableBeanFactory().createBean(
                        bundle.getJobDetail().getJobClass(), AutowireCapableBeanFactory.AUTOWIRE_CONSTRUCTOR, false) :
                super.createJobInstance(bundle));

        if (isEligibleForPropertyPopulation(job)) {
            BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job);
            MutablePropertyValues pvs = new MutablePropertyValues();
            if (this.schedulerContext != null) {
                pvs.addPropertyValues(this.schedulerContext);
            }
            pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap());
            pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
            if (this.ignoredUnknownProperties != null) {
                for (String propName : this.ignoredUnknownProperties) {
                    if (pvs.contains(propName) && !bw.isWritableProperty(propName)) {
                        pvs.removePropertyValue(propName);
                    }
                }
                bw.setPropertyValues(pvs);
            }
            else {
                bw.setPropertyValues(pvs, true);
            }
        }

        return job;
    }

可以看到它在第一行就把傳入的TriggerFiredBundle 的job class 變成了一個(gè)spring bean,到這里才恍然大悟购岗。 這也就是為什么在本文開(kāi)頭一個(gè)QuartzJobBean 沒(méi)加任何annotation汰聋,為什么還可以autowired對(duì)象。

當(dāng)qsRsrcs.getThreadPool().runInThread(shell) 時(shí)喊积, 就是真正的運(yùn)行job 了烹困。 后續(xù)還有一些邏輯, 比如release trigger 之類(lèi)的就不仔細(xì)描述了注服。 本文只是走一些quatrz 主要是如何spring boot 集成的韭邓。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市溶弟,隨后出現(xiàn)的幾起案子女淑,更是在濱河造成了極大的恐慌,老刑警劉巖辜御,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸭你,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡擒权,警方通過(guò)查閱死者的電腦和手機(jī)袱巨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)碳抄,“玉大人愉老,你說(shuō)我怎么就攤上這事∑市В” “怎么了嫉入?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)璧尸。 經(jīng)常有香客問(wèn)我咒林,道長(zhǎng),這世上最難降的妖魔是什么爷光? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任垫竞,我火速辦了婚禮,結(jié)果婚禮上蛀序,老公的妹妹穿的比我還像新娘欢瞪。我一直安慰自己,他們只是感情好徐裸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布引有。 她就那樣靜靜地躺著,像睡著了一般倦逐。 火紅的嫁衣襯著肌膚如雪譬正。 梳的紋絲不亂的頭發(fā)上宫补,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音曾我,去河邊找鬼粉怕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛抒巢,可吹牛的內(nèi)容都是我干的贫贝。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蛉谜,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼稚晚!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起型诚,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤客燕,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后狰贯,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體也搓,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年涵紊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了傍妒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡摸柄,死狀恐怖颤练,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情驱负,我是刑警寧澤昔案,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站电媳,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏庆亡。R本人自食惡果不足惜匾乓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望又谋。 院中可真熱鬧拼缝,春花似錦、人聲如沸彰亥。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)任斋。三九已至继阻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瘟檩。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工抹缕, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人墨辛。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓卓研,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親睹簇。 傳聞我的和親對(duì)象是個(gè)殘疾皇子奏赘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容