導(dǎo)語(yǔ):本文以引擎相關(guān)的物料管理流程為切入點(diǎn)吓肋,同時(shí)結(jié)合底層數(shù)據(jù)模型和源碼李剖,為大家詳細(xì)剖析引擎物料管理功能的實(shí)現(xiàn)細(xì)節(jié),期望能夠幫助大家更好地理解BML(物料庫(kù))服務(wù)的架構(gòu)琴儿。
1.BML物料庫(kù)服務(wù)
BML物料庫(kù)是Linkis中PublicEnhancementService(PS)——公共增強(qiáng)服務(wù)架構(gòu)下的功能模塊。
在Linkis的架構(gòu)體系里坐桩,物料
的概念是指被統(tǒng)一存儲(chǔ)托管起來(lái)的各種文件數(shù)據(jù)碾局,包括腳本代碼、資源文件、第三方j(luò)ar蹭睡、引擎啟動(dòng)時(shí)所需的相關(guān)類(lèi)庫(kù)和配置文件以及用于安全認(rèn)證的keytab文件等放闺。
總之搬葬,任何以文件態(tài)存在的數(shù)據(jù)抡锈,都可以被集中托管在物料庫(kù)之中补疑,然后在各自所需的場(chǎng)景中被下載使用竭望。
物料服務(wù)是無(wú)狀態(tài)的奈虾,可進(jìn)行多實(shí)例部署,做到服務(wù)高可用,每個(gè)實(shí)例對(duì)外提供獨(dú)立的服務(wù)戳稽,互不干擾乓序,所有物料元數(shù)據(jù)及版本信息等在數(shù)據(jù)庫(kù)中共享耿戚,底層物料數(shù)據(jù)可被存儲(chǔ)到HDFS或本地(共享)文件系統(tǒng)之中,以及支持實(shí)現(xiàn)文件存儲(chǔ)相關(guān)的接口蘑辑,擴(kuò)展其他文件存儲(chǔ)系統(tǒng)等喜鼓。
物料服務(wù)提供精確的權(quán)限控制,對(duì)于引擎資源類(lèi)型的物料捍掺,可被所有用戶(hù)共享訪問(wèn)麦备;對(duì)于一些含有敏感信息的物料數(shù)據(jù),也可做到僅有限用戶(hù)可讀昭娩。
物料文件采用追加的方式凛篙,可將多個(gè)版本的資源文件合并成一個(gè)大文件,避免產(chǎn)生過(guò)多的HDFS小文件题禀,HDFS小文件過(guò)多會(huì)導(dǎo)致HDFS整體性能的下降鞋诗。
物料服務(wù)提供了文件上傳、更新迈嘹、下載等操作任務(wù)的生命周期管理削彬。同時(shí)全庸,使用物料服務(wù)的方式有rest接口和SDK兩種形式,用戶(hù)可以根據(jù)自己的需要進(jìn)行選擇融痛。
BML架構(gòu)圖如下:
上述關(guān)于BML架構(gòu)的概述壶笼,有參考官網(wǎng)文檔:https://linkis.apache.org/zh-CN/docs/latest/architecture/public_enhancement_services/bml
2. BML物料庫(kù)服務(wù)底層表模型
在深入理解BML物料管理的流程細(xì)節(jié)之前,有必要先梳理下BML物料管理服務(wù)底層依賴(lài)的數(shù)據(jù)庫(kù)表模型雁刷。
可結(jié)合Linkis的linkis_ddl.sql
文件以及下文內(nèi)容闡述的引擎物料上傳和更新流程來(lái)理解bml resources相關(guān)表的字段含義以及表與表之間的字段關(guān)系覆劈。
3. BML物料庫(kù)服務(wù)的使用場(chǎng)景
目前在Linkis中,BML物料庫(kù)服務(wù)的使用場(chǎng)景包括:
引擎物料文件沛励,包括引擎啟動(dòng)時(shí)所需的conf和lib中的文件
存儲(chǔ)腳本责语,比如工作流任務(wù)節(jié)點(diǎn)鏈接的Scripts中的腳本是存儲(chǔ)在BML物料庫(kù)中的
DSS中工作流內(nèi)容版本管理
任務(wù)運(yùn)行時(shí)所需資源文件管理
4. 引擎物料管理流程剖析
引擎物料
是Linkis物料概念中的一個(gè)子集,其作用是為引擎啟動(dòng)時(shí)提供最新版本的jar包資源和配置文件等目派。本小節(jié)主要從引擎物料管理功能為切入點(diǎn)坤候,剖析引擎物料數(shù)據(jù)在BML中的流轉(zhuǎn)細(xì)節(jié)。
4.1 引擎物料說(shuō)明
對(duì)Linkis的安裝包正常部署之后企蹭,在LINKIS_INSTALL_HOME/lib/linkis-engineconn-plugins
目錄之下可以看到所有的引擎物料目錄白筹,以jdbc引擎為例,引擎物料目錄的結(jié)構(gòu)如下:
jdbc
├── dist
│?? └── v4
│?? ├── conf
│?? ├── conf.zip
│?? ├── lib
│?? └── lib.zip
└── plugin
└── 4 └── linkis-engineplugin-jdbc-1.1.2.jar
物料目錄構(gòu)成:
jdbc/dist/版本號(hào)/conf.zip
jdbc/dist/版本號(hào)/lib.zip
jdbc/plugin/版本號(hào)(去v留數(shù)字)/linkis-engineplugin-引擎名稱(chēng)-1.1.x.jar
conf.zip和lib.zip會(huì)作為引擎物料被托管在物料管理服務(wù)中谅摄,本地每次對(duì)物料conf或lib進(jìn)行修改之后徒河,對(duì)應(yīng)物料會(huì)產(chǎn)生一個(gè)新的版本號(hào),物料文件數(shù)據(jù)會(huì)被重新上傳送漠。引擎啟動(dòng)時(shí)顽照,會(huì)獲取最新版本號(hào)的物料數(shù)據(jù),加載lib和conf并啟動(dòng)引擎的java進(jìn)程螺男。
4.2 引擎物料上傳和更新流程
在Linkis完成部署并首次啟動(dòng)時(shí)棒厘,會(huì)觸發(fā)引擎物料(lib.zip和conf.zip)首次上傳至物料庫(kù);當(dāng)引擎lib下jar包或conf中引擎配置文件有修改時(shí)下隧,則需要觸發(fā)引擎物料的刷新機(jī)制來(lái)保證引擎啟動(dòng)時(shí)能夠加載到最新的物料文件奢人。
以現(xiàn)在Linkis1.1.x版本為例,觸發(fā)引擎物料刷新的兩種方式有兩種:
通過(guò)命令sh sbin/linkis-daemon.sh restart cg-engineplugin
重啟engineplugin服務(wù)
通過(guò)請(qǐng)求引擎物料刷新的接口
# 刷新所有引擎物料
curl --cookie "linkis_user_session_ticket_id_v1=kN4HCk555Aw04udC1Npi4ttKa3duaCOv2HLiVea4FcQ=" http://127.0.0.1:9001/api/rest_j/v1/engineplugin/refreshAll
# 指定引擎類(lèi)型和版本刷新物料
curl --cookie "linkis_user_session_ticket_id_v1=kN4HCk555Aw04udC1Npi4ttKa3duaCOv2HLiVea4FcQ=" http://127.0.0.1:9001/api/rest_j/v1/engineplugin/refresh?ecType=jdbc&version=4
這兩種引擎物料的刷新方式淆院,其底層的實(shí)現(xiàn)機(jī)制是一樣的何乎,都是調(diào)用了EngineConnResourceService
類(lèi)中的refreshAll()或refresh()方法。
在抽象類(lèi)EngineConnResourceService
的默認(rèn)實(shí)現(xiàn)類(lèi)DefaultEngineConnResourceService
中的init()方法內(nèi)部土辩,通過(guò)參數(shù)wds.linkis.engineconn.dist.load.enable(默認(rèn)為true)來(lái)控制是否在每次啟動(dòng)engineplugin服務(wù)時(shí)都執(zhí)行refreshAll(false)來(lái)檢查所有引擎物料是否有更新(其中faslse代表異步獲取執(zhí)行結(jié)果)支救。
init()方法被注解@PostConstruct修飾,在DefaultEngineConnResourceService加載后拷淘,對(duì)象使用前執(zhí)行各墨,且只執(zhí)行一次。
手動(dòng)調(diào)用engineplugin/refresh的接口启涯,即手動(dòng)執(zhí)行了EngineConnResourceService
類(lèi)中的refreshAll或refresh方法贬堵。
所以引擎物料檢測(cè)更新的邏輯在DefaultEngineConnResourceService
中的refreshAll和refresh方法內(nèi)恃轩。
其中refreshAll()的核心邏輯是:
1)通過(guò)參數(shù)wds.linkis.engineconn.home獲取引擎的安裝目錄,默認(rèn)是:
getEngineConnsHome = Configuration.getLinkisHome() + "/lib/linkis-engineconn-plugins";
2)遍歷引擎目錄
getEngineConnTypeListFromDisk: Array[String] = new File(getEngineConnsHome).listFiles().map(_.getName)
3)EngineConnBmlResourceGenerator
接口提供對(duì)各個(gè)引擎(版本)底層文件或目錄的合法性檢測(cè)黎做。對(duì)應(yīng)實(shí)現(xiàn)存在于抽象類(lèi)AbstractEngineConnBmlResourceGenerator
中叉跛。
4)DefaultEngineConnBmlResourceGenerator
類(lèi)主要是為了生成EngineConnLocalizeResource
。EngineConnLocalizeResource是對(duì)物料資源文件元數(shù)據(jù)和InputStream的封裝蒸殿,在后續(xù)的邏輯中EngineConnLocalizeResource會(huì)被作為物料參數(shù)來(lái)參與物料的上傳過(guò)程筷厘。
EngineConnBmlResourceGenerator、AbstractEngineConnBmlResourceGenerator宏所、DefaultEngineConnBmlResourceGenerator這三個(gè)文件的代碼細(xì)節(jié)暫不細(xì)說(shuō)酥艳,可通過(guò)以下UML類(lèi)圖,大致了解其繼承機(jī)制楣铁,并結(jié)合方法內(nèi)的具體實(shí)現(xiàn)來(lái)理解這一部分的功能玖雁。
再重新回到DefaultEngineConnResourceService
類(lèi)中的refreshAll方法內(nèi),繼續(xù)看refreshTask線程的核心流程:
engineConnBmlResourceGenerator.getEngineConnTypeListFromDisk foreach { engineConnType =>
Utils.tryCatch {
engineConnBmlResourceGenerator.generate(engineConnType).foreach {
case (version, localize) =>
logger.info(s" Try to initialize ${engineConnType}EngineConn-$version.")
refresh(localize, engineConnType, version)
}
}
......}
掃描引擎的安裝目錄盖腕,可獲得每個(gè)引擎物料目錄的列表,對(duì)于每個(gè)引擎物料目錄結(jié)構(gòu)的合法性校驗(yàn)通過(guò)之后浓镜,可得到對(duì)應(yīng)的EngineConnLocalizeResource
溃列,然后通過(guò)調(diào)用refresh(localize: Array[EngineConnLocalizeResource], engineConnType: String, version: String)來(lái)完成后續(xù)物料的上傳工作。
而在refresh()方法的內(nèi)部膛薛,主要經(jīng)過(guò)的流程有:
從表linkis_cg_engine_conn_plugin_bml_resources
中獲取對(duì)應(yīng)engineConnType和version的物料列表數(shù)據(jù)听隐,賦值給變量engineConnBmlResources。
val engineConnBmlResources = asScalaBuffer(engineConnBmlResourceDao.getAllEngineConnBmlResource(engineConnType, version))
4.2.1 引擎物料上傳流程
引擎物料上傳流程時(shí)序圖
如果表linkis_cg_engine_conn_plugin_bml_resources
中沒(méi)有匹配到數(shù)據(jù)哄啄,則需要拿EngineConnLocalizeResource中的數(shù)據(jù)來(lái)構(gòu)造EngineConnBmlResource對(duì)象雅任,并保存至linkis_cg_engine_conn_plugin_bml_resources
表中,此數(shù)據(jù)保存之前咨跌,需要先完成物料文件的上傳操作沪么,即執(zhí)行uploadToBml(localizeResource)
方法。
在uploadToBml(localizeResource)方法內(nèi)部锌半,通過(guò)構(gòu)造bmlClient來(lái)請(qǐng)求物料上傳的接口禽车。即:
private val bmlClient = BmlClientFactory.createBmlClient()
bmlClient.uploadResource(Utils.getJvmUser, localizeResource.fileName, localizeResource.getFileInputStream)
在BML Server中,物料上傳的接口位置在BmlRestfulApi類(lèi)中的uploadResource接口方法內(nèi)刊殉。主要經(jīng)歷的過(guò)程是:
ResourceTask resourceTask = taskService.createUploadTask(files, user, properties);
每一次物料上傳殉摔,都會(huì)構(gòu)造一個(gè)ResourceTask來(lái)完成文件上傳的流程,并記錄此次文件上傳Task的執(zhí)行記錄记焊。在createUploadTask方法內(nèi)部逸月,主要完成的操作如下:
1)為此次上傳的資源文件產(chǎn)生一個(gè)全局唯一標(biāo)識(shí)的resource_id
,String resourceId = UUID.randomUUID().toString();
2)構(gòu)建ResourceTask記錄遍膜,并存儲(chǔ)在表linkis_ps_bml_resources_task
中碗硬,以及后續(xù)一系列的Task狀態(tài)修改腐缤。
ResourceTask resourceTask = ResourceTask.createUploadTask(resourceId, user, properties);
taskDao.insert(resourceTask);
taskDao.updateState(resourceTask.getId(), TaskState.RUNNING.getValue(), new Date());
3)物料文件真正寫(xiě)入物料庫(kù)的操作是由ResourceServiceImpl類(lèi)中的upload方法完成的,在upload方法內(nèi)部肛响,會(huì)把一組List<MultipartFile> files
對(duì)應(yīng)的字節(jié)流持久化至物料庫(kù)文件存儲(chǔ)系統(tǒng)中岭粤;把物料文件的properties數(shù)據(jù),存儲(chǔ)到資源記錄表(linkis_ps_bml_resources)和資源版本記錄表(linkis_ps_bml_resources_version)中特笋。
MultipartFile p = files[0]
String resourceId = (String) properties.get("resourceId");
String fileName =new String(p.getOriginalFilename().getBytes(Constant.ISO_ENCODE),
Constant.UTF8_ENCODE);
fileName = resourceId;
String path = resourceHelper.generatePath(user, fileName, properties);
// generatePath目前支持Local和HDFS路徑剃浇,路徑的構(gòu)成規(guī)則由LocalResourceHelper或HdfsResourceHelper
// 中的generatePath方法實(shí)現(xiàn)
StringBuilder sb = new StringBuilder();
long size = resourceHelper.upload(path, user, inputStream, sb, true);
// 文件size計(jì)算以及文件字節(jié)流寫(xiě)入文件由LocalResourceHelper或HdfsResourceHelper中的upload方法實(shí)現(xiàn)
Resource resource = Resource.createNewResource(resourceId, user, fileName, properties);
// 插入一條記錄到resource表linkis_ps_bml_resources中
long id = resourceDao.uploadResource(resource);
// 新增一條記錄到resource version表linkis_ps_bml_resources_version中,此時(shí)的版本號(hào)是onstant.FIRST_VERSION
// 除了記錄這個(gè)版本的元數(shù)據(jù)信息外猎物,最重要的是記錄了該版本的文件的存儲(chǔ)位置虎囚,包括文件路徑,起始位置蔫磨,結(jié)束位置淘讥。
String clientIp = (String) properties.get("clientIp");
ResourceVersion resourceVersion = ResourceVersion.createNewResourceVersion(
resourceId, path, md5String, clientIp, size, Constant.FIRST_VERSION, 1);versionDao.insertNewVersion(resourceVersion);
上述流程執(zhí)行成功之后,物料數(shù)據(jù)才算是真正完成堤如,然后把UploadResult返回給客戶(hù)端蒲列,并標(biāo)記此次ResourceTask的狀態(tài)為完成,如果有遇到上傳文件報(bào)錯(cuò)搀罢,則標(biāo)記此次ResourceTask的狀態(tài)為失敗蝗岖,記錄異常信息。
4.2.2 引擎物料更新流程
引擎物料更新流程時(shí)序圖
如果表linkis_cg_engine_conn_plugin_bml_resources
中匹配到本地物料數(shù)據(jù)榔至,則需要拿EngineConnLocalizeResource中的數(shù)據(jù)來(lái)構(gòu)造EngineConnBmlResource對(duì)象抵赢,并更新linkis_cg_engine_conn_plugin_bml_resources
表中原有物料文件的版本號(hào)、文件大小唧取、修改時(shí)間等元數(shù)據(jù)信息铅鲤,此數(shù)據(jù)更新前,需要先完成物料文件的更新上傳操作枫弟,即執(zhí)行uploadToBml(localizeResource, engineConnBmlResource.getBmlResourceId)
方法邢享。
在uploadToBml(localizeResource, resourceId)方法內(nèi)部,通過(guò)構(gòu)造bmlClient來(lái)請(qǐng)求物料資源更新的接口媒区。即:
private val bmlClient = BmlClientFactory.createBmlClient()
bmlClient.updateResource(Utils.getJvmUser, resourceId, localizeResource.fileName, localizeResource.getFileInputStream)
在BML Server中驼仪,實(shí)現(xiàn)物料更新的接口位置在BmlRestfulApi類(lèi)中的updateVersion接口方法內(nèi),主要經(jīng)歷的過(guò)程是:
完成resourceId的有效性檢測(cè)袜漩,即檢測(cè)傳入的resourceId是否在linkis_ps_bml_resources表中存在绪爸,如果此resourceId不存在,給客戶(hù)端拋出異常宙攻,在接口層面此次物料更新操作失敗奠货。
所以在表linkis_cg_engine_conn_plugin_bml_resources
和linkis_ps_bml_resources
中的資源數(shù)據(jù)的對(duì)應(yīng)關(guān)系需要保證完整,否則會(huì)出現(xiàn)物料文件無(wú)法更新的報(bào)錯(cuò)座掘。
resourceService.checkResourceId(resourceId)
resourceId如果存在于linkis_ps_bml_resources表中递惋,會(huì)繼續(xù)執(zhí)行:
StringUtils.isEmpty(versionService.getNewestVersion(resourceId))
getNewestVersion方法是為了在表linkis_ps_bml_resources_version
中獲取該resourceId的最大版本號(hào)柔滔,如果resourceId對(duì)應(yīng)的最大version為空,那么物料同樣會(huì)更新失敗萍虽,所以此處數(shù)據(jù)的對(duì)應(yīng)關(guān)系完整性也需要嚴(yán)格保證睛廊。
上述兩處檢查都通過(guò)之后,會(huì)創(chuàng)建ResourceUpdateTask來(lái)完成最終的文件寫(xiě)入和記錄更新保存等工作杉编。
ResourceTask resourceTask = null;
synchronized (resourceId.intern()) {
resourceTask = taskService.createUpdateTask(resourceId, user, file, properties);}
而在createUpdateTask方法內(nèi)部超全,主要實(shí)現(xiàn)的功能是:
// 為物料Resource生成新的version
String lastVersion = getResourceLastVersion(resourceId);
String newVersion = generateNewVersion(lastVersion);
// 然后是對(duì)ResourceTask的構(gòu)建,和狀態(tài)維護(hù)
ResourceTask resourceTask = ResourceTask.createUpdateTask(resourceId, newVersion, user, system, properties);
// 物料更新上傳的邏輯由versionService.updateVersion方法完成versionService.updateVersion(resourceTask.getResourceId(), user, file, properties);
在versionService.updateVersion方法內(nèi)部邓馒,主要實(shí)現(xiàn)的功能是:
ResourceHelper resourceHelper = ResourceHelperFactory.getResourceHelper();
InputStream inputStream = file.getInputStream();
// 獲取資源的path
String newVersion = params.get("newVersion").toString();
String path = versionDao.getResourcePath(resourceId) + "_" + newVersion;
// getResourcePath的獲取邏輯是從原有路徑中l(wèi)imit一條嘶朱,然后以_拼接newVersion
// select resource from linkis_ps_bml_resources_version WHERE resource_id = #{resourceId} limit 1
// 資源上傳到hdfs或local
StringBuilder stringBuilder = new StringBuilder();
long size = resourceHelper.upload(path, user, inputStream, stringBuilder, OVER_WRITE);
// 最后在linkis_ps_bml_resources_version表中插入一條新的資源版本記錄
ResourceVersion resourceVersion = ResourceVersion.createNewResourceVersion(resourceId, path, md5String, clientIp, size, newVersion, 1);versionDao.insertNewVersion(resourceVersion);
5. 文章小結(jié)
本文從Linkis引擎物料管理功能作為切入點(diǎn),概述了BML物料服務(wù)的架構(gòu)光酣,并結(jié)合底層源碼疏遏,詳細(xì)地剖析了在引擎物料管理功能中,引擎物料的概念救军,以及引擎物料的上傳财异、更新、版本管理等操作流程缤言。
6. 參考文章
https://linkis.apache.org/zh-CN/docs/latest/architecture/public_enhancement_services/bml?
更多征文內(nèi)容請(qǐng)?jiān)L問(wèn): https://github.com/apache/incubator-linkis/discussions/categories/solicit-articles-%E5%BE%81%E6%96%87?
— END —
●?往期精選??●
Apache Linkis有獎(jiǎng)?wù)魑谋Φ保\(chéng)邀您的參與
版本動(dòng)態(tài) | 數(shù)據(jù)質(zhì)量管理平臺(tái) Qualitis 0.9.0 版本發(fā)布
如何成為社區(qū)貢獻(xiàn)者
1???官方文檔貢獻(xiàn)。發(fā)現(xiàn)文檔的不足胆萧、優(yōu)化文檔,持續(xù)更新文檔等方式參與社區(qū)貢獻(xiàn)俐东。通過(guò)文檔貢獻(xiàn)跌穗,讓開(kāi)發(fā)者熟悉如何提交PR和真正參與到社區(qū)的建設(shè)。參考攻略:保姆級(jí)教程:如何成為Apache Linkis文檔貢獻(xiàn)者
2???代碼貢獻(xiàn)虏辫。我們梳理了社區(qū)中簡(jiǎn)單并且容易入門(mén)的的任務(wù)蚌吸,非常適合新人做代碼貢獻(xiàn)。請(qǐng)查閱新手任務(wù)列表:https://github.com/apache/incubator-linkis/issues/1161
3???內(nèi)容貢獻(xiàn):發(fā)布WeDataSphere開(kāi)源組件相關(guān)的內(nèi)容砌庄,包括但不限于安裝部署教程羹唠、使用經(jīng)驗(yàn)、案例實(shí)踐等娄昆,形式不限佩微,請(qǐng)投稿給小助手。例如:
4???社區(qū)答疑:積極在社區(qū)中進(jìn)行答疑萌焰、分享技術(shù)哺眯、幫助開(kāi)發(fā)者解決問(wèn)題等;
5???其他:積極參與社區(qū)活動(dòng)扒俯、成為社區(qū)志愿者奶卓、幫助社區(qū)宣傳一疯、為社區(qū)發(fā)展提供有效建議等;
本文使用 文章同步助手 同步