Azkaban Learning


title: Azkaban Learning
date: 2017-01-11 11:54:03
tags: [Azkaban,調(diào)度系統(tǒng),大數(shù)據(jù)組件]
categories: "調(diào)度系統(tǒng)"


Azkaban

關(guān)鍵字:Azkaban簡介轩娶、大數(shù)據(jù)作業(yè)調(diào)度系統(tǒng)

這篇文章適合對azkaban有一定了解的人閱讀。建議先粗讀:
AZ開發(fā)文檔:http://azkaban.github.io/azkaban/docs/latest/#overview
強子哥的源碼分析:https://my.oschina.net/qiangzigege/blog/653198
(以下內(nèi)容部分摘自上兩個鏈接)

azkaban源碼: git clone https://github.com/azkaban/azkaban.git

Azkaban 簡介

Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.
Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.

Azkaban 是由Linkedln公司為了解決hadoop 作業(yè)之間的依賴而實現(xiàn)的框往。因為有一些ETL作業(yè)以及數(shù)據(jù)分析產(chǎn)品需要按照一定的順序去執(zhí)行鳄抒。
隨著hadoop用戶的逐年增加,Azkaban從一個簡單的服務(wù)解決方案發(fā)展成為一個更加健壯魯棒的方案椰弊。

Azkaban的系統(tǒng)架構(gòu)主要由三個組件組成:

  • WebServer :暴露Restful API许溅,提供分發(fā)作業(yè)和調(diào)度作業(yè)功能;
  • ExecServer :對WebServer 暴露 API 秉版,提供執(zhí)行作業(yè)的功能贤重;
  • MySQL :數(shù)據(jù)存儲,實現(xiàn)Web 和 Exec之間的數(shù)據(jù)共享和部分狀態(tài)的同步清焕。
azkaban.png

多執(zhí)行節(jié)點模式下并蝗,更細節(jié)一點的架構(gòu)圖可以如下祭犯,圖中省略MySQL數(shù)據(jù)庫:

myAzkaban.png

非常簡單而直觀


WebServer

暴露Restful API

在azkaban-webserver工程中,可以非常清晰地看到對外暴露的Servlet滚停,其中最主要的幾個是:

  • ExecutorServlet 主要提供立即執(zhí)行作業(yè)沃粗、取消作業(yè)流、暫停作業(yè)流键畴、獲取流或節(jié)點日志等接口
  • ScheduleServlet 主要提供設(shè)置調(diào)度最盅、設(shè)置Sla報警規(guī)則、獲取調(diào)度信息等接口
  • HistoryServlet 主要提供查看作業(yè)流執(zhí)行歷史的接口
  • ProjectManagerServlet 主要提供上傳項目zip包起惕、下載項目zip包檩禾、刪除項目、獲取流的DAG等接口

分發(fā)作業(yè)

ExecutorManager 主要承擔(dān)這部分的功能疤祭,所有類型的作業(yè)(包括立即執(zhí)行和調(diào)度執(zhí)行),都會通過submitExecutableFlow(ExecutableFlow exflow, String userId)這個方法進行提交盼产。

在該方法中,我們可以看到:如果是多執(zhí)行節(jié)點模式下勺馆,執(zhí)行實例先放進分發(fā)隊列中戏售;如果是單節(jié)點模式下,立即調(diào)用dispatch方法進行分發(fā)草穆。

if (isMultiExecutorMode()) {
    //Take MultiExecutor route
    executorLoader.addActiveExecutableReference(reference);
    queuedFlows.enqueue(exflow, reference);
} else {
    // assign only local executor we have
    Executor choosenExecutor = activeExecutors.iterator().next();
    executorLoader.addActiveExecutableReference(reference);
    try {
        dispatch(reference, exflow, choosenExecutor);
    } catch (ExecutorManagerException e) {
        executorLoader.removeActiveExecutableReference(reference.getExecId());
            throw e;
    }
}

