Kettle插件開發(fā)之Redis

一壁查、背景介紹

1.1语御、Redis介紹

天下武功应闯,唯快不破碉纺。Redis作為NoSQL數(shù)據(jù)庫的性能王者,天生擁有高可用疫赎、高性能、高擴(kuò)展等基因胎撇。那么進(jìn)一步來看看Redis具備的特性:

  • 使用ANSI C語言編寫晚树,開源并遵守BSD協(xié)議。支持網(wǎng)絡(luò)婚瓜、可基于內(nèi)存亦可持久化的日志型愚铡、Key-Value鍵值對存儲數(shù)據(jù)庫碍舍,并支持多種語言的API接口
  • 支持單點(diǎn)模式片橡、主從模式锻全、sentinels哨兵模式,cluster集群模式部署了嚎,可保證redis高可用
  • 基于內(nèi)存運(yùn)行歪泳,天生具備高性能
  • 支持分布式呐伞,理論上可以無限擴(kuò)展

那么Redis對比其他類型的數(shù)據(jù)庫,Redis又具備以下特點(diǎn):

  • C/S通訊模型
  • 單進(jìn)程單線程模型
  • 豐富的數(shù)據(jù)類型
  • 操作原子性
  • 持久化
  • 高并發(fā)讀寫
  • 支持lua腳本

1.2癣防、Redis數(shù)據(jù)類型

最新版本的Redis提供的數(shù)據(jù)類型主要分為8種自有類型蕾盯,包括:String類型级遭、列表類型、集合類型掠兄、順序集合類型蚂夕、哈希類型侈贷、位數(shù)組俏蛮、概率數(shù)據(jù)結(jié)構(gòu)和流搏屑。

image

1.3、Redis的數(shù)據(jù)結(jié)構(gòu)

主要介紹五種常用的數(shù)據(jù)類型的數(shù)據(jù)結(jié)構(gòu)伟骨,具體如下圖:

image

關(guān)于上表中的部分釋義:

  • 雙端鏈表與單鏈表十分相似,不同的是第一個(gè)鏈接點(diǎn)與最后一個(gè)鏈接點(diǎn)直接相連逛腿。雙端鏈表不是雙向鏈表
image.png
  • 壓縮列表是列表鍵和哈希鍵的底層實(shí)現(xiàn)之一鳄逾。當(dāng)一個(gè)列表鍵只包含少量列表項(xiàng)殴俱,并且每個(gè)列表項(xiàng)要么就是小整數(shù)明场,要么就是長度比較短的字符串苦锨,Redis會使用壓縮列表來做列表鍵的底層實(shí)現(xiàn)拉庶;
  • 整數(shù)集合是集合鍵的底層實(shí)現(xiàn)之氏仗。當(dāng)一個(gè)集合只包含整數(shù)值元素皆尔,并且這個(gè)集合的元素?cái)?shù)量不多時(shí),Redis會使用整數(shù)集合作為集合鍵的底層實(shí)現(xiàn)流炕。
  • 跳躍表是一種鏈表+多級索引的有序數(shù)據(jù)結(jié)構(gòu)浪感,它通過在每個(gè)節(jié)點(diǎn)中維持多個(gè)指向其他節(jié)點(diǎn)的指針影兽,從而達(dá)到快速訪問節(jié)點(diǎn)的目的。
image

1.4捐名、應(yīng)用場景

  • 分布式鎖(共享Session镶蹋,唯一ID生成器,秒殺系統(tǒng))
  • 緩存(“熱點(diǎn)”數(shù)據(jù):高頻讀拂酣、低頻寫)
  • 限流器婶熬、計(jì)數(shù)器
  • 消息隊(duì)列
  • 排行榜赵颅、社交網(wǎng)絡(luò)和實(shí)時(shí)系統(tǒng)

二洲赵、代碼分析

初步介紹了Redis的基本特征叠萍、數(shù)據(jù)類型和數(shù)據(jù)結(jié)構(gòu)苛谷,下面我們切入正題:如何基于Kettle平臺構(gòu)建Redis讀寫操作插件。

