python基于ModBusTCP服务端的业务实现特定的client

python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。

一、业务描述

我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。

1、ModBusTCP服务端交互流程

大概类似如下交互时序,其中PLC将用我们的脚本代替。

2、控制

根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。

3、状态

根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。

4、结果

根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。

5、 地址空间

(1)控制

类型:HoldingRegisters、Coils

起始地址与寄存器数量:自定义

(2)状态

类型:HoldingRegisters、DiscreteInputs、InputRegisters

起始地址与寄存器数量:自定义

(3)结果

类型:HoldingRegisters、InputRegisters

起始地址与寄存器数量:自定义

二、程序整体设计

class myModBusTCPclient(object):

    def __init__(self, obj):
        self.obj = obj

    # 状态读取后转成二进制,分别对应TriggerReady等状态
    def custom_binary(self, num, length=16, ByteSwap=0):
        # 将整数转换为二进制字符串
        binary_str = bin(num)[2:]
        # 计算需要补充的零的个数
        zeros_to_add = length - len(binary_str)
        # 构造符合规则的二进制字符串
        result_str = '0' * zeros_to_add + binary_str
        # 翻转二进制,如01转为10,方便后续取值
        result_str = result_str[::-1]
        if ByteSwap==0:
            return result_str
        elif ByteSwap==1:  # 需要字节交换时
            return result_str[8:] + result_str[:8]
        else:
            raise ValueError("ByteSwap 的值错误!")

    # 控制写之前先将TriggerEnable等二进制控制位转成数值
    def custom_num(self, binary, length=16, ByteSwap=0):
        assert len(binary) == length, "输入的二进制长度不正确!"
        binary = binary[::-1]  # 翻转二进制,如01转为10,方便后续取值
        if ByteSwap==0:
            return int(binary, 2)
        elif ByteSwap==1:  # 需要字节交换时
            return int(binary[8:] + binary[:8], 2)
        else:
            raise ValueError("ByteSwap 的值错误!")

    def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)
            self.obj.write_registers(address, value, slave=slave)
        elif addrtype=="Coils":
            ...
        else:
            raise ValueError("ctrl_addrtype的值错误!")

    def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)
            value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表
            print("status HoldingRegisters:", value_list)
            print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])
            return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]
        elif addrtype=="InputRegisters":
            ...
        elif addrtype=="DiscreteInputs":
            ...
        else:
            raise ValueError("status_addrtype的值错误!")

    def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):
        if addrtype=="HoldingRegisters":
            ...
        elif addrtype=="InputRegisters":
            ...
        else:
            raise ValueError("plc_out_addrtype的值错误!")

if __name__ == "__main__":
    # Modbus TCP服务器的IP地址和端口号
    server_ip = "192.168.1.196"
    port = 502
    station = 1

    # 创建Modbus TCP客户端
    MDclient = ModbusTcpClient(server_ip, port)
    if MDclient.connect():
        myclient = myModBusTCPclient(MDclient)
        myclient.ctrl(ByteSwap=1, binary="1100000000000000")
        time.sleep(1)
        myclient.status(ByteSwap=1)

1、程序结构

上述代码定义了一个名为 myModBusTCPclient 的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:

构造函数 __init__

接收一个参数 obj,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj 中。

custom_binary 方法:

将给定的整数 num 转换为指定长度 length 的二进制字符串。可选参数 ByteSwap 用于指定是否进行字节交换。如果 ByteSwap 为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。

custom_num 方法:

接收一个二进制字符串 binary,根据给定的长度 length 和是否进行字节交换 ByteSwap 将其转换为整数。返回转换得到的整数。

ctrl 方法:

根据给定的地址类型 addrtype(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap、二进制字符串 binary、Modbus 地址 address 和从站号 slave,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers 方法写入寄存器。

status 方法:

根据给定的地址类型 addrtype、是否进行字节交换 ByteSwap、寄存器地址 reg_addr、寄存器数量 reg_nb 和从站号 slave,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers 方法读取寄存器。

plc_out 方法:

根据给定的地址类型 addrtype 和是否进行字节交换 ByteSwap,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。

if __name__ == "__main__": 部分:

在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient。通过 myModBusTCPclient 类创建了一个自定义的客户端对象 myclient。调用了 ctrl 方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status 方法,从 Modbus 服务器读取数据。

2、不同地址空间的请求

在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。

为了满足业务,需要对原有的pymodbus进行封装改造。

为了满足大端序和小端序,需要加入字节交换的操作。

三、程序实现

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-08 11:20:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-08 11:20:04       101 阅读
  3. 在Django里面运行非项目文件

    2023-12-08 11:20:04       82 阅读
  4. Python语言-面向对象

    2023-12-08 11:20:04       91 阅读

热门阅读

  1. Angular 中如何返回上一页?

    2023-12-08 11:20:04       57 阅读
  2. 数组实现循环队列(增设队列大小size)

    2023-12-08 11:20:04       70 阅读
  3. el-table实现无限向下滚动懒加载数据

    2023-12-08 11:20:04       69 阅读
  4. Spark例子

    2023-12-08 11:20:04       51 阅读
  5. MySQL基础回顾02

    2023-12-08 11:20:04       63 阅读
  6. 【CSP】202303-2_垦田计划Python实现

    2023-12-08 11:20:04       60 阅读
  7. 微信小程序加载动态svg数据图片

    2023-12-08 11:20:04       53 阅读
  8. WT588F02B单片机语音芯片在磁疗仪中的应用介绍

    2023-12-08 11:20:04       70 阅读
  9. 算法----钥匙和房间

    2023-12-08 11:20:04       57 阅读