一壁查、背景介紹
1.1语御、Redis介紹
天下武功应闯,唯快不破碉纺。Redis作為NoSQL數(shù)據(jù)庫的性能王者,天生擁有高可用疫赎、高性能、高擴(kuò)展等基因胎撇。那么進(jìn)一步來看看Redis具備的特性:
- 使用ANSI C語言編寫晚树,開源并遵守BSD協(xié)議。支持網(wǎng)絡(luò)婚瓜、可基于內(nèi)存亦可持久化的日志型愚铡、Key-Value鍵值對存儲數(shù)據(jù)庫碍舍,并支持多種語言的API接口
- 支持單點(diǎn)模式片橡、主從模式锻全、sentinels哨兵模式,cluster集群模式部署了嚎,可保證redis高可用
- 基于內(nèi)存運(yùn)行歪泳,天生具備高性能
- 支持分布式呐伞,理論上可以無限擴(kuò)展
那么Redis對比其他類型的數(shù)據(jù)庫,Redis又具備以下特點(diǎn):
- C/S通訊模型
- 單進(jìn)程單線程模型
- 豐富的數(shù)據(jù)類型
- 操作原子性
- 持久化
- 高并發(fā)讀寫
- 支持lua腳本
1.2癣防、Redis數(shù)據(jù)類型
最新版本的Redis提供的數(shù)據(jù)類型主要分為8種自有類型蕾盯,包括:String類型级遭、列表類型、集合類型掠兄、順序集合類型蚂夕、哈希類型侈贷、位數(shù)組俏蛮、概率數(shù)據(jù)結(jié)構(gòu)和流搏屑。
1.3、Redis的數(shù)據(jù)結(jié)構(gòu)
主要介紹五種常用的數(shù)據(jù)類型的數(shù)據(jù)結(jié)構(gòu)伟骨,具體如下圖:
關(guān)于上表中的部分釋義:
- 雙端鏈表與單鏈表十分相似,不同的是第一個(gè)鏈接點(diǎn)與最后一個(gè)鏈接點(diǎn)直接相連逛腿。雙端鏈表不是雙向鏈表
- 壓縮列表是列表鍵和哈希鍵的底層實(shí)現(xiàn)之一鳄逾。當(dāng)一個(gè)列表鍵只包含少量列表項(xiàng)殴俱,并且每個(gè)列表項(xiàng)要么就是小整數(shù)明场,要么就是長度比較短的字符串苦锨,Redis會使用壓縮列表來做列表鍵的底層實(shí)現(xiàn)拉庶;
- 整數(shù)集合是集合鍵的底層實(shí)現(xiàn)之氏仗。當(dāng)一個(gè)集合只包含整數(shù)值元素皆尔,并且這個(gè)集合的元素?cái)?shù)量不多時(shí),Redis會使用整數(shù)集合作為集合鍵的底層實(shí)現(xiàn)流炕。
- 跳躍表是一種鏈表+多級索引的有序數(shù)據(jù)結(jié)構(gòu)浪感,它通過在每個(gè)節(jié)點(diǎn)中維持多個(gè)指向其他節(jié)點(diǎn)的指針影兽,從而達(dá)到快速訪問節(jié)點(diǎn)的目的。
1.4捐名、應(yīng)用場景
- 分布式鎖(共享Session镶蹋,唯一ID生成器,秒殺系統(tǒng))
- 緩存(“熱點(diǎn)”數(shù)據(jù):高頻讀拂酣、低頻寫)
- 限流器婶熬、計(jì)數(shù)器
- 消息隊(duì)列
- 排行榜赵颅、社交網(wǎng)絡(luò)和實(shí)時(shí)系統(tǒng)
二洲赵、代碼分析
初步介紹了Redis的基本特征叠萍、數(shù)據(jù)類型和數(shù)據(jù)結(jié)構(gòu)苛谷,下面我們切入正題:如何基于Kettle平臺構(gòu)建Redis讀寫操作插件。
2.1例书、開發(fā)環(huán)境
- jdk1.7/1.8
- kettle 6.1.0.1/7.0.0.0
- redis5.x
- jedis2.9.0
2.2自沧、代碼架構(gòu)
2.3、實(shí)現(xiàn)功能
通過本組合插件的構(gòu)建孝偎,主要實(shí)現(xiàn)了Redis數(shù)據(jù)以下三個(gè)操作插件:
- 數(shù)據(jù)的寫入(RedisOutput)
- 數(shù)據(jù)的讀纫露堋(RedisInput)
- 數(shù)據(jù)的刪除(RedisDelete)
2.4雨效、插件代碼
本插件所有操作的服務(wù)端Redis集群叮姑,是以sentinel哨兵模式搭建的高可用集群。
2.4.1极颓、寫入插件
步驟類核心代碼1:初始化Redis連接配置
//pool設(shè)置為static變量菠隆,多組件共享,會存在某一個(gè)組件使用完連接池破衔,導(dǎo)致其他組件pool不可用
private JedisSentinelPool pool = null;
ExecutorService service = Executors.newFixedThreadPool(12);
@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
long start = System.currentTimeMillis();
if (super.init(smi, sdi)) {
try {
// Create client and connect to redis server(s)
Set<Map<String, String>> jedisClusterNodes = ((RedisOutputMeta) smi).getServers();
// 建立連接池配置參數(shù)
JedisPoolConfig config = new JedisPoolConfig();
// 設(shè)置最大連接數(shù)
config.setMaxTotal(10000);
// 設(shè)置最大阻塞時(shí)間晰筛,記住是毫秒數(shù)milliseconds
config.setMaxWaitMillis(10000);
// 設(shè)置最大空閑連接數(shù)
config.setMaxIdle(300);
// jedis實(shí)例是否可用
config.setTestOnBorrow(true);
// return 一個(gè)jedis實(shí)例給pool時(shí),是否檢查連接可用性(ping())
// config.setTestOnReturn(true);
// 設(shè)置jedis實(shí)例空閑檢查連接可用性
config.setTestWhileIdle(true);
// 創(chuàng)建連接池
// 獲取redis密碼
String password = null;
int timeout = 3000;
String masterName = ((RedisOutputMeta) smi).getMasterName();
Set<String> sentinels = new HashSet<String>();
Iterator<Map<String, String>> it = jedisClusterNodes.iterator();
while (it.hasNext()) {
Map<String, String> hostAndPort = it.next();
password = hostAndPort.get("auth");
sentinels.add(hostAndPort.get("hostname") + ":" + hostAndPort.get("port"));
}
pool = new JedisSentinelPool(masterName, sentinels, config, timeout, password);
long end = System.currentTimeMillis();
logBasic("建立連接池 毫秒:" + (end - start));
return true;
} catch (Exception e) {
logError(BaseMessages.getString(PKG, "RedisInput.Error.ConnectError"), e);
return false;
}
} else {
return false;
}
}
步驟類核心代碼2:redis數(shù)據(jù)執(zhí)行寫入操作
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (RedisOutputMeta) smi;
data = (RedisOutputData) sdi;
// TODO Auto-generated method stub
Jedis jedis = pool.getResource();
Object[] r = getRow(); // get row, set busy!
// If no more input to be expected, stop
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
// clone input row meta for now, we will change it (add or set inline) later
data.outputRowMeta = getInputRowMeta().clone();
// Get output field types
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
keyfieldname = meta.getKeyFieldName();
valuefieldname = meta.getValueFieldName();
ttlfieldname = meta.getTtlFieldName();
logBasic("keyfieldname:" + keyfieldname);
logBasic("valuefieldname:" + valuefieldname);
logBasic("ttlfieldname:" + ttlfieldname);
}
RedisOutputThread thread = new RedisOutputThread(this, jedis, r);
service.submit(thread);
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedisOutput.Log.LineNumber") + getLinesRead());
}
}
return true;
}
元數(shù)據(jù)類核心代碼1:用戶關(guān)鍵配置信息檢查
@Override
public void check(List<CheckResultInterface> remarks, TransMeta transMeta,
StepMeta stepMeta, RowMetaInterface prev, String[] input,
String[] output, RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
BaseMessages.getString(PKG,
"RedisOutputMeta.CheckResult.NotReceivingFields"),
stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG,
"RedisOutputMeta.CheckResult.StepRecevingData",
prev.size() + ""), stepMeta);
remarks.add(cr);
}
// See if we have input streams leading to this step!
if (input.length > 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG,
"RedisOutputMeta.CheckResult.StepRecevingData2"),
stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(
CheckResultInterface.TYPE_RESULT_ERROR,
BaseMessages
.getString(PKG,
"RedisOutputMeta.CheckResult.NoInputReceivedFromOtherSteps"),
stepMeta);
remarks.add(cr);
}
}
元數(shù)據(jù)類核心代碼2:對應(yīng)redis數(shù)據(jù)寫入操作盼砍,stepnode關(guān)鍵配置信息讀取
private void readData(Node stepnode) throws KettleXMLException {
try {
this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
this.valueFieldName = XMLHandler
.getTagValue(stepnode, "valuefield");
this.ttlFieldName = XMLHandler.getTagValue(stepnode, "ttlfield");
this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
int nrservers = XMLHandler.countNodes(serverNodes, "server");
allocate(nrservers);
for (int i = 0; i < nrservers; i++) {
Node fnode = XMLHandler
.getSubNodeByNr(serverNodes, "server", i);
Map<String, String> hostAndPort = new HashMap<String, String>();
hostAndPort.put("hostname",
XMLHandler.getTagValue(fnode, "hostname"));
hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
if (i == 0) {
setJedisServer(hostAndPort);
}
servers.add(hostAndPort);
}
} catch (Exception e) {
throw new KettleXMLException(BaseMessages.getString(PKG,
"RedisOutputMeta.Exception.UnableToReadStepInfo"), e);
}
}
多線程處理類RedisOutputThread
@Override
public void run() {
long start = System.currentTimeMillis();
// Get value from redis, don't cast now, be lazy. TODO change this?
int keyFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
redisOutput.meta.getKeyFieldName());
Object key = r[keyFieldIndex];
int valueFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
redisOutput.meta.getValueFieldName());
Object value = r[valueFieldIndex];
int ttlFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
redisOutput.meta.getTtlFieldName());
Object ttl = r[ttlFieldIndex];
String keyString = key.toString();
String valueString = value.toString();
String ttlString = ttl.toString();
if (keyString != null && !StringUtil.isEmpty(keyString)) {
String getKeyValue = jedis.get(keyString);
boolean existsKey = jedis.exists(keyString);
// 如果key已存在,就只更新value數(shù)據(jù)黔宛;如果key不存在臀晃,更新數(shù)據(jù)同時(shí)設(shè)置key過期時(shí)間(單位:秒)
if (existsKey) {
jedis.set(keyString, valueString);
redisOutput
.logBasic(" This key already exists, so only the corresponding value= "
+ getKeyValue + " is updated.");
} else {
if (ttlString != null && !StringUtil.isEmpty(ttlString)) {
jedis.set(keyString, valueString, "NX", "EX",
Integer.parseInt(ttlString));
} else {
jedis.set(keyString, valueString, "NX", "EX",
2 * 24 * 60 * 60);
}
}
}
try {
redisOutput.putRow(redisOutput.data.outputRowMeta, r);
// redisOutput.rowkey.add(r[idFieldIndex]);
} catch (KettleStepException e) {
e.printStackTrace();
}
jedis.close();
long end = System.currentTimeMillis();
redisOutput.logBasic("Redis_Key:" + keyString + " ,Redis_Value:"
+ valueString + " ,Redis_TTL:" + ttlString + " ,processRow "
+ (end - start) + " milliseconds .");
}
2.4.2案淋、讀取插件
步驟類核心代碼1:初始化Redis連接配置(同上)
步驟類核心代碼2:根據(jù)用戶配置數(shù)據(jù)類型誉碴,動態(tài)選擇對應(yīng)Redis的API讀取Redis數(shù)據(jù)庫對應(yīng)key的數(shù)據(jù)
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (RedisInputMeta) smi;
data = (RedisInputData) sdi;
Object[] r = getRow(); // get row, set busy!
// If no more input to be expected, stop
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
// clone input row meta for now, we will change it (add or set inline) later
data.outputRowMeta = getInputRowMeta().clone();
// Get output field types
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
keytype = meta.getKeyTypeFieldName();
logBasic("keytype:" + keytype);
valuetype = meta.getValueTypeName();
logBasic("valuetype:" + valuetype);
mastername = meta.getMasterName();
logBasic("mastername:" + mastername);
}
// Get value from redis, don't cast now, be lazy. TODO change this?
int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
if (keyFieldIndex < 0) {
throw new KettleException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.KeyFieldNameNotFound"));
}
int key2Index = -1;
if (keytype.equals("hash")) {
key2Index = getInputRowMeta().indexOfValue(meta.getKey2FieldName());
if (key2Index < 0) {
throw new KettleException(
BaseMessages.getString(PKG, "RedisOutputMeta.Exception.Key2FieldNameNotFound"));
}
}
StringBuffer fetchedValue = new StringBuffer("");
Jedis jedis = pool.getResource();
try {
if (keytype.equals("string")) {
String value =jedis.get((String) (r[keyFieldIndex]));
fetchedValue.append(value).append("|");
} else if (keytype.equals("hash")) {
String res = jedis.hget((String) r[keyFieldIndex], (String) (r[key2Index]));
fetchedValue.append(res + "|");
} else if (keytype.equals("hashall")) {
Map<String, String> map = jedis.hgetAll((String) r[keyFieldIndex]);
for (Map.Entry<String, String> entry : map.entrySet()) {
fetchedValue.append(entry.getKey() + ":" + entry.getValue() + "|");
}
} else if (keytype.equals("list")) {
List<String> list = jedis.lrange((String) r[keyFieldIndex], 0, -1);
for (String s : list) {
fetchedValue.append(s).append("|");
}
} else if (keytype.equals("set")) {
Set<String> set = jedis.smembers((String) r[keyFieldIndex]);
for (String s : set) {
fetchedValue.append(s).append("|");
}
} else if (keytype.equals("zset")) {
Set<String> set = jedis.zrangeByScore((String) r[keyFieldIndex], 0, -1);
for (String s : set) {
fetchedValue.append(s).append("|");
}
} else if (keytype.equals("keys")) {
Set<String> set = jedis.keys((String) r[keyFieldIndex]);
for (String s : set) {
fetchedValue.append(s).append("|");
}
}
} finally {
jedis.close();
}
String output;
if (fetchedValue.length() > 1)
output = fetchedValue.substring(0, fetchedValue.length() - 1);
else
output = fetchedValue.toString();
// Add Value data name to output, or set value data if already exists
//logBasic("output:" + output);
Object[] outputRowData = r;
int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
// Not found so add it
outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), output);
} else {
// Update value in place
outputRowData[valueFieldIndex] = output;
}
putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedisInput.Log.LineNumber") + getLinesRead());
}
}
return true;
}
元數(shù)據(jù)類核心代碼1:用戶關(guān)鍵配置信息檢查
@Override
public void check(List<CheckResultInterface> remarks,
TransMeta transMeta, StepMeta stepMeta,
RowMetaInterface prev,
String[] input, String[] output,
RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.NotReceivingFields"), stepMeta);
remarks.add(cr);
} else {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.StepRecevingData", prev.size() + ""), stepMeta);
remarks.add(cr);
}
// See if we have input streams leading to this step!
if (input.length > 0) {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.StepRecevingData2"), stepMeta);
remarks.add(cr);
} else {
cr =
new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString(PKG,
"RedisInputMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
remarks.add(cr);
}
}
元數(shù)據(jù)類核心代碼2:對應(yīng)redis數(shù)據(jù)讀取操作,stepnode關(guān)鍵配置信息讀取
private void readData(Node stepnode) throws KettleXMLException {
try {
this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
this.keyTypeFieldName = XMLHandler.getTagValue(stepnode, "keytypefield");
this.key2FieldName = XMLHandler.getTagValue(stepnode, "key2field");
this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefield");
this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
int nrservers = XMLHandler.countNodes(serverNodes, "server");
allocate(nrservers);
for (int i = 0; i < nrservers; i++) {
Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
Map<String,String> hostAndPort = new HashMap<String,String>();
hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
if (i == 0) {
setJedisServer(hostAndPort);
}
servers.add(hostAndPort);
}
} catch (Exception e) {
throw new KettleXMLException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.UnableToReadStepInfo"),
e);
}
}
2.4.3奋岁、刪除插件
image.png
- RedisDelete:步驟類厦取,初始化Redis連接配置虾攻,處理數(shù)據(jù)流數(shù)據(jù)霎箍,執(zhí)行刪除操作
- RedisDeleteData:數(shù)據(jù)類漂坏,定義kettle刪除插件的RowMetaInterface
- RedisDeleteDialog:對話框類顶别,定義kettle刪除插件redis find del的UI Swing的對話框
- RedisDeleteMeta:元數(shù)據(jù)類驯绎,根據(jù)配置執(zhí)行redis操作剩失,并定義kettle刪除插件顯示類
步驟類核心代碼1:初始化Redis連接配置(同上)
步驟類核心代碼2:redis數(shù)據(jù)執(zhí)行刪除操作
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (RedisDeleteMeta) smi;
data = (RedisDeleteData) sdi;
Object[] r = getRow(); // get row, set busy!
// If no more input to be expected, stop
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
// clone input row meta for now, we will change it (add or set inline) later
data.outputRowMeta = getInputRowMeta().clone();
// Get output field types
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
keyfieldname = meta.getKeyFieldName();
logBasic("keyfieldname:" + keyfieldname);
valuefieldname = meta.getValueFieldName();
logBasic("valueFieldName:" + valuefieldname);
valuetype = meta.getValueTypeName();
logBasic("valuetype:" + valuetype);
mastername = meta.getMasterName();
logBasic("mastername:" + mastername);
}
// Get value from redis, don't cast now, be lazy. TODO change this?
int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
if (keyFieldIndex < 0) {
throw new KettleException(BaseMessages.getString(PKG, "RedisDeletetMeta.Exception.KeyFieldNameNotFound"));
}
Jedis jedis = pool.getResource();
String outPutValue = "";
String[] delKeys=r[keyFieldIndex].toString().split(",");
try {
Long delete_flag = jedis.del(delKeys);
outPutValue = String.valueOf(delete_flag);
} finally {
jedis.close();
}
// logBasic("output:" + output);
Object[] outputRowData = r;
int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
// Not found so add it
outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), outPutValue);
} else {
// Update value in place
outputRowData[valueFieldIndex] = outPutValue;
}
putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
if (checkFeedback(getLinesRead())) {
if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedisDelete.Log.LineNumber") + getLinesRead());
}
}
return true;
}
元數(shù)據(jù)類核心代碼1:用戶關(guān)鍵配置信息檢查
@Override
public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev,
String[] input, String[] output, RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NotReceivingFields"), stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData", prev.size() + ""),
stepMeta);
remarks.add(cr);
}
// See if we have input streams leading to this step!
if (input.length > 0) {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData2"), stepMeta);
remarks.add(cr);
} else {
cr = new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR,
BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
remarks.add(cr);
}
}
元數(shù)據(jù)類核心代碼2:對應(yīng)redis數(shù)據(jù)刪除操作演熟,stepnode關(guān)鍵配置信息讀取
private void readData(Node stepnode) throws KettleXMLException {
try {
this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfieldname");
this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefieldname");
this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
int nrservers = XMLHandler.countNodes(serverNodes, "server");
allocate(nrservers);
for (int i = 0; i < nrservers; i++) {
Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
Map<String, String> hostAndPort = new HashMap<String, String>();
hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
if (i == 0) {
setJedisServer(hostAndPort);
}
servers.add(hostAndPort);
}
} catch (Exception e) {
throw new KettleXMLException(BaseMessages.getString(PKG, "RedisDeleteMeta.Exception.UnableToReadStepInfo"),
e);
}
}
三绽媒、使用說明
3.1、Redis Output插件
①Key:動態(tài)key字段猎提,從上一步驟數(shù)據(jù)流動態(tài)獲认撬铡(必選)
②Value:動態(tài)value字段伞租,從上一步驟數(shù)據(jù)流動態(tài)獲瓤(必選)
③TTL:key超時(shí)時(shí)間作喘,默認(rèn)值172800秒泞坦,從上一步驟數(shù)據(jù)流動態(tài)獲取贰锁,可對單行數(shù)據(jù)做控制(必選)
④mastername:Redis sentinels哨兵模式集群master名稱(必選)
⑤server ip:服務(wù)端ip列表(必選)
⑥server port:哨兵端口列表,和服務(wù)端IP一一對應(yīng)(必選)
⑦認(rèn)證密鑰:服務(wù)端IP對應(yīng)鑒權(quán)密碼(可選)
3.2、Redis Input插件
①Key Field:動態(tài)key字段粤攒,從上一步驟數(shù)據(jù)流動態(tài)獲却殉帧(必選)
②Key type:key數(shù)據(jù)類型,從集合列表選擇(必選)
③Hash值:動態(tài)hash值字段盔几,從上一步驟數(shù)據(jù)流動態(tài)獲妊放摹(可選)
④Value Field:動態(tài)value字段芯丧,從上一步驟數(shù)據(jù)流動態(tài)獲扔Ш恪(必選)
⑤Value type:value數(shù)據(jù)類型,從集合列表選擇(必選)
⑥mastername:Redis sentinels哨兵模式集群master名稱(必選)
⑦h(yuǎn)ostname:服務(wù)端ip列表(必選)
⑧host port:哨兵端口列表岭佳,和服務(wù)端IP一一對應(yīng)(必選)
⑨auth:服務(wù)端IP對應(yīng)鑒權(quán)密碼(可選)
3.3珊随、Redis Delete插件
①Key:動態(tài)key字段叶洞,從上一步驟數(shù)據(jù)流動態(tài)獲取(必選)
②輸出字段名:delete操作帆焕,返回自定義輸出字段名稱(必選)
③Value type:value數(shù)據(jù)類型叶雹,從集合列表選擇(必選)
④mastername:Redis sentinels哨兵模式集群master名稱(必選)
⑤server ip:服務(wù)端ip列表(必選)
⑥server port:哨兵端口列表钥星,和服務(wù)端IP一一對應(yīng)(必選)
⑦認(rèn)證密鑰:服務(wù)端IP對應(yīng)鑒權(quán)密碼(可選)
四谦炒、總結(jié)
如果你需要源碼或者了解更多自定義插件及集成方式宁改,抑或有開發(fā)過程或者使用過程中的任何疑問或建議还蹲,請關(guān)注小編"游走在數(shù)據(jù)之間"谜喊,回復(fù)2查看源代碼,回復(fù)3獲取入門視頻斗遏。