2.1例书、開發(fā)環(huán)境

  • jdk1.7/1.8
  • kettle 6.1.0.1/7.0.0.0
  • redis5.x
  • jedis2.9.0

2.2自沧、代碼架構(gòu)

image.png

2.3、實(shí)現(xiàn)功能

通過本組合插件的構(gòu)建孝偎,主要實(shí)現(xiàn)了Redis數(shù)據(jù)以下三個(gè)操作插件:

  • 數(shù)據(jù)的寫入(RedisOutput)
  • 數(shù)據(jù)的讀纫露堋(RedisInput)
  • 數(shù)據(jù)的刪除(RedisDelete)

2.4雨效、插件代碼

本插件所有操作的服務(wù)端Redis集群叮姑,是以sentinel哨兵模式搭建的高可用集群。

2.4.1极颓、寫入插件

image.png

步驟類核心代碼1:初始化Redis連接配置

//pool設(shè)置為static變量菠隆,多組件共享,會存在某一個(gè)組件使用完連接池破衔,導(dǎo)致其他組件pool不可用
    private  JedisSentinelPool pool = null;
    ExecutorService service = Executors.newFixedThreadPool(12);
    @Override
    public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        long start = System.currentTimeMillis();
        if (super.init(smi, sdi)) {
            try {
                // Create client and connect to redis server(s)
                Set<Map<String, String>> jedisClusterNodes = ((RedisOutputMeta) smi).getServers();
                // 建立連接池配置參數(shù)
                JedisPoolConfig config = new JedisPoolConfig();
                // 設(shè)置最大連接數(shù)
                config.setMaxTotal(10000);
                // 設(shè)置最大阻塞時(shí)間晰筛,記住是毫秒數(shù)milliseconds
                config.setMaxWaitMillis(10000);
                // 設(shè)置最大空閑連接數(shù)
                config.setMaxIdle(300);
                // jedis實(shí)例是否可用
                config.setTestOnBorrow(true);
                // return 一個(gè)jedis實(shí)例給pool時(shí),是否檢查連接可用性(ping())
                // config.setTestOnReturn(true);
                // 設(shè)置jedis實(shí)例空閑檢查連接可用性
                config.setTestWhileIdle(true);
                // 創(chuàng)建連接池
                // 獲取redis密碼
                String password = null;
                int timeout = 3000;
                String masterName = ((RedisOutputMeta) smi).getMasterName();
                Set<String> sentinels = new HashSet<String>();
                Iterator<Map<String, String>> it = jedisClusterNodes.iterator();
                while (it.hasNext()) {
                    Map<String, String> hostAndPort = it.next();
                    password = hostAndPort.get("auth");
                    sentinels.add(hostAndPort.get("hostname") + ":" + hostAndPort.get("port"));
                }
                pool = new JedisSentinelPool(masterName, sentinels, config, timeout, password);
                long end = System.currentTimeMillis();
                logBasic("建立連接池 毫秒:" + (end - start));
                return true;
            } catch (Exception e) {
                logError(BaseMessages.getString(PKG, "RedisInput.Error.ConnectError"), e);
                return false;
            }
        } else {
            return false;
        }
    }

步驟類核心代碼2:redis數(shù)據(jù)執(zhí)行寫入操作

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (RedisOutputMeta) smi;
        data = (RedisOutputData) sdi;
        // TODO Auto-generated method stub
        Jedis jedis = pool.getResource();
        Object[] r = getRow(); // get row, set busy!
        // If no more input to be expected, stop
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            // clone input row meta for now, we will change it (add or set inline) later
            data.outputRowMeta = getInputRowMeta().clone();
            // Get output field types
            meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
            keyfieldname = meta.getKeyFieldName();
            valuefieldname = meta.getValueFieldName();
            ttlfieldname = meta.getTtlFieldName();
            logBasic("keyfieldname:" + keyfieldname);
            logBasic("valuefieldname:" + valuefieldname);
            logBasic("ttlfieldname:" + ttlfieldname);
        }
        RedisOutputThread thread = new RedisOutputThread(this, jedis, r);
        service.submit(thread);
        if (checkFeedback(getLinesRead())) {
            if (log.isBasic()) {
                logBasic(BaseMessages.getString(PKG, "RedisOutput.Log.LineNumber") + getLinesRead());
            }
        }
        return true;
    }

