SDC-Cluster-PipelineStoreResource-原理源碼

PipelineStoreResource.getPipeline(pid)
PipelineStoreResource.savePipeline()
PipelineStoreResource.saveUiInfo()

PipelineStoreResource.getPipeline(pid)


# 向Spark-Executor上的一個(gè)Slave實(shí)例SDC發(fā)送 getPipeline(pid)請(qǐng)求,響應(yīng)流程:

PipelineStoreResource.getPipelineInfo(String name){
    // 1. 先從磁盤加載其 pipelineId/info.json
    PipelineInfo pipelineInfo = store[PipelineStoreTask].getInfo(name){//SlavePipelineStoreTask.getInfo(String name)
        return pipelineStore[PipelineStoreTask].getInfo(name){//FilePipelineStoreTask.getInfo(String name)
            return getInfo(name, false){//FilePipelineStoreTask.getInfo(String name, boolean checkExistence)
                    synchronized (lockCache.getLock(name)) {
                      if (checkExistence && !hasPipeline(name)) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
                      }
                      // 獲取該SlaveSDC對(duì)應(yīng)的Data目錄,一般是在該容器所在機(jī)器本地的Container目錄: nm-local-dir 下 containerId位置;
                      Path filePath = getInfoFile(name){
                        return getPipelineDir(name).resolve(INFO_FILE);{
                            getPipelineDir(name){
                                return storeDir[UnixPath].resolve(PipelineUtils.escapedPipelineName(name));{
                                    // storeDir = /home/app/appdata/hadoop/dataDir/tmpDir/nm-local-dir/usercache/app/appcache/application_1585661170516_0006/container_1585661170516_0006_01_000002/data/pipelines
                                }
                            }
                            .resolve(INFO_FILE);
                        }
                      }
                      try (InputStream infoFile = Files.newInputStream(filePath)) {
                        PipelineInfoJson pipelineInfoJsonBean = json.readValue(infoFile, PipelineInfoJson.class);
                        return pipelineInfoJsonBean.getPipelineInfo();
                      } catch (Exception ex) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0206, name, ex);
                      }
                    }
                
            }
        }
    }
    String title = name;
    
    // 
    switch (get) {
      case "pipeline":
        PipelineConfiguration pipeline = store.load(name, rev);{//SlavePipelineStoreTask.load(String name, String tagOrRev)
            return pipelineStore.load(name, tagOrRev);{//FilePipelineStoreTask.load(String name, String tagOrRev)
                synchronized (lockCache.getLock(name)) {
                  if (!hasPipeline(name)) {
                    throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
                  }
                  // getPipelineFile()是讀取pipeline.json, 而getInfo()->getInfoFile()是讀取 info.json文件;
                  /*
                  * info.json :     464B 字節(jié); createed, lastModified, uuid, sdcId, metadate信息;
                  * pipeline.json:  15.2 KB:  其info字段及為 info.json的內(nèi)容, 此外還包括幾個(gè)大字段:
                        - configuration: Pipeline相關(guān)配置: exeMode, deliveryGuraantee等;
                        - stages:   各Stages的配置情況;
                        - startEventStages + stopEventStages; 
                        - issues;
                  */
                  Path pipelineFile = getPipelineFile(name);
                  try (InputStream pipelineFile = Files.newInputStream(getPipelineFile(name))) {
                    PipelineInfo info = getInfo(name);
                    // 
                    PipelineConfigurationJson pipelineConfigBean=json.readValue(pipelineFile, PipelineConfigurationJson.class);
                    PipelineConfiguration pipeline = pipelineConfigBean.getPipelineConfiguration();
                    pipeline.setPipelineInfo(info);

                    Map<String, Map> uiInfo;
                    if (Files.exists(getPipelineUiInfoFile(name))) {
                      try (InputStream uiInfoFile = Files.newInputStream(getPipelineUiInfoFile(name))) {
                        uiInfo = json.readValue(uiInfoFile, Map.class);
                        pipeline = injectUiInfo(uiInfo, pipeline);
                      }
                    }

                    return pipeline;
                  }
                  catch (Exception ex) {
                    throw new PipelineStoreException(ContainerError.CONTAINER_0206, name, ex.toString(), ex);
                  }
                }
                
            }
        }
        PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipeline);
        // 對(duì)于從本地加載到的Pipeline.json, 還要validate()校驗(yàn)一下;否則可能顯示不了;
        pipeline = validator.validate();
        data = BeanHelper.wrapPipelineConfiguration(pipeline);// 用PipelineConfigurationJson(pipelineConfiguration)對(duì)象包裝輸出;
        title = pipeline.getTitle() != null ? pipeline.getTitle() : pipeline.getInfo().getPipelineId();
        break;
      case "info":
        data = BeanHelper.wrapPipelineInfo(store.getInfo(name));
        break;
      case "history":// 返回的 PipelineRevInfo 對(duì)象僅簡(jiǎn)單封裝了 date,user,rev字段; 可能以后版本再實(shí)現(xiàn)pipelineConfigHistory?
        data = BeanHelper.wrapPipelineRevInfo(store.getHistory(name));
        break;
      default:
        throw new IllegalArgumentException(Utils.format("Invalid value for parameter 'get': {}", get));
    }

    if (attachment) {
      Map<String, Object> envelope = new HashMap<String, Object>();
      envelope.put("pipelineConfig", data);

      RuleDefinitions ruleDefinitions = store.retrieveRules(name, rev);
      envelope.put("pipelineRules", BeanHelper.wrapRuleDefinitions(ruleDefinitions));

      return Response.ok().
          header("Content-Disposition", "attachment; filename=\"" + title + ".json\"").
          type(MediaType.APPLICATION_JSON).entity(envelope).build();
    } else {
      return Response.ok().type(MediaType.APPLICATION_JSON).entity(data).build();
    }
}

