Long time no see!
在使用opc之前我們先了解一下什么是opc,首先OPC包含三個概念模型:
- OPC Server
-
OPC Group(注意這個加粗!6ξ摹D赐铩)
- OPC Item
關于這三個概念模型具體含義,我就不一一贅述了蓉坎,大家可點進下面的連接查看
http://www.laomaozy.com/W-Z/208219.html
首先胡嘿,我們的需求是衷敌,用Java寫一個OPC客戶端程序,定時從OPC服務讀數據助琐,那么我們來看下網上的DEMO咋寫的:
public static void test() throws Exception {
final ConnectionInformation ci = new ConnectionInformation();
ci.setHost("10.211.55.4");
ci.setUser("OPCUser");
ci.setPassword("opcuser");
ci.setClsid("7BC0CC8E-482C-47CA-ABDC-0FE7F9C6E729");
Item item = null;
Server server = new Server(ci, null);
try {
server.connect();
Group group = server.addGroup();
item = group.addItem("tongdao.tag1.aaa");
System.out.println("讀取到的值為:" + getVal(item.read(true).getValue()));
server.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
我們簡單理解下面氓,和大部分通信客戶端一個步驟:建立通道連接舌界,傳參,取數據屁药,關閉連接
如果我們需要取多個點號的數據怎么辦柏锄?
在外面套一層for循環(huán)對吧(沒錯趾娃,我有個朋友也是這么想的)
那么就有了如下寫法:
public static void test() throws Exception {
final ConnectionInformation ci = new ConnectionInformation();
ci.setHost("10.211.55.4");
ci.setUser("OPCUser");
ci.setPassword("opcuser");
ci.setClsid("7BC0CC8E-482C-47CA-ABDC-0FE7F9C6E729");
Item item = null;
Server server = new Server(ci, null);
List<String> itemIdList = new ArrayList<>(); // 假設這里面有數據
try {
server.connect();
for(String itemId : itemIdList) {
Group group = server.addGroup();
item = group.addItem("tongdao.tag1.aaa");
System.out.println("讀取到的值為:" + getVal(item.read(true).getValue()));
}
server.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
寫完后信心滿滿,仿佛屈屈OPC妇蛀,不過如此!
到了測試環(huán)節(jié)眷茁,我們真實環(huán)境大概兩萬個點位上祈,數據量也不是很大浙芙,不是分分鐘測試成功?
結果是:人家的OPC Server服務被(我的一個朋友)成功搞掛了纸俭;
看來不能一次性取這么多啊南窗,這個OPC矾瘾。。蛉迹。放妈。。8太行啊
那我們先取1000個試試水珍策,看看性能究竟如何
代碼主要邏輯不變宅倒,我們把參數長度設為1000
List<String> itemIdList = new ArrayList<>(); // 假設這里面有數據
itemIdList = itemIdList.subList(0, 1000); //只查1000條數據拐迁,測測性能
執(zhí)行完時間大概在8秒左右
嘶~倒吸一口涼氣,這1000條就要8秒铺韧,那我2w條不得 8 * 20 = 160s?
兩分多鐘缓淹,也還行,湾盗,罢维,,
但是!作為一枚專業(yè)及嚴謹的程序員平窘,怎么能忍受得了2分多鐘的時長凳怨!不行!絕對不行紫新!
此時我那個朋友萌生出了另一套方案芒率,單線程1000條8s左右篙顺,那我開雙線程分別查詢1000條是不是也是8s左右了捏?這樣折合下來能節(jié)約一半的時間嘛匪蟀,而且可以的話我們可以多建幾個線程去跑材彪,這樣時間又會以指數級下降
于是有了以下這段代碼:
package com.oukong.framework;
import org.jinterop.dcom.common.JIErrorCodes;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.lib.common.ConnectionInformation;
import org.openscada.opc.lib.da.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
public class OpcTest3 {
public static void main(String[] args) throws Exception {
test();
}
public static void test() throws Exception {
final ConnectionInformation ci = new ConnectionInformation();
ci.setHost("10.211.55.4");
ci.setUser("OPCUser");
ci.setPassword("opcuser");
ci.setClsid("7BC0CC8E-482C-47CA-ABDC-0FE7F9C6E729");
Item item = null;
Server server = new Server(ci, null);
List<String> itemIdList = new ArrayList<>(); // 假設這里面有數據
List<String> itemList1 = itemIdList.subList(0, 1000);
List<String> itemList2 = itemIdList.subList(1000, 2000);
Map<String, Object> result = new HashMap<>();
try {
server.connect();
CountDownLatch countDownLatch = new CountDownLatch(2); //線程計數器
OpcThread1 thread = new OpcThread1(server, itemList1, countDownLatch, result);
OpcThread1 thread2 = new OpcThread1(server, itemList2, countDownLatch, result);
thread.start();
thread2.start();
countDownLatch.await(); //等待兩個線程都執(zhí)行完成
server.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class OpcThread1 extends Thread {
private List<String> itemList;
private Server server;
private CountDownLatch countDownLatch;
private Map<String, Object> result;
public OpcThread1(Server server, List<String> itemList, CountDownLatch countDownLatch,
Map<String, Object> result) {
this.server = server;
this.itemList = itemList;
this.countDownLatch = countDownLatch;
this.result = result;
}
@Override
public void run() {
Group group = null;
try {
server.connect();
for(String itemId : itemList) {
Group group = server.addGroup();
Item item = group.addItem("tongdao.tag1.aaa");
System.out.println("讀取到的值為:" + getVal(item.read(true).getValue()));
}
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后執(zhí)行結果不出所料,和預想的時間相差不大凤类,但是谜疤,我們忽略了一個問題现诀,這個OPC服務的吞吐量8太行履肃,真怕一不注意就把服務給搞掛了尺棋,所以,該方法雖然表面上可行成福,但是荆残,内斯,,潭苞,下流真朗,,秀菱,蹭睡,
難道真的沒別的方法了嗎肩豁?
關于上面提到的組,沒有利用空間的嗎琼锋?
于是我那個朋友翻了Group對象的源碼祟昭,發(fā)現提供了這么個東東
該對象提供了添加多個Item的方法谜叹!也就意味著我們可以一次性傳多個參數進行請求,然后回給我們返回一個集合艳悔!
于是我們有了如下代碼:
@Override
public void run() {
Group group = null;
try {
group = server.addGroup();
String[] items = itemList.toArray(new String[]{});
Map<String, Item> itemResult = group.addItems(items);
for(String key : itemResult.keySet()) {
Item itemMap = itemResult.get(key);
result.put(key, getVal(itemMap.read(true).getValue()));
}
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
上面的代碼中我們用一個map將返回值給存了起來
再測一下猜年!
結果:單線程情況下疾忍,請求1000條數據,耗時3秒左右
emmm....雖然進步了袁稽,但是1000條還是要3秒擒抛,2w條還是要1分鐘左右歧沪,這能忍莲组?不能锹杈!
于是我還想再找找有沒有別的我沒注意到的地方了,然后就發(fā)下了一行比較詭異的代碼:
正常我們取值不是直接getValue()就好了邪码?這個為什么還要read一下闭专?
然后發(fā)現真正從客戶端讀取數據的就是這一行旧烧,我那個朋友之前誤以為這個玩意兒就是取完值的集合了
但事實是掘剪,我們雖然分了組,但是取數據的時候還是一個item一個item地取廉赔,所以會如此之慢,浪册,村象,攒至,然后我又扒了一遍group的源碼,又發(fā)現了這個東東:
看清沒,我們之前是通過item去read值志膀,但事實是溉浙,人家group本身就有一個read,而且將真正的值的集合返回給了我們馆蠕,再稍微推敲一下惊奇,是不是這個組建好之后,我們可以取多次數據了吼渡。诞吱。竭缝。抬纸。
知道了這個方法,稍微改造了下代碼,我批量穿參時候阿趁,也批量取脖阵,于是就有了如下代碼!N啬拧:纺肌W寡纭(此處應有閃光特效)
package com.oukong.framework;
import org.jinterop.dcom.common.JIErrorCodes;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.dcom.da.OPCSERVERSTATE;
import org.openscada.opc.lib.common.AlreadyConnectedException;
import org.openscada.opc.lib.common.ConnectionInformation;
import org.openscada.opc.lib.common.NotConnectedException;
import org.openscada.opc.lib.da.*;
import java.io.*;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
public class OpcDaTest2 {
public static void test(List<String> itemList) {
List<String> itemList1 = itemList.subList(0, 500);
List<String> itemList2 = itemList.subList(1000, 2000);
final ConnectionInformation ci = new ConnectionInformation();
ci.setHost("10.10.1.13"); // KEPServer服務器所在IP
ci.setDomain(""); // 域 為空
ci.setUser("OPCuser");
ci.setPassword("Sa2022");
ci.setClsid("4B12BF21-3C60-4C48-A47F-E5F1E3BCFD34"); // OPCServer的注冊表ID喜鼓,可以在“組件服務”中查到
Item item = null;
Server server = new Server(ci, null);
Map<String, Object> result = new HashMap<>();
try {
server.connect();
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(1);
OpcThread thread = new OpcThread(server, itemList1, countDownLatch, result);
thread.start();
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("totalSize: " + result.size() + "\tuse :" + (end - start) + "ms");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class OpcThread extends Thread {
private List<String> itemList;
private Server server;
private CountDownLatch countDownLatch;
private Map<String, Object> result;
public OpcThread(Server server, List<String> itemList, CountDownLatch countDownLatch,
Map<String, Object> result) {
this.server = server;
this.itemList = itemList;
this.countDownLatch = countDownLatch;
this.result = result;
}
@Override
public void run() {
Group group = null;
try {
// 建組
long s = System.currentTimeMillis();
group = server.addGroup();
String[] items = itemList.toArray(new String[]{});
Map<String, Item> itemResult = group.addItems(items);
System.out.println(itemResult.size());
long e = System.currentTimeMillis();
System.out.println("建組耗時:" + (e - s));
//第一次取數據
long start = System.currentTimeMillis();
Set itemSet = new HashSet(itemResult.values());
Item[] itemArr = new Item[itemSet.size()];
itemSet.toArray(itemArr);
Map<Item, ItemState> resultMap = group.read(true, itemArr);
for(Item key : resultMap.keySet()) {
ItemState itemMap = resultMap.get(key);
result.put(key.getId(), getVal(itemMap.getValue()));
}
long end = System.currentTimeMillis();
System.out.println("group1 totalSize1 : " + itemResult.size() + "\tuse :" + (end - start) + "ms");
//第二次取數據
long start2 = System.currentTimeMillis();
Map<Item, ItemState> resultMap2 = group.read(true, itemArr);
for(Item key : resultMap2.keySet()) {
ItemState itemMap = resultMap2.get(key);
result.put(key.getId(), getVal(itemMap.getValue()));
}
long end2 = System.currentTimeMillis();
System.out.println("group1 totalSize2 : " + resultMap2.size() + "\tuse :" + (end2 - start2) + "ms");
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
這下我們徹底將組利用了起來,執(zhí)行結果如下:
建組耗時:177214
group1 totalSize1 : 24000 use :6467ms
group1 totalSize2 : 24000 use :1526ms
我們將最初3分鐘的時間顿锰,優(yōu)化到了6秒鐘硼控,就問6不6胳赌?
最后附上完整的依賴和代碼
maven
<!--utgard -->
<dependency>
<groupId>org.openscada.utgard</groupId>
<artifactId>org.openscada.opc.lib</artifactId>
<version>1.5.0</version>
<exclusions>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.65</version>
</dependency>
<dependency>
<groupId>org.openscada.utgard</groupId>
<artifactId>org.openscada.opc.dcom</artifactId>
<version>1.5.0</version>
</dependency>
OPC客戶端
import lombok.extern.slf4j.Slf4j;
import org.jinterop.dcom.common.JIErrorCodes;
import org.jinterop.dcom.common.JIException;
import org.jinterop.dcom.core.JIVariant;
import org.openscada.opc.dcom.da.OPCSERVERSTATE;
import org.openscada.opc.lib.common.AlreadyConnectedException;
import org.openscada.opc.lib.common.ConnectionInformation;
import org.openscada.opc.lib.common.NotConnectedException;
import org.openscada.opc.lib.da.*;
import java.net.UnknownHostException;
import java.util.*;
/**
* @Auther: 夏
* @DATE: 2022/6/8 14:58
* @Description: opc da客戶端
*/
@Slf4j
public class OpcDAClient {
private String host;
private String user;
private String password;
private String clsId;
private Server server;
private String bakHost;
private Integer groupCount;
private Group group = null;
private Map<String, Item> groupItems = null;
/**
* 初始化連接信息
* @param host
* @param user
* @param password
* @param clsId
*/
public OpcDAClient(String host, String user, String password, String clsId, Integer groupCount) {
this.host = host;
this.user = user;
this.password = password;
this.clsId = clsId;
this.groupCount = groupCount;
}
/**
* 設置備用服務地址
* @param bakHost
*/
public void setBakHost(String bakHost) {
this.bakHost = bakHost;
}
/**
* 創(chuàng)建連接
*/
public void connect() {
if(server.getServerState() != null && server.getServerState().getServerState().equals(OPCSERVERSTATE.OPC_STATUS_RUNNING)) {
return;
}
final ConnectionInformation ci = new ConnectionInformation();
ci.setHost(host);
ci.setDomain(""); // 域 為空
ci.setUser(user);
ci.setPassword(password);
ci.setClsid(clsId);
server = new Server(ci, null);
try {
server.connect();
} catch (UnknownHostException e) {
e.printStackTrace();
log.error("opc 地址錯誤:", e);
} catch (JIException e) {
e.printStackTrace();
log.error("opc 連接失敗:", e);
log.info("開始連接備用服務...");
try {
ci.setHost(bakHost);
server = new Server(ci, null);
server.connect();
} catch (Exception e2) {
log.error("備用服務連接失敗:", e2);
}
} catch (AlreadyConnectedException e) {
e.printStackTrace();
log.error("opc 已連接:", e);
}
log.info("OPC Server connect success...");
}
/**
* 根據地址獲取數據
* @param itemId
* @return
*/
public Object getItemValue(String itemId) {
try {
Group group = server.addGroup();
Item item = group.addItem(itemId);
return getVal(item.read(true).getValue());
} catch (Exception e) {
e.printStackTrace();
log.error("獲取數據異常 itemId:{}", itemId, e);
}
return null;
}
/**
* 獲取多組數據
* @param itemIdList
* @return
*/
public Map<String, Object> getItemValues(List<String> itemIdList) {
Map<String, Object> result = new HashMap<>();
try {
if(groupItems == null || group == null) {
log.info("開始建組...");
group = server.addGroup();
String[] items = itemIdList.toArray(new String[]{});
groupItems = group.addItems(items);
log.info("組建完成撼短,開始查詢數據...");
}
Set itemSet = new HashSet(groupItems.values());
Item[] itemArr = new Item[itemSet.size()];
itemSet.toArray(itemArr);
Map<Item, ItemState> resultMap = group.read(true, itemArr);
log.info("數據獲取完成:{}條", resultMap.size());
for(Item item : resultMap.keySet()) {
ItemState itemMap = resultMap.get(item);
result.put(item.getId(), getVal(itemMap.getValue()));
}
} catch (Exception e) {
e.printStackTrace();
log.error("批量獲取數據異常:", e);
}
return result;
}
/**
* 獲取value
* @param var
* @return
* @throws JIException
*/
private static Object getVal(JIVariant var) throws JIException {
Object value;
int type = var.getType();
switch (type) {
case JIVariant.VT_I2:
value = var.getObjectAsShort();
break;
case JIVariant.VT_I4:
value = var.getObjectAsInt();
break;
case JIVariant.VT_I8:
value = var.getObjectAsLong();
break;
case JIVariant.VT_R4:
value = var.getObjectAsFloat();
break;
case JIVariant.VT_R8:
value = var.getObjectAsDouble();
break;
case JIVariant.VT_BSTR:
value = var.getObjectAsString2();
break;
case JIVariant.VT_BOOL:
value = var.getObjectAsBoolean();
break;
case JIVariant.VT_UI2:
case JIVariant.VT_UI4:
value = var.getObjectAsUnsigned().getValue();
break;
case JIVariant.VT_EMPTY:
throw new JIException(JIErrorCodes.JI_VARIANT_IS_NULL, "Variant is Empty.");
case JIVariant.VT_NULL:
throw new JIException(JIErrorCodes.JI_VARIANT_IS_NULL, "Variant is null.");
default:
throw new JIException(JIErrorCodes.JI_VARIANT_IS_NULL, "Unknown Type.");
}
return value;
}
/**
* 將一組數據平均分成n組
*
* @param source 要分組的數據源
* @param n 平均分成n組
* @param <T>
* @return
*/
private static <T> List<List<T>> averageAssign(List<T> source, int n) {
List<List<T>> result = new ArrayList<List<T>>();
int remainder = source.size() % n; //(先計算出余數)
int number = source.size() / n; //然后是商
int offset = 0;//偏移量
for (int i = 0; i < n; i++) {
List<T> value = null;
if (remainder > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remainder --;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
/**
* 關閉連接
*/
public void disconnect() {
server.disconnect();
if (null == server.getServerState()) {
log.info("OPC Server Disconnect...");
}
}
}
能看到這里的恭喜你,能看到友情提醒:Utgrad不支持opc server服務器在windows 1909之后的版本灾杰,如果遇到了熙参,請記住:
人和代碼有一個能跑就好了
淺薄之見昭娩,如有不嚴謹之處弄屡,還望評論指正