元數(shù)據(jù)類核心代碼1:用戶關(guān)鍵配置信息檢查

@Override
    public void check(List<CheckResultInterface> remarks, TransMeta transMeta,
            StepMeta stepMeta, RowMetaInterface prev, String[] input,
            String[] output, RowMetaInterface info) {
        CheckResult cr;
        if (prev == null || prev.size() == 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
                    BaseMessages.getString(PKG,
                            "RedisOutputMeta.CheckResult.NotReceivingFields"),
                    stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG,
                            "RedisOutputMeta.CheckResult.StepRecevingData",
                            prev.size() + ""), stepMeta);
            remarks.add(cr);
        }
        // See if we have input streams leading to this step!
        if (input.length > 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG,
                            "RedisOutputMeta.CheckResult.StepRecevingData2"),
                    stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(
                    CheckResultInterface.TYPE_RESULT_ERROR,
                    BaseMessages
                            .getString(PKG,
                                    "RedisOutputMeta.CheckResult.NoInputReceivedFromOtherSteps"),
                    stepMeta);
            remarks.add(cr);
        }
    }

元數(shù)據(jù)類核心代碼2:對應(yīng)redis數(shù)據(jù)寫入操作盼砍,stepnode關(guān)鍵配置信息讀取

private void readData(Node stepnode) throws KettleXMLException {
        try {
            this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
            this.valueFieldName = XMLHandler
                    .getTagValue(stepnode, "valuefield");
            this.ttlFieldName = XMLHandler.getTagValue(stepnode, "ttlfield");
            this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
            Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
            int nrservers = XMLHandler.countNodes(serverNodes, "server");
            allocate(nrservers);
            for (int i = 0; i < nrservers; i++) {
                Node fnode = XMLHandler
                        .getSubNodeByNr(serverNodes, "server", i);
                Map<String, String> hostAndPort = new HashMap<String, String>();
                hostAndPort.put("hostname",
                        XMLHandler.getTagValue(fnode, "hostname"));
                hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
                hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
                if (i == 0) {
                    setJedisServer(hostAndPort);
                }
                servers.add(hostAndPort);
            }
        } catch (Exception e) {
            throw new KettleXMLException(BaseMessages.getString(PKG,
                    "RedisOutputMeta.Exception.UnableToReadStepInfo"), e);
        }
    }

多線程處理類RedisOutputThread

@Override
    public void run() {
        long start = System.currentTimeMillis();
        // Get value from redis, don't cast now, be lazy. TODO change this?
        int keyFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
                redisOutput.meta.getKeyFieldName());
        Object key = r[keyFieldIndex];
        int valueFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
                redisOutput.meta.getValueFieldName());
        Object value = r[valueFieldIndex];
        int ttlFieldIndex = redisOutput.getInputRowMeta().indexOfValue(
                redisOutput.meta.getTtlFieldName());
        Object ttl = r[ttlFieldIndex];
        String keyString = key.toString();
        String valueString = value.toString();
        String ttlString = ttl.toString();
        if (keyString != null && !StringUtil.isEmpty(keyString)) {
            String getKeyValue = jedis.get(keyString);
            boolean existsKey = jedis.exists(keyString);
            // 如果key已存在,就只更新value數(shù)據(jù)黔宛;如果key不存在臀晃,更新數(shù)據(jù)同時(shí)設(shè)置key過期時(shí)間(單位:秒)
            if (existsKey) {
                jedis.set(keyString, valueString);
                redisOutput
                        .logBasic(" This key already exists, so only the corresponding value= "
                                + getKeyValue + " is updated.");
            } else {
                if (ttlString != null && !StringUtil.isEmpty(ttlString)) {
                    jedis.set(keyString, valueString, "NX", "EX",
                            Integer.parseInt(ttlString));
                } else {
                    jedis.set(keyString, valueString, "NX", "EX",
                            2 * 24 * 60 * 60);
                }
            }
        }
        try {
            redisOutput.putRow(redisOutput.data.outputRowMeta, r);
            // redisOutput.rowkey.add(r[idFieldIndex]);
        } catch (KettleStepException e) {
            e.printStackTrace();
        }
        jedis.close();
        long end = System.currentTimeMillis();
        redisOutput.logBasic("Redis_Key:" + keyString + " ,Redis_Value:"
                + valueString + " ,Redis_TTL:" + ttlString + " ,processRow "
                + (end - start) + "  milliseconds .");
    }