在多執(zhí)行節(jié)點模式下灌灾,執(zhí)行實例被放進分發(fā)隊列。隊列會被線程 QueueProcessorThread 定時處理悲柱。

    /* Method responsible for processing the non-dispatched flows */
    private void processQueuedFlows(long activeExecutorsRefreshWindow,
      int maxContinuousFlowProcessed) throws InterruptedException,
      ExecutorManagerException {
      long lastExecutorRefreshTime = 0;
      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
      int currentContinuousFlowProcessed = 0;

      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
        ExecutionReference reference = runningCandidate.getFirst();
        ExecutableFlow exflow = runningCandidate.getSecond();

        long currentTime = System.currentTimeMillis();

        // if we have dispatched more than maxContinuousFlowProcessed or
        // It has been more then activeExecutorsRefreshWindow millisec since we
        // refreshed
        if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
          // Refresh executorInfo for all activeExecutors
          refreshExecutors();
          lastExecutorRefreshTime = currentTime;
          currentContinuousFlowProcessed = 0;
        }

        /**
         * <pre>
         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
         *        Currently we try each queued flow once to infer a global busy state
         * Possible improvements:-
         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
         * Assumptions:-
         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
         * </pre>
         */
        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
          // put back in the queue
          queuedFlows.enqueue(exflow, reference);
          long sleepInterval =
            activeExecutorsRefreshWindow
              - (currentTime - lastExecutorRefreshTime);
          // wait till next executor refresh
          sleep(sleepInterval);
        } else {
          exflow.setUpdateTime(currentTime);
          // process flow with current snapshot of activeExecutors
          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
        }

        // do not count failed flow processsing (flows still in queue)
        if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
          currentContinuousFlowProcessed++;
        }
      }
    }

selectExecutorAndDispatchFlow 方法先是選擇執(zhí)行節(jié)點(選擇節(jié)點的實現(xiàn)比較有意思)锋喜,選好節(jié)點后最終也是調(diào)用了dispatch進行作業(yè)分發(fā)。

/* process flow with a snapshot of available Executors */
    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
      ExecutableFlow exflow, Set<Executor> availableExecutors)
      throws ExecutorManagerException {
      synchronized (exflow) {
        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
        if (selectedExecutor != null) {
          try {
            dispatch(reference, exflow, selectedExecutor);
          } catch (ExecutorManagerException e) {
            logger.warn(String.format(
              "Executor %s responded with exception for exec: %d",
              selectedExecutor, exflow.getExecutionId()), e);
            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
              availableExecutors);
          }
        } else {
          handleNoExecutorSelectedCase(reference, exflow);
        }
      }
    }

因為Web 和Exec 之間是通過mysql進行數(shù)據(jù)共享的豌鸡,所以dispatch進行作業(yè)分發(fā)的邏輯非常簡單嘿般,就是簡單地通過HTTP請求傳遞execId等信息,其余所需要的數(shù)據(jù)都通過數(shù)據(jù)庫讀寫完成涯冠。

調(diào)度作業(yè)

調(diào)度作業(yè)是調(diào)度系統(tǒng)的最重要的功能之一炉奴,也是Azkaban里相對復(fù)雜的一個模塊。調(diào)度是通過ScheduleManager對外暴露蛇更,對應(yīng)著的結(jié)構(gòu)是Schedule瞻赶;對內(nèi)是通過TriggerManager來實現(xiàn),對應(yīng)著的結(jié)構(gòu)是Trigger派任。

所有的調(diào)度信息都通過ScheduleManager.scheduleFlow傳入,可以看到傳入?yún)?shù)包含了項目id砸逊、項目名字、流名字掌逛、第一次調(diào)度時間戳师逸、時區(qū)、調(diào)度周期颤诀、下一次執(zhí)行時間字旭、提交時間、提交人崖叫。對于一個調(diào)度來說遗淳,最關(guān)鍵的信息無非是第一次調(diào)度時間和調(diào)度周期。

public Schedule scheduleFlow(final int scheduleId, final int projectId,
      final String projectName, final String flowName, final String status,
      final long firstSchedTime, final DateTimeZone timezone,
      final ReadablePeriod period, final long lastModifyTime,
      final long nextExecTime, final long submitTime, final String submitUser)

從scheduleFlow 往下可以看到調(diào)用了TriggerBasedScheduleLoader.insertSchedule心傀。這個方法里邊先是將Schedule轉(zhuǎn)換成了Trigger屈暗,然后將Trigger放到了TriggerManager里邊。scheduleToTrigger方法寫的非常簡潔巧妙脂男,讀者自行研究养叛,此處不作細致分析。

 @Override
  public void insertSchedule(Schedule s) throws ScheduleManagerException {
    Trigger t = scheduleToTrigger(s);
    try {
      triggerManager.insertTrigger(t, t.getSubmitUser());
      s.setScheduleId(t.getTriggerId());
    } catch (TriggerManagerException e) {
      throw new ScheduleManagerException("Failed to insert new schedule!", e);
    }
  }