報(bào)錯(cuò)PipelineConfig,并更新4個(gè)文件

# 當(dāng)Stopped, StoppedError, Edit等非Running狀態(tài)下時(shí):拖拽算子觸發(fā) savePipeline(),更新pipeline.json/info.json/uiinfo.json, pipelineState.json4 個(gè)文件
PipelineStoreResource.savePipeline(String name,String rev,String description,PipelineConfigurationJson pipeline){
    if (store.isRemotePipeline(name, rev)) {
      throw new PipelineException(ContainerError.CONTAINER_01101, "SAVE_PIPELINE", name);
    }
    PipelineInfo pipelineInfo = store.getInfo(name);
    RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId());
    PipelineConfiguration pipelineConfig = BeanHelper.unwrapPipelineConfiguration(pipeline);
    PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipelineConfig);
    pipelineConfig = validator.validate();
    pipelineConfig = store.save(user, name, rev, description, pipelineConfig);{//CachePipelineStoreTask.save()
        synchronized (lockCache.getLock(name)) {
          PipelineConfiguration pipelineConf = pipelineStore.save(user, name, tag, tagDescription, pipeline);{
              
              synchronized (lockCache.getLock(name)) {
              if (!hasPipeline(name)) {
                throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
              }
              PipelineInfo savedInfo = getInfo(name);
              if (!savedInfo.getUuid().equals(pipeline.getUuid())) {
                throw new PipelineStoreException(ContainerError.CONTAINER_0205, name);
              }
              if (pipelineStateStore != null) {
                PipelineStatus pipelineStatus = pipelineStateStore.getState(name, tag).getStatus();
                if (pipelineStatus.isActive()) {
                  throw new PipelineStoreException(ContainerError.CONTAINER_0208, pipelineStatus);
                }
              }
              UUID uuid = UUID.randomUUID();
              PipelineInfo info = new PipelineInfo();
              try (
                  OutputStream infoFile = Files.newOutputStream(getInfoFile(name));
                  OutputStream pipelineFile = Files.newOutputStream(getPipelineFile(name));
                ){
                pipeline.setUuid(uuid);
                // 更新 info.json文件;
                json.writeValue(infoFile, BeanHelper.wrapPipelineInfo(info));
                // 更新 pipeline.json 文件;
                json.writeValue(pipelineFile, BeanHelper.wrapPipelineConfiguration(pipeline));
                if (pipelineStateStore != null) {
                  List<Issue> errors = new ArrayList<>();
                  PipelineBeanCreator.get().create(pipeline, errors, null);
                  // 更新 runInfo下 pipelineState.json文件;
                  pipelineStateStore.edited(user, name, tag,  PipelineBeanCreator.get().getExecutionMode(pipeline, errors), false);
                  pipeline.getIssues().addAll(errors);
                }

                Map<String, Object> uiInfo = extractUiInfo(pipeline);
                
                // 更新 uiinfo.json文件;
                saveUiInfo(name, tag, uiInfo);

              } catch (Exception ex) {
                throw new PipelineStoreException(ContainerError.CONTAINER_0204, name, ex.toString(), ex);
              }
              pipeline.setPipelineInfo(info);
              return pipeline;
            }
              
          }
          pipelineInfoMap.put(name, pipelineConf.getInfo());
          return pipelineConf;
        }
    }
    
    return Response.ok().entity(BeanHelper.wrapPipelineConfiguration(pipelineConfig)).build();
    
}