2.4.2案淋、讀取插件

image.png

步驟類核心代碼1:初始化Redis連接配置(同上)

步驟類核心代碼2:根據(jù)用戶配置數(shù)據(jù)類型誉碴,動態(tài)選擇對應(yīng)Redis的API讀取Redis數(shù)據(jù)庫對應(yīng)key的數(shù)據(jù)

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (RedisInputMeta) smi;
        data = (RedisInputData) sdi;
        Object[] r = getRow(); // get row, set busy!
        // If no more input to be expected, stop
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            // clone input row meta for now, we will change it (add or set inline) later
            data.outputRowMeta = getInputRowMeta().clone();
            // Get output field types
            meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
            keytype = meta.getKeyTypeFieldName();
            logBasic("keytype:" + keytype);
            valuetype = meta.getValueTypeName();
            logBasic("valuetype:" + valuetype);
            mastername = meta.getMasterName();
            logBasic("mastername:" + mastername);
        }
        // Get value from redis, don't cast now, be lazy. TODO change this?
        int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
        if (keyFieldIndex < 0) {
            throw new KettleException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.KeyFieldNameNotFound"));
        }
        int key2Index = -1;
        if (keytype.equals("hash")) {
            key2Index = getInputRowMeta().indexOfValue(meta.getKey2FieldName());
            if (key2Index < 0) {
                throw new KettleException(
                        BaseMessages.getString(PKG, "RedisOutputMeta.Exception.Key2FieldNameNotFound"));
            }
        }
        StringBuffer fetchedValue = new StringBuffer("");
        
        Jedis jedis = pool.getResource();
        try {
        if (keytype.equals("string")) {
            String value =jedis.get((String) (r[keyFieldIndex]));
            fetchedValue.append(value).append("|");
        } else if (keytype.equals("hash")) {
            String res = jedis.hget((String) r[keyFieldIndex], (String) (r[key2Index]));
            fetchedValue.append(res + "|");
        } else if (keytype.equals("hashall")) {
            Map<String, String> map = jedis.hgetAll((String) r[keyFieldIndex]);
            for (Map.Entry<String, String> entry : map.entrySet()) {
                fetchedValue.append(entry.getKey() + ":" + entry.getValue() + "|");
            }
        } else if (keytype.equals("list")) {
            List<String> list = jedis.lrange((String) r[keyFieldIndex], 0, -1);
            for (String s : list) {
                fetchedValue.append(s).append("|");
            }
        } else if (keytype.equals("set")) {
            Set<String> set = jedis.smembers((String) r[keyFieldIndex]);
            for (String s : set) {
                fetchedValue.append(s).append("|");
            }
        } else if (keytype.equals("zset")) {
            Set<String> set = jedis.zrangeByScore((String) r[keyFieldIndex], 0, -1);
            for (String s : set) {
                fetchedValue.append(s).append("|");
            }
        } else if (keytype.equals("keys")) {
            Set<String> set = jedis.keys((String) r[keyFieldIndex]);
            for (String s : set) {
                fetchedValue.append(s).append("|");
            }
        }
        } finally {
            jedis.close();
        }
        
        String output;
        if (fetchedValue.length() > 1)
            output = fetchedValue.substring(0, fetchedValue.length() - 1);
        else
            output = fetchedValue.toString();
        // Add Value data name to output, or set value data if already exists
        //logBasic("output:" + output);
        Object[] outputRowData = r;
        int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
        if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
            // Not found so add it
            outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), output);
        } else {
            // Update value in place
            outputRowData[valueFieldIndex] = output;
        }
        putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
        if (checkFeedback(getLinesRead())) {
            if (log.isBasic()) {
                logBasic(BaseMessages.getString(PKG, "RedisInput.Log.LineNumber") + getLinesRead());
            }
        }
        return true;
    }

