本章閱讀收獲:可了解Quartz框架中的Scheduler部分源碼
繼上一節(jié)內(nèi)容
上一節(jié)內(nèi)容我們講到了instantiate初始化方法中的數(shù)據(jù)庫(kù)連接相關(guān)操作,本節(jié)內(nèi)容將結(jié)束整個(gè)schedule初始化操作员帮,讓我們繼續(xù)耐心往下看~~~~
instantiate初始化過(guò)程源碼分析
在數(shù)據(jù)庫(kù)相關(guān)操作之后,接下來(lái)是插件相關(guān)操作~
// Set up any SchedulerPlugins
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
for (int i = 0; i < pluginNames.length; i++) {
Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
+ pluginNames[i], true);
String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);
if (plugInClass == null) {
initException = new SchedulerException(
"SchedulerPlugin class not specified for plugin '"
+ pluginNames[i] + "'");
throw initException;
}
SchedulerPlugin plugin = null;
try {
plugin = (SchedulerPlugin)
loadHelper.loadClass(plugInClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException(
"SchedulerPlugin class '" + plugInClass
+ "' could not be instantiated.", e);
throw initException;
}
try {
setBeanProps(plugin, pp);
} catch (Exception e) {
initException = new SchedulerException(
"JobStore SchedulerPlugin '" + plugInClass
+ "' props could not be configured.", e);
throw initException;
}
plugins[i] = plugin;
}
這段代碼非常明顯,就是根據(jù)我們配置的插件類(lèi)來(lái)進(jìn)行初始化注入?yún)?shù)工作辞州。目前也不做過(guò)度的展開(kāi)~~~
// Set up any JobListeners
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Class<?>[] strArg = new Class[] { String.class };
String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
JobListener[] jobListeners = new JobListener[jobListenerNames.length];
for (int i = 0; i < jobListenerNames.length; i++) {
Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
+ jobListenerNames[i], true);
String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
if (listenerClass == null) {
initException = new SchedulerException(
"JobListener class not specified for listener '"
+ jobListenerNames[i] + "'");
throw initException;
}
JobListener listener = null;
try {
listener = (JobListener)
loadHelper.loadClass(listenerClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException(
"JobListener class '" + listenerClass
+ "' could not be instantiated.", e);
throw initException;
}
try {
Method nameSetter = null;
try {
nameSetter = listener.getClass().getMethod("setName", strArg);
}
catch(NoSuchMethodException ignore) {
/* do nothing */
}
if(nameSetter != null) {
nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
}
setBeanProps(listener, lp);
} catch (Exception e) {
initException = new SchedulerException(
"JobListener '" + listenerClass
+ "' props could not be configured.", e);
throw initException;
}
jobListeners[i] = listener;
}
這段源碼內(nèi)容作用相信大家大致也能猜的出來(lái),就是關(guān)于Job的監(jiān)聽(tīng)器初始化工作寥粹。我們看到可以配置多個(gè)Job監(jiān)聽(tīng)器变过,這里有一個(gè)特殊點(diǎn)就是:
try {
nameSetter = listener.getClass().getMethod("setName", strArg);
}
catch(NoSuchMethodException ignore) {
/* do nothing */
}
if(nameSetter != null) {
nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
}
也不難理解,就是獲取setName方法涝涤,并把配置的jobListener名字注入進(jìn)去媚狰。至于為什么這里不直接使用setBeanProps(listener, lp);方式直接注入,這是個(gè)疑問(wèn)點(diǎn)阔拳?
同理我們看下觸發(fā)器監(jiān)聽(tīng)器的初始化代碼崭孤,代碼基本一致,也不做過(guò)多介紹:
// Set up any TriggerListeners
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
for (int i = 0; i < triggerListenerNames.length; i++) {
Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
+ triggerListenerNames[i], true);
String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
if (listenerClass == null) {
initException = new SchedulerException(
"TriggerListener class not specified for listener '"
+ triggerListenerNames[i] + "'");
throw initException;
}
TriggerListener listener = null;
try {
listener = (TriggerListener)
loadHelper.loadClass(listenerClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException(
"TriggerListener class '" + listenerClass
+ "' could not be instantiated.", e);
throw initException;
}
try {
Method nameSetter = null;
try {
nameSetter = listener.getClass().getMethod("setName", strArg);
}
catch(NoSuchMethodException ignore) { /* do nothing */ }
if(nameSetter != null) {
nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );
}
setBeanProps(listener, lp);
} catch (Exception e) {
initException = new SchedulerException(
"TriggerListener '" + listenerClass
+ "' props could not be configured.", e);
throw initException;
}
triggerListeners[i] = listener;
}
最后一個(gè)初始化屬性是線程執(zhí)行器:
// Get ThreadExecutor Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
if (threadExecutorClass != null) {
tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
try {
threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);
setBeanProps(threadExecutor, tProps);
} catch (Exception e) {
initException = new SchedulerException(
"ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);
throw initException;
}
} else {
log.info("Using default implementation for ThreadExecutor");
threadExecutor = new DefaultThreadExecutor();
}
這里也是千篇一律的加載threadExecutor類(lèi)糊肠。
接下來(lái)終于要到初始化的高潮了辨宠,也就是最后一步了~~~~開(kāi)搞!;豕嗤形!
// Fire everything up
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
boolean tpInited = false;
boolean qsInited = false;
try {
//這是一個(gè)封裝執(zhí)行Job的工廠類(lèi),這里會(huì)有一個(gè)JobRunShell類(lèi)泪酱,主要是為了一些listener的執(zhí)行和異常處理派殷,這里不做深入展開(kāi)
JobRunShellFactory jrsf = null; // Create correct run-shell factory...
//是否設(shè)置事務(wù)
if (userTXLocation != null) {
UserTransactionHelper.setUserTxLocation(userTXLocation);
}
//包裝任務(wù)進(jìn)事務(wù)中,默認(rèn)false
if (wrapJobInTx) {
jrsf = new JTAJobRunShellFactory();
} else {
jrsf = new JTAAnnotationAwareJobRunShellFactory();
}
//設(shè)置任務(wù)調(diào)度器唯一標(biāo)識(shí)符
if (autoId) {
try {
schedInstId = DEFAULT_INSTANCE_ID;
//如果JobStore是集群墓阀,必須要有調(diào)度器唯一標(biāo)識(shí)符生成器
if (js.isClustered()) {
schedInstId = instanceIdGenerator.generateInstanceId();
}
} catch (Exception e) {
getLog().error("Couldn't generate instance Id!", e);
throw new IllegalStateException("Cannot run without an instance id.");
}
}
//應(yīng)該和JMX相關(guān)毡惜,而我們默認(rèn)實(shí)現(xiàn)的是RAMJobStore,開(kāi)頭是org.quartz.simpl斯撮,所以不做深度進(jìn)去解析
if (js.getClass().getName().startsWith("org.terracotta.quartz")) {
try {
String uuid = (String) js.getClass().getMethod("getUUID").invoke(js);
if(schedInstId.equals(DEFAULT_INSTANCE_ID)) {
schedInstId = "TERRACOTTA_CLUSTERED,node=" + uuid;
if (jmxObjectName == null) {
jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
}
} else if(jmxObjectName == null) {
jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId + ",node=" + uuid);
}
} catch(Exception e) {
throw new RuntimeException("Problem obtaining node id from TerracottaJobStore.", e);
}
if(null == cfg.getStringProperty(PROP_SCHED_JMX_EXPORT)) {
jmxExport = true;
}
}
//我們默認(rèn)實(shí)現(xiàn)的是RAMJobStore经伙,也不是JobStoreSupport的實(shí)現(xiàn)類(lèi),JobStoreSupport主要是用于數(shù)據(jù)庫(kù)存儲(chǔ)信息
if (js instanceof JobStoreSupport) {
JobStoreSupport jjs = (JobStoreSupport)js;
//存儲(chǔ)失敗重試時(shí)間
jjs.setDbRetryInterval(dbFailureRetry);
//這部分代碼目前還不太懂
if(threadsInheritInitalizersClassLoader)
jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
//把之前初始化成功的線程執(zhí)行器放入
jjs.setThreadExecutor(threadExecutor);
}
//開(kāi)始創(chuàng)建調(diào)度器資源類(lèi)
QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
rsrcs.setName(schedName);
rsrcs.setThreadName(threadName);
rsrcs.setInstanceId(schedInstId);
rsrcs.setJobRunShellFactory(jrsf);
rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
rsrcs.setBatchTimeWindow(batchTimeWindow);
rsrcs.setMaxBatchSize(maxBatchSize);
rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
rsrcs.setJMXExport(jmxExport);
rsrcs.setJMXObjectName(jmxObjectName);
//這塊也不是太懂
if (managementRESTServiceEnabled) {
ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
}
//這里是rmi相關(guān),也跳過(guò)
if (rmiExport) {
rsrcs.setRMIRegistryHost(rmiHost);
rsrcs.setRMIRegistryPort(rmiPort);
rsrcs.setRMIServerPort(rmiServerPort);
rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
rsrcs.setRMIBindName(rmiBindName);
}
// ThreadPool tp 是線程池帕膜,這里是對(duì)setInstanceName枣氧,setInstanceId賦值
SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);
//在調(diào)度器資源類(lèi)中放入線程執(zhí)行器
rsrcs.setThreadExecutor(threadExecutor);
//線程執(zhí)行器初始化,看了下默認(rèn)的垮刹,就是一個(gè)空方法
threadExecutor.initialize();
//設(shè)置線程池
rsrcs.setThreadPool(tp);
if(tp instanceof SimpleThreadPool) {
if(threadsInheritInitalizersClassLoader)
((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
}
tp.initialize();
tpInited = true;
rsrcs.setJobStore(js);
// add plugins
for (int i = 0; i < plugins.length; i++) {
rsrcs.addSchedulerPlugin(plugins[i]);
}
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
qsInited = true;
// 關(guān)鍵點(diǎn)达吞,創(chuàng)建出Scheduler,默認(rèn)是StdScheduler
Scheduler scheduler = instantiate(rsrcs, qs);
// set job factory if specified
if(jobFactory != null) {
qs.setJobFactory(jobFactory);
}
// Initialize plugins now that we have a Scheduler instance.
for (int i = 0; i < plugins.length; i++) {
plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
}
// add listeners
for (int i = 0; i < jobListeners.length; i++) {
qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
}
for (int i = 0; i < triggerListeners.length; i++) {
qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
}
// set scheduler context data...
for(Object key: schedCtxtProps.keySet()) {
String val = schedCtxtProps.getProperty((String) key);
scheduler.getContext().put((String)key, val);
}
// 開(kāi)始初始化JobStore
js.setInstanceId(schedInstId);
js.setInstanceName(schedName);
js.setThreadPoolSize(tp.getPoolSize());
js.initialize(loadHelper, qs.getSchedulerSignaler());
//其實(shí)就是在jrsf注入scheduler變量
jrsf.initialize(scheduler);
//這里做了遠(yuǎn)程綁定荒典,如果沒(méi)有的話會(huì)直接跳過(guò)
qs.initialize();
getLog().info(
"Quartz scheduler '" + scheduler.getSchedulerName()
+ "' initialized from " + propSrc);
getLog().info("Quartz scheduler version: " + qs.getVersion());
// prevents the repository from being garbage collected
qs.addNoGCObject(schedRep);
// prevents the db manager from being garbage collected
if (dbMgr != null) {
qs.addNoGCObject(dbMgr);
}
//調(diào)用程序庫(kù)SchedulerRepository中新添加scheduler
schedRep.bind(scheduler);
return scheduler;
}
catch(SchedulerException e) {
shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
throw e;
}
catch(RuntimeException re) {
shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
throw re;
}
catch(Error re) {
shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
throw re;
}
這里做了什么呢酪劫?總結(jié)來(lái)說(shuō),就是各種綁定寺董。分布來(lái)說(shuō)的話就是:
- 創(chuàng)建一個(gè)JobRunShellFactory覆糟,這是JobRunShell類(lèi)的工廠類(lèi),主要是為了一些listener的執(zhí)行和異常處理
- 創(chuàng)建QuartzSchedulerResources遮咖,也就是QuartzScheduler的資源類(lèi)滩字,然后把各類(lèi)資源都賦值進(jìn)去
- 創(chuàng)建出關(guān)鍵的StdScheduler,這個(gè)和QuartzScheduler不同御吞,StdScheduler持有QuartzScheduler
- SchedulerRepository添加新的StdScheduler
- 最后返回StdScheduler
結(jié)束語(yǔ)
本節(jié)已經(jīng)結(jié)束了所有的instantiate初始化操作麦箍,是實(shí)話的話還有很多細(xì)節(jié)點(diǎn)沒(méi)有深入進(jìn)去,所以可能會(huì)造成讀者部分會(huì)有點(diǎn)懵陶珠。但是在后面如果執(zhí)行調(diào)用的時(shí)候内列,我們會(huì)繼續(xù)來(lái)深入分析。