我們在繼續(xù)看看Trigger被塞到TriggerManager做了些啥宰翅。從下邊可以看到弃甥,先是調(diào)用triggerLoader寫進數(shù)據(jù)庫,然后就放到了一個線程runnerThread中去汁讼。

public void insertTrigger(Trigger t) throws TriggerManagerException {
    synchronized (syncObj) {
      try {
        triggerLoader.addTrigger(t);
      } catch (TriggerLoaderException e) {
        throw new TriggerManagerException(e);
      }
      runnerThread.addTrigger(t);
      triggerIdMap.put(t.getTriggerId(), t);
    }
  }

接下來就顯而易見了淆攻,這個線程TriggerScannerThread runnerThread 定期檢查Trigger是否應(yīng)該觸發(fā)(onTriggerTrigger)或者終止(onTriggerExpire)。

    private void checkAllTriggers() throws TriggerManagerException {
      long now = System.currentTimeMillis();

      // sweep through the rest of them
      for (Trigger t : triggers) {
        try {
          scannerStage = "Checking for trigger " + t.getTriggerId();

          boolean shouldSkip = true;
          if (shouldSkip && t.getInfo() != null && t.getInfo().containsKey("monitored.finished.execution")) {
            int execId = Integer.valueOf((String) t.getInfo().get("monitored.finished.execution"));
            if (justFinishedFlows.containsKey(execId)) {
              logger.info("Monitored execution has finished. Checking trigger earlier " + t.getTriggerId());
              shouldSkip = false;
            }
          }
          if (shouldSkip && t.getNextCheckTime() > now) {
            shouldSkip = false;
          }

          if (shouldSkip) {
            logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
          }

          if (logger.isDebugEnabled()) {
            logger.info("Checking trigger " + t.getTriggerId());
          }
          if (t.getStatus().equals(TriggerStatus.READY)) {
            if (t.triggerConditionMet()) {
              onTriggerTrigger(t);
            } else if (t.expireConditionMet()) {
              onTriggerExpire(t);
            }
          }
          if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
            removeTrigger(t);
          } else {
            t.updateNextCheckTime();
          }
        } catch (Throwable th) {
          //skip this trigger, moving on to the next one
          logger.error("Failed to process trigger with id : " + t.getTriggerId(), th);
        }
      }
    }

Trigger觸發(fā)的時候就會調(diào)用自己的action.doAction()嘿架,調(diào)度任務(wù)的Trigger的action一般都是ExecuteFlowAction瓶珊,其doAction方法如下。方法主要做了兩個事情耸彪,第一個是構(gòu)建執(zhí)行實例ExecutableFlow伞芹,第二個是如果該調(diào)度設(shè)置了報警規(guī)則,則構(gòu)建SlaTrigger蝉娜。