元數(shù)據(jù)類核心代碼1:用戶關(guān)鍵配置信息檢查

@Override
    public void check(List<CheckResultInterface> remarks,
                      TransMeta transMeta, StepMeta stepMeta,
                      RowMetaInterface prev,
                      String[] input, String[] output,
                      RowMetaInterface info) {
        CheckResult cr;
        if (prev == null || prev.size() == 0) {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.NotReceivingFields"), stepMeta);
            remarks.add(cr);
        } else {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.StepRecevingData", prev.size() + ""), stepMeta);
            remarks.add(cr);
        }
        // See if we have input streams leading to this step!
        if (input.length > 0) {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.StepRecevingData2"), stepMeta);
            remarks.add(cr);
        } else {
            cr =
                    new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString(PKG,
                            "RedisInputMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
            remarks.add(cr);
        }
    }

元數(shù)據(jù)類核心代碼2:對應(yīng)redis數(shù)據(jù)讀取操作,stepnode關(guān)鍵配置信息讀取

private void readData(Node stepnode) throws KettleXMLException {
        try {
            this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfield");
            this.keyTypeFieldName = XMLHandler.getTagValue(stepnode, "keytypefield");
            this.key2FieldName = XMLHandler.getTagValue(stepnode, "key2field");
            this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefield");
            this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
            this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
            Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
            int nrservers = XMLHandler.countNodes(serverNodes, "server");
            allocate(nrservers);
            for (int i = 0; i < nrservers; i++) {
                Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
                Map<String,String> hostAndPort = new HashMap<String,String>();
                hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
                hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
                hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
                if (i == 0) {
                    setJedisServer(hostAndPort);
                }
                servers.add(hostAndPort);
            }
        } catch (Exception e) {
            throw new KettleXMLException(BaseMessages.getString(PKG, "RedisInputMeta.Exception.UnableToReadStepInfo"),
                    e);
        }
    }

2.4.3奋岁、刪除插件

image.png

  • RedisDelete:步驟類厦取,初始化Redis連接配置虾攻,處理數(shù)據(jù)流數(shù)據(jù)霎箍,執(zhí)行刪除操作
  • RedisDeleteData:數(shù)據(jù)類漂坏,定義kettle刪除插件的RowMetaInterface
  • RedisDeleteDialog:對話框類顶别,定義kettle刪除插件redis find del的UI Swing的對話框
  • RedisDeleteMeta:元數(shù)據(jù)類驯绎,根據(jù)配置執(zhí)行redis操作剩失,并定義kettle刪除插件顯示類

步驟類核心代碼1:初始化Redis連接配置(同上)

步驟類核心代碼2:redis數(shù)據(jù)執(zhí)行刪除操作

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        meta = (RedisDeleteMeta) smi;
        data = (RedisDeleteData) sdi;
        Object[] r = getRow(); // get row, set busy!
        // If no more input to be expected, stop
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            // clone input row meta for now, we will change it (add or set inline) later
            data.outputRowMeta = getInputRowMeta().clone();
            // Get output field types
            meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
            keyfieldname = meta.getKeyFieldName();
            logBasic("keyfieldname:" + keyfieldname);
            valuefieldname = meta.getValueFieldName();
            logBasic("valueFieldName:" + valuefieldname);
            valuetype = meta.getValueTypeName();
            logBasic("valuetype:" + valuetype);
            mastername = meta.getMasterName();
            logBasic("mastername:" + mastername);
        }
        // Get value from redis, don't cast now, be lazy. TODO change this?
        int keyFieldIndex = getInputRowMeta().indexOfValue(meta.getKeyFieldName());
        if (keyFieldIndex < 0) {
            throw new KettleException(BaseMessages.getString(PKG, "RedisDeletetMeta.Exception.KeyFieldNameNotFound"));
        }
        Jedis jedis = pool.getResource();
        String outPutValue = "";
        String[] delKeys=r[keyFieldIndex].toString().split(",");
        try {
            Long delete_flag = jedis.del(delKeys);
            outPutValue = String.valueOf(delete_flag);
        } finally {
            jedis.close();
        }
        // logBasic("output:" + output);
        Object[] outputRowData = r;
        int valueFieldIndex = getInputRowMeta().indexOfValue(meta.getValueFieldName());
        if (valueFieldIndex < 0 || valueFieldIndex > outputRowData.length) {
            // Not found so add it
            outputRowData = RowDataUtil.addValueData(r, getInputRowMeta().size(), outPutValue);
        } else {
            // Update value in place
            outputRowData[valueFieldIndex] = outPutValue;
        }
        putRow(data.outputRowMeta, outputRowData); // copy row to possible alternate rowset(s).
        if (checkFeedback(getLinesRead())) {
            if (log.isBasic()) {
                logBasic(BaseMessages.getString(PKG, "RedisDelete.Log.LineNumber") + getLinesRead());
            }
        }
        return true;
    }

