在CDH平臺(tái)上為Streamsets導(dǎo)入擴(kuò)展包前, 先設(shè)置一個(gè)用來存儲(chǔ)這些擴(kuò)展包的目錄
- 在Cloundera管理平臺(tái)界面中, 選擇StreamSets服務(wù) 然后點(diǎn)擊Configuration.
- 在 Configuration 頁(yè)面, 在 Data Collector 高級(jí)配置選項(xiàng)中的 (Safety Valve) sdc-env.sh 區(qū)域, 增加環(huán)境變量 STREAMSETS_LIBRARIES_EXTRA_DIR 并把它指向存放擴(kuò)展包的目錄 , 按照這樣的格式:
export STREAMSETS_LIBRARIES_EXTRA_DIR="<external directory>"
舉例:
export STREAMSETS_LIBRARIES_EXTRA_DIR="/opt/sdc-extras/"
默認(rèn)的路徑是:/var/lib/sdc.
- 在每個(gè)運(yùn)行Data Collector的節(jié)點(diǎn)創(chuàng)建目錄 /opt/sdc-extras/
- 在每個(gè)節(jié)點(diǎn)給 Data Collector 的用戶增加擴(kuò)展目錄的權(quán)限仔掸。
舉例來說, 如果你是用用戶名和用戶組名都是:sdc的用戶來運(yùn)行Data Collector 服務(wù),那么可以使用下面的命令把擴(kuò)展目錄的權(quán)限賦予sdc:sdc :
chown -R sdc:sdc /opt/sdc-extras
- 當(dāng)使用默認(rèn)啟動(dòng)的java 安全管理器的時(shí)候寞酿,需要把擴(kuò)展包目錄加入Data Collector高級(jí)配置片段中的sdc-sercurity.policy屬性透硝,配置格式如下:
// user-defined external directory grant codebase "
file://<external directory>-" { permission java.security.AllPermission; };
案例:
// user-defined external directory grant codebase
"file:///opt/sdc-extras/-" { permission java.security.AllPermission; };
- 重啟Data Collector.
相關(guān)信息
Data Collector Environment Configuration
Step 2. Install External Libraries
配置好擴(kuò)展包存放目錄之后酝惧,使用 Data Collector 的Package Manager 來 安裝擴(kuò)展包
-
在Data Collector界面中,右上角的工具欄中, 點(diǎn)擊 Package Manager 圖標(biāo):
-
在導(dǎo)航欄中, 點(diǎn)擊 External Libraries:
Data Collector 會(huì)羅列出所有已經(jīng)安裝好的擴(kuò)展包桥氏。
- 在右上角工具欄的下方, 點(diǎn)擊 Install External Libraries 圖標(biāo):
- 在安裝擴(kuò)展包的對(duì)話框中, 選擇擴(kuò)展包對(duì)應(yīng)的stage 庫(kù)。
舉例說明, 如果你要按照 JDBC Multitable Consumer源需要的JDBC驅(qū)動(dòng)牺汤,選擇JDBC stage 庫(kù)姐浮。如果你要給 Groovy Evaluator processor安裝java的擴(kuò)展工具包,則需要選擇 Groovy stage庫(kù) 超歌。
- 瀏覽文件找到你要安裝的擴(kuò)展包然后點(diǎn)擊Open砍艾。
- 點(diǎn)擊 Upload,將擴(kuò)展包安裝到指定的stage庫(kù)
Data Collector 安裝擴(kuò)展包然后回彈出一個(gè)提供重啟Data Collector功能的對(duì)話框巍举。
- 要安裝更多的擴(kuò)展包脆荷,只需要點(diǎn)擊Cancel,然后重復(fù)3-6這幾個(gè)步驟懊悯。
比如說蜓谋,你想要在Spark Evaluator processor中使用一個(gè)擴(kuò)展包,但是你有兩個(gè)不同版本的processor(每個(gè)版本都有自己獨(dú)立的庫(kù))炭分。為了保證擴(kuò)展包對(duì)兩個(gè)版本的processor都是可用的桃焕,你必須把擴(kuò)展包分別上傳到兩個(gè)版本對(duì)應(yīng)的的stage庫(kù)下。
- After installing all of the external libraries that you want, restart the Data Collector in one of the following ways:
- If you started the Data Collector manually from the command line, click Restart Data Collector in the Install External Libraries window.
- If you started the Data Collector as a service, you must use the command line for restart. Click Cancel in the Install External Libraries window, and then run the following command:
service sdc restart
當(dāng)前開發(fā)環(huán)境擴(kuò)展包存放路徑:
/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-3.3.0/streamsets-libs/streamsets-datacollector-basic-lib/lib/streamsets-extra-1.0-SNAPSHOT.jar
/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-3.3.0/streamsets-libs-extras/streamsets-datacollector-basic-lib/lib/streamsets-extra-1.0-SNAPSHOT.jar
如果配置的路徑和文件上傳路徑不一致時(shí)候捧毛,以配置路徑為準(zhǔn)观堂。
javascript調(diào)用外部的java代碼
網(wǎng)頁(yè)鏈接地址:https://streamsets.com/blog/calling-external-java-code-script-evaluators/
// Only need single SHA3, Hex instances
if (!state.sha3 || !state.Hex) {
var DigestSHA3 = Java.type('org.bouncycastle.jcajce.provider.digest.SHA3.DigestSHA3');
state.sha3 = new DigestSHA3(256);
state.Hex = Java.type('org.bouncycastle.util.encoders.Hex');
}
var sha3 = state.sha3;
var Hex = state.Hex;
for(var i = 0; i < records.length; i++) {
var record = records[i];
try {
// Need to reset the message digest object for every field!
sha3.reset();
var digest = sha3.digest(record.value['data'].getBytes('UTF-8'));
record.value.digest = Hex.toHexString(digest);
output.write(record);
} catch (e) {
// Send record to error
error.write(record, e);
}
}