在使用kettle可視化工具進(jìn)行編輯生成kettle文件時(shí),只能指定一個(gè)數(shù)據(jù)源, 并且我們?cè)陂_發(fā)到發(fā)布過程中最少有三套環(huán)境,dev, test, prod. 如果按照kettle只能指定一個(gè)數(shù)據(jù)源,對(duì)應(yīng)的kjb/ktr文件就得要三套,首先不說麻煩, 更容易導(dǎo)致出錯(cuò),測(cè)試發(fā)到生產(chǎn)的可能.所以我們要解決這個(gè)痛點(diǎn), 就是可以根據(jù)我們配置三套環(huán)境的數(shù)據(jù)源,然后根據(jù)環(huán)境變量自己去更換數(shù)據(jù)源.
1.踩坑
在查詢資料((#.#)其實(shí)就是百度)的過程中,找到如下方法,(以kjb文件為例):
注意該方法為錯(cuò)誤方法
踩坑點(diǎn):在將kjb的路徑傳到JobMeta里初始化時(shí)候就kettle就開始連接數(shù)據(jù)庫了,導(dǎo)致我在測(cè)試環(huán)境一直報(bào)數(shù)據(jù)庫連接錯(cuò)誤(作者公司test/dev環(huán)境是隔開的本地是沒辦法連接測(cè)試環(huán)境)
public static Job executeKjb(String kjbPath, Map<String, String> param) {
log.info("execute krt, ktrPath={}, param={}", kjbPath, param);
Job job;
try {
KettleEnvironment.init();
//初始化job路徑
JobMeta jobMeta = new JobMeta(kjbPath, null);
//替換db鏈接信息(這里主要設(shè)置kjb文件里面的數(shù)據(jù)庫連接信息)
setDBLinkInfo(KETTLE_DB_PARAM_LIST, jobMeta.getDatabases());
job = new Job(null, jobMeta);
//krt文件更換連接信息(修改kjb引用ktr文件里的數(shù)據(jù)庫連接信息)
Map<JobEntryCopy, JobEntryTrans> activeJobEntryTransformations = job.getActiveJobEntryTransformations();
activeJobEntryTransformations.entrySet().forEach(jobEntryCopyJobEntryTransEntry -> {
DatabaseMeta[] connections = jobEntryCopyJobEntryTransEntry.getValue().getUsedDatabaseConnections();
setDBLinkInfo(KETTLE_DB_PARAM_LIST, Arrays.asList(connections));
});
//初始化job參數(shù)歹垫,腳本中獲取參數(shù)值:${variableName}
if (!CollectionUtils.isEmpty(param)) {
job.injectVariables(param);
}
job.start();
job.waitUntilFinished();
} catch (Exception e) {
log.error("kbj文件執(zhí)行失敗", e);
throw new ServiceException(".kjb文件執(zhí)行失敗", e);
}
return job;
}
//dbParams自己配置的各環(huán)境的數(shù)據(jù)庫連接信息
private static void setDBLinkInfo(List<KettleDBParams.KettleDBParam> dbParams, List<DatabaseMeta> databases) {
//設(shè)置DB參數(shù)
if (!CollectionUtils.isEmpty(dbParams)) {
Map<String, KettleDBParams.KettleDBParam> connNameGroupMap = dbParams.stream()
.collect(Collectors.toMap(KettleDBParams.KettleDBParam::getConnName, e -> e, (o, o2) -> o));
for (DatabaseMeta databaseMeta : databases) {
KettleDBParams.KettleDBParam dbParam = connNameGroupMap.get(databaseMeta.getName());
if (dbParam == null) {
throw new ServiceException(String.format("執(zhí)行.ktr失敗, 未發(fā)現(xiàn)數(shù)據(jù)庫連接【%s】", dbParam.getConnName()));
}
log.info("kettle數(shù)據(jù)庫使用的驅(qū)動(dòng)為:{}", databaseMeta.getDriverClass());
//連接地址
databaseMeta.setHostname(dbParam.getHost());
//數(shù)據(jù)庫名稱
databaseMeta.setDBName(dbParam.getDbName());
//端口
databaseMeta.setDBPort(dbParam.getPort());
//用戶
databaseMeta.setUsername(dbParam.getUsername());
//密碼
databaseMeta.setPassword(dbParam.getPassword());
}
} else {
throw new ServiceException("需要替換的數(shù)據(jù)庫參數(shù)為空!!!");
}
}
2.正確修改方式(可能是之一)
作者用的是JNDI方式連接數(shù)據(jù)源
配置文件:
kettle解壓目錄/simple-jndi/jdbc.properties
以下是配置文件內(nèi)原本內(nèi)容
/前面就是JNDI名稱,上圖紅框內(nèi)容, 按照自己需要配置進(jìn)行配置即可
SampleData/type=javax.sql.DataSource
SampleData/driver=org.h2.Driver
SampleData/url=jdbc:h2:file:samples/db/sampledb;IFEXISTS=TRUE
SampleData/user=PENTAHO_USER
SampleData/password=PASSWORD
SampleDataAdmin/type=javax.sql.DataSource
SampleDataAdmin/driver=org.h2.Driver
SampleDataAdmin/url=jdbc:h2:file:samples/db/sampledb;IFEXISTS=TRUE
SampleDataAdmin/user=PENTAHO_ADMIN
SampleDataAdmin/password=PASSWORD
Quartz/type=javax.sql.DataSource
Quartz/driver=org.hsqldb.jdbcDriver
Quartz/url=jdbc:hsqldb:hsql://localhost/quartz
Quartz/user=pentaho_user
Quartz/password=password
Hibernate/type=javax.sql.DataSource
Hibernate/driver=org.hsqldb.jdbcDriver
Hibernate/url=jdbc:hsqldb:hsql://localhost/hibernate
Hibernate/user=hibuser
Hibernate/password=password
Shark/type=javax.sql.DataSource
Shark/driver=org.hsqldb.jdbcDriver
Shark/url=jdbc:hsqldb:hsql://localhost/shark
Shark/user=sa
Shark/password=
PDI_Operations_Mart/type=javax.sql.DataSource
PDI_Operations_Mart/driver=org.postgresql.Driver
PDI_Operations_Mart/url=jdbc:postgresql://localhost:5432/hibernate?searchpath=pentaho_operations_mart
PDI_Operations_Mart/user=hibuser
PDI_Operations_Mart/password=password
live_logging_info/type=javax.sql.DataSource
live_logging_info/driver=org.postgresql.Driver
live_logging_info/url=jdbc:postgresql://localhost:5432/hibernate?searchpath=pentaho_dilogs
live_logging_info/user=hibuser
live_logging_info/password=password
3.正確方式-java項(xiàng)目中運(yùn)用
我們只需要根據(jù)dev/test/prod各個(gè)環(huán)境去讀取相應(yīng)的jdbc.properties文件即可
然后我們?cè)谥纊ettle是如何讀取到j(luò)dbc.properties文件,
源碼如下:
//在init的時(shí)候就會(huì)設(shè)置讀取
KettleEnvironment.init();
↓ 一直點(diǎn)下去init方法
public static void init(List<PluginTypeInterface> pluginClasses, boolean simpleJndi) throws KettleException {
SettableFuture ready;
if (initialized.compareAndSet((Object)null, ready = SettableFuture.create())) {
System.setProperties(ConcurrentMapProperties.convertProperties(System.getProperties()));
try {
if (!KettleClientEnvironment.isInitialized()) {
KettleClientEnvironment.init();
}
//這里是加載JNDI方式的方法
if (simpleJndi) {
JndiUtil.initJNDI();
}
pluginClasses.forEach(PluginRegistry::addPluginType);
PluginRegistry.init();
KettleVariablesList.init();
initLifecycleListeners();
ready.set(true);
} catch (Throwable var5) {
ready.setException(var5);
throw var5 instanceof KettleException ? (KettleException)var5 : new KettleException(var5);
}
} else {
ready = (SettableFuture)initialized.get();
try {
ready.get();
} catch (Throwable var4) {
throw new KettleException(var4);
}
}
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓點(diǎn)進(jìn) JndiUtil.initJNDI();
public static void initJNDI() throws KettleException {
//加載jdbc.properties的路徑是從Const.JNDI_DIRECTORY靜態(tài)變量里獲取, 如果為空就加載kettle文件內(nèi)數(shù)據(jù)源
String path = Const.JNDI_DIRECTORY;
if (path == null || path.equals("")) {
try {
File file = new File("simple-jndi");
path = file.getCanonicalPath();
} catch (Exception var2) {
throw new KettleException("Error initializing JNDI", var2);
}
Const.JNDI_DIRECTORY = path;
}
System.setProperty("java.naming.factory.initial", "org.osjava.sj.SimpleContextFactory");
System.setProperty("org.osjava.sj.root", path);
System.setProperty("org.osjava.sj.delimiter", "/");
}
那我們的思路就來了, 我們?cè)谡{(diào)用kettle文件之前賦值給Const.JNDI_DIRECTORY靜態(tài)變量我們自己的路徑就好了,那么下面就是我自己的加載方式
注意jdbc.properties文件名不要改, 應(yīng)該默認(rèn)讀取的,作者沒有往深里看
import com.xxxx.actuator.exception.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.core.Const;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.File;
/**
* 初始化kettle讀取數(shù)據(jù)源配置文件地址
* 我是把jdbc.properties配置放到服務(wù)器上的,springboot打成jar讀取文件太麻煩了,
*以下只是給一個(gè)啟發(fā), 具體如何讀取按照自己項(xiàng)目需求.
*/
@Component
@Slf4j
public class JndiInit {
@Value("${spring.profiles}")
private String evn;
@PostConstruct
public void init() {
try {
String fileSeparator = System.getProperty("file.separator");
//開發(fā)環(huán)境獲取根目錄下文件路徑
if ("dev".equals(evn)) {
Const.JNDI_DIRECTORY = new File("").getCanonicalPath() + fileSeparator + "jndi" + fileSeparator + evn;
} else if ("test".equals(evn) || "prod".equals(evn)) {
//測(cè)試/生產(chǎn)環(huán)境
Const.JNDI_DIRECTORY = "/home/service/app/executor/tmp" + fileSeparator + "jndi" + fileSeparator + evn;
}
} catch (Exception e) {
throw new ServiceException("初始化jndi地址失敗", e);
}
}
}