元數(shù)據(jù)類核心代碼1:用戶關(guān)鍵配置信息檢查

@Override
    public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev,
            String[] input, String[] output, RowMetaInterface info) {
        CheckResult cr;
        if (prev == null || prev.size() == 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NotReceivingFields"), stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData", prev.size() + ""),
                    stepMeta);
            remarks.add(cr);
        }
        // See if we have input streams leading to this step!
        if (input.length > 0) {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_OK,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.StepRecevingData2"), stepMeta);
            remarks.add(cr);
        } else {
            cr = new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR,
                    BaseMessages.getString(PKG, "RedisDeleteMeta.CheckResult.NoInputReceivedFromOtherSteps"), stepMeta);
            remarks.add(cr);
        }
    }

元數(shù)據(jù)類核心代碼2:對應(yīng)redis數(shù)據(jù)刪除操作演熟,stepnode關(guān)鍵配置信息讀取

private void readData(Node stepnode) throws KettleXMLException {
        try {
            this.keyFieldName = XMLHandler.getTagValue(stepnode, "keyfieldname");
            this.valueFieldName = XMLHandler.getTagValue(stepnode, "valuefieldname");
            this.valueTypeName = XMLHandler.getTagValue(stepnode, "valuetype");
            this.masterName = XMLHandler.getTagValue(stepnode, "mastername");
            Node serverNodes = XMLHandler.getSubNode(stepnode, "servers");
            int nrservers = XMLHandler.countNodes(serverNodes, "server");
            allocate(nrservers);
            for (int i = 0; i < nrservers; i++) {
                Node fnode = XMLHandler.getSubNodeByNr(serverNodes, "server", i);
                Map<String, String> hostAndPort = new HashMap<String, String>();
                hostAndPort.put("hostname", XMLHandler.getTagValue(fnode, "hostname"));
                hostAndPort.put("port", XMLHandler.getTagValue(fnode, "port"));
                hostAndPort.put("auth", XMLHandler.getTagValue(fnode, "auth"));
                if (i == 0) {
                    setJedisServer(hostAndPort);
                }
                servers.add(hostAndPort);
            }
        } catch (Exception e) {
            throw new KettleXMLException(BaseMessages.getString(PKG, "RedisDeleteMeta.Exception.UnableToReadStepInfo"),
                    e);
        }
    }

三绽媒、使用說明

3.1、Redis Output插件

①Key:動態(tài)key字段猎提,從上一步驟數(shù)據(jù)流動態(tài)獲认撬铡(必選) 
②Value:動態(tài)value字段伞租,從上一步驟數(shù)據(jù)流動態(tài)獲瓤(必選) 
③TTL:key超時(shí)時(shí)間作喘,默認(rèn)值172800秒泞坦,從上一步驟數(shù)據(jù)流動態(tài)獲取贰锁,可對單行數(shù)據(jù)做控制(必選) 
④mastername:Redis sentinels哨兵模式集群master名稱(必選) 
⑤server ip:服務(wù)端ip列表(必選) 
⑥server port:哨兵端口列表,和服務(wù)端IP一一對應(yīng)(必選) 
⑦認(rèn)證密鑰:服務(wù)端IP對應(yīng)鑒權(quán)密碼(可選)
image.png