當(dāng)Running狀態(tài)時(shí), 更新saveUiInfo()


 可以認(rèn)為['STARTING', 'STARTING_ERROR', 'RUNNING', 'RUNNING_ERROR', 'RETRY', 'FINISHING', 'STOPPING', 'STOPPING_ERROR', 'CONNECTING', 'CONNECT_ERROR']這些狀態(tài)只更新UI

# 當(dāng)Running狀態(tài)下: 拖動(dòng)算子后, 向PipelineStoreResource.saveUiInfo()發(fā)送 保存UI的請(qǐng)求到 uiinfo.json 文件中
PipelineStoreResource.saveUiInfo(String name,String rev,Map uiInfo){
    // 從CachePipelineStoreTask的內(nèi)存(pipelineInfoMap:Map<String,PipelineInfo>)中 讀取PipelineInfo對(duì)象;
    PipelineInfo pipelineInfo = store[PipelineStoreTask].getInfo(name){//CachePipelineStoreTask.getInfo(String name)
        PipelineInfo pipelineInfo = pipelineInfoMap.get(name);// pipelineInfoMap: Map<String, PipelineInfo>
        if (pipelineInfo == null) {
          throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
        } else {
          return pipelineInfo;
        }
    }
    
    RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId());
    store.saveUiInfo(name, rev, uiInfo);{
        pipelineStore.saveUiInfo(name, rev, uiInfo);{//FilePipelineStoreTask.saveUiInfo()
            // /home/app/appdata/streamset/data/pipelines/ClusterOriginKafkaDemo255cbfd4-56ff-4c93-a94a-551721a9e80b/uiinfo.json
            try (OutputStream uiInfoFile = Files.newOutputStream(getPipelineUiInfoFile(name))){
              json.writeValue(uiInfoFile, uiInfo);
            } catch (Exception ex) {
              throw new PipelineStoreException(ContainerError.CONTAINER_0405, name, ex.toString(), ex);
            }
        }
    }
    
    return Response.ok().build();
}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末垦写,一起剝皮案震驚了整個(gè)濱河市涯塔,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌意蛀,老刑警劉巖屑迂,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡镰烧,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門楞陷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)怔鳖,“玉大人,你說(shuō)我怎么就攤上這事固蛾〗嶂矗” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵艾凯,是天一觀的道長(zhǎng)献幔。 經(jīng)常有香客問我,道長(zhǎng)趾诗,這世上最難降的妖魔是什么蜡感? 我笑而不...
    開封第一講書人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮恃泪,結(jié)果婚禮上郑兴,老公的妹妹穿的比我還像新娘。我一直安慰自己贝乎,他們只是感情好情连,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著览效,像睡著了一般蒙具。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上朽肥,一...
    開封第一講書人閱讀 49,837評(píng)論 1 290
  • 那天禁筏,我揣著相機(jī)與錄音,去河邊找鬼衡招。 笑死篱昔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播州刽,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼空执,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了穗椅?” 一聲冷哼從身側(cè)響起辨绊,我...
    開封第一講書人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎匹表,沒想到半個(gè)月后门坷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡袍镀,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年默蚌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片苇羡。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡绸吸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出设江,到底是詐尸還是另有隱情锦茁,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布叉存,位于F島的核電站码俩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏鹉胖。R本人自食惡果不足惜握玛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一够傍、第九天 我趴在偏房一處隱蔽的房頂上張望甫菠。 院中可真熱鬧,春花似錦冕屯、人聲如沸寂诱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)痰洒。三九已至,卻和暖如春浴韭,著一層夾襖步出監(jiān)牢的瞬間丘喻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工念颈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留泉粉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嗡靡,于是被迫代替她去往敵國(guó)和親跺撼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容