如何通過(guò)java集成kettle實(shí)現(xiàn)遠(yuǎn)程調(diào)用kettle集群

如何通過(guò)java集成kettle實(shí)現(xiàn)遠(yuǎn)程調(diào)用kettle集群

package com.hry;

import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.www.SlaveServerJobStatus;

public class CallJob {

// private void run(String hostname, int port) throws Exception {
// SlaveServerConfig config = new SlaveServerConfig(hostname, port, false);
// Carte.runCarte(config);
//
//
// }

private Repository repository;
private SlaveServer remoteSlaveServer;
private boolean remoteFlag = false;
private String lastCarteObjectId;
private JobMeta jobMeta;
private Job job;

/**
 *
 * @param oracle資源庫(kù)信息
 *
 *            初始化對(duì)象叹洲,初始化kettle環(huán)境和資源庫(kù)
 */
public CallJob(DBRepositoryInfo rposInfo) {
    // TODO Auto-generated constructor stub
    try {
        // 初始化
        KettleEnvironment.init();
        this.initRepository(rposInfo);
    } catch (KettleException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

public SlaveServer getRemoteSlaveServer() {
    return remoteSlaveServer;
}

/**
 * 設(shè)置遠(yuǎn)程運(yùn)行服務(wù)器
 *
 * @param remoteSlaveServer
 */
public void setRemoteSlaveServer(SlaveServer remoteSlaveServer) {
    this.remoteSlaveServer = remoteSlaveServer;
}

public boolean isRemoteFlag() {
    return remoteFlag;
}

public void setRemoteFlag(boolean remoteFlag) {
    this.remoteFlag = remoteFlag;
}

/**
 * 初始化遠(yuǎn)程服務(wù)器
 *
 * @param slaveInfo
 */
public void initSlaveServer(SlaveServerInfo slaveInfo) {
    remoteSlaveServer = new SlaveServer();
    remoteSlaveServer.setHostname(slaveInfo.getServerHost());
    remoteSlaveServer.setMaster(true);
    remoteSlaveServer.setPort(slaveInfo.getServerPort());
    remoteSlaveServer.setUsername(slaveInfo.getServerUsername());
    remoteSlaveServer.setPassword(slaveInfo.getServerPassword());
}

/**
 * 初始化資源庫(kù)
 *
 * @param rposInfo
 * @throws KettleException
 */
public void initRepository(DBRepositoryInfo rposInfo) throws KettleException {

    // 新建數(shù)據(jù)庫(kù)資源庫(kù)
    repository = new KettleDatabaseRepository();
    // 建立數(shù)據(jù)庫(kù)連接
    DatabaseMeta databaseMeta = new DatabaseMeta(rposInfo.getDbName(), rposInfo.getDbType(), "Native",
            rposInfo.getDbHostname(), rposInfo.getDbName(), rposInfo.getDbPort(), rposInfo.getDbUsername(),
            rposInfo.getDbPassword());
    // 建立資源庫(kù)信息
    KettleDatabaseRepositoryMeta kettleDatabaseMeta = new KettleDatabaseRepositoryMeta(rposInfo.getRepoId(),
            rposInfo.getRepoName(), "Transformation description", databaseMeta);
    // 初始化資源庫(kù)
    repository.init(kettleDatabaseMeta);
    // 連接資源庫(kù)
    repository.connect(rposInfo.getRepoUsername(), rposInfo.getRepoPassword());
    // 資源庫(kù)目錄
}

/**
 * 停止遠(yuǎn)程執(zhí)行的任務(wù)
 *
 * @param Transname
 * @param carteObjectid
 */

public void stopRemoteJob(String Transname, String carteObjectid) {
    try {
        remoteSlaveServer.stopJob(Transname, carteObjectid);

    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

/**
 * 停止 執(zhí)行的Job
 *
 * @param Transname
 * @param carteObjectid
 */

public void stopLacalJob() {
    try {
        job.stopAll();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

@SuppressWarnings("deprecation")
public void suspendLocalJob() {

    try {
        job.suspend();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

@SuppressWarnings("deprecation")
public void resumeLocalJob() {
    try {
        job.resume();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

public String getLocalJobStatus() {

    return job.getStatus();
}

public String getRemoteJobStatus() {
    SlaveServerJobStatus jobStatus = null;

    try {
        jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    return jobStatus.toString();
}

/**
 * 遠(yuǎn)程執(zhí)行任務(wù)
 *
 * @param jobNmae
 * @throws KettleException
 */

public void executeJobRemote(String dir, String jobName) throws KettleException {

    RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree(); // Default
                                                                                        // =
    RepositoryDirectoryInterface jobdir = repository.findDirectory(dir);

    JobMeta jobMeta = repository.loadJob(jobName, jobdir, null, null); // reads
                                                                        // last
                                                                        // version

    JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();

    jobExecutionConfiguration.setRemoteServer(remoteSlaveServer);
    jobExecutionConfiguration.setRepository(repository);

    // lastCarteObjectId = Job.sendToSlaveServer(jobMeta,
    // jobExecutionConfiguration, repository);
    // IMetaStore metastore = new IMetaStore();
    lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null);
    SlaveServerJobStatus jobStatus = null;

    Result oneResult = new Result();

    while (true) {
        try {
            jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);

            if (jobStatus.getResult() != null) {
                // The job is finished, get the result...
                //
                oneResult = jobStatus.getResult();

                break;
            }
        } catch (Exception e1) {

            oneResult.setNrErrors(1L);
            break; // Stop looking too, chances are too low the server
                    // will
                    // come back on-line
        }
    }

}

public String getLastCarteObjectId() {
    return lastCarteObjectId;
}

/**
 * 本地執(zhí)行job
 *
 * @param jobNmae
 * @throws KettleException
 */

public Result executeJobLocal(String jobName) throws KettleException {

    RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree(); // Default
                                                                                        // =
                                                                                        // root
    JobMeta jobMeta;

    jobMeta = repository.loadJob(jobName, directory, null, null); // reads

    Result oneResult = new Result();
    job = new Job(repository, jobMeta);

    job.start();

    job.waitUntilFinished();
    oneResult = job.getResult();
    return oneResult;
}

public static void main(String[] args) {
      //基于數(shù)據(jù)庫(kù)資源庫(kù)方式
    DBRepositoryInfo rposInfo = new DBRepositoryInfo();
    rposInfo.setDbHostname("192.168.70.227");
    rposInfo.setDbName("kettle");
    rposInfo.setDbPort("3306");
    rposInfo.setDbType("MYSQL");
    rposInfo.setDbUsername("root");
    rposInfo.setDbPassword("admin");
    rposInfo.setRepoId("kettle_repo_mysql");
    rposInfo.setRepoName("kettle_repo_mysql");

    rposInfo.setRepoPassword("admin");
    rposInfo.setRepoUsername("admin");

    CallJob ctf = new CallJob(rposInfo);
    SlaveServerInfo ssi = new SlaveServerInfo();
    ssi.setServerHost("localhost");
    ssi.setServerPort("8080");
    ssi.setServerName("master1");
    ssi.setServerUsername("cluster");
    ssi.setServerPassword("cluster");
    ctf.initSlaveServer(ssi);

    try {
        ctf.executeJobRemote("/test", "job_test");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

有問(wèn)題可以加: QQ群:452881901

版權(quán)歸:kettle老者
轉(zhuǎn)載來(lái)源:kettle老者

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市昌渤,隨后出現(xiàn)的幾起案子浅辙,更是在濱河造成了極大的恐慌厅贪,老刑警劉巖羞福,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件幻碱,死亡現(xiàn)場(chǎng)離奇詭異答渔,居然都是意外死亡关带,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)研儒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)豫缨,“玉大人,你說(shuō)我怎么就攤上這事端朵『冒牛” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵冲呢,是天一觀(guān)的道長(zhǎng)舍败。 經(jīng)常有香客問(wèn)我,道長(zhǎng)敬拓,這世上最難降的妖魔是什么邻薯? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮乘凸,結(jié)果婚禮上厕诡,老公的妹妹穿的比我還像新娘。我一直安慰自己营勤,他們只是感情好灵嫌,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著葛作,像睡著了一般寿羞。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上赂蠢,一...
    開(kāi)封第一講書(shū)人閱讀 52,262評(píng)論 1 308
  • 那天绪穆,我揣著相機(jī)與錄音,去河邊找鬼虱岂。 笑死玖院,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的第岖。 我是一名探鬼主播司恳,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼绍傲!你這毒婦竟也來(lái)了扔傅?” 一聲冷哼從身側(cè)響起耍共,我...
    開(kāi)封第一講書(shū)人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎猎塞,沒(méi)想到半個(gè)月后试读,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡荠耽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年钩骇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铝量。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡倘屹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出慢叨,到底是詐尸還是另有隱情纽匙,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布拍谐,位于F島的核電站烛缔,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏轩拨。R本人自食惡果不足惜践瓷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望亡蓉。 院中可真熱鬧晕翠,春花似錦、人聲如沸砍濒。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)梯影。三九已至巫员,卻和暖如春庶香,著一層夾襖步出監(jiān)牢的瞬間甲棍,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工赶掖, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留感猛,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓奢赂,卻偏偏與公主長(zhǎng)得像陪白,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子膳灶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理咱士,服務(wù)發(fā)現(xiàn)立由,斷路器,智...
    卡卡羅2017閱讀 134,701評(píng)論 18 139
  • Java基礎(chǔ)常見(jiàn)英語(yǔ)詞匯(共70個(gè))['?bd?ekt] ['?:rientid]導(dǎo)向的 ...
    今夜子辰閱讀 3,305評(píng)論 1 34
  • scheduler定時(shí)調(diào)度系統(tǒng)是大多行業(yè)項(xiàng)目都需要的序厉,傳統(tǒng)的spring-job模式锐膜,個(gè)人感覺(jué)已經(jīng)out了,因?yàn)榇?..
    安琪拉_4b7e閱讀 2,843評(píng)論 4 6
  • 以前弛房,你被人欺負(fù)嘲諷道盏,在外人面前覺(jué)得無(wú)地自容,你死的心都有了文捶,可你現(xiàn)在還是活得好好的荷逞,而那些帶給你陰影的人早不知身...
    溺巢閱讀 318評(píng)論 0 0
  • 有一天,我在深夜的大路里排徊粹排,突然間种远,我覺(jué)得身后有人,我一回頭恨搓,身后空無(wú)一人院促。只有我一個(gè)人在這個(gè)大街上。 ...
    王者冠軍閱讀 305評(píng)論 0 2