流程數(shù)據(jù)的定義在flowable中是比較復雜的, 涉及到多張數(shù)據(jù)庫表關聯(lián)關系刁品,這些在一個流程引擎中也是最為核心的數(shù)據(jù)粉渠,并且需要進行頻繁的讀取逆皮,所以為了實現(xiàn)讀取的高效性辽装,flowable將流程定義的數(shù)據(jù)放在內(nèi)存中闽寡,需要時直接從內(nèi)存中獲取。本文就簡單看一看flowable是怎么實現(xiàn)緩存的尼酿。
1. 用戶定義的入口
先回顧一下爷狈,如果我們要在Spring環(huán)境中使用flowable必須采用類似方式定義一個bean
<bean id="processEngineConfiguration"
class="org.flowable.spring.SpringProcessEngineConfiguration">
SpringProcessEngineConfiguration繼承于ProcessEngineConfigurationImpl,在ProcessEngineConfigurationImpl源碼中可以看到:
protected int processDefinitionCacheLimit = -1; // By default, no limit
protected DeploymentCache<ProcessDefinitionCacheEntry> processDefinitionCache;
protected int processDefinitionInfoCacheLimit = -1; // By default, no limit
protected ProcessDefinitionInfoCache processDefinitionInfoCache;
protected int knowledgeBaseCacheLimit = -1;
protected DeploymentCache<Object> knowledgeBaseCache;
protected int appResourceCacheLimit = -1;
protected DeploymentCache<Object> appResourceCache;
因此我們可以在bean定義中可以設置processDefinitionCacheLimit等屬性的值即可控制緩存的容量裳擎,如果沒有進行設置將不會對其限制涎永,為以防止OOM異常,建議可以設置一個鹿响,當超出容量時flowable引擎將會通過LRU算法進行移除羡微。
2. 緩存的初始化
在flowable流程引擎在Spring環(huán)境中的啟動源碼分析這篇文章我們已經(jīng)知道了flowble流程的初始化過程,那么對緩存的初始化肯定也能在相關內(nèi)里找到惶我,在ProcessEngineConfigurationImpl的init方法中有如下幾行代碼:
initProcessDefinitionCache();
initProcessDefinitionInfoCache();
initAppResourceCache();
initKnowledgeBaseCache();
以initProcessDefinitionCache為例看一下方法實現(xiàn)妈倔,不用多說:
public void initProcessDefinitionCache() {
if (processDefinitionCache == null) {
if (processDefinitionCacheLimit <= 0) {
processDefinitionCache = new DefaultDeploymentCache<ProcessDefinitionCacheEntry>();
} else {
processDefinitionCache = new DefaultDeploymentCache<ProcessDefinitionCacheEntry>(processDefinitionCacheLimit);
}
}
}
進入DefaultDeploymentCache的構造方法,當沒有設置緩存大小時通過無參構造方法創(chuàng)建的是一個同步的Map, 重點可以看一下下面的有參構造函數(shù)實現(xiàn):
/** Cache with no limit */
public DefaultDeploymentCache() {
this.cache = Collections.synchronizedMap(new HashMap<String, T>());
}
/**
* Cache which has a hard limit: no more elements will be cached than the limit.
*/
public DefaultDeploymentCache(final int limit) {
this.cache = Collections.synchronizedMap(new LinkedHashMap<String, T>(limit + 1, 0.75f, true) { // +1 is needed, because the entry is inserted first, before it is removed
// 0.75 is the default (see javadocs)
// true will keep the 'access-order', which is needed to have a real LRU cache
private static final long serialVersionUID = 1L;
protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
boolean removeEldest = size() > limit;
if (removeEldest && logger.isTraceEnabled()) {
logger.trace("Cache limit is reached, {} will be evicted", eldest.getKey());
}
return removeEldest;
}
});
}
有參構造方法核心是基于LinkedHashMap并且重寫了removeEldestEntry方法指孤,當超出容量時會返回true, 查看LinkedHashMap可以知道當調(diào)用put或putAll返回前會根據(jù)該方法返回的值決定是否移除最老的一個元素启涯,從而實現(xiàn)了LRU緩存算法。
3. 緩存的寫入與更新
在流程部署時肯定會涉及到相關數(shù)據(jù)的更新恃轩,通過RepositoryServiceImpl.deploy->DeployCmd.executeDeploy查看有如下代碼:
// Actually deploy
commandContext.getProcessEngineConfiguration().getDeploymentManager().deploy(deployment, deploymentSettings);
查看DeploymentManager.deploy->BpmnDeployer.deploy代碼:
cachingAndArtifactsManager.updateCachingAndArtifacts(parsedDeployment);
然后想看CachingAndArtifactsManager.updateCachingAndArtifacts方法源碼即具體更新緩存的實現(xiàn):
/**
* Ensures that the process definition is cached in the appropriate places, including the deployment's collection of deployed artifacts and the deployment manager's cache, as well as caching any
* ProcessDefinitionInfos.
*/
public void updateCachingAndArtifacts(ParsedDeployment parsedDeployment) {
CommandContext commandContext = Context.getCommandContext();
final ProcessEngineConfigurationImpl processEngineConfiguration = Context.getProcessEngineConfiguration();
DeploymentCache<ProcessDefinitionCacheEntry> processDefinitionCache = processEngineConfiguration.getDeploymentManager().getProcessDefinitionCache();
DeploymentEntity deployment = parsedDeployment.getDeployment();
for (ProcessDefinitionEntity processDefinition : parsedDeployment.getAllProcessDefinitions()) {
BpmnModel bpmnModel = parsedDeployment.getBpmnModelForProcessDefinition(processDefinition);
Process process = parsedDeployment.getProcessModelForProcessDefinition(processDefinition);
ProcessDefinitionCacheEntry cacheEntry = new ProcessDefinitionCacheEntry(processDefinition, bpmnModel, process);
processDefinitionCache.add(processDefinition.getId(), cacheEntry);
addDefinitionInfoToCache(processDefinition, processEngineConfiguration, commandContext);
// Add to deployment for further usage
deployment.addDeployedArtifact(processDefinition);
}
}
4. 緩存的讀取
一個典型的讀取場景就是在啟動流程的時候结洼,所以查看RuntimeServiceImpl.startProcessInstanceByKey->StartProcessInstanceCmd.execute方法源碼:
// Find the process definition
ProcessDefinition processDefinition = null;
if (processDefinitionId != null) {
processDefinition = deploymentCache.findDeployedProcessDefinitionById(processDefinitionId);
if (processDefinition == null) {
throw new FlowableObjectNotFoundException("No process definition found for id = '" + processDefinitionId + "'", ProcessDefinition.class);
}
}
接下來進入DeploymentManager.findDeployedProcessDefinitionById可以看到, 首先會從緩存中查找,如果沒有則從數(shù)據(jù)庫中加載:
public ProcessDefinition findDeployedProcessDefinitionById(String processDefinitionId) {
if (processDefinitionId == null) {
throw new FlowableIllegalArgumentException("Invalid process definition id : null");
}
// first try the cache
ProcessDefinitionCacheEntry cacheEntry = processDefinitionCache.get(processDefinitionId);
ProcessDefinition processDefinition = cacheEntry != null ? cacheEntry.getProcessDefinition() : null;
if (processDefinition == null) {
processDefinition = processDefinitionEntityManager.findById(processDefinitionId);
if (processDefinition == null) {
throw new FlowableObjectNotFoundException("no deployed process definition found with id '" + processDefinitionId + "'", ProcessDefinition.class);
}
processDefinition = resolveProcessDefinition(processDefinition).getProcessDefinition();
}
return processDefinition;
}
然后看一下resolveProcessDefinition方法, 當緩存中沒有數(shù)據(jù)時會調(diào)用deploy方法來重新加載緩存叉跛。
/**
* Resolving the process definition will fetch the BPMN 2.0, parse it and store the {@link BpmnModel} in memory.
*/
public ProcessDefinitionCacheEntry resolveProcessDefinition(ProcessDefinition processDefinition) {
String processDefinitionId = processDefinition.getId();
String deploymentId = processDefinition.getDeploymentId();
ProcessDefinitionCacheEntry cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
if (cachedProcessDefinition == null) {
if (Flowable5Util.isFlowable5ProcessDefinition(processDefinition, processEngineConfiguration)) {
return Flowable5Util.getFlowable5CompatibilityHandler().resolveProcessDefinition(processDefinition);
}
DeploymentEntity deployment = deploymentEntityManager.findById(deploymentId);
deployment.setNew(false);
deploy(deployment, null);
cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
if (cachedProcessDefinition == null) {
throw new FlowableException("deployment '" + deploymentId + "' didn't put process definition '" + processDefinitionId + "' in the cache");
}
}
return cachedProcessDefinition;
}
總結一下:flowable緩存的實現(xiàn)核心即基于LinkedHashMap并通過重寫其removeEldestEntry方法實現(xiàn)LRU緩存移除算法松忍。以流程定義緩存為例可以知道,每次部署時會將流程定義的數(shù)據(jù)加入緩存筷厘,每次流程啟動時都會嘗試去緩存中獲取數(shù)據(jù)鸣峭,如果緩存中有就直接返回,如果沒有就從數(shù)據(jù)庫中加載并放入緩存以供下次使用酥艳。