3.2、Redis Input插件

①Key Field:動態(tài)key字段粤攒,從上一步驟數(shù)據(jù)流動態(tài)獲却殉帧(必選) 
②Key type:key數(shù)據(jù)類型,從集合列表選擇(必選) 
③Hash值:動態(tài)hash值字段盔几,從上一步驟數(shù)據(jù)流動態(tài)獲妊放摹(可選)
④Value Field:動態(tài)value字段芯丧,從上一步驟數(shù)據(jù)流動態(tài)獲扔Ш恪(必選) 
⑤Value type:value數(shù)據(jù)類型,從集合列表選擇(必選) 
⑥mastername:Redis sentinels哨兵模式集群master名稱(必選) 
⑦h(yuǎn)ostname:服務(wù)端ip列表(必選) 
⑧host port:哨兵端口列表岭佳,和服務(wù)端IP一一對應(yīng)(必選) 
⑨auth:服務(wù)端IP對應(yīng)鑒權(quán)密碼(可選)
image.png

3.3珊随、Redis Delete插件

①Key:動態(tài)key字段叶洞,從上一步驟數(shù)據(jù)流動態(tài)獲取(必選) 
②輸出字段名:delete操作帆焕,返回自定義輸出字段名稱(必選) 
③Value type:value數(shù)據(jù)類型叶雹,從集合列表選擇(必選) 
④mastername:Redis sentinels哨兵模式集群master名稱(必選) 
⑤server ip:服務(wù)端ip列表(必選) 
⑥server port:哨兵端口列表钥星,和服務(wù)端IP一一對應(yīng)(必選) 
⑦認(rèn)證密鑰:服務(wù)端IP對應(yīng)鑒權(quán)密碼(可選)
image.png

四谦炒、總結(jié)

如果你需要源碼或者了解更多自定義插件及集成方式宁改,抑或有開發(fā)過程或者使用過程中的任何疑問或建議还蹲,請關(guān)注小編"游走在數(shù)據(jù)之間"谜喊,回復(fù)2查看源代碼,回復(fù)3獲取入門視頻斗遏。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末山卦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子最易,更是在濱河造成了極大的恐慌怒坯,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藻懒,死亡現(xiàn)場離奇詭異,居然都是意外死亡视译,警方通過查閱死者的電腦和手機(jī)嬉荆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人弥虐,你說我怎么就攤上這事。” “怎么了缤底?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵坑鱼,是天一觀的道長呼股。 經(jīng)常有香客問我允扇,道長狭园,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘搀菩。我一直安慰自己土砂,他們只是感情好吴叶,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般避矢。 火紅的嫁衣襯著肌膚如雪卸勺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天芽淡,我揣著相機(jī)與錄音,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛向抢,可吹牛的內(nèi)容都是我干的亩冬。 我是一名探鬼主播覆享,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼巍糯,長吁一口氣:“原來是場噩夢啊……” “哼宅楞!你這毒婦竟也來了婶希?” 一聲冷哼從身側(cè)響起狰晚,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤换棚,失蹤者是張志新(化名)和其女友劉穎夕玩,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡兔仰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片告匠。...
    茶點(diǎn)故事閱讀 38,569評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡裸诽,死狀恐怖埂蕊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布恨统,位于F島的核電站畴蒲,受9級特大地震影響蔫骂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一桦沉、第九天 我趴在偏房一處隱蔽的房頂上張望埠褪。 院中可真熱鬧渴语,春花似錦、人聲如沸技肩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工摄咆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留影锈,地道東北人辆床。 一個(gè)月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親议蟆。 傳聞我的和親對象是個(gè)殘疾皇子疟丙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容