Java使用Utgrad從opc讀取數據(踩坑)

Long time no see!

在使用opc之前我們先了解一下什么是opc,首先OPC包含三個概念模型:

  • OPC Server
  • OPC Group(注意這個加粗!6ξ摹D赐铩)

  • OPC Item

關于這三個概念模型具體含義,我就不一一贅述了蓉坎,大家可點進下面的連接查看
http://www.laomaozy.com/W-Z/208219.html

首先胡嘿,我們的需求是衷敌,用Java寫一個OPC客戶端程序,定時從OPC服務讀數據助琐,那么我們來看下網上的DEMO咋寫的:

public static void test() throws Exception {
        final ConnectionInformation ci = new ConnectionInformation();
        ci.setHost("10.211.55.4");
        ci.setUser("OPCUser");
        ci.setPassword("opcuser");
        ci.setClsid("7BC0CC8E-482C-47CA-ABDC-0FE7F9C6E729");
        Item item = null;
        Server server = new Server(ci, null);
        try {
            server.connect();

            Group group = server.addGroup();
            item = group.addItem("tongdao.tag1.aaa");
            System.out.println("讀取到的值為:" + getVal(item.read(true).getValue()));

            server.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

我們簡單理解下面氓,和大部分通信客戶端一個步驟:建立通道連接舌界,傳參,取數據屁药,關閉連接

如果我們需要取多個點號的數據怎么辦柏锄?
在外面套一層for循環(huán)對吧(沒錯趾娃,我有個朋友也是這么想的)
那么就有了如下寫法:

public static void test() throws Exception {
        final ConnectionInformation ci = new ConnectionInformation();
        ci.setHost("10.211.55.4");
        ci.setUser("OPCUser");
        ci.setPassword("opcuser");
        ci.setClsid("7BC0CC8E-482C-47CA-ABDC-0FE7F9C6E729");
        Item item = null;
        Server server = new Server(ci, null);

        List<String> itemIdList = new ArrayList<>(); // 假設這里面有數據
        try {
            server.connect();

            for(String itemId : itemIdList) {
                Group group = server.addGroup();
                item = group.addItem("tongdao.tag1.aaa");
                System.out.println("讀取到的值為:" + getVal(item.read(true).getValue()));
            }

            server.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

寫完后信心滿滿,仿佛屈屈OPC妇蛀,不過如此!
到了測試環(huán)節(jié)眷茁,我們真實環(huán)境大概兩萬個點位上祈,數據量也不是很大浙芙,不是分分鐘測試成功?
結果是:人家的OPC Server服務被(我的一個朋友)成功搞掛了纸俭;

看來不能一次性取這么多啊南窗,這個OPC矾瘾。。蛉迹。放妈。。8太行啊
那我們先取1000個試試水珍策,看看性能究竟如何

代碼主要邏輯不變宅倒,我們把參數長度設為1000

List<String> itemIdList = new ArrayList<>(); // 假設這里面有數據
itemIdList = itemIdList.subList(0, 1000); //只查1000條數據拐迁,測測性能

執(zhí)行完時間大概在8秒左右

嘶~倒吸一口涼氣,這1000條就要8秒铺韧,那我2w條不得 8 * 20 = 160s?
兩分多鐘缓淹,也還行,湾盗,罢维,,
但是!作為一枚專業(yè)及嚴謹的程序員平窘,怎么能忍受得了2分多鐘的時長凳怨!不行!絕對不行紫新!

此時我那個朋友萌生出了另一套方案芒率,單線程1000條8s左右篙顺,那我開雙線程分別查詢1000條是不是也是8s左右了捏?這樣折合下來能節(jié)約一半的時間嘛匪蟀,而且可以的話我們可以多建幾個線程去跑材彪,這樣時間又會以指數級下降

于是有了以下這段代碼:

package com.oukong.framework;

import org.jinterop.dcom.common.JIErrorCodes;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.lib.common.ConnectionInformation;
import org.openscada.opc.lib.da.*;

import java.util.*;
import java.util.concurrent.CountDownLatch;

public class OpcTest3 {

    public static void main(String[] args) throws Exception {
        test();
    }

    public static void test() throws Exception {
        final ConnectionInformation ci = new ConnectionInformation();
        ci.setHost("10.211.55.4");
        ci.setUser("OPCUser");
        ci.setPassword("opcuser");
        ci.setClsid("7BC0CC8E-482C-47CA-ABDC-0FE7F9C6E729");
        Item item = null;
        Server server = new Server(ci, null);

        List<String> itemIdList = new ArrayList<>(); // 假設這里面有數據
        List<String> itemList1 = itemIdList.subList(0, 1000);
        List<String> itemList2 = itemIdList.subList(1000, 2000);
        Map<String, Object> result = new HashMap<>();
        try {
            server.connect();
            CountDownLatch countDownLatch = new CountDownLatch(2); //線程計數器
            OpcThread1 thread = new OpcThread1(server, itemList1, countDownLatch, result);
            OpcThread1 thread2 = new OpcThread1(server, itemList2, countDownLatch, result);
            thread.start();
            thread2.start();

            countDownLatch.await(); //等待兩個線程都執(zhí)行完成

            server.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

class OpcThread1 extends Thread {

    private List<String> itemList;

    private Server server;

    private CountDownLatch countDownLatch;

    private Map<String, Object> result;

    public OpcThread1(Server server, List<String> itemList, CountDownLatch countDownLatch,
                     Map<String, Object> result) {
        this.server = server;
        this.itemList = itemList;
        this.countDownLatch = countDownLatch;
        this.result = result;
    }

    @Override
    public void run() {
        Group group = null;
        try {
           server.connect();
            for(String itemId : itemList) {
                Group group = server.addGroup();
                Item item = group.addItem("tongdao.tag1.aaa");
                System.out.println("讀取到的值為:" + getVal(item.read(true).getValue()));
            }
            countDownLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

然后執(zhí)行結果不出所料,和預想的時間相差不大凤类,但是谜疤,我們忽略了一個問題现诀,這個OPC服務的吞吐量8太行履肃,真怕一不注意就把服務給搞掛了尺棋,所以,該方法雖然表面上可行成福,但是荆残,内斯,,潭苞,下流真朗,,秀菱,蹭睡,

難道真的沒別的方法了嗎肩豁?
關于上面提到的組,沒有利用空間的嗎琼锋?

于是我那個朋友翻了Group對象的源碼祟昭,發(fā)現提供了這么個東東


image.png

該對象提供了添加多個Item的方法谜叹!也就意味著我們可以一次性傳多個參數進行請求,然后回給我們返回一個集合艳悔!
于是我們有了如下代碼:

@Override
    public void run() {
        Group group = null;
        try {
            group = server.addGroup();
            String[] items = itemList.toArray(new String[]{});
            Map<String, Item> itemResult = group.addItems(items);
            for(String key : itemResult.keySet()) {
                Item itemMap = itemResult.get(key);
                result.put(key, getVal(itemMap.read(true).getValue()));
            }
            countDownLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

上面的代碼中我們用一個map將返回值給存了起來
再測一下猜年!
結果:單線程情況下疾忍,請求1000條數據,耗時3秒左右
emmm....雖然進步了袁稽,但是1000條還是要3秒擒抛,2w條還是要1分鐘左右歧沪,這能忍莲组?不能锹杈!

于是我還想再找找有沒有別的我沒注意到的地方了,然后就發(fā)下了一行比較詭異的代碼:


image.png

正常我們取值不是直接getValue()就好了邪码?這個為什么還要read一下闭专?

然后發(fā)現真正從客戶端讀取數據的就是這一行旧烧,我那個朋友之前誤以為這個玩意兒就是取完值的集合了


image.png

但事實是掘剪,我們雖然分了組,但是取數據的時候還是一個item一個item地取廉赔,所以會如此之慢,浪册,村象,攒至,然后我又扒了一遍group的源碼,又發(fā)現了這個東東:


image.png

看清沒,我們之前是通過item去read值志膀,但事實是溉浙,人家group本身就有一個read,而且將真正的值的集合返回給了我們馆蠕,再稍微推敲一下惊奇,是不是這個組建好之后,我們可以取多次數據了吼渡。诞吱。竭缝。抬纸。

知道了這個方法,稍微改造了下代碼,我批量穿參時候阿趁,也批量取脖阵,于是就有了如下代碼!N啬拧:纺肌W寡纭(此處應有閃光特效)

package com.oukong.framework;

import org.jinterop.dcom.common.JIErrorCodes;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.dcom.da.OPCSERVERSTATE;
import org.openscada.opc.lib.common.AlreadyConnectedException;
import org.openscada.opc.lib.common.ConnectionInformation;
import org.openscada.opc.lib.common.NotConnectedException;
import org.openscada.opc.lib.da.*;

import java.io.*;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

public class OpcDaTest2 {

    public static void test(List<String> itemList) {

        List<String> itemList1 = itemList.subList(0, 500);
        List<String> itemList2 = itemList.subList(1000, 2000);
        final ConnectionInformation ci = new ConnectionInformation();
        ci.setHost("10.10.1.13"); // KEPServer服務器所在IP
        ci.setDomain(""); // 域 為空
        ci.setUser("OPCuser");
        ci.setPassword("Sa2022");

        ci.setClsid("4B12BF21-3C60-4C48-A47F-E5F1E3BCFD34"); // OPCServer的注冊表ID喜鼓,可以在“組件服務”中查到
        Item item = null;
        Server server = new Server(ci, null);

        Map<String, Object> result = new HashMap<>();
        try {
            server.connect();
            long start = System.currentTimeMillis();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            OpcThread thread = new OpcThread(server, itemList1, countDownLatch, result);
            thread.start();
            countDownLatch.await();
            long end = System.currentTimeMillis();
            System.out.println("totalSize: " + result.size() + "\tuse :" + (end - start) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

class OpcThread extends Thread {

    private List<String> itemList;

    private Server server;

    private CountDownLatch countDownLatch;

    private Map<String, Object> result;

    public OpcThread(Server server, List<String> itemList, CountDownLatch countDownLatch,
                     Map<String, Object> result) {
        this.server = server;
        this.itemList = itemList;
        this.countDownLatch = countDownLatch;
        this.result = result;
    }

    @Override
    public void run() {
        Group group = null;
        try {
            // 建組
            long s = System.currentTimeMillis();
            group = server.addGroup();
            String[] items = itemList.toArray(new String[]{});
            Map<String, Item> itemResult = group.addItems(items);
            System.out.println(itemResult.size());
            long e = System.currentTimeMillis();

            System.out.println("建組耗時:" + (e - s));

            //第一次取數據
            long start = System.currentTimeMillis();
            Set itemSet = new HashSet(itemResult.values());
            Item[] itemArr = new Item[itemSet.size()];
            itemSet.toArray(itemArr);
            Map<Item, ItemState> resultMap = group.read(true, itemArr);
            for(Item key : resultMap.keySet()) {
                ItemState itemMap = resultMap.get(key);
                result.put(key.getId(), getVal(itemMap.getValue()));
            }
            long end = System.currentTimeMillis();
            System.out.println("group1 totalSize1 : " + itemResult.size() + "\tuse :" + (end - start) + "ms");

            //第二次取數據
            long start2 = System.currentTimeMillis();
            Map<Item, ItemState> resultMap2 = group.read(true, itemArr);
            for(Item key : resultMap2.keySet()) {
                ItemState itemMap = resultMap2.get(key);
                result.put(key.getId(), getVal(itemMap.getValue()));
            }
            long end2 = System.currentTimeMillis();
            System.out.println("group1 totalSize2 : " + resultMap2.size() + "\tuse :" + (end2 - start2) + "ms");
            countDownLatch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

這下我們徹底將組利用了起來,執(zhí)行結果如下:

建組耗時:177214
group1 totalSize1 : 24000   use :6467ms
group1 totalSize2 : 24000   use :1526ms

我們將最初3分鐘的時間顿锰,優(yōu)化到了6秒鐘硼控,就問6不6胳赌?

最后附上完整的依賴和代碼

maven

<!--utgard -->
      <dependency>
            <groupId>org.openscada.utgard</groupId>
            <artifactId>org.openscada.opc.lib</artifactId>
            <version>1.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.bouncycastle</groupId>
                    <artifactId>bcprov-jdk15on</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.65</version>
        </dependency>
        <dependency>
            <groupId>org.openscada.utgard</groupId>
            <artifactId>org.openscada.opc.dcom</artifactId>
            <version>1.5.0</version>
        </dependency>

OPC客戶端

import lombok.extern.slf4j.Slf4j;
import org.jinterop.dcom.common.JIErrorCodes;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.dcom.da.OPCSERVERSTATE;
import org.openscada.opc.lib.common.AlreadyConnectedException;
import org.openscada.opc.lib.common.ConnectionInformation;
import org.openscada.opc.lib.common.NotConnectedException;
import org.openscada.opc.lib.da.*;

import java.net.UnknownHostException;
import java.util.*;

/**
 * @Auther: 夏
 * @DATE: 2022/6/8 14:58
 * @Description: opc da客戶端
 */
@Slf4j
public class OpcDAClient {

    private String host;

    private String user;

    private String password;

    private String clsId;

    private Server server;

    private String bakHost;

    private Integer groupCount;

    private Group group = null;

    private Map<String, Item> groupItems = null;

    /**
     * 初始化連接信息
     * @param host
     * @param user
     * @param password
     * @param clsId
     */
    public OpcDAClient(String host, String user, String password, String clsId, Integer groupCount) {
        this.host = host;
        this.user = user;
        this.password = password;
        this.clsId = clsId;
        this.groupCount = groupCount;
    }

    /**
     * 設置備用服務地址
     * @param bakHost
     */
    public void setBakHost(String bakHost) {
        this.bakHost = bakHost;
    }

    /**
     * 創(chuàng)建連接
     */
    public void connect() {
        if(server.getServerState() != null && server.getServerState().getServerState().equals(OPCSERVERSTATE.OPC_STATUS_RUNNING)) {
            return;
        }
        final ConnectionInformation ci = new ConnectionInformation();
        ci.setHost(host);
        ci.setDomain(""); // 域 為空
        ci.setUser(user);
        ci.setPassword(password);

        ci.setClsid(clsId);
        server = new Server(ci, null);
        try {
            server.connect();
        } catch (UnknownHostException e) {
            e.printStackTrace();
            log.error("opc 地址錯誤:", e);
        } catch (JIException e) {
            e.printStackTrace();
            log.error("opc 連接失敗:", e);
            log.info("開始連接備用服務...");
            try {
                ci.setHost(bakHost);
                server = new Server(ci, null);
                server.connect();
            } catch (Exception e2) {
                log.error("備用服務連接失敗:", e2);
            }

        } catch (AlreadyConnectedException e) {
            e.printStackTrace();
            log.error("opc 已連接:", e);
        }
        log.info("OPC Server connect success...");
    }

    /**
     * 根據地址獲取數據
     * @param itemId
     * @return
     */
    public Object getItemValue(String itemId) {
        try {
            Group group = server.addGroup();
            Item item = group.addItem(itemId);
            return getVal(item.read(true).getValue());
        } catch (Exception e) {
            e.printStackTrace();
            log.error("獲取數據異常 itemId:{}", itemId, e);
        }
        return null;
    }

    /**
     * 獲取多組數據
     * @param itemIdList
     * @return
     */
    public Map<String, Object> getItemValues(List<String> itemIdList) {
        Map<String, Object> result = new HashMap<>();
        try {
            if(groupItems == null || group == null) {
                log.info("開始建組...");
                group = server.addGroup();
                String[] items = itemIdList.toArray(new String[]{});
                groupItems = group.addItems(items);
                log.info("組建完成撼短,開始查詢數據...");
            }

            Set itemSet = new HashSet(groupItems.values());
            Item[] itemArr = new Item[itemSet.size()];
            itemSet.toArray(itemArr);
            Map<Item, ItemState> resultMap = group.read(true, itemArr);
            log.info("數據獲取完成:{}條", resultMap.size());
            for(Item item : resultMap.keySet()) {
                ItemState itemMap = resultMap.get(item);
                result.put(item.getId(), getVal(itemMap.getValue()));
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("批量獲取數據異常:", e);
        }
        return result;
    }

    /**
     * 獲取value
     * @param var
     * @return
     * @throws JIException
     */
    private static Object getVal(JIVariant var) throws JIException {
        Object value;
        int type = var.getType();
        switch (type) {
            case JIVariant.VT_I2:
                value = var.getObjectAsShort();
                break;
            case JIVariant.VT_I4:
                value = var.getObjectAsInt();
                break;
            case JIVariant.VT_I8:
                value = var.getObjectAsLong();
                break;
            case JIVariant.VT_R4:
                value = var.getObjectAsFloat();
                break;
            case JIVariant.VT_R8:
                value = var.getObjectAsDouble();
                break;
            case JIVariant.VT_BSTR:
                value = var.getObjectAsString2();
                break;
            case JIVariant.VT_BOOL:
                value = var.getObjectAsBoolean();
                break;
            case JIVariant.VT_UI2:
            case JIVariant.VT_UI4:
                value = var.getObjectAsUnsigned().getValue();
                break;
            case JIVariant.VT_EMPTY:
                throw new JIException(JIErrorCodes.JI_VARIANT_IS_NULL, "Variant is Empty.");
            case JIVariant.VT_NULL:
                throw new JIException(JIErrorCodes.JI_VARIANT_IS_NULL, "Variant is null.");
            default:
                throw new JIException(JIErrorCodes.JI_VARIANT_IS_NULL, "Unknown Type.");
        }

        return value;
    }

    /**
     * 將一組數據平均分成n組
     *
     * @param source 要分組的數據源
     * @param n      平均分成n組
     * @param <T>
     * @return
     */
    private static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remainder = source.size() % n;  //(先計算出余數)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remainder > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remainder --;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    /**
     * 關閉連接
     */
    public void disconnect() {
        server.disconnect();
        if (null == server.getServerState()) {
            log.info("OPC Server Disconnect...");
        }
    }

}

能看到這里的恭喜你,能看到友情提醒:Utgrad不支持opc server服務器在windows 1909之后的版本灾杰,如果遇到了熙参,請記住:

人和代碼有一個能跑就好了

淺薄之見昭娩,如有不嚴謹之處弄屡,還望評論指正

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末膀捷,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子秀仲,更是在濱河造成了極大的恐慌神僵,老刑警劉巖覆劈,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異炮障,居然都是意外死亡坤候,警方通過查閱死者的電腦和手機白筹,發(fā)現死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來系馆,“玉大人它呀,你說我怎么就攤上這事∠滤恚” “怎么了谓媒?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵句惯,是天一觀的道長土辩。 經常有香客問我,道長抢野,這世上最難降的妖魔是什么拷淘? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮指孤,結果婚禮上启涯,老公的妹妹穿的比我還像新娘。我一直安慰自己恃轩,他們只是感情好结洼,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著叉跛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪筷厘。 梳的紋絲不亂的頭發(fā)上鸣峭,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天寒锚,我揣著相機與錄音桦他,去河邊找鬼犀被。 笑死赴精,一個胖子當著我的面吹牛,可吹牛的內容都是我干的扇售。 我是一名探鬼主播,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赫冬!你這毒婦竟也來了?” 一聲冷哼從身側響起溃列,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤劲厌,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后听隐,有當地人在樹林里發(fā)現了一具尸體补鼻,經...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了风范。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咨跌。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖硼婿,靈堂內的尸體忽然破棺而出锌半,到底是詐尸還是另有隱情,我是刑警寧澤寇漫,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布刊殉,位于F島的核電站,受9級特大地震影響州胳,放射性物質發(fā)生泄漏记焊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一栓撞、第九天 我趴在偏房一處隱蔽的房頂上張望遍膜。 院中可真熱鬧,春花似錦腐缤、人聲如沸捌归。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惜索。三九已至,卻和暖如春剃浇,著一層夾襖步出監(jiān)牢的瞬間巾兆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工虎囚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留角塑,地道東北人。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓淘讥,卻偏偏與公主長得像圃伶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蒲列,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355