完成公司门户网站建设,广州网页设计公司排名,北京企业网站推广价格,wordpress用不了了使用我前面几篇博文的内容#xff0c;能够使用Python编写一个最简单的OPC UA /ModbusTCP网关。
从这个程序可以看出#xff1a;
应用OPC UA 并不难#xff0c;现在我们就可以应用到工程应用中#xff0c;甚至DIY项目也可以。不必采用复杂的工具软件。使用Python 来构建工… 使用我前面几篇博文的内容能够使用Python编写一个最简单的OPC UA /ModbusTCP网关。
从这个程序可以看出
应用OPC UA 并不难现在我们就可以应用到工程应用中甚至DIY项目也可以。不必采用复杂的工具软件。使用Python 来构建工业自动化领域的原型机程序是不错的选择。
OPCUA_modbus 网关
import sys
sys.path.insert(0, ..)
import time
from opcua import Server
from pyModbusTCP.client import ModbusClient # Modbus TCP Client
from pyModbusTCP import utils
if __name__ __main__:# setup our serverserver Server()server.set_endpoint(opc.tcp://127.0.0.1:48400/freeopcua/server/)# setup our own namespace, not really necessary but should as specuri http://examples.freeopcua.github.ioidx server.register_namespace(uri)# get Objects node, this is where we should put our nodesobjects server.get_objects_node()# populating our address spacemyobj objects.add_object(idx, MyObject)myvar myobj.add_variable(idx, MyVariable, 6.7)myvar.set_writable() # Set MyVariable to be writable by clientsModbusInterface ModbusClient(hostlocalhost, port502, unit_id1, auto_openTrue, auto_closeFalse) # starting!server.start()try:count 0while True:time.sleep(1)reg_lModbusInterface.read_input_registers(0,2)valutils.word_list_to_long(reg_l)print(utils.decode_ieee(val[0],False)) myvar.set_value(utils.decode_ieee(val[0],False))finally:#close connection, remove subcsriptions, etcserver.stop()
modbusTCP服务器程序
import argparse
from pyModbusTCP.server import ModbusServer, DataBank
from pyModbusTCP import utils
from datetime import datetimeimport numpy as np
Fs 8000
f 50
x0
coil_stateTrue
class MyDataBank(DataBank):A custom ModbusServerDataBank for override get_holding_registers method.def __init__(self):# turn off allocation of memory for standard modbus object types# only holding registers space will be replaced by dynamic build values.super().__init__(virtual_modeTrue)def get_coils(self, address, number1, srv_infoNone):global coil_statecoil_statenot coil_statereturn coil_statedef get_holding_registers(self, address, number1, srv_infoNone):Get virtual holding registers.# populate virtual registers dict with current datetime valuesnow datetime.now()return now.seconddef get_input_registers(self, address, number1, srv_infoNone):global xwavenp.sin(2 * np.pi * f * x / Fs)*10xx1b32_l[utils.encode_ieee(wave,False)]b16_l utils.long_list_to_word(b32_l)print(b16_l) return b16_lif __name__ __main__:# parse argsparser argparse.ArgumentParser()parser.add_argument(-H, --host, typestr, defaultlocalhost, helpHost (default: localhost))parser.add_argument(-p, --port, typeint, default502, helpTCP port (default: 502))args parser.parse_args()# init modbus server and start itserver ModbusServer(hostargs.host, portargs.port, data_bankMyDataBank())server.start()最后透过uaExpert 程序访问OPCUA Server。可以看到myVar 变量在变化。 OPCUA 客户端
import sys
sys.path.insert(0, ..)
import numpy as np
import matplotlib.pyplot as plt
from opcua import Clientx np.arange(0,1000,1,dtypenp.int16)
ynp.arange(-10,10,0.02,dtypenp.float32)
if __name__ __main__:client Client(opc.tcp://localhost:48400/freeopcua/server/)# client Client(opc.tcp://adminlocalhost:4840/freeopcua/server/) #connect using a usertry:client.connect()# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objectsroot client.get_root_node()print(Objects node is: , root)# Node objects have methods to read and write node attributes as well as browse or populate address spaceprint(Children of root are: , root.get_children())while True:myvar root.get_child([0:Objects, 2:MyObject, 2:MyVariable])obj root.get_child([0:Objects, 2:MyObject])print(myvar is: , myvar.get_value())ynp.append(y,myvar.get_value())ynp.delete(y, 0, axis0) plt.clf()plt.plot(x, y, ls-, lw2, labelplot figure)plt.legend()plt.show()plt.pause(0.1)# Stacked myvar access# print(myvar is: , root.get_children()[0].get_children()[1].get_variables()[0].get_value())finally:client.disconnect() 看出来了吧这些程序都短小精悍。编写程序是学习计算机网络协议最好的方法。