写点什么

Python 如何用 PyModbus 库进行 Modbus TCP 通信

  • 2024-08-15
    湖南
  • 本文字数:4197 字

    阅读完需:约 14 分钟

使用 python 解决工业通信问题是一个非常好的选择,python 具有丰富的生态,可以轻松解决工业通信的各种问题。


本篇主要介绍使用 pymodbus 库进行 modbus tcp 仿真,实现 pc 端读取 plc 或工业设备 modbus 变量。


安装 pymodbus:

pip install -U pymodbus
复制代码

创建 modbus tcp server

这里我们先创建一个虚拟的 modbus 设备,如果你手里有一个 plc 或者工业设备,可以直接跳过本节。

  • modbus_server.py

''' * @Author: liuzhao * @Last Modified time: 2022-10-05 09:56:13'''
from pymodbus.server.sync import (    StartTcpServer,)from pymodbus.datastore import (    ModbusSequentialDataBlock,    ModbusServerContext,    ModbusSlaveContext,)from pymodbus.version import version
datablock = ModbusSequentialDataBlock.create()context = ModbusSlaveContext(    di=datablock,    co=datablock,    hr=datablock,    ir=datablock,    )single = True
# Build data storagestore = ModbusServerContext(slaves=context, single=single)
if __name__ == '__main__':
    address = ("0.0.0.0", 503)    StartTcpServer(        context=store,  # Data storage        address=address,  # listen address        allow_reuse_address=True,  # allow the reuse of an address    )
复制代码

直接运行该脚本,就可以在本机的 503 端口创建一台 modbus 设备了,具体实现暂不深追,我们学习的重点是客户端对 modbus 变量的读写。

读写 modbus 变量

modbus 变量类型以及地址

coil 是线圈,Discrete input 是数字量输入,Input register 是模拟量输入,Holding register 是保持寄存器。一般地址范围是 0-65535

读取常规变量

读写线圈 | 读取输入变量 | 读写保持寄存器

from pymodbus.client.sync import ModbusTcpClientfrom pymodbus.bit_read_message import ReadCoilsResponsefrom pymodbus.register_read_message import ReadInputRegistersResponsefrom pymodbus.exceptions import ConnectionException      # 连接失败,用于异常处理
host = '127.0.0.1'port = 503client = ModbusTcpClient(host,port)
# 写入线圈client.write_coil(1, True)client.write_coil(2, False)client.write_coil(3, True)
# 读取线圈    注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个result:ReadCoilsResponse = client.read_coils(address=1,cout=8)     # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题print(result.isError())
# 不建议使用print(result.getBit(7))            # 这里的参数address不是plc里的地址,而是python列表的address,
print('read_coils ')
# 建议使用print(result.bits)        # 打印读取结果,一共8位# 读取其中的位print(                      result.bits[0],    result.bits[1],    result.bits[2]    )         # 相当于result.getBit(0)
# 读取数字输入result = client.read_discrete_inputs(address=10001,count=8)    # 从10001开始读,读取8位print(result.bits)
# 读取模拟输入寄存器input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8)# print(f'is_error:{input_register_result.isError()}')print('read_input_registers ')print(input_register_result.registers)  print(input_register_result.getRegister(0))  
# 读写保持寄存器client.write_register(address=40001,value=100)result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1)print('read_holding_registers ')print(result.registers)
# 关闭连接client.close()
复制代码

读取复杂变量

字符串、浮点数、负数等

这里需要注意 modbus 设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。


根据不同的设备,对照下表调整正确的组合方式。

# 复杂数据类型
from collections import OrderedDictimport logging
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.constants import Endianfrom pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
ORDER_DICT = {"<": "LITTLE", ">": "BIG"}
def run_binary_payload_client(host:str,port:int):
    for word_endian, byte_endian in (        (Endian.Big, Endian.Big),        (Endian.Big, Endian.Little),        (Endian.Little, Endian.Big),        (Endian.Little, Endian.Little),    ):        print("-" * 60)        print(f"Word Order: {ORDER_DICT[word_endian]}")        print(f"Byte Order: {ORDER_DICT[byte_endian]}")        print()
        builder = BinaryPayloadBuilder(            wordorder=word_endian,            byteorder=byte_endian,        )
        # 写入的变量        my_string = "abcd-efgh123345765432"        builder.add_string(my_string)        builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])        builder.add_8bit_int(-0x12)        builder.add_8bit_uint(0x12)        builder.add_16bit_int(-0x5678)        builder.add_16bit_uint(0x1234)        builder.add_32bit_int(-0x1234)        builder.add_32bit_uint(0x12345678)        builder.add_16bit_float(12.34)        builder.add_16bit_float(-12.34)        builder.add_32bit_float(22.34)        builder.add_32bit_float(-22.34)        builder.add_64bit_int(-0xDEADBEEF)        builder.add_64bit_uint(0x12345678DEADBEEF)        builder.add_64bit_uint(0x12345678DEADBEEF)        builder.add_64bit_float(123.45)        builder.add_64bit_float(-123.45)        registers = builder.to_registers()        print("Writing Registers:")        print(registers)        print("\n")        payload = builder.build()        address = 40001          # 从40001开始写入        # We can write registers        client.write_registers(address, registers, unit=1)    # 写入
        # 读取复杂变量        print("Reading Registers:")        address = 40001
        count = len(payload)        print(f"payload_len {count}")        result = client.read_holding_registers(address, count, slave=1)        print(result.registers)        print("\n")        decoder = BinaryPayloadDecoder.fromRegisters(            result.registers, byteorder=byte_endian, wordorder=word_endian        )        # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder        assert (            decoder._byteorder == builder._byteorder  # pylint: disable=protected-access        )  # nosec        assert (            decoder._wordorder == builder._wordorder  # pylint: disable=protected-access        )  # nosec
        decoded = OrderedDict(            [                ("string", decoder.decode_string(len(my_string))),                ("bits", decoder.decode_bits()),                ("8int", decoder.decode_8bit_int()),                ("8uint", decoder.decode_8bit_uint()),                ("16int", decoder.decode_16bit_int()),                ("16uint", decoder.decode_16bit_uint()),                ("32int", decoder.decode_32bit_int()),                ("32uint", decoder.decode_32bit_uint()),                ("16float", decoder.decode_16bit_float()),                ("16float2", decoder.decode_16bit_float()),                ("32float", decoder.decode_32bit_float()),                ("32float2", decoder.decode_32bit_float()),                ("64int", decoder.decode_64bit_int()),                ("64uint", decoder.decode_64bit_uint()),                ("ignore", decoder.skip_bytes(8)),                ("64float", decoder.decode_64bit_float()),                ("64float2", decoder.decode_64bit_float()),            ]        )        print("Decoded Data")        for name, value in iter(decoded.items()):            print(                "%s\t" % name,  # pylint: disable=consider-using-f-string                hex(value) if isinstance(value, int) else value,            )        print("\n")
    # 关闭连接    client.close()
if __name__ == "__main__":    run_binary_payload_client("127.0.0.1", 503)
复制代码


用户头像

欢迎关注,一起学习,一起交流,一起进步 2020-06-14 加入

公众号:做梦都在改BUG

评论

发布
暂无评论
Python如何用PyModbus库进行Modbus TCP通信_Python_我再BUG界嘎嘎乱杀_InfoQ写作社区