mirror of https://github.com/Nonannet/pyhoff.git
full refactored, minimal modbus tcp implementation integrated
This commit is contained in:
parent
759e1d168d
commit
e7c4ad6f7c
|
@ -0,0 +1,24 @@
|
||||||
|
[flake8]
|
||||||
|
# Specify the maximum allowed line length
|
||||||
|
max-line-length = 88
|
||||||
|
|
||||||
|
# Ignore specific rules
|
||||||
|
# For example, E501: Line too long, W503: Line break before binary operator
|
||||||
|
ignore = E501, W503, W504
|
||||||
|
|
||||||
|
# Exclude specific files or directories
|
||||||
|
exclude =
|
||||||
|
.git,
|
||||||
|
__pycache__,
|
||||||
|
build,
|
||||||
|
dist,
|
||||||
|
.conda
|
||||||
|
test.py
|
||||||
|
|
||||||
|
# Enable specific plugins or options
|
||||||
|
# Example: Enabling flake8-docstrings
|
||||||
|
select = C,E,F,W,D
|
||||||
|
|
||||||
|
# Specify custom error codes to ignore or enable
|
||||||
|
per-file-ignores =
|
||||||
|
tests/*: D
|
|
@ -82,31 +82,6 @@ target/
|
||||||
profile_default/
|
profile_default/
|
||||||
ipython_config.py
|
ipython_config.py
|
||||||
|
|
||||||
# pyenv
|
|
||||||
# For a library or package, you might want to ignore these files since the code is
|
|
||||||
# intended to run in multiple environments; otherwise, check them in:
|
|
||||||
# .python-version
|
|
||||||
|
|
||||||
# pipenv
|
|
||||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
|
||||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
|
||||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
|
||||||
# install all needed dependencies.
|
|
||||||
#Pipfile.lock
|
|
||||||
|
|
||||||
# poetry
|
|
||||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
|
||||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
|
||||||
# commonly ignored for libraries.
|
|
||||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
|
||||||
#poetry.lock
|
|
||||||
|
|
||||||
# pdm
|
|
||||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
|
||||||
#pdm.lock
|
|
||||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
|
||||||
# in version control.
|
|
||||||
# https://pdm.fming.dev/#use-with-ide
|
|
||||||
.pdm.toml
|
.pdm.toml
|
||||||
|
|
||||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||||
|
@ -152,9 +127,7 @@ dmypy.json
|
||||||
# Cython debug symbols
|
# Cython debug symbols
|
||||||
cython_debug/
|
cython_debug/
|
||||||
|
|
||||||
# PyCharm
|
pyModbusTCP_old/
|
||||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
test.py
|
||||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
test_*.ipynb
|
||||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
settings.json
|
||||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
|
||||||
#.idea/
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
cff-version: 1.2.0
|
||||||
|
title: pyhoff
|
||||||
|
abstract: The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP
|
||||||
|
authors:
|
||||||
|
- family-names: Kruse
|
||||||
|
given-names: Nicolas
|
||||||
|
orcid: "https://orcid.org/0000-0001-6758-2269"
|
||||||
|
version: 1.0.0
|
||||||
|
#date-released: "2025-04-01"
|
||||||
|
#identifiers:
|
||||||
|
# - description: This is the collection of archived snapshots of all versions of My Research Software
|
||||||
|
# type: doi
|
||||||
|
# value: "10.5281/"
|
||||||
|
license: MIT License
|
||||||
|
repository-code: "https://github.com/Nonannet/pyhoff"
|
2
LICENSE
2
LICENSE
|
@ -1,6 +1,6 @@
|
||||||
MIT License
|
MIT License
|
||||||
|
|
||||||
Copyright (c) 2023 Nonannet
|
Copyright (c) 2025 Nicolas Kruse
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
# pyhoff
|
# pyhoff
|
||||||
|
|
||||||
This python code allows to read and write the most common
|
The pyhoff package allows to read and write the most common
|
||||||
Beckhoff bus terminals ("Busklemmen") using the bus coupler
|
Beckhoff and WAGO bus terminals ("Busklemmen") using the bus
|
||||||
BK9050 ("Busskoppler BK9050") over Ethernet TCP/IP.
|
coupler ("Busskoppler") BK9000, BK9050, BK9100 or WAGO 750_352
|
||||||
|
over Ethernet TCP/IP based on ModBus TCP.
|
||||||
|
|
||||||
It depends on the package pyModbusTCP. This can be installed with:
|
It depends on the package pyModbusTCP. This can be installed with:
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
# WAGO IO-Error LED
|
||||||
|
|
||||||
|
Default credentials are: admin:wago
|
||||||
|
|
||||||
|
Generating a new ea-config.xml based on the currently attached terminals:
|
||||||
|
|
||||||
|
``` js
|
||||||
|
await fetch("http://192.168.178.172/DOCPLFUNC", {
|
||||||
|
"credentials": "include",
|
||||||
|
"headers": {
|
||||||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
||||||
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||||
|
"Accept-Language": "en-US,en;q=0.7,de;q=0.3",
|
||||||
|
"Content-Type": "application/x-www-form-urlencoded",
|
||||||
|
"Sec-GPC": "1",
|
||||||
|
"Authorization": "Basic YWRtaW46d2Fnbw==",
|
||||||
|
"Upgrade-Insecure-Requests": "1",
|
||||||
|
"Priority": "u=4",
|
||||||
|
"Pragma": "no-cache",
|
||||||
|
"Cache-Control": "no-cache"
|
||||||
|
},
|
||||||
|
"referrer": "http://192.168.178.172/webserv/cplcfg/ea-config.ssi",
|
||||||
|
"body": "DO_CREATE_EA_XML=create+ea-config.xml",
|
||||||
|
"method": "POST",
|
||||||
|
"mode": "cors"
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
Download ea-config.xml under http://192.168.178.172/etc/ea-config.xml (authorization required)
|
328
pyhoff.py
328
pyhoff.py
|
@ -1,328 +0,0 @@
|
||||||
from pyModbusTCP.client import ModbusClient
|
|
||||||
from typing import Type
|
|
||||||
|
|
||||||
class BusTerminal():
|
|
||||||
output_bit_width = 0
|
|
||||||
input_bit_width = 0
|
|
||||||
output_word_width = 0
|
|
||||||
input_word_width = 0
|
|
||||||
|
|
||||||
def __init__(self, bus_coupler: 'BusCoupler'):
|
|
||||||
self.output_bit_offset = 0
|
|
||||||
self.input_bit_offset = 0
|
|
||||||
self.output_word_offset = 0
|
|
||||||
self.output_word_offset = 0
|
|
||||||
self.input_word_offset = 0
|
|
||||||
self.bus_coupler = bus_coupler
|
|
||||||
|
|
||||||
class DigitalInputTerminal(BusTerminal):
|
|
||||||
def read_input(self, channel: int):
|
|
||||||
print(self.input_bit_width, channel)
|
|
||||||
if channel < 1 or channel > self.input_bit_width:
|
|
||||||
raise Exception("address out of range")
|
|
||||||
return self.bus_coupler.read_discrete_input(self.input_bit_offset + channel - 1)
|
|
||||||
|
|
||||||
class DigitalOutputTerminal(BusTerminal):
|
|
||||||
def write_coil(self, channel: int, value: int):
|
|
||||||
if channel < 1 or channel > self.output_bit_width:
|
|
||||||
raise Exception("address out of range")
|
|
||||||
return self.bus_coupler.write_single_coil(self.output_bit_offset + channel - 1, value)
|
|
||||||
|
|
||||||
def read_coil(self, channel: int):
|
|
||||||
if channel < 1 or channel > self.output_bit_width:
|
|
||||||
raise Exception("address out of range")
|
|
||||||
return self.bus_coupler.read_coil(self.output_bit_offset + channel - 1)
|
|
||||||
|
|
||||||
class AnalogInputTerminal(BusTerminal):
|
|
||||||
def read_words(self, word_offset: int, word_count: int):
|
|
||||||
if word_offset < 0 or word_offset + word_count > self.input_word_width:
|
|
||||||
raise Exception("address out of range")
|
|
||||||
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
|
|
||||||
|
|
||||||
def read_words_nocheck(self, word_offset: int, word_count: int):
|
|
||||||
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
|
|
||||||
|
|
||||||
def read_word(self, word_offset: int):
|
|
||||||
val = self.read_words(word_offset, 1)
|
|
||||||
if val is None:
|
|
||||||
return -999
|
|
||||||
else:
|
|
||||||
return val[0]
|
|
||||||
|
|
||||||
|
|
||||||
class AnalogOutputTerminal(BusTerminal):
|
|
||||||
def read_words(self, word_offset: int, word_count: int):
|
|
||||||
if word_offset < 0 or word_offset + word_count > self.output_word_width:
|
|
||||||
raise Exception("address out of range")
|
|
||||||
return self.bus_coupler.read_holding_registers(self.output_word_offset + word_offset, word_count)
|
|
||||||
|
|
||||||
def read_words_nocheck(self, word_offset: int, word_count: int):
|
|
||||||
return self.bus_coupler.read_holding_registers(self.output_word_offset*0 + word_offset, word_count)
|
|
||||||
|
|
||||||
def read_word(self, word_offset: int):
|
|
||||||
val = self.read_words(word_offset, 1)
|
|
||||||
if val is None:
|
|
||||||
return -999
|
|
||||||
else:
|
|
||||||
return val[0]
|
|
||||||
|
|
||||||
def write_word(self, word_offset: int, data: int):
|
|
||||||
if word_offset < 0 or word_offset + 1 > self.output_word_width:
|
|
||||||
raise Exception("address out of range")
|
|
||||||
return self.bus_coupler.write_single_register(self.output_word_offset + word_offset, data)
|
|
||||||
|
|
||||||
def write_word_nocheck(self, word_offset: int, data: int):
|
|
||||||
return self.bus_coupler.write_single_register(self.output_word_offset*0 + word_offset, data)
|
|
||||||
|
|
||||||
class KL1104(DigitalInputTerminal):
|
|
||||||
"""KL1104: 4x digital input 24 V"""
|
|
||||||
input_bit_width = 4
|
|
||||||
|
|
||||||
class KL1408(DigitalInputTerminal):
|
|
||||||
"""KL1104: 8x digital input 24 V galvanic isolated"""
|
|
||||||
input_bit_width = 8
|
|
||||||
|
|
||||||
class WAGO_750_1405(DigitalInputTerminal):
|
|
||||||
"""750-1405: 16x digital input 24 V"""
|
|
||||||
input_bit_width = 16
|
|
||||||
|
|
||||||
class KL2404(DigitalOutputTerminal):
|
|
||||||
"""KL2404: 4x digital output with 500 mA"""
|
|
||||||
output_bit_width = 4
|
|
||||||
|
|
||||||
class KL2424(DigitalOutputTerminal):
|
|
||||||
"""KL2424: 4x digital output with 2000 mA"""
|
|
||||||
output_bit_width = 4
|
|
||||||
|
|
||||||
class KL2634(DigitalOutputTerminal):
|
|
||||||
"""KL2634: 4x digital output 250 V AC, 30 V DC, 4 A"""
|
|
||||||
output_bit_width = 4
|
|
||||||
|
|
||||||
class KL2408(DigitalOutputTerminal):
|
|
||||||
"""750-530: 8x digital output with 24 V / 500 mA"""
|
|
||||||
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
|
|
||||||
output_bit_width = 8
|
|
||||||
|
|
||||||
class WAGO_750_530(DigitalOutputTerminal):
|
|
||||||
"""750-530: 8x digital output with 24 V / 500 mA"""
|
|
||||||
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
|
|
||||||
output_bit_width = 8
|
|
||||||
|
|
||||||
class KL3054(AnalogInputTerminal):
|
|
||||||
"""KL3054: 4x analog input 4...20 mA 12 Bit single-ended"""
|
|
||||||
#Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
|
||||||
input_word_width = 8
|
|
||||||
output_word_width = 8
|
|
||||||
|
|
||||||
def read_normalized(self, channel: int):
|
|
||||||
return self.read_word(channel * 2 - 1) / 0x7FFF
|
|
||||||
|
|
||||||
def read_current(self, channel: int):
|
|
||||||
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
|
|
||||||
|
|
||||||
class KL3042(AnalogInputTerminal):
|
|
||||||
"""KL3042: 2x analog input 0...20 mA 12 Bit single-ended"""
|
|
||||||
#Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
|
|
||||||
input_word_width = 4
|
|
||||||
output_word_width = 4
|
|
||||||
|
|
||||||
def read_normalized(self, channel: int):
|
|
||||||
return self.read_word(channel*2-1) / 0x7FFF
|
|
||||||
|
|
||||||
def read_current(self, channel: int):
|
|
||||||
return self.read_normalized(channel*2-1) * 20.0
|
|
||||||
|
|
||||||
class KL3202(AnalogInputTerminal):
|
|
||||||
"""KL3202: 2x analog input PT100 16 Bit 3-wire"""
|
|
||||||
#Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
|
|
||||||
input_word_width = 4
|
|
||||||
output_word_width = 4
|
|
||||||
|
|
||||||
def read_temperature(self, channel: int):
|
|
||||||
val = self.read_word(channel*2-1)
|
|
||||||
if val > 0x7FFF:
|
|
||||||
return (val - 0x10000) / 10.0
|
|
||||||
else:
|
|
||||||
return val / 10.0
|
|
||||||
|
|
||||||
class KL3214(AnalogInputTerminal):
|
|
||||||
"""KL3214: 4x analog input PT100 16 Bit 3-wire"""
|
|
||||||
#inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
|
|
||||||
#out: 4 x 8 Bit Control (optional)
|
|
||||||
input_word_width = 8
|
|
||||||
output_word_width = 8
|
|
||||||
|
|
||||||
def read_temperature(self, channel: int):
|
|
||||||
val = self.read_word(channel*2-1)
|
|
||||||
if val > 0x7FFF:
|
|
||||||
return (val - 0x10000) / 10.0
|
|
||||||
else:
|
|
||||||
return val / 10.0
|
|
||||||
|
|
||||||
class KL4002(AnalogOutputTerminal):
|
|
||||||
"""KL4002: 2x analog output 0...10 V 12 Bit differentiell"""
|
|
||||||
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
|
||||||
input_word_width = 4
|
|
||||||
output_word_width = 4
|
|
||||||
|
|
||||||
def set_normalized(self, channel: int, value: float):
|
|
||||||
self.write_word(channel*2-1, int(value * 0x7FFF))
|
|
||||||
|
|
||||||
def set_voltage(self, channel: int, value: float):
|
|
||||||
self.set_normalized(channel, value / 10.0)
|
|
||||||
|
|
||||||
class KL4132(AnalogOutputTerminal):
|
|
||||||
"""KL4002: 2x analog output ±10 V 16 bit differential"""
|
|
||||||
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
|
||||||
input_word_width = 4
|
|
||||||
output_word_width = 4
|
|
||||||
|
|
||||||
def set_normalized(self, channel: int, value: float):
|
|
||||||
if value >= 0:
|
|
||||||
self.write_word(channel-1, int(value * 0x7FFF))
|
|
||||||
else:
|
|
||||||
self.write_word(channel-1, int(0x10000 + value * 0x7FFF ))
|
|
||||||
|
|
||||||
def set_voltage(self, channel: int, value: float):
|
|
||||||
self.set_normalized(channel, value / 10.0)
|
|
||||||
|
|
||||||
class KL4004(AnalogOutputTerminal):
|
|
||||||
"""KL4004: 4x analog output 0...10 V 12 Bit differentiell"""
|
|
||||||
#Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
|
|
||||||
input_word_width = 8
|
|
||||||
output_word_width = 8
|
|
||||||
|
|
||||||
def set_normalized(self, channel: int, value: float):
|
|
||||||
self.write_word(channel*2-1, int(value * 0x7FFF))
|
|
||||||
|
|
||||||
def set_voltage(self, channel: int, value: float):
|
|
||||||
self.set_normalized(channel, value / 10.0)
|
|
||||||
|
|
||||||
class KL9000(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class KL9010(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class KL9100(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class KL9183(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class KL9188(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class WAGO_750_600(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class WAGO_750_602(BusTerminal):
|
|
||||||
"""Dummy, no I/O function"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class BusCoupler(ModbusClient):
|
|
||||||
"""BusCoupler: Busskoppler ModBus TCP"""
|
|
||||||
|
|
||||||
def __init__(self, host: str, port: int = 502, timeout: int = 5, watchdog: float = 0):
|
|
||||||
"""
|
|
||||||
Constructs all the necessary attributes for the person object.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
host : str
|
|
||||||
ip or hostname of the BK9050
|
|
||||||
port : int
|
|
||||||
port of the modbus host
|
|
||||||
debug : bool
|
|
||||||
outputs modbus debug information
|
|
||||||
timeout : float
|
|
||||||
timeout for waiting for the device response
|
|
||||||
watchdog : float
|
|
||||||
time in secounds after the device sets all outputs to
|
|
||||||
default state. A value of 0 deactivates the watchdog.
|
|
||||||
"""
|
|
||||||
|
|
||||||
ModbusClient.__init__(self, host, port, auto_open=True, timeout=timeout)
|
|
||||||
self.bus_terminals: list[BusTerminal] = list()
|
|
||||||
self.next_output_bit_offset = 0
|
|
||||||
self.next_input_bit_offset = 0
|
|
||||||
self.next_output_word_offset = 0
|
|
||||||
self.next_input_word_offset = 0
|
|
||||||
self.init_hardware(watchdog)
|
|
||||||
|
|
||||||
def init_hardware(self, watchdog: float):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def read_discrete_input(self, address: int):
|
|
||||||
value = self.read_discrete_inputs(address)
|
|
||||||
if value is None:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return value[0]
|
|
||||||
|
|
||||||
def read_coil(self, address):
|
|
||||||
value = self.read_coils(address)
|
|
||||||
if value is None:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return value[0]
|
|
||||||
|
|
||||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
|
|
||||||
for terminal_class in bus_terminals:
|
|
||||||
nterm = terminal_class(self)
|
|
||||||
|
|
||||||
nterm.output_bit_offset = self.next_output_bit_offset
|
|
||||||
nterm.input_bit_offset = self.next_input_bit_offset
|
|
||||||
nterm.output_word_offset = self.next_output_word_offset
|
|
||||||
nterm.input_word_offset = self.next_input_word_offset
|
|
||||||
|
|
||||||
self.next_output_bit_offset += terminal_class.output_bit_width
|
|
||||||
self.next_input_bit_offset += terminal_class.input_bit_width
|
|
||||||
self.next_output_word_offset += terminal_class.output_word_width
|
|
||||||
self.next_input_word_offset += terminal_class.input_word_width
|
|
||||||
|
|
||||||
self.bus_terminals.append(nterm)
|
|
||||||
|
|
||||||
return self.bus_terminals
|
|
||||||
|
|
||||||
class ModBusBK9050(BusCoupler):
|
|
||||||
"""BK9050: Busskoppler ModBus TCP"""
|
|
||||||
|
|
||||||
def init_hardware(self, watchdog: float):
|
|
||||||
#https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
|
|
||||||
#config watchdog on page 58
|
|
||||||
|
|
||||||
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
|
||||||
self.write_single_register(0x1120, watchdog * 1000) #ms
|
|
||||||
|
|
||||||
#reset watchdog timer:
|
|
||||||
self.write_single_register(0x1121, 0xBECF)
|
|
||||||
self.write_single_register(0x1121, 0xAFFE)
|
|
||||||
|
|
||||||
#set process image offset
|
|
||||||
self.next_output_word_offset = 0x0800
|
|
||||||
|
|
||||||
class WAGO750_352(BusCoupler):
|
|
||||||
"""Wago 750-352: Busskoppler ModBus TCP"""
|
|
||||||
|
|
||||||
def init_hardware(self, watchdog: float):
|
|
||||||
#deactivate/reset watchdog timer:
|
|
||||||
self.write_single_register(0x1005, 0xAAAA)
|
|
||||||
self.write_single_register(0x1005, 0x5555)
|
|
||||||
|
|
||||||
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
|
||||||
self.write_single_register(0x1000, int(watchdog * 10))
|
|
||||||
|
|
||||||
if watchdog:
|
|
||||||
#configure watchdog to reset on all functions codes
|
|
||||||
self.write_single_register(0x1001, 0xFFFF)
|
|
||||||
|
|
||||||
#set process image offset
|
|
||||||
self.next_output_word_offset = 0x0000
|
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
[project]
|
||||||
|
name = "pyhoff"
|
||||||
|
version = "1.0.0"
|
||||||
|
authors = [
|
||||||
|
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
|
||||||
|
]
|
||||||
|
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.8"
|
||||||
|
classifiers = [
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"License :: OSI Approved :: MIT License",
|
||||||
|
"Operating System :: OS Independent",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.urls]
|
||||||
|
Homepage = "https://github.com/Nonannet/pyhoff"
|
||||||
|
Issues = "https://github.com/Nonannet/pyhoff/issues"
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=61.0"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["src"]
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
minversion = "6.0"
|
||||||
|
addopts = "-ra -q"
|
||||||
|
testpaths = ["tests"]
|
||||||
|
pythonpath = ["src"]
|
|
@ -0,0 +1,306 @@
|
||||||
|
from .modbus import SimpleModbusClient
|
||||||
|
from typing import Type, Any
|
||||||
|
|
||||||
|
|
||||||
|
class BusTerminal():
|
||||||
|
"""
|
||||||
|
Base class for all bus terminals.
|
||||||
|
"""
|
||||||
|
parameters: dict[str, int] = {}
|
||||||
|
|
||||||
|
def __init__(self, bus_coupler: 'BusCoupler',
|
||||||
|
output_bit_offset: int,
|
||||||
|
input_bit_offset: int,
|
||||||
|
output_word_offset: int,
|
||||||
|
input_word_offset: int):
|
||||||
|
"""
|
||||||
|
Initialize a BusTerminal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bus_coupler: The bus coupler to which this terminal is connected.
|
||||||
|
"""
|
||||||
|
self._output_bit_offset = output_bit_offset
|
||||||
|
self._input_bit_offset = input_bit_offset
|
||||||
|
self._output_word_offset = output_word_offset
|
||||||
|
self._input_word_offset = input_word_offset
|
||||||
|
self.bus_coupler = bus_coupler
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalInputTerminal(BusTerminal):
|
||||||
|
"""
|
||||||
|
Represents a digital input terminal.
|
||||||
|
"""
|
||||||
|
def read_input(self, channel: int) -> bool | None:
|
||||||
|
"""
|
||||||
|
Read the input from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number (start counting from 1) to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The input value of the specified channel.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If the channel number is out of range.
|
||||||
|
"""
|
||||||
|
if channel < 1 or channel > self.parameters['input_bit_width']:
|
||||||
|
raise Exception("address out of range")
|
||||||
|
return self.bus_coupler.read_discrete_input(self._input_bit_offset + channel - 1)
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalOutputTerminal(BusTerminal):
|
||||||
|
"""
|
||||||
|
Represents a digital output terminal.
|
||||||
|
"""
|
||||||
|
def write_coil(self, channel: int, value: bool) -> bool:
|
||||||
|
"""
|
||||||
|
Write a value to a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number (start counting from 1) to write to.
|
||||||
|
value: The value to write.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the write operation succeeded, otherwise False.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If the channel number is out of range.
|
||||||
|
"""
|
||||||
|
if channel < 1 or channel > self.parameters['output_bit_width']:
|
||||||
|
raise Exception("address out of range")
|
||||||
|
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value)
|
||||||
|
|
||||||
|
def read_coil(self, channel: int) -> bool | None:
|
||||||
|
"""
|
||||||
|
Read the coil value back from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number (start counting from 1) to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The coil value of the specified channel.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If the channel number is out of range.
|
||||||
|
"""
|
||||||
|
if channel < 1 or channel > self.parameters['output_bit_width']:
|
||||||
|
raise Exception("address out of range")
|
||||||
|
return self.bus_coupler.read_coil(self._output_bit_offset + channel - 1)
|
||||||
|
|
||||||
|
|
||||||
|
class AnalogInputTerminal(BusTerminal):
|
||||||
|
"""
|
||||||
|
Represents an analog input terminal.
|
||||||
|
"""
|
||||||
|
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
|
||||||
|
"""
|
||||||
|
Read a list of words from the terminal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
word_offset: The starting word offset (0 based index).
|
||||||
|
word_count: The number of words to read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The read words.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If the word offset or count is out of range.
|
||||||
|
"""
|
||||||
|
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
|
||||||
|
raise Exception("address out of range")
|
||||||
|
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
|
||||||
|
|
||||||
|
def read_word(self, word_offset: int) -> int:
|
||||||
|
"""
|
||||||
|
Read a single word from the terminal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
word_offset: The word offset (0 based index) to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The read word value.
|
||||||
|
"""
|
||||||
|
val = self.read_words(word_offset, 1)
|
||||||
|
if val:
|
||||||
|
return val[0]
|
||||||
|
else:
|
||||||
|
return -999
|
||||||
|
|
||||||
|
def read_normalized(self, channel: int) -> float:
|
||||||
|
"""
|
||||||
|
Read a normalized value (0...1) from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The normalized value.
|
||||||
|
"""
|
||||||
|
return self.read_word(channel * 2 - 1) / 0x7FFF
|
||||||
|
|
||||||
|
|
||||||
|
class AnalogOutputTerminal(BusTerminal):
|
||||||
|
"""
|
||||||
|
Represents an analog output terminal.
|
||||||
|
"""
|
||||||
|
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
|
||||||
|
"""
|
||||||
|
Read a list of words from the terminal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
word_offset: The starting word offset (0 based index).
|
||||||
|
word_count: The number of words to read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The read words.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If the word offset or count is out of range.
|
||||||
|
"""
|
||||||
|
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
|
||||||
|
raise Exception("address out of range")
|
||||||
|
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
|
||||||
|
|
||||||
|
def read_word(self, word_offset: int) -> int:
|
||||||
|
"""
|
||||||
|
Read a single word from the terminal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
word_offset: The word offset (0 based index) to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The read word value.
|
||||||
|
"""
|
||||||
|
val = self.read_words(word_offset, 1)
|
||||||
|
if val:
|
||||||
|
return val[0]
|
||||||
|
else:
|
||||||
|
return -999
|
||||||
|
|
||||||
|
def write_word(self, word_offset: int, data: int) -> bool:
|
||||||
|
"""
|
||||||
|
Write a word to the terminal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
word_offset: The word offset to write to.
|
||||||
|
data: The data to write.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The result of the write operation.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If the word offset is out of range.
|
||||||
|
"""
|
||||||
|
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']:
|
||||||
|
raise Exception("address out of range")
|
||||||
|
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
|
||||||
|
|
||||||
|
def set_normalized(self, channel: int, value: float):
|
||||||
|
"""
|
||||||
|
Set a normalized value between 0 and 1 to a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to set.
|
||||||
|
value: The normalized value to set.
|
||||||
|
"""
|
||||||
|
self.write_word(channel * 2 - 1, int(value * 0x7FFF))
|
||||||
|
|
||||||
|
|
||||||
|
class BusCoupler():
|
||||||
|
"""BusCoupler: Busskoppler ModBus TCP"""
|
||||||
|
|
||||||
|
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
|
||||||
|
timeout: int = 5, watchdog: float = 0, debug: bool = False):
|
||||||
|
"""
|
||||||
|
Constructs a Bus Coupler connected over ModBus TCP.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: ip or hostname of the BK9050
|
||||||
|
port: port of the modbus host
|
||||||
|
debug: outputs modbus debug information
|
||||||
|
timeout: timeout for waiting for the device response
|
||||||
|
watchdog: time in seconds after the device sets all outputs to
|
||||||
|
default state. A value of 0 deactivates the watchdog.
|
||||||
|
debug: If True, debug information is printed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.bus_terminals: list[Any] = list()
|
||||||
|
self.next_output_bit_offset = 0
|
||||||
|
self.next_input_bit_offset = 0
|
||||||
|
self.next_output_word_offset = 0
|
||||||
|
self.next_input_word_offset = 0
|
||||||
|
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
|
||||||
|
|
||||||
|
self.add_bus_terminals(bus_terminals)
|
||||||
|
self._init_hardware(watchdog)
|
||||||
|
|
||||||
|
def _init_hardware(self, watchdog: float):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def read_discrete_input(self, address: int) -> bool | None:
|
||||||
|
"""
|
||||||
|
Read a discrete input from the given register address.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
address: The register address to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The value of the discrete input.
|
||||||
|
"""
|
||||||
|
value = self.modbus.read_discrete_inputs(address)
|
||||||
|
if value:
|
||||||
|
return value[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def read_coil(self, address: int) -> bool | None:
|
||||||
|
"""
|
||||||
|
Read a coil from the given register address.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
address: The register address to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The value of the coil.
|
||||||
|
"""
|
||||||
|
value = self.modbus.read_coils(address)
|
||||||
|
if value:
|
||||||
|
return value[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]:
|
||||||
|
"""
|
||||||
|
Add bus terminals to the bus coupler.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bus_terminals: A list of bus terminal classes to add.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The corresponding list of bus terminal objects.
|
||||||
|
"""
|
||||||
|
for terminal_class in bus_terminals:
|
||||||
|
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
|
||||||
|
new_terminal = terminal_class(self,
|
||||||
|
self.next_output_bit_offset,
|
||||||
|
self.next_input_bit_offset,
|
||||||
|
self.next_output_word_offset,
|
||||||
|
self.next_input_word_offset)
|
||||||
|
|
||||||
|
self.next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
|
||||||
|
self.next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
|
||||||
|
self.next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
|
||||||
|
self.next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
|
||||||
|
|
||||||
|
self.bus_terminals.append(new_terminal)
|
||||||
|
|
||||||
|
return self.bus_terminals
|
||||||
|
|
||||||
|
def get_error(self) -> str:
|
||||||
|
"""
|
||||||
|
Get the last error message.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The last error message.
|
||||||
|
"""
|
||||||
|
return self.modbus.last_error
|
|
@ -0,0 +1,392 @@
|
||||||
|
from . import DigitalInputTerminal, DigitalOutputTerminal
|
||||||
|
from . import AnalogInputTerminal, AnalogOutputTerminal
|
||||||
|
from . import BusTerminal, BusCoupler
|
||||||
|
|
||||||
|
|
||||||
|
class BK9000(BusCoupler):
|
||||||
|
"""
|
||||||
|
BK9000: Busskoppler ModBus TCP
|
||||||
|
"""
|
||||||
|
def _init_hardware(self, watchdog: float):
|
||||||
|
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
|
||||||
|
# config watchdog on page 58
|
||||||
|
|
||||||
|
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
||||||
|
self.modbus.write_single_register(0x1120, int(watchdog * 1000)) # ms
|
||||||
|
|
||||||
|
# reset watchdog timer:
|
||||||
|
self.modbus.write_single_register(0x1121, 0xBECF)
|
||||||
|
self.modbus.write_single_register(0x1121, 0xAFFE)
|
||||||
|
|
||||||
|
# set process image offset
|
||||||
|
self.next_output_word_offset = 0x0800
|
||||||
|
|
||||||
|
|
||||||
|
class BK9050(BK9000):
|
||||||
|
"""
|
||||||
|
BK9050: Busskoppler ModBus TCP
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BK9100(BK9000):
|
||||||
|
"""
|
||||||
|
BK9100: Busskoppler ModBus TCP
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WAGO750_352(BusCoupler):
|
||||||
|
"""
|
||||||
|
Wago 750-352: Busskoppler ModBus TCP
|
||||||
|
"""
|
||||||
|
def _init_hardware(self, watchdog: float):
|
||||||
|
# deactivate/reset watchdog timer:
|
||||||
|
self.modbus.write_single_register(0x1005, 0xAAAA)
|
||||||
|
self.modbus.write_single_register(0x1005, 0x5555)
|
||||||
|
|
||||||
|
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
||||||
|
self.modbus.write_single_register(0x1000, int(watchdog * 10))
|
||||||
|
|
||||||
|
if watchdog:
|
||||||
|
# configure watchdog to reset on all functions codes
|
||||||
|
self.modbus.write_single_register(0x1001, 0xFFFF)
|
||||||
|
|
||||||
|
# set process image offset
|
||||||
|
self.next_output_word_offset = 0x0000
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalInputTerminal4Bit(DigitalInputTerminal):
|
||||||
|
"""
|
||||||
|
Generic 4 bit input terminal
|
||||||
|
"""
|
||||||
|
parameters = {'input_bit_width': 4}
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalInputTerminal8Bit(DigitalInputTerminal):
|
||||||
|
"""
|
||||||
|
Generic 8 bit input terminal
|
||||||
|
"""
|
||||||
|
parameters = {'input_bit_width': 8}
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalInputTerminal16Bit(DigitalInputTerminal):
|
||||||
|
"""
|
||||||
|
Generic 16 bit input terminal
|
||||||
|
"""
|
||||||
|
parameters = {'input_bit_width': 16}
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalOutputTerminal4Bit(DigitalOutputTerminal):
|
||||||
|
"""
|
||||||
|
Generic 4 bit output terminal
|
||||||
|
"""
|
||||||
|
parameters = {'output_bit_width': 4}
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalOutputTerminal8Bit(DigitalOutputTerminal):
|
||||||
|
"""
|
||||||
|
Generic 8 bit output terminal
|
||||||
|
"""
|
||||||
|
parameters = {'output_bit_width': 8}
|
||||||
|
|
||||||
|
|
||||||
|
class DigitalOutputTerminal16Bit(DigitalOutputTerminal):
|
||||||
|
"""
|
||||||
|
Generic 16 bit output terminal
|
||||||
|
"""
|
||||||
|
parameters = {'output_bit_width': 16}
|
||||||
|
|
||||||
|
|
||||||
|
class KL1104(DigitalInputTerminal4Bit):
|
||||||
|
"""
|
||||||
|
KL1104: 4x digital input 24 V
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL1408(DigitalInputTerminal8Bit):
|
||||||
|
"""
|
||||||
|
KL1104: 8x digital input 24 V galvanic isolated
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WAGO_750_1405(DigitalInputTerminal16Bit):
|
||||||
|
"""
|
||||||
|
750-1405: 16x digital input 24 V
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL2404(DigitalOutputTerminal4Bit):
|
||||||
|
"""
|
||||||
|
KL2404: 4x digital output with 500 mA
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL2424(DigitalOutputTerminal4Bit):
|
||||||
|
"""
|
||||||
|
KL2424: 4x digital output with 2000 mA
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL2634(DigitalOutputTerminal4Bit):
|
||||||
|
"""
|
||||||
|
KL2634: 4x digital output 250 V AC, 30 V DC, 4 A
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL2408(DigitalOutputTerminal8Bit):
|
||||||
|
"""
|
||||||
|
750-530: 8x digital output with 24 V / 500 mA
|
||||||
|
|
||||||
|
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WAGO_750_530(DigitalOutputTerminal8Bit):
|
||||||
|
"""
|
||||||
|
750-530: 8x digital output with 24 V / 500 mA
|
||||||
|
|
||||||
|
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL1512(AnalogInputTerminal):
|
||||||
|
"""
|
||||||
|
KL1512: 2x digital input, counter, 24 V DC, 1 kHz
|
||||||
|
"""
|
||||||
|
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||||
|
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||||
|
|
||||||
|
_last_counter_values = [0, 0]
|
||||||
|
|
||||||
|
def read_counter(self, channel: int) -> int:
|
||||||
|
"""
|
||||||
|
Read the absolut counter value of a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The counter value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.read_word(channel * 2 - 1)
|
||||||
|
|
||||||
|
def read_delta(self, channel: int) -> int:
|
||||||
|
"""
|
||||||
|
Read the counter change since last read of a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The counter value.
|
||||||
|
"""
|
||||||
|
new_count = self.read_word(channel * 2 - 1)
|
||||||
|
return new_count - self._last_counter_values[channel - 1]
|
||||||
|
|
||||||
|
|
||||||
|
class KL3054(AnalogInputTerminal):
|
||||||
|
"""
|
||||||
|
KL3054: 4x analog input 4...20 mA 12 Bit single-ended
|
||||||
|
"""
|
||||||
|
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||||
|
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||||
|
|
||||||
|
def read_current(self, channel: int) -> float:
|
||||||
|
"""
|
||||||
|
Read the current value from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The current value.
|
||||||
|
"""
|
||||||
|
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
|
||||||
|
|
||||||
|
|
||||||
|
class KL3042(AnalogInputTerminal):
|
||||||
|
"""
|
||||||
|
KL3042: 2x analog input 0...20 mA 12 Bit single-ended
|
||||||
|
"""
|
||||||
|
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
|
||||||
|
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||||
|
|
||||||
|
def read_current(self, channel: int) -> float:
|
||||||
|
"""
|
||||||
|
Read the current value from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The current value.
|
||||||
|
"""
|
||||||
|
return self.read_normalized(channel * 2 - 1) * 20.0
|
||||||
|
|
||||||
|
|
||||||
|
class KL3202(AnalogInputTerminal):
|
||||||
|
"""
|
||||||
|
KL3202: 2x analog input PT100 16 Bit 3-wire
|
||||||
|
"""
|
||||||
|
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
|
||||||
|
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||||
|
|
||||||
|
def read_temperature(self, channel: int) -> float:
|
||||||
|
"""
|
||||||
|
Read the temperature value from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The temperature value in °C.
|
||||||
|
"""
|
||||||
|
val = self.read_word(channel * 2 - 1)
|
||||||
|
if val > 0x7FFF:
|
||||||
|
return (val - 0x10000) / 10.0
|
||||||
|
else:
|
||||||
|
return val / 10.0
|
||||||
|
|
||||||
|
|
||||||
|
class KL3214(AnalogInputTerminal):
|
||||||
|
"""
|
||||||
|
KL3214: 4x analog input PT100 16 Bit 3-wire
|
||||||
|
"""
|
||||||
|
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
|
||||||
|
# out: 4 x 8 Bit Control (optional)
|
||||||
|
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||||
|
|
||||||
|
def read_temperature(self, channel: int) -> float:
|
||||||
|
"""
|
||||||
|
Read the temperature value from a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to read from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The temperature value.
|
||||||
|
"""
|
||||||
|
val = self.read_word(channel * 2 - 1)
|
||||||
|
if val > 0x7FFF:
|
||||||
|
return (val - 0x10000) / 10.0
|
||||||
|
else:
|
||||||
|
return val / 10.0
|
||||||
|
|
||||||
|
|
||||||
|
class KL4002(AnalogOutputTerminal):
|
||||||
|
"""
|
||||||
|
KL4002: 2x analog output 0...10 V 12 Bit differentiell
|
||||||
|
"""
|
||||||
|
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||||
|
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||||
|
|
||||||
|
def set_voltage(self, channel: int, value: float):
|
||||||
|
"""
|
||||||
|
Set a voltage value to a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to set.
|
||||||
|
value: The voltage value to set.
|
||||||
|
"""
|
||||||
|
self.set_normalized(channel, value / 10.0)
|
||||||
|
|
||||||
|
|
||||||
|
class KL4132(AnalogOutputTerminal):
|
||||||
|
"""
|
||||||
|
KL4002: 2x analog output ±10 V 16 bit differential
|
||||||
|
"""
|
||||||
|
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||||
|
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||||
|
|
||||||
|
def set_normalized(self, channel: int, value: float):
|
||||||
|
"""
|
||||||
|
Set a normalized value between -1 and +1 to a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to set.
|
||||||
|
value: The normalized value to set.
|
||||||
|
"""
|
||||||
|
if value >= 0:
|
||||||
|
self.write_word(channel - 1, int(value * 0x7FFF))
|
||||||
|
else:
|
||||||
|
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF))
|
||||||
|
|
||||||
|
def set_voltage(self, channel: int, value: float):
|
||||||
|
"""
|
||||||
|
Set a voltage value between -10 and +10 V to a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to set.
|
||||||
|
value: The voltage value to set.
|
||||||
|
"""
|
||||||
|
self.set_normalized(channel, value / 10.0)
|
||||||
|
|
||||||
|
|
||||||
|
class KL4004(AnalogOutputTerminal):
|
||||||
|
"""
|
||||||
|
KL4004: 4x analog output 0...10 V 12 Bit differentiell
|
||||||
|
"""
|
||||||
|
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
|
||||||
|
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||||
|
|
||||||
|
def set_voltage(self, channel: int, value: float):
|
||||||
|
"""
|
||||||
|
Set a voltage value to a specific channel.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel: The channel number to set.
|
||||||
|
value: The voltage value to set.
|
||||||
|
"""
|
||||||
|
self.set_normalized(channel, value / 10.0)
|
||||||
|
|
||||||
|
|
||||||
|
class KL9010(BusTerminal):
|
||||||
|
"""
|
||||||
|
End terminal, no I/O function
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL9100(BusTerminal):
|
||||||
|
"""
|
||||||
|
Potential supply terminal, no I/O function
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL9183(BusTerminal):
|
||||||
|
"""
|
||||||
|
Potential distribution terminal, no I/O function
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KL9188(BusTerminal):
|
||||||
|
"""
|
||||||
|
Potential distribution terminal, no I/O function
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WAGO_750_600(BusTerminal):
|
||||||
|
"""
|
||||||
|
End nodule, no I/O function
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WAGO_750_602(BusTerminal):
|
||||||
|
"""
|
||||||
|
Potential supply terminal, no I/O function
|
||||||
|
"""
|
||||||
|
pass
|
|
@ -0,0 +1,467 @@
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
import random
|
||||||
|
|
||||||
|
_READ_COILS = 0x01
|
||||||
|
_READ_DISCRETE_INPUTS = 0x02
|
||||||
|
_READ_HOLDING_REGISTERS = 0x03
|
||||||
|
_READ_INPUT_REGISTERS = 0x04
|
||||||
|
_WRITE_SINGLE_COIL = 0x05
|
||||||
|
_WRITE_SINGLE_REGISTER = 0x06
|
||||||
|
_WRITE_MULTIPLE_COILS = 0x0F
|
||||||
|
_WRITE_MULTIPLE_REGISTERS = 0x10
|
||||||
|
|
||||||
|
_modbus_exceptions = {
|
||||||
|
0x01: 'illegal function',
|
||||||
|
0x02: 'illegal data address',
|
||||||
|
0x03: 'illegal data value',
|
||||||
|
0x04: 'slave device failure',
|
||||||
|
0x05: 'acknowledge',
|
||||||
|
0x06: 'slave device busy',
|
||||||
|
0x07: 'negative acknowledge',
|
||||||
|
0x08: 'memory parity error',
|
||||||
|
0x0A: 'gateway path unavailable',
|
||||||
|
0x0B: 'gateway target device failed to respond'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_bits(data: bytes, bit_number: int) -> list[bool]:
|
||||||
|
return [bool(data[i // 8] >> (i % 8) & 0x01)
|
||||||
|
for i in range(bit_number)]
|
||||||
|
|
||||||
|
|
||||||
|
def _get_words(data: bytes) -> list[int]:
|
||||||
|
return [(data[i * 2] << 8) + data[i * 2 + 1]
|
||||||
|
for i in range(len(data) // 2)]
|
||||||
|
|
||||||
|
|
||||||
|
def _from_bits(values: list[bool]) -> bytes:
|
||||||
|
return bytes(sum(((1 << j) * bool(values[8 * i + j]))
|
||||||
|
for j in range(8)) for i in range(len(values) // 8))
|
||||||
|
|
||||||
|
|
||||||
|
def _from_words(values: list[int]) -> bytes:
|
||||||
|
return b''.join(word.to_bytes(2, byteorder='big') for word in values)
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleModbusClient:
|
||||||
|
"""
|
||||||
|
Modbus TCP client
|
||||||
|
"""
|
||||||
|
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
|
||||||
|
"""
|
||||||
|
Constructs a Modbus client
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: hostname or IPv4/IPv6 address server address
|
||||||
|
port: server port
|
||||||
|
unit_id: ModBus id
|
||||||
|
timeout: socket timeout in seconds
|
||||||
|
"""
|
||||||
|
assert 0 <= unit_id < 256
|
||||||
|
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.unit_id = unit_id
|
||||||
|
self.timeout = timeout
|
||||||
|
self.last_error = ''
|
||||||
|
self._transaction_id = random.randint(0, 0xFFFF)
|
||||||
|
self._socket: None | socket.socket = None
|
||||||
|
self.debug = debug
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
|
||||||
|
socket.AF_UNSPEC,
|
||||||
|
socket.SOCK_STREAM):
|
||||||
|
try:
|
||||||
|
self._socket = socket.socket(af, st, pr)
|
||||||
|
except socket.error:
|
||||||
|
self.close()
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
self._socket.settimeout(self.timeout)
|
||||||
|
self._socket.connect(sa)
|
||||||
|
except socket.error:
|
||||||
|
self.close()
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
if self._socket:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
self.last_error = 'connection failed'
|
||||||
|
return False
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
Close connection
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
empty bytes object
|
||||||
|
"""
|
||||||
|
if self._socket:
|
||||||
|
self._socket.close()
|
||||||
|
self._socket = None
|
||||||
|
|
||||||
|
return bytes()
|
||||||
|
|
||||||
|
def read_coils(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
|
||||||
|
"""
|
||||||
|
ModBus function for reading coils (0x01)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bit_address: Bit address (0 to 0xffff)
|
||||||
|
bit_lengths: Number of bits to read (1 to 2000)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of bool or None: Bits list or None if error
|
||||||
|
"""
|
||||||
|
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
|
||||||
|
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
|
||||||
|
|
||||||
|
if not self.send_modbus_data(_READ_COILS, _from_words([bit_address, bit_lengths])):
|
||||||
|
return None
|
||||||
|
|
||||||
|
rx_data = self.receive_modbus_data()
|
||||||
|
if not rx_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if len(rx_data) < 2:
|
||||||
|
self.last_error = 'received frame under min size'
|
||||||
|
return None
|
||||||
|
|
||||||
|
byte_count = rx_data[0]
|
||||||
|
bit_data = rx_data[1:]
|
||||||
|
|
||||||
|
if not (byte_count * 8 >= bit_lengths and
|
||||||
|
byte_count == len(bit_data)):
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _get_bits(bit_data, bit_lengths)
|
||||||
|
|
||||||
|
def read_discrete_inputs(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
|
||||||
|
"""
|
||||||
|
ModBus function for reading discrete inputs (0x02)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bit_address: Bit address (0 to 0xffff)
|
||||||
|
bit_lengths: Number of bits to read (1 to 2000)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of bool or None: Bits list or None if error
|
||||||
|
"""
|
||||||
|
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
|
||||||
|
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
|
||||||
|
|
||||||
|
if not self.send_modbus_data(_READ_DISCRETE_INPUTS, _from_words([bit_address, bit_lengths])):
|
||||||
|
return None
|
||||||
|
|
||||||
|
rx_data = self.receive_modbus_data()
|
||||||
|
if not rx_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if len(rx_data) < 2:
|
||||||
|
self.last_error = 'received frame under minimum size'
|
||||||
|
return None
|
||||||
|
|
||||||
|
byte_count = rx_data[0]
|
||||||
|
bit_data = rx_data[1:]
|
||||||
|
|
||||||
|
if not (byte_count * 8 >= bit_lengths and
|
||||||
|
byte_count == len(bit_data)):
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _get_bits(bit_data, bit_lengths)
|
||||||
|
|
||||||
|
def read_holding_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
|
||||||
|
"""
|
||||||
|
ModBus function for reading holding registers (0x03)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
register_address: Register address (0 to 0xffff)
|
||||||
|
word_lengths: Number of registers to read (1 to 125)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of int or None: Registers list or None if error
|
||||||
|
"""
|
||||||
|
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
|
||||||
|
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
|
||||||
|
|
||||||
|
if not self.send_modbus_data(_READ_HOLDING_REGISTERS, _from_words([register_address, word_lengths])):
|
||||||
|
return None
|
||||||
|
|
||||||
|
rx_data = self.receive_modbus_data()
|
||||||
|
if not rx_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if len(rx_data) < 2:
|
||||||
|
self.last_error = 'received frame under minimum size'
|
||||||
|
return None
|
||||||
|
|
||||||
|
byte_count = rx_data[0]
|
||||||
|
reg_data = rx_data[1:]
|
||||||
|
|
||||||
|
if not (byte_count == 2 * word_lengths and
|
||||||
|
byte_count == len(reg_data)):
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _get_words(reg_data)
|
||||||
|
|
||||||
|
def read_input_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
|
||||||
|
"""
|
||||||
|
ModBus function for reading input registers (0x04)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
register_address: Register address (0 to 0xffff)
|
||||||
|
word_lengths: Number of registers to read (1 to 125)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of int or None: Registers list or None if error
|
||||||
|
"""
|
||||||
|
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
|
||||||
|
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
|
||||||
|
|
||||||
|
if not self.send_modbus_data(_READ_INPUT_REGISTERS, _from_words([register_address, word_lengths])):
|
||||||
|
return None
|
||||||
|
|
||||||
|
rx_data = self.receive_modbus_data()
|
||||||
|
if not rx_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if len(rx_data) < 2:
|
||||||
|
self.last_error = 'received frame under minimum size'
|
||||||
|
return None
|
||||||
|
|
||||||
|
byte_count = rx_data[0]
|
||||||
|
reg_data = rx_data[1:]
|
||||||
|
|
||||||
|
if not (byte_count == 2 * word_lengths and
|
||||||
|
byte_count == len(reg_data)):
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _get_words(reg_data)
|
||||||
|
|
||||||
|
def write_single_coil(self, bit_address: int, value: bool) -> bool:
|
||||||
|
"""
|
||||||
|
ModBus function for writing a single coil (0x05)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bit_address: Bit address (0 to 0xffff)
|
||||||
|
value: Value to write (single bit)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if write succeeded or False if failed
|
||||||
|
"""
|
||||||
|
assert 0 <= bit_address <= 0xffff, 'bit_address out of range'
|
||||||
|
|
||||||
|
tx_data = _from_words([bit_address, 0xFF00 * bool(value)])
|
||||||
|
if not self.send_modbus_data(_WRITE_SINGLE_COIL, tx_data):
|
||||||
|
return False
|
||||||
|
|
||||||
|
data = self.receive_modbus_data()
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(data) != 4:
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return False
|
||||||
|
|
||||||
|
return data == tx_data
|
||||||
|
|
||||||
|
def write_single_register(self, register_address: int, value: int) -> bool:
|
||||||
|
"""
|
||||||
|
ModBus function for writing a single register (0x06)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
register_address: Register address (0 to 0xffff)
|
||||||
|
value: Value to write (0 to 0xffff)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if write succeeded or False if failed
|
||||||
|
"""
|
||||||
|
assert 0 <= register_address <= 0xffff, 'register_address out of range'
|
||||||
|
assert 0 <= value <= 0xffff, 'value out of range 0 to 0xffff'
|
||||||
|
|
||||||
|
tx_data = _from_words([register_address, value])
|
||||||
|
if not self.send_modbus_data(_WRITE_SINGLE_REGISTER, tx_data):
|
||||||
|
return False
|
||||||
|
|
||||||
|
data = self.receive_modbus_data()
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(data) != 4:
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return False
|
||||||
|
|
||||||
|
return data == tx_data
|
||||||
|
|
||||||
|
def write_multiple_coils(self, bit_address: int, values: list[bool]) -> bool:
|
||||||
|
"""
|
||||||
|
ModBus function for writing multiple coils (0x0F)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bit_address: Bit address (0 to 0xffff)
|
||||||
|
values: List of bit values to write
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if write succeeded or False if failed
|
||||||
|
"""
|
||||||
|
assert bit_address + len(values) <= 0xffff, 'bit_address out of range'
|
||||||
|
assert 1 <= len(values) <= 2000, 'number values must be from 1 to 2000'
|
||||||
|
|
||||||
|
byte_count = (len(values) + 7) // 8
|
||||||
|
tx_data = struct.pack('>HHB', bit_address, len(values), byte_count) + _from_bits(values)
|
||||||
|
if not self.send_modbus_data(_WRITE_MULTIPLE_COILS, tx_data):
|
||||||
|
return False
|
||||||
|
|
||||||
|
data = self.receive_modbus_data()
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(data) != 4:
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return False
|
||||||
|
|
||||||
|
return _get_words(data[0:1])[0] == bit_address
|
||||||
|
|
||||||
|
def write_multiple_registers(self, register_address: int, values: list[int]) -> bool:
|
||||||
|
"""
|
||||||
|
ModBus function for writing multiple registers (0x10)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
register_address: Register address (0 to 0xffff)
|
||||||
|
values: List of 16 bit register values to write
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if write succeeded or False if failed
|
||||||
|
"""
|
||||||
|
assert register_address + len(values) <= 0xffff, 'register_address out of range'
|
||||||
|
assert max(values) <= 0xffff, 'value out of range 0 to 0xffff'
|
||||||
|
assert min(values) >= 0, 'value out of range 0 to 0xffff'
|
||||||
|
|
||||||
|
byte_count = len(values) * 2
|
||||||
|
tx_data = struct.pack('>HHB', register_address, len(values), byte_count) + _from_words(values)
|
||||||
|
if not self.send_modbus_data(_WRITE_MULTIPLE_REGISTERS, tx_data):
|
||||||
|
return False
|
||||||
|
|
||||||
|
data = self.receive_modbus_data()
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if len(data) != 4:
|
||||||
|
self.last_error = 'received frame size mismatch'
|
||||||
|
return False
|
||||||
|
|
||||||
|
return _get_words(data[0:1])[0] == register_address
|
||||||
|
|
||||||
|
def _recv(self, number_of_bytes: int) -> bytes:
|
||||||
|
"""
|
||||||
|
Receive data over tcp, wait until all specified bytes are received
|
||||||
|
|
||||||
|
Args:
|
||||||
|
number_of_bytes: Number of bytes to receive
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
returns received bytes or empty an bytes object if an error occurred
|
||||||
|
"""
|
||||||
|
if not self._socket:
|
||||||
|
return bytes()
|
||||||
|
|
||||||
|
buffer = bytes()
|
||||||
|
while len(buffer) < number_of_bytes:
|
||||||
|
try:
|
||||||
|
buffer += self._socket.recv(number_of_bytes - len(buffer))
|
||||||
|
except socket.error:
|
||||||
|
return bytes()
|
||||||
|
|
||||||
|
if self.debug:
|
||||||
|
print(f'<- Received: {' '.join(hex(b) for b in buffer)}')
|
||||||
|
|
||||||
|
return buffer
|
||||||
|
|
||||||
|
def _send(self, data: bytes) -> int:
|
||||||
|
"""
|
||||||
|
Send data over tcp
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: data to send
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
number of transmitted bytes or 0 if transmission failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
for _ in range(2):
|
||||||
|
if self._socket:
|
||||||
|
try:
|
||||||
|
self._socket.sendall(data)
|
||||||
|
if self.debug:
|
||||||
|
print(f'-> Send: {' '.join(hex(b) for b in data)}')
|
||||||
|
return len(data)
|
||||||
|
except socket.error:
|
||||||
|
self.last_error = 'sending data failed'
|
||||||
|
self.close()
|
||||||
|
self.connect()
|
||||||
|
else:
|
||||||
|
self.connect()
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def send_modbus_data(self, function_code: int, body: bytes) -> int:
|
||||||
|
"""
|
||||||
|
Send raw ModBus TCP frame
|
||||||
|
|
||||||
|
Args:
|
||||||
|
unction_code: ModBus function code
|
||||||
|
body: data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
number of transmitted bytes or 0 if transmission failed
|
||||||
|
"""
|
||||||
|
self._transaction_id = (self._transaction_id + 1) % 0x10000
|
||||||
|
protocol_identifier = 0
|
||||||
|
length = len(body) + 2
|
||||||
|
header = struct.pack('>HHHBB', self._transaction_id,
|
||||||
|
protocol_identifier, length, self.unit_id,
|
||||||
|
function_code)
|
||||||
|
frame = header + body
|
||||||
|
|
||||||
|
return self._send(frame)
|
||||||
|
|
||||||
|
def receive_modbus_data(self) -> bytes:
|
||||||
|
"""
|
||||||
|
Receive a ModBus frame
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes received or empty bytes object if an error occurred
|
||||||
|
"""
|
||||||
|
header = self._recv(7)
|
||||||
|
if not header:
|
||||||
|
self.last_error = 'receiving return frame failed'
|
||||||
|
return self.close()
|
||||||
|
|
||||||
|
transaction_id, protocol_identifier, length, unit_id =\
|
||||||
|
struct.unpack('>HHHB', header)
|
||||||
|
|
||||||
|
if not ((transaction_id == self._transaction_id) and
|
||||||
|
(protocol_identifier == 0) and
|
||||||
|
(unit_id == self.unit_id) and
|
||||||
|
(length <= 0xFF) and
|
||||||
|
(transaction_id == self._transaction_id)):
|
||||||
|
self.last_error = 'received frame is invalid'
|
||||||
|
return self.close()
|
||||||
|
|
||||||
|
data = self._recv(length - 1)
|
||||||
|
if not data:
|
||||||
|
self.last_error = 'receiving data payload failed'
|
||||||
|
return self.close()
|
||||||
|
|
||||||
|
if data[0] > 0x80:
|
||||||
|
self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})'
|
||||||
|
return bytes()
|
||||||
|
|
||||||
|
return data[1:]
|
|
@ -0,0 +1,36 @@
|
||||||
|
import unittest
|
||||||
|
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
|
||||||
|
|
||||||
|
|
||||||
|
class TestModbusFunctions(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_get_bits(self):
|
||||||
|
data = bytes([0b11101010, 0b11010101])
|
||||||
|
bit_number = 16
|
||||||
|
expected = [False, True, False, True, False, True, True, True,
|
||||||
|
True, False, True, False, True, False, True, True]
|
||||||
|
result = _get_bits(data, bit_number)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_get_words(self):
|
||||||
|
data = bytes([0x12, 0x34, 0x56, 0x78])
|
||||||
|
expected = [0x1234, 0x5678]
|
||||||
|
result = _get_words(data)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_from_bits(self):
|
||||||
|
values = [False, True, False, True, False, True, True, True,
|
||||||
|
True, False, True, False, True, False, True, True]
|
||||||
|
expected = bytes([0b11101010, 0b11010101])
|
||||||
|
result = _from_bits(values)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_from_words(self):
|
||||||
|
values = [0x1234, 0x5678]
|
||||||
|
expected = bytes([0x12, 0x34, 0x56, 0x78])
|
||||||
|
result = _from_words(values)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
|
@ -0,0 +1,34 @@
|
||||||
|
import inspect
|
||||||
|
import src.pyhoff.devices as devices
|
||||||
|
from src.pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal
|
||||||
|
|
||||||
|
|
||||||
|
def test_terminal_plausib():
|
||||||
|
|
||||||
|
for n, o in inspect.getmembers(devices):
|
||||||
|
if inspect.isclass(o) and o not in [DigitalInputTerminal,
|
||||||
|
DigitalOutputTerminal,
|
||||||
|
AnalogInputTerminal,
|
||||||
|
AnalogOutputTerminal]:
|
||||||
|
print('Terminal: ' + n)
|
||||||
|
if issubclass(o, DigitalInputTerminal):
|
||||||
|
assert o.parameters.get('input_bit_width', 0) > 0
|
||||||
|
assert o.parameters.get('output_bit_width', 0) == 0
|
||||||
|
assert o.parameters.get('input_word_width', 0) == 0
|
||||||
|
assert o.parameters.get('output_word_width', 0) == 0
|
||||||
|
|
||||||
|
if issubclass(o, DigitalOutputTerminal):
|
||||||
|
assert o.parameters.get('input_bit_width', 0) == 0
|
||||||
|
assert o.parameters.get('output_bit_width', 0) > 0
|
||||||
|
assert o.parameters.get('input_word_width', 0) == 0
|
||||||
|
assert o.parameters.get('output_word_width', 0) == 0
|
||||||
|
|
||||||
|
if issubclass(o, AnalogInputTerminal):
|
||||||
|
assert o.parameters.get('input_bit_width', 0) == 0
|
||||||
|
assert o.parameters.get('output_bit_width', 0) == 0
|
||||||
|
assert o.parameters.get('input_word_width', 0) > 0
|
||||||
|
|
||||||
|
if issubclass(o, AnalogOutputTerminal):
|
||||||
|
assert o.parameters.get('input_bit_width', 0) == 0
|
||||||
|
assert o.parameters.get('output_bit_width', 0) == 0
|
||||||
|
assert o.parameters.get('output_word_width', 0) > 0
|
Loading…
Reference in New Issue