構(gòu)建執(zhí)行實例完成后唱较,可以看到調(diào)用了executorManager.submitExecutableFlow(exflow, submitUser) 進行作業(yè)分發(fā),這樣子召川,就跟上文提到的作業(yè)分發(fā)殊途同歸绊汹。下邊不再分析。

  @Override
  public void doAction() throws Exception {
    if (projectManager == null || executorManager == null) {
      throw new Exception("ExecuteFlowAction not properly initialized!");
    }

    Project project = projectManager.getProject(projectId);
    if (project == null) {
      logger.error("Project to execute " + projectId + " does not exist!");
      throw new RuntimeException("Error finding the project to execute "
          + projectId);
    }

    Flow flow = project.getFlow(flowName);
    if (flow == null) {
      logger.error("Flow " + flowName + " cannot be found in project "
          + project.getName());
      throw new RuntimeException("Error finding the flow to execute "
          + flowName);
    }

    ExecutableFlow exflow = new ExecutableFlow(project, flow);
    exflow.setSubmitUser(submitUser);
    exflow.addAllProxyUsers(project.getProxyUsers());

    if (executionOptions == null) {
      executionOptions = new ExecutionOptions();
    }
    if (!executionOptions.isFailureEmailsOverridden()) {
      executionOptions.setFailureEmails(flow.getFailureEmails());
    }
    if (!executionOptions.isSuccessEmailsOverridden()) {
      executionOptions.setSuccessEmails(flow.getSuccessEmails());
    }
    exflow.setExecutionOptions(executionOptions);

    try {
      executorManager.submitExecutableFlow(exflow, submitUser);
      logger.info("Invoked flow " + project.getName() + "." + flowName);
    } catch (ExecutorManagerException e) {
      throw new RuntimeException(e);
    }

    // deal with sla
    if (slaOptions != null && slaOptions.size() > 0) {
      int execId = exflow.getExecutionId();
      for (SlaOption sla : slaOptions) {
        logger.info("Adding sla trigger " + sla.toString() + " to execution "
            + execId);
        SlaChecker slaFailChecker =
            new SlaChecker("slaFailChecker", sla, execId);
        Map<String, ConditionChecker> slaCheckers =
            new HashMap<String, ConditionChecker>();
        slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
        Condition triggerCond =
            new Condition(slaCheckers, slaFailChecker.getId()
                + ".isSlaFailed()");
        // if whole flow finish before violate sla, just expire
        SlaChecker slaPassChecker =
            new SlaChecker("slaPassChecker", sla, execId);
        Map<String, ConditionChecker> expireCheckers =
            new HashMap<String, ConditionChecker>();
        expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
        Condition expireCond =
            new Condition(expireCheckers, slaPassChecker.getId()
                + ".isSlaPassed()");
        List<TriggerAction> actions = new ArrayList<TriggerAction>();
        List<String> slaActions = sla.getActions();
        for (String act : slaActions) {
          if (act.equals(SlaOption.ACTION_ALERT)) {
            SlaAlertAction slaAlert =
                new SlaAlertAction("slaAlert", sla, execId);
            actions.add(slaAlert);
          } else if (act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
            KillExecutionAction killAct =
                new KillExecutionAction("killExecution", execId);
            actions.add(killAct);
          }
        }
        Trigger slaTrigger =
            new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond,
                actions);
        slaTrigger.getInfo().put("monitored.finished.execution",
            String.valueOf(execId));
        slaTrigger.setResetOnTrigger(false);
        slaTrigger.setResetOnExpire(false);
        logger.info("Ready to put in the sla trigger");
        triggerManager.insertTrigger(slaTrigger);
        logger.info("Sla inserted.");
      }
    }
  }

WebServer總結(jié)

下邊用一張圖簡單總結(jié)

image.png

ExecServer

暴露Restful API

Azkaban3.0后就開始支持多執(zhí)行節(jié)點部署扮宠。單個執(zhí)行節(jié)點比較簡單西乖,對web暴露的API也比較少,主要是:

  • ExecutorServlet 主要提供執(zhí)行坛增、取消获雕、暫停、日志查詢等接口收捣。

執(zhí)行作業(yè)

這里簡單看下執(zhí)行節(jié)點執(zhí)行一個作業(yè)的流程是怎樣的届案。我們在ExecutorServlet中看到所有的執(zhí)行作業(yè)請求都經(jīng)過handleAjaxExecute方法,這個方法簡單地將執(zhí)行id傳遞給FlowRunnerManager:

private void handleAjaxExecute(HttpServletRequest req,
      Map<String, Object> respMap, int execId) throws ServletException {
    try {
      flowRunnerManager.submitFlow(execId);
    } catch (ExecutorManagerException e) {
      e.printStackTrace();
      logger.error(e);
      respMap.put(RESPONSE_ERROR, e.getMessage());
    }
  }

FlowRunnerManager 通過submitFlow方法提交工作流去執(zhí)行罢艾。先是構(gòu)建執(zhí)行實例ExecutableFlow楣颠,然后準(zhǔn)備執(zhí)行目錄setupFlow(flow)尽纽,然后生成FlowRunner,然后提交到線程池去運行executorService.submit(runner)童漩。

 public void submitFlow(int execId) throws ExecutorManagerException {
    // Load file and submit
    if (runningFlows.containsKey(execId)) {
      throw new ExecutorManagerException("Execution " + execId
          + " is already running.");
    }

    ExecutableFlow flow = null;
    flow = executorLoader.fetchExecutableFlow(execId);
    if (flow == null) {
      throw new ExecutorManagerException("Error loading flow with exec "
          + execId);
    }

    // Sets up the project files and execution directory.
    setupFlow(flow);

    // Setup flow runner
    FlowWatcher watcher = null;
    ExecutionOptions options = flow.getExecutionOptions();
    if (options.getPipelineExecutionId() != null) {
      Integer pipelineExecId = options.getPipelineExecutionId();
      FlowRunner runner = runningFlows.get(pipelineExecId);

      if (runner != null) {
        watcher = new LocalFlowWatcher(runner);
      } else {
        watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
      }
    }

    int numJobThreads = numJobThreadPerFlow;
    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
      try {
        int numJobs =
            Integer.valueOf(options.getFlowParameters().get(
                FLOW_NUM_JOB_THREADS));
        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
                .isProjectWhitelisted(flow.getProjectId(),
                    WhitelistType.NumJobPerFlow))) {
          numJobThreads = numJobs;
        }
      } catch (Exception e) {
        throw new ExecutorManagerException(
            "Failed to set the number of job threads "
                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                + " for flow " + execId, e);
      }
    }

    FlowRunner runner =
        new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
    runner.setFlowWatcher(watcher)
        .setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
        .setValidateProxyUser(validateProxyUser)
        .setNumJobThreads(numJobThreads).addListener(this);

    configureFlowLevelMetrics(runner);

    // Check again.
    if (runningFlows.containsKey(execId)) {
      throw new ExecutorManagerException("Execution " + execId
          + " is already running.");
    }

    // Finally, queue the sucker.
    runningFlows.put(execId, runner);

    try {
      // The executorService already has a queue.
      // The submit method below actually returns an instance of FutureTask,
      // which implements interface RunnableFuture, which extends both
      // Runnable and Future interfaces
      Future<?> future = executorService.submit(runner);
      // keep track of this future
      submittedFlows.put(future, runner.getExecutionId());
      // update the last submitted time.
      this.lastFlowSubmittedDate = System.currentTimeMillis();
    } catch (RejectedExecutionException re) {
      throw new ExecutorManagerException(
          "Azkaban server can't execute any more flows. "
              + "The number of running flows has reached the system configured limit."
              + "Please notify Azkaban administrators");
    }
  }

FlowRunner本身也繼承與Runnable弄贿,其run方法里邊調(diào)用了 runFlow方法,方法內(nèi)容如下矫膨。方法里按照樹的層次結(jié)構(gòu)逐層訪問DAG圖的每一個job差凹,逐個去提交執(zhí)行。

private void runFlow() throws Exception {
    logger.info("Starting flows");
    runReadyJob(this.flow);
    updateFlow();

    while (!flowFinished) {
      synchronized (mainSyncObj) {
        if (flowPaused) {
          try {
            mainSyncObj.wait(CHECK_WAIT_MS);
          } catch (InterruptedException e) {
          }

          continue;
        } else {
          if (retryFailedJobs) {
            retryAllFailures();
          } else if (!progressGraph()) {
            try {
              mainSyncObj.wait(CHECK_WAIT_MS);
            } catch (InterruptedException e) {
            }
          }
        }
      }
    }

    logger.info("Finishing up flow. Awaiting Termination");
    executorService.shutdown();

    updateFlow();
    logger.info("Finished Flow");
  }

對于單個job侧馅,最后構(gòu)造一個JobRunner去執(zhí)行之危尿。

private void runExecutableNode(ExecutableNode node) throws IOException {
    // Collect output props from the job's dependencies.
    prepareJobProperties(node);

    node.setStatus(Status.QUEUED);
    JobRunner runner = createJobRunner(node);
    logger.info("Submitting job '" + node.getNestedId() + "' to run.");
    try {
      executorService.submit(runner);
      activeJobRunners.add(runner);
    } catch (RejectedExecutionException e) {
      logger.error(e);
    }
    ;
  }

 private JobRunner createJobRunner(ExecutableNode node) {
    // Load job file.
    File path = new File(execDir, node.getJobSource());

    JobRunner jobRunner =
        new JobRunner(node, path.getParentFile(), executorLoader,
            jobtypeManager);
    if (watcher != null) {
      jobRunner.setPipeline(watcher, pipelineLevel);
    }
    if (validateUserProxy) {
      jobRunner.setValidatedProxyUsers(proxyUsers);
    }

    jobRunner.setDelayStart(node.getDelayedExecution());
    jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
    jobRunner.addListener(listener);

    if (JobCallbackManager.isInitialized()) {
      jobRunner.addListener(JobCallbackManager.getInstance());
    }

    configureJobLevelMetrics(jobRunner);

    return jobRunner;
  }

每個jobRunner在執(zhí)行的時候,都去插件模塊里邊尋找對應(yīng)的插件來進行job的類型加載馁痴。每種job類型都有對應(yīng)的run方法谊娇。最后就是調(diào)用run方法去執(zhí)行job。各種不同類型的job可以參考azkaban默認(rèn)的job類型以及 azkaban-plugin工程里邊實現(xiàn)的一些hadoop相關(guān)作業(yè)類型罗晕。

 try {
        job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
      } catch (JobTypeManagerException e) {
        logger.error("Failed to build job type", e);
        return false;
      }

Azkaban Plugin

azkaban的插件機制使得可以非常方便的增加插件類型邮绿,從而支持運行更多的作業(yè)類型。azkaban的hadoop插件可以從以下倉庫中找到:

git clone https://github.com/azkaban/azkaban-plugins.git

插件的實現(xiàn)

其中插件的類繼承關(guān)系圖如下攀例。每種插件作業(yè)都會單獨起一個進程去執(zhí)行船逮。其中ProcessJob就是負(fù)責(zé)起進程的一個類;JavaProcessJob繼承自它粤铭,特化為Java進程挖胃;其他的hadoop插件又各自繼承自JavaProcessJob。如果要自己實現(xiàn)插件類型梆惯,只要繼承JavaProcessJob類酱鸭,在繼承子類里邊調(diào)用插件的Wrapper類就可以了。具體細節(jié)可以看代碼實現(xiàn)垛吗。

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凹髓,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子怯屉,更是在濱河造成了極大的恐慌蔚舀,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锨络,死亡現(xiàn)場離奇詭異赌躺,居然都是意外死亡,警方通過查閱死者的電腦和手機羡儿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門礼患,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事缅叠∏哪啵” “怎么了?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵肤粱,是天一觀的道長弹囚。 經(jīng)常有香客問我,道長狼犯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任领铐,我火速辦了婚禮悯森,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘绪撵。我一直安慰自己瓢姻,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布音诈。 她就那樣靜靜地躺著幻碱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪细溅。 梳的紋絲不亂的頭發(fā)上褥傍,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天,我揣著相機與錄音喇聊,去河邊找鬼恍风。 笑死,一個胖子當(dāng)著我的面吹牛誓篱,可吹牛的內(nèi)容都是我干的朋贬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼窜骄,長吁一口氣:“原來是場噩夢啊……” “哼锦募!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起邻遏,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤糠亩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后准验,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體削解,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年沟娱,在試婚紗的時候發(fā)現(xiàn)自己被綠了氛驮。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡济似,死狀恐怖矫废,靈堂內(nèi)的尸體忽然破棺而出盏缤,到底是詐尸還是另有隱情,我是刑警寧澤蓖扑,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布唉铜,位于F島的核電站,受9級特大地震影響律杠,放射性物質(zhì)發(fā)生泄漏潭流。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一柜去、第九天 我趴在偏房一處隱蔽的房頂上張望灰嫉。 院中可真熱鬧,春花似錦嗓奢、人聲如沸讼撒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽根盒。三九已至,卻和暖如春物蝙,著一層夾襖步出監(jiān)牢的瞬間炎滞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工诬乞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留厂榛,地道東北人。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓丽惭,卻偏偏與公主長得像击奶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子责掏,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容

  • 目的這篇教程從用戶的角度出發(fā)柜砾,全面地介紹了Hadoop Map/Reduce框架的各個方面。先決條件請先確認(rèn)Had...
    SeanC52111閱讀 1,711評論 0 1
  • 前言 大數(shù)據(jù)處理技術(shù)應(yīng)用: [x] 電信運營商 數(shù)據(jù)營銷:房地產(chǎn)營銷换衬、運營商時代(匯聚用戶行為) [x] 互聯(lián)網(wǎng)用...
    MichaelFly閱讀 4,404評論 0 16
  • 《分布式任務(wù)調(diào)度平臺XXL-JOB》 一痰驱、簡介 1.1 概述 XXL-JOB是一個輕量級分布式任務(wù)調(diào)度框架,其核心...
    許雪里閱讀 16,775評論 3 29
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理瞳浦,服務(wù)發(fā)現(xiàn)担映,斷路器,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • ** 版本:2.2.1 ** Hello world: 調(diào)度器: 任務(wù)詳情:任務(wù)體實現(xiàn)Job接口 觸發(fā)器: 執(zhí)行調(diào)...
    Coselding閱讀 10,141評論 12 38