Compare commits

..

6 Commits

Author SHA1 Message Date
Nicolas 5e52a3f866 readme updated 2025-02-17 14:57:11 +01:00
Nicolas f34eef3eb4 docstrings and project settings updated, few refacoring 2025-02-17 14:57:03 +01:00
Nicolas b68b32680a flake8 config updated 2025-02-17 09:32:10 +01:00
Nicolas 2d69ffc7d5 test for setup BusTerminal added 2025-02-17 09:31:28 +01:00
Nicolas 6a4aa22575 minor fixes 2025-02-17 09:31:06 +01:00
Nicolas e7c4ad6f7c full refactored, minimal modbus tcp implementation integrated 2025-02-16 23:49:16 +01:00
15 changed files with 1504 additions and 388 deletions

20
.flake8 Normal file
View File

@ -0,0 +1,20 @@
[flake8]
# Specify the maximum allowed line length
max-line-length = 88
# Ignore specific rules
# For example, E501: Line too long, W503: Line break before binary operator
ignore = E501, W503, W504
# Exclude specific files or directories
exclude =
.git,
__pycache__,
build,
dist,
.conda
.venv
# Enable specific plugins or options
# Example: Enabling flake8-docstrings
select = C,E,F,W,D

36
.gitignore vendored
View File

@ -82,31 +82,6 @@ target/
profile_default/ profile_default/
ipython_config.py ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml .pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
@ -125,6 +100,7 @@ celerybeat.pid
env/ env/
venv/ venv/
ENV/ ENV/
.venv/
env.bak/ env.bak/
venv.bak/ venv.bak/
@ -152,9 +128,7 @@ dmypy.json
# Cython debug symbols # Cython debug symbols
cython_debug/ cython_debug/
# PyCharm pyModbusTCP_old/
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can test.py
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore test_*.ipynb
# and can be added to the global gitignore or merged into this file. For a more nuclear settings.json
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

15
CITATION.cff Normal file
View File

@ -0,0 +1,15 @@
cff-version: 1.2.0
title: pyhoff
abstract: The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP
authors:
- family-names: Kruse
given-names: Nicolas
orcid: "https://orcid.org/0000-0001-6758-2269"
version: 1.0.0
#date-released: "2025-04-01"
#identifiers:
# - description: This is the collection of archived snapshots of all versions of My Research Software
# type: doi
# value: "10.5281/"
license: MIT License
repository-code: "https://github.com/Nonannet/pyhoff"

View File

@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) 2023 Nonannet Copyright (c) 2025 Nicolas Kruse
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

103
README.md
View File

@ -1,51 +1,98 @@
# pyhoff # pyhoff
This python code allows to read and write the most common ## Description
Beckhoff bus terminals ("Busklemmen") using the bus coupler The pyhoff package allows you to read and write the most common
BK9050 ("Busskoppler BK9050") over Ethernet TCP/IP. Beckhoff and WAGO bus terminals ("Busklemmen") using the Ethernet bus
coupler ("Busskoppler") BK9000, BK9050, BK9100, or WAGO 750_352
over Ethernet TCP/IP based on ModBus TCP.
It depends on the package pyModbusTCP. This can be installed with: ### Key Features
- Supports a wide range of Beckhoff and WAGO analog and digital bus
terminals.
- Very light weight: no dependencies; compact code base
- Easy to extend
- Using standardized ModBus TCP.
- Provides high-level abstractions for reading and writing data
from/to IO-terminals with minimal code
pip install pyModbusTCP ### Usage Scenarios
- Industrial test setups.
- Research automation setups.
- Data acquisition and monitoring.
## Installation
The package has no additional decencies. It can be installed with pip:
```bash
pip install pyhoff
```
## Usage
It is easy to use as the following example code shows: It is easy to use as the following example code shows:
```python ```python
from pyhoff import * from pyhoff.devices import *
#connect to the BK9050 by tcp/ip on default port 502 # connect to the BK9050 by tcp/ip on default port 502
bus_coupler = ModBusBK9050("172.16.17.1") bus_coupler = BK9050("172.16.17.1")
#list of all bus terminals connected to the bus coupler # list of all bus terminals connected to the bus coupler
#in the order of the physical arrangement # in the order of the physical arrangement
terminal_list = [KL2404, KL2424, KL9100, KL1104, KL3202, terminal_list = [KL2404, KL2424, KL9100, KL1104, KL3202,
KL4002, KL9188, KL3054, KL3214, KL4004, KL4002, KL9188, KL3054, KL3214, KL4004,
KL9010] KL9010]
terminals = bus_coupler.add_bus_terminals(terminal_list) terminals = bus_coupler.add_bus_terminals(terminal_list)
#Set 1. output of the first bus terminal (KL2404) to hi # Set 1. output of the first bus terminal (KL2404) to hi
terminals[0].write_coil(0, True) terminals[0].write_coil(1, True)
#read the temperature from the 2. channel of the 5. bus # read the temperature from the 2. channel of the 5. bus
#terminal (KL3202) # terminal (KL3202)
t = terminals[4].read_temperature(1) t = terminals[4].read_temperature(2)
print(f"t = {t:.1f} °C") print(f"t = {t:.1f} °C")
#Set 1. output of the 6. bus terminal (KL4002) to 4.2 V # Set 1. output of the 6. bus terminal (KL4002) to 4.2 V
terminals[5].set_voltage(0, 4.2) terminals[5].set_voltage(1, 4.2)
``` ```
The following terminals are implemented: ## Contributing
- KL1104: 4x digital input 24 V Other analog and digital IO terminals are easy to complement. Contributions are welcome!
- KL2404: 4x digital output with 500 mA Please open an issue or submit a pull request on GitHub.
- KL2424: 4x digital output with 2000 mA
- KL3054: 4x analog input 4...20 mA 12 Bit single-ended
- KL3202: 2x analog input PT100 16 Bit 3-wire
- KL3214: 4x analog input PT100 16 Bit 3-wire
- KL4002: 2x analog output 0...10 V 12 Bit differentiell
- KL4004: 4x analog output 0...10 V 12 Bit differentiell
- Dummy terminals without io functionality: KL9100, KL9183, KL9188
Other analog and digital io-terminals are easy to complement. Pull requests are welcome. ## Developer Guide
To get started with developing the `pyhoff` package, follow these steps:
1. **Clone the Repository**
First, clone the repository to your local machine using Git:
```bash
git clone https://github.com/Nonannet/pyhoff.git
cd pyhoff
```
2. **Set Up a Virtual Environment**
It is recommended to use a virtual environment to manage dependencies. You can create one using `venv`:
```bash
python -m venv venv
source venv/bin/activate # On Windows use `venv\Scripts\activate`
```
3. **Install Dev Dependencies**
Install the dependencies required for development using `pip`:
```bash
pip install -r requirements-dev.txt
```
4. **Run Tests**
Ensure that everything is set up correctly by running the tests:
```bash
pytest
```
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

25
notes/build.md Normal file
View File

@ -0,0 +1,25 @@
# Notes on building the package
```bash
# Get code
git clone https://github.com/Nonannet/pyhoff.git
cd pyhoff
# Setup venv
python -m venv ./.venv
source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate`
# Check code:
pip install -r requirements-dev.txt
flake8
pytest
# Build package:
pip install build
python3 -m build
# Upload
pip install twine
#python3 -m twine upload dist/*
python3 -m twine upload --repository testpypi dist/* # Test repository: https://test.pypi.org/project/example_package_YOUR_USERNAME_HERE
```

29
notes/wago.md Normal file
View File

@ -0,0 +1,29 @@
# WAGO IO-Error LED
Default credentials are: admin:wago
Generating a new ea-config.xml based on the currently attached terminals:
``` js
await fetch("http://192.168.178.172/DOCPLFUNC", {
"credentials": "include",
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.7,de;q=0.3",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-GPC": "1",
"Authorization": "Basic YWRtaW46d2Fnbw==",
"Upgrade-Insecure-Requests": "1",
"Priority": "u=4",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
},
"referrer": "http://192.168.178.172/webserv/cplcfg/ea-config.ssi",
"body": "DO_CREATE_EA_XML=create+ea-config.xml",
"method": "POST",
"mode": "cors"
});
```
Download ea-config.xml under http://192.168.178.172/etc/ea-config.xml (authorization required)

328
pyhoff.py
View File

@ -1,328 +0,0 @@
from pyModbusTCP.client import ModbusClient
from typing import Type
class BusTerminal():
output_bit_width = 0
input_bit_width = 0
output_word_width = 0
input_word_width = 0
def __init__(self, bus_coupler: 'BusCoupler'):
self.output_bit_offset = 0
self.input_bit_offset = 0
self.output_word_offset = 0
self.output_word_offset = 0
self.input_word_offset = 0
self.bus_coupler = bus_coupler
class DigitalInputTerminal(BusTerminal):
def read_input(self, channel: int):
print(self.input_bit_width, channel)
if channel < 1 or channel > self.input_bit_width:
raise Exception("address out of range")
return self.bus_coupler.read_discrete_input(self.input_bit_offset + channel - 1)
class DigitalOutputTerminal(BusTerminal):
def write_coil(self, channel: int, value: int):
if channel < 1 or channel > self.output_bit_width:
raise Exception("address out of range")
return self.bus_coupler.write_single_coil(self.output_bit_offset + channel - 1, value)
def read_coil(self, channel: int):
if channel < 1 or channel > self.output_bit_width:
raise Exception("address out of range")
return self.bus_coupler.read_coil(self.output_bit_offset + channel - 1)
class AnalogInputTerminal(BusTerminal):
def read_words(self, word_offset: int, word_count: int):
if word_offset < 0 or word_offset + word_count > self.input_word_width:
raise Exception("address out of range")
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
def read_words_nocheck(self, word_offset: int, word_count: int):
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
def read_word(self, word_offset: int):
val = self.read_words(word_offset, 1)
if val is None:
return -999
else:
return val[0]
class AnalogOutputTerminal(BusTerminal):
def read_words(self, word_offset: int, word_count: int):
if word_offset < 0 or word_offset + word_count > self.output_word_width:
raise Exception("address out of range")
return self.bus_coupler.read_holding_registers(self.output_word_offset + word_offset, word_count)
def read_words_nocheck(self, word_offset: int, word_count: int):
return self.bus_coupler.read_holding_registers(self.output_word_offset*0 + word_offset, word_count)
def read_word(self, word_offset: int):
val = self.read_words(word_offset, 1)
if val is None:
return -999
else:
return val[0]
def write_word(self, word_offset: int, data: int):
if word_offset < 0 or word_offset + 1 > self.output_word_width:
raise Exception("address out of range")
return self.bus_coupler.write_single_register(self.output_word_offset + word_offset, data)
def write_word_nocheck(self, word_offset: int, data: int):
return self.bus_coupler.write_single_register(self.output_word_offset*0 + word_offset, data)
class KL1104(DigitalInputTerminal):
"""KL1104: 4x digital input 24 V"""
input_bit_width = 4
class KL1408(DigitalInputTerminal):
"""KL1104: 8x digital input 24 V galvanic isolated"""
input_bit_width = 8
class WAGO_750_1405(DigitalInputTerminal):
"""750-1405: 16x digital input 24 V"""
input_bit_width = 16
class KL2404(DigitalOutputTerminal):
"""KL2404: 4x digital output with 500 mA"""
output_bit_width = 4
class KL2424(DigitalOutputTerminal):
"""KL2424: 4x digital output with 2000 mA"""
output_bit_width = 4
class KL2634(DigitalOutputTerminal):
"""KL2634: 4x digital output 250 V AC, 30 V DC, 4 A"""
output_bit_width = 4
class KL2408(DigitalOutputTerminal):
"""750-530: 8x digital output with 24 V / 500 mA"""
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
output_bit_width = 8
class WAGO_750_530(DigitalOutputTerminal):
"""750-530: 8x digital output with 24 V / 500 mA"""
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
output_bit_width = 8
class KL3054(AnalogInputTerminal):
"""KL3054: 4x analog input 4...20 mA 12 Bit single-ended"""
#Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
input_word_width = 8
output_word_width = 8
def read_normalized(self, channel: int):
return self.read_word(channel * 2 - 1) / 0x7FFF
def read_current(self, channel: int):
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
class KL3042(AnalogInputTerminal):
"""KL3042: 2x analog input 0...20 mA 12 Bit single-ended"""
#Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
input_word_width = 4
output_word_width = 4
def read_normalized(self, channel: int):
return self.read_word(channel*2-1) / 0x7FFF
def read_current(self, channel: int):
return self.read_normalized(channel*2-1) * 20.0
class KL3202(AnalogInputTerminal):
"""KL3202: 2x analog input PT100 16 Bit 3-wire"""
#Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
input_word_width = 4
output_word_width = 4
def read_temperature(self, channel: int):
val = self.read_word(channel*2-1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL3214(AnalogInputTerminal):
"""KL3214: 4x analog input PT100 16 Bit 3-wire"""
#inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
#out: 4 x 8 Bit Control (optional)
input_word_width = 8
output_word_width = 8
def read_temperature(self, channel: int):
val = self.read_word(channel*2-1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL4002(AnalogOutputTerminal):
"""KL4002: 2x analog output 0...10 V 12 Bit differentiell"""
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
input_word_width = 4
output_word_width = 4
def set_normalized(self, channel: int, value: float):
self.write_word(channel*2-1, int(value * 0x7FFF))
def set_voltage(self, channel: int, value: float):
self.set_normalized(channel, value / 10.0)
class KL4132(AnalogOutputTerminal):
"""KL4002: 2x analog output ±10 V 16 bit differential"""
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
input_word_width = 4
output_word_width = 4
def set_normalized(self, channel: int, value: float):
if value >= 0:
self.write_word(channel-1, int(value * 0x7FFF))
else:
self.write_word(channel-1, int(0x10000 + value * 0x7FFF ))
def set_voltage(self, channel: int, value: float):
self.set_normalized(channel, value / 10.0)
class KL4004(AnalogOutputTerminal):
"""KL4004: 4x analog output 0...10 V 12 Bit differentiell"""
#Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
input_word_width = 8
output_word_width = 8
def set_normalized(self, channel: int, value: float):
self.write_word(channel*2-1, int(value * 0x7FFF))
def set_voltage(self, channel: int, value: float):
self.set_normalized(channel, value / 10.0)
class KL9000(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9010(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9100(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9183(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9188(BusTerminal):
"""Dummy, no I/O function"""
pass
class WAGO_750_600(BusTerminal):
"""Dummy, no I/O function"""
pass
class WAGO_750_602(BusTerminal):
"""Dummy, no I/O function"""
pass
class BusCoupler(ModbusClient):
"""BusCoupler: Busskoppler ModBus TCP"""
def __init__(self, host: str, port: int = 502, timeout: int = 5, watchdog: float = 0):
"""
Constructs all the necessary attributes for the person object.
Parameters
----------
host : str
ip or hostname of the BK9050
port : int
port of the modbus host
debug : bool
outputs modbus debug information
timeout : float
timeout for waiting for the device response
watchdog : float
time in secounds after the device sets all outputs to
default state. A value of 0 deactivates the watchdog.
"""
ModbusClient.__init__(self, host, port, auto_open=True, timeout=timeout)
self.bus_terminals: list[BusTerminal] = list()
self.next_output_bit_offset = 0
self.next_input_bit_offset = 0
self.next_output_word_offset = 0
self.next_input_word_offset = 0
self.init_hardware(watchdog)
def init_hardware(self, watchdog: float):
pass
def read_discrete_input(self, address: int):
value = self.read_discrete_inputs(address)
if value is None:
return False
else:
return value[0]
def read_coil(self, address):
value = self.read_coils(address)
if value is None:
return False
else:
return value[0]
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
for terminal_class in bus_terminals:
nterm = terminal_class(self)
nterm.output_bit_offset = self.next_output_bit_offset
nterm.input_bit_offset = self.next_input_bit_offset
nterm.output_word_offset = self.next_output_word_offset
nterm.input_word_offset = self.next_input_word_offset
self.next_output_bit_offset += terminal_class.output_bit_width
self.next_input_bit_offset += terminal_class.input_bit_width
self.next_output_word_offset += terminal_class.output_word_width
self.next_input_word_offset += terminal_class.input_word_width
self.bus_terminals.append(nterm)
return self.bus_terminals
class ModBusBK9050(BusCoupler):
"""BK9050: Busskoppler ModBus TCP"""
def init_hardware(self, watchdog: float):
#https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
#config watchdog on page 58
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.write_single_register(0x1120, watchdog * 1000) #ms
#reset watchdog timer:
self.write_single_register(0x1121, 0xBECF)
self.write_single_register(0x1121, 0xAFFE)
#set process image offset
self.next_output_word_offset = 0x0800
class WAGO750_352(BusCoupler):
"""Wago 750-352: Busskoppler ModBus TCP"""
def init_hardware(self, watchdog: float):
#deactivate/reset watchdog timer:
self.write_single_register(0x1005, 0xAAAA)
self.write_single_register(0x1005, 0x5555)
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.write_single_register(0x1000, int(watchdog * 10))
if watchdog:
#configure watchdog to reset on all functions codes
self.write_single_register(0x1001, 0xFFFF)
#set process image offset
self.next_output_word_offset = 0x0000

31
pyproject.toml Normal file
View File

@ -0,0 +1,31 @@
[project]
name = "pyhoff"
version = "1.0.0"
authors = [
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
]
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.urls]
Homepage = "https://github.com/Nonannet/pyhoff"
Issues = "https://github.com/Nonannet/pyhoff/issues"
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = ["tests"]
pythonpath = ["src"]

2
requirements-dev.txt Normal file
View File

@ -0,0 +1,2 @@
pytest
flake8

292
src/pyhoff/__init__.py Normal file
View File

@ -0,0 +1,292 @@
from .modbus import SimpleModbusClient
from typing import Type, Any
class BusTerminal():
"""
Base class for all bus terminals.
Args:
bus_coupler: The bus coupler to which this terminal is connected.
output_bit_offset: The offset of the output bits.
input_bit_offset: The offset of the input bits.
output_word_offset: The offset of the output words.
input_word_offset: The offset of the input words.
Attributes:
bus_coupler: The bus coupler to which this terminal is connected.
parameters: The parameters of the terminal.
"""
parameters: dict[str, int] = {}
def __init__(self, bus_coupler: 'BusCoupler',
output_bit_offset: int,
input_bit_offset: int,
output_word_offset: int,
input_word_offset: int):
self._output_bit_offset = output_bit_offset
self._input_bit_offset = input_bit_offset
self._output_word_offset = output_word_offset
self._input_word_offset = input_word_offset
self.bus_coupler = bus_coupler
class DigitalInputTerminal(BusTerminal):
"""
Base class for digital input terminals.
"""
def read_input(self, channel: int) -> bool | None:
"""
Read the input from a specific channel.
Args:
channel: The channel number (start counting from 1) to read from.
Returns:
The input value of the specified channel or None if the read operation failed.
Raises:
Exception: If the channel number is out of range.
"""
if channel < 1 or channel > self.parameters['input_bit_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1)
class DigitalOutputTerminal(BusTerminal):
"""
Base class for digital output terminals.
"""
def write_coil(self, channel: int, value: bool) -> bool:
"""
Write a value to a specific channel.
Args:
channel: The channel number (start counting from 1) to write to.
value: The value to write.
Returns:
True if the write operation succeeded, otherwise False.
Raises:
Exception: If the channel number is out of range.
"""
if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value)
def read_coil(self, channel: int) -> bool | None:
"""
Read the coil value back from a specific channel.
Args:
channel: The channel number (start counting from 1) to read from.
Returns:
The coil value of the specified channel or None if the read operation failed.
Raises:
Exception: If the channel number is out of range.
"""
if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1)
class AnalogInputTerminal(BusTerminal):
"""
Base class for analog input terminals.
"""
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
"""
Read a single word from the terminal.
Args:
word_offset: The word offset (0 based index) to read from.
Returns:
The read word value.
"""
val = self.read_words(word_offset, 1)
if val:
return val[0]
else:
return -999
def read_normalized(self, channel: int) -> float:
"""
Read a normalized value (0...1) from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The normalized value.
"""
return self.read_word(channel * 2 - 1) / 0x7FFF
class AnalogOutputTerminal(BusTerminal):
"""
Base class for analog output terminals.
"""
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
"""
Read a single word from the terminal.
Args:
word_offset: The word offset (0 based index) to read from.
Returns:
The read word value.
"""
val = self.read_words(word_offset, 1)
if val:
return val[0]
else:
return -999
def write_word(self, word_offset: int, data: int) -> bool:
"""
Write a word to the terminal.
Args:
word_offset: The word offset to write to.
data: The data to write.
Returns:
The result of the write operation.
Raises:
Exception: If the word offset is out of range.
"""
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
def set_normalized(self, channel: int, value: float):
"""
Set a normalized value between 0 and 1 to a specific channel.
Args:
channel: The channel number to set.
value: The normalized value to set.
"""
self.write_word(channel * 2 - 1, int(value * 0x7FFF))
class BusCoupler():
"""
Base class for ModBus TCP bus coupler
Args:
host: ip or hostname of the bus coupler
port: port of the modbus host
debug: outputs modbus debug information
timeout: timeout for waiting for the device response
watchdog: time in seconds after the device sets all outputs to
default state. A value of 0 deactivates the watchdog.
debug: If True, debug information is printed.
Attributes:
bus_terminals: A list of bus terminal classes according to the
connected terminals.
modbus: The underlying modbus client used for the connection.
Examples:
>>> from pyhoff.devices import *
>>> bk = BK9000('192.168.0.23', bus_terminals=[KL3202, KL9010])
>>> t1 = bk.terminals[0].read_temperature(1)
>>> t2 = bk.terminals[0].read_temperature(2)
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
"""
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False):
self.bus_terminals: list[Any] = list()
self._next_output_bit_offset = 0
self._next_input_bit_offset = 0
self._next_output_word_offset = 0
self._next_input_word_offset = 0
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
self.add_bus_terminals(bus_terminals)
self._init_hardware(watchdog)
def _init_hardware(self, watchdog: float):
pass
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]:
"""
Add bus terminals to the bus coupler.
Args:
bus_terminals: A list of bus terminal classes to add.
Returns:
The corresponding list of bus terminal objects.
"""
for terminal_class in bus_terminals:
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
new_terminal = terminal_class(self,
self._next_output_bit_offset,
self._next_input_bit_offset,
self._next_output_word_offset,
self._next_input_word_offset)
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
self.bus_terminals.append(new_terminal)
return self.bus_terminals
def get_error(self) -> str:
"""
Get the last error message.
Returns:
The last error message.
"""
return self.modbus.last_error

395
src/pyhoff/devices.py Normal file
View File

@ -0,0 +1,395 @@
from . import DigitalInputTerminal, DigitalOutputTerminal
from . import AnalogInputTerminal, AnalogOutputTerminal
from . import BusTerminal, BusCoupler
class BK9000(BusCoupler):
"""
BK9000 ModBus TCP bus coupler
"""
def _init_hardware(self, watchdog: float):
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
# config watchdog on page 58
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.modbus.write_single_register(0x1120, int(watchdog * 1000)) # ms
# reset watchdog timer:
self.modbus.write_single_register(0x1121, 0xBECF)
self.modbus.write_single_register(0x1121, 0xAFFE)
# set process image offset
self.next_output_word_offset = 0x0800
class BK9050(BK9000):
"""
BK9050 ModBus TCP bus coupler
"""
pass
class BK9100(BK9000):
"""
BK9100 ModBus TCP bus coupler
"""
pass
class WAGO750_352(BusCoupler):
"""
Wago 750-352 ModBus TCP bus coupler
"""
def _init_hardware(self, watchdog: float):
# deactivate/reset watchdog timer:
self.modbus.write_single_register(0x1005, 0xAAAA)
self.modbus.write_single_register(0x1005, 0x5555)
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.modbus.write_single_register(0x1000, int(watchdog * 10))
if watchdog:
# configure watchdog to reset on all functions codes
self.modbus.write_single_register(0x1001, 0xFFFF)
# set process image offset
self.next_output_word_offset = 0x0000
class DigitalInputTerminal4Bit(DigitalInputTerminal):
"""
Generic 4 bit input terminal
"""
parameters = {'input_bit_width': 4}
class DigitalInputTerminal8Bit(DigitalInputTerminal):
"""
Generic 8 bit input terminal
"""
parameters = {'input_bit_width': 8}
class DigitalInputTerminal16Bit(DigitalInputTerminal):
"""
Generic 16 bit input terminal
"""
parameters = {'input_bit_width': 16}
class DigitalOutputTerminal4Bit(DigitalOutputTerminal):
"""
Generic 4 bit output terminal
"""
parameters = {'output_bit_width': 4}
class DigitalOutputTerminal8Bit(DigitalOutputTerminal):
"""
Generic 8 bit output terminal
"""
parameters = {'output_bit_width': 8}
class DigitalOutputTerminal16Bit(DigitalOutputTerminal):
"""
Generic 16 bit output terminal
"""
parameters = {'output_bit_width': 16}
class KL1104(DigitalInputTerminal4Bit):
"""
KL1104: 4x digital input 24 V
"""
pass
class KL1408(DigitalInputTerminal8Bit):
"""
KL1104: 8x digital input 24 V galvanic isolated
"""
pass
class WAGO_750_1405(DigitalInputTerminal16Bit):
"""
750-1405: 16x digital input 24 V
"""
pass
class KL2404(DigitalOutputTerminal4Bit):
"""
KL2404: 4x digital output with 500 mA
"""
pass
class KL2424(DigitalOutputTerminal4Bit):
"""
KL2424: 4x digital output with 2000 mA
"""
pass
class KL2634(DigitalOutputTerminal4Bit):
"""
KL2634: 4x digital output 250 V AC, 30 V DC, 4 A
"""
pass
class KL2408(DigitalOutputTerminal8Bit):
"""
750-530: 8x digital output with 24 V / 500 mA
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
"""
pass
class WAGO_750_530(DigitalOutputTerminal8Bit):
"""
750-530: 8x digital output with 24 V / 500 mA
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
"""
pass
class KL1512(AnalogInputTerminal):
"""
KL1512: 2x digital input, counter, 24 V DC, 1 kHz
"""
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int):
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset)
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)]
def read_counter(self, channel: int) -> int:
"""
Read the absolut counter value of a specific channel.
Args:
channel: The channel number to read from.
Returns:
The counter value.
"""
return self.read_word(channel * 2 - 1)
def read_delta(self, channel: int) -> int:
"""
Read the counter change since last read of a specific channel.
Args:
channel: The channel number to read from.
Returns:
The counter value.
"""
# TODO: handel overflow
new_count = self.read_word(channel * 2 - 1)
return new_count - self._last_counter_values[channel - 1]
class KL3054(AnalogInputTerminal):
"""
KL3054: 4x analog input 4...20 mA 12 Bit single-ended
"""
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8}
def read_current(self, channel: int) -> float:
"""
Read the current value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The current value.
"""
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
class KL3042(AnalogInputTerminal):
"""
KL3042: 2x analog input 0...20 mA 12 Bit single-ended
"""
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def read_current(self, channel: int) -> float:
"""
Read the current value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The current value.
"""
return self.read_normalized(channel * 2 - 1) * 20.0
class KL3202(AnalogInputTerminal):
"""
KL3202: 2x analog input PT100 16 Bit 3-wire
"""
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def read_temperature(self, channel: int) -> float:
"""
Read the temperature value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The temperature value in °C.
"""
val = self.read_word(channel * 2 - 1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL3214(AnalogInputTerminal):
"""
KL3214: 4x analog input PT100 16 Bit 3-wire
"""
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
# out: 4 x 8 Bit Control (optional)
parameters = {'input_word_width': 8, 'output_word_width': 8}
def read_temperature(self, channel: int) -> float:
"""
Read the temperature value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The temperature value.
"""
val = self.read_word(channel * 2 - 1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL4002(AnalogOutputTerminal):
"""
KL4002: 2x analog output 0...10 V 12 Bit differentiell
"""
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def set_voltage(self, channel: int, value: float):
"""
Set a voltage value to a specific channel.
Args:
channel: The channel number to set.
value: The voltage value to set.
"""
self.set_normalized(channel, value / 10.0)
class KL4132(AnalogOutputTerminal):
"""
KL4002: 2x analog output ±10 V 16 bit differential
"""
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def set_normalized(self, channel: int, value: float):
"""
Set a normalized value between -1 and +1 to a specific channel.
Args:
channel: The channel number to set.
value: The normalized value to set.
"""
if value >= 0:
self.write_word(channel - 1, int(value * 0x7FFF))
else:
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF))
def set_voltage(self, channel: int, value: float):
"""
Set a voltage value between -10 and +10 V to a specific channel.
Args:
channel: The channel number to set.
value: The voltage value to set.
"""
self.set_normalized(channel, value / 10.0)
class KL4004(AnalogOutputTerminal):
"""
KL4004: 4x analog output 0...10 V 12 Bit differentiell
"""
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8}
def set_voltage(self, channel: int, value: float):
"""
Set a voltage value to a specific channel.
Args:
channel: The channel number to set.
value: The voltage value to set.
"""
self.set_normalized(channel, value / 10.0)
class KL9010(BusTerminal):
"""
End terminal, no I/O function
"""
pass
class KL9100(BusTerminal):
"""
Potential supply terminal, no I/O function
"""
pass
class KL9183(BusTerminal):
"""
Potential distribution terminal, no I/O function
"""
pass
class KL9188(BusTerminal):
"""
Potential distribution terminal, no I/O function
"""
pass
class WAGO_750_600(BusTerminal):
"""
End terminal, no I/O function
"""
pass
class WAGO_750_602(BusTerminal):
"""
Potential supply terminal, no I/O function
"""
pass

517
src/pyhoff/modbus.py Normal file
View File

@ -0,0 +1,517 @@
import socket
import struct
import random
_READ_COILS = 0x01
_READ_DISCRETE_INPUTS = 0x02
_READ_HOLDING_REGISTERS = 0x03
_READ_INPUT_REGISTERS = 0x04
_WRITE_SINGLE_COIL = 0x05
_WRITE_SINGLE_REGISTER = 0x06
_WRITE_MULTIPLE_COILS = 0x0F
_WRITE_MULTIPLE_REGISTERS = 0x10
_modbus_exceptions = {
0x01: 'illegal function',
0x02: 'illegal data address',
0x03: 'illegal data value',
0x04: 'slave device failure',
0x05: 'acknowledge',
0x06: 'slave device busy',
0x07: 'negative acknowledge',
0x08: 'memory parity error',
0x0A: 'gateway path unavailable',
0x0B: 'gateway target device failed to respond'
}
def _get_bits(data: bytes, bit_number: int) -> list[bool]:
return [bool(data[i // 8] >> (i % 8) & 0x01)
for i in range(bit_number)]
def _get_words(data: bytes) -> list[int]:
return [(data[i * 2] << 8) + data[i * 2 + 1]
for i in range(len(data) // 2)]
def _from_bits(values: list[bool]) -> bytes:
return bytes(sum(((1 << j) * bool(values[8 * i + j]))
for j in range(8)) for i in range(len(values) // 8))
def _from_words(values: list[int]) -> bytes:
return b''.join(word.to_bytes(2, byteorder='big') for word in values)
class SimpleModbusClient:
"""
A simple Modbus TCP client
Args:
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
debug: if True prints out transmitted and received bytes in hex
Attributes:
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
last_error: contains last error message or empty string if no error occurred
debug: if True prints out transmitted and received bytes in hex
Example:
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
>>> print(client.read_coils(0, 10))
>>> print(client.read_discrete_inputs(0, 10))
>>> print(client.read_holding_registers(0, 10))
>>> print(client.read_input_registers(0, 10))
>>> print(client.write_single_coil(0, True))
>>> print(client.write_single_register(0, 1234))
>>> print(client.write_multiple_coils(0, [True, False, True]))
>>> print(client.write_multiple_registers(0, [1234, 5678]))
>>> client.close()
"""
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
assert 0 <= unit_id < 256
self.host = host
self.port = port
self.unit_id = unit_id
self.timeout = timeout
self.last_error = ''
self._transaction_id = random.randint(0, 0xFFFF)
self._socket: None | socket.socket = None
self.debug = debug
def connect(self):
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM):
try:
self._socket = socket.socket(af, st, pr)
except socket.error:
self.close()
continue
try:
self._socket.settimeout(self.timeout)
self._socket.connect(sa)
except socket.error:
self.close()
continue
break
if self._socket:
return True
else:
self.last_error = 'connection failed'
return False
def close(self):
"""
Close connection
Returns:
empty bytes object
"""
if self._socket:
self._socket.close()
self._socket = None
return bytes()
def read_coils(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
"""
ModBus function for reading coils (0x01)
Args:
bit_address: Bit address (0 to 0xffff)
bit_lengths: Number of bits to read (1 to 2000)
Returns:
list of bool or None: Bits list or None if error
"""
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_COILS, _from_words([bit_address, bit_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under min size'
return None
byte_count = rx_data[0]
bit_data = rx_data[1:]
if not (byte_count * 8 >= bit_lengths and
byte_count == len(bit_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_bits(bit_data, bit_lengths)
def read_discrete_inputs(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
"""
ModBus function for reading discrete inputs (0x02)
Args:
bit_address: Bit address (0 to 0xffff)
bit_lengths: Number of bits to read (1 to 2000)
Returns:
list of bool or None: Bits list or None if error
"""
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_DISCRETE_INPUTS, _from_words([bit_address, bit_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under minimum size'
return None
byte_count = rx_data[0]
bit_data = rx_data[1:]
if not (byte_count * 8 >= bit_lengths and
byte_count == len(bit_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_bits(bit_data, bit_lengths)
def read_holding_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
"""
ModBus function for reading holding registers (0x03)
Args:
register_address: Register address (0 to 0xffff)
word_lengths: Number of registers to read (1 to 125)
Returns:
list of int or None: Registers list or None if error
"""
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_HOLDING_REGISTERS, _from_words([register_address, word_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under minimum size'
return None
byte_count = rx_data[0]
reg_data = rx_data[1:]
if not (byte_count == 2 * word_lengths and
byte_count == len(reg_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_words(reg_data)
def read_input_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
"""
ModBus function for reading input registers (0x04)
Args:
register_address: Register address (0 to 0xffff)
word_lengths: Number of registers to read (1 to 125)
Returns:
list of int or None: Registers list or None if error
"""
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_INPUT_REGISTERS, _from_words([register_address, word_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under minimum size'
return None
byte_count = rx_data[0]
reg_data = rx_data[1:]
if not (byte_count == 2 * word_lengths and
byte_count == len(reg_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_words(reg_data)
def write_single_coil(self, bit_address: int, value: bool) -> bool:
"""
ModBus function for writing a single coil (0x05)
Args:
bit_address: Bit address (0 to 0xffff)
value: Value to write (single bit)
Returns:
True if write succeeded or False if failed
"""
assert 0 <= bit_address <= 0xffff, 'bit_address out of range'
tx_data = _from_words([bit_address, 0xFF00 * bool(value)])
if not self.send_modbus_data(_WRITE_SINGLE_COIL, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return data == tx_data
def read_discrete_input(self, address: int) -> bool | None:
"""
Read a discrete input from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the discrete input.
"""
value = self.read_discrete_inputs(address)
if value:
return value[0]
else:
return None
def read_coil(self, address: int) -> bool | None:
"""
Read a coil from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the coil.
"""
value = self.read_coils(address)
if value:
return value[0]
else:
return None
def write_single_register(self, register_address: int, value: int) -> bool:
"""
ModBus function for writing a single register (0x06)
Args:
register_address: Register address (0 to 0xffff)
value: Value to write (0 to 0xffff)
Returns:
True if write succeeded or False if failed
"""
assert 0 <= register_address <= 0xffff, 'register_address out of range'
assert 0 <= value <= 0xffff, 'value out of range 0 to 0xffff'
tx_data = _from_words([register_address, value])
if not self.send_modbus_data(_WRITE_SINGLE_REGISTER, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return data == tx_data
def write_multiple_coils(self, bit_address: int, values: list[bool]) -> bool:
"""
ModBus function for writing multiple coils (0x0F)
Args:
bit_address: Bit address (0 to 0xffff)
values: List of bit values to write
Returns:
True if write succeeded or False if failed
"""
assert bit_address + len(values) <= 0xffff, 'bit_address out of range'
assert 1 <= len(values) <= 2000, 'number values must be from 1 to 2000'
byte_count = (len(values) + 7) // 8
tx_data = struct.pack('>HHB', bit_address, len(values), byte_count) + _from_bits(values)
if not self.send_modbus_data(_WRITE_MULTIPLE_COILS, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return _get_words(data[0:1])[0] == bit_address
def write_multiple_registers(self, register_address: int, values: list[int]) -> bool:
"""
ModBus function for writing multiple registers (0x10)
Args:
register_address: Register address (0 to 0xffff)
values: List of 16 bit register values to write
Returns:
True if write succeeded or False if failed
"""
assert register_address + len(values) <= 0xffff, 'register_address out of range'
assert max(values) <= 0xffff, 'value out of range 0 to 0xffff'
assert min(values) >= 0, 'value out of range 0 to 0xffff'
byte_count = len(values) * 2
tx_data = struct.pack('>HHB', register_address, len(values), byte_count) + _from_words(values)
if not self.send_modbus_data(_WRITE_MULTIPLE_REGISTERS, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return _get_words(data[0:1])[0] == register_address
def _recv(self, number_of_bytes: int) -> bytes:
"""
Receive data over tcp, wait until all specified bytes are received
Args:
number_of_bytes: Number of bytes to receive
Returns:
returns received bytes or empty an bytes object if an error occurred
"""
if not self._socket:
return bytes()
buffer = bytes()
while len(buffer) < number_of_bytes:
try:
buffer += self._socket.recv(number_of_bytes - len(buffer))
except socket.error:
return bytes()
if self.debug:
print(f'<- Received: {' '.join(hex(b) for b in buffer)}')
return buffer
def _send(self, data: bytes) -> int:
"""
Send data over tcp
Args:
data: data to send
Returns:
number of transmitted bytes or 0 if transmission failed
"""
for _ in range(2):
if self._socket:
try:
self._socket.sendall(data)
if self.debug:
print(f'-> Send: {' '.join(hex(b) for b in data)}')
return len(data)
except socket.error:
self.last_error = 'sending data failed'
self.close()
self.connect()
else:
self.connect()
return 0
def send_modbus_data(self, function_code: int, body: bytes) -> int:
"""
Send raw ModBus TCP frame
Args:
unction_code: ModBus function code
body: data
Returns:
number of transmitted bytes or 0 if transmission failed
"""
self._transaction_id = (self._transaction_id + 1) % 0x10000
protocol_identifier = 0
length = len(body) + 2
header = struct.pack('>HHHBB', self._transaction_id,
protocol_identifier, length, self.unit_id,
function_code)
frame = header + body
return self._send(frame)
def receive_modbus_data(self) -> bytes:
"""
Receive a ModBus frame
Returns:
bytes received or empty bytes object if an error occurred
"""
header = self._recv(7)
if not header:
self.last_error = 'receiving return frame failed'
return self.close()
transaction_id, protocol_identifier, length, unit_id =\
struct.unpack('>HHHB', header)
if not ((transaction_id == self._transaction_id) and
(protocol_identifier == 0) and
(unit_id == self.unit_id) and
(length <= 0xFF) and
(transaction_id == self._transaction_id)):
self.last_error = 'received frame is invalid'
return self.close()
data = self._recv(length - 1)
if not data:
self.last_error = 'receiving data payload failed'
return self.close()
if data[0] > 0x80:
self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})'
return bytes()
return data[1:]

36
tests/test_modbus.py Normal file
View File

@ -0,0 +1,36 @@
import unittest
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
class TestModbusFunctions(unittest.TestCase):
def test_get_bits(self):
data = bytes([0b11101010, 0b11010101])
bit_number = 16
expected = [False, True, False, True, False, True, True, True,
True, False, True, False, True, False, True, True]
result = _get_bits(data, bit_number)
self.assertEqual(result, expected)
def test_get_words(self):
data = bytes([0x12, 0x34, 0x56, 0x78])
expected = [0x1234, 0x5678]
result = _get_words(data)
self.assertEqual(result, expected)
def test_from_bits(self):
values = [False, True, False, True, False, True, True, True,
True, False, True, False, True, False, True, True]
expected = bytes([0b11101010, 0b11010101])
result = _from_bits(values)
self.assertEqual(result, expected)
def test_from_words(self):
values = [0x1234, 0x5678]
expected = bytes([0x12, 0x34, 0x56, 0x78])
result = _from_words(values)
self.assertEqual(result, expected)
if __name__ == '__main__':
unittest.main()

61
tests/test_terminals.py Normal file
View File

@ -0,0 +1,61 @@
import inspect
import pyhoff as pyhoff
import pyhoff.devices as devices
from pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal
def test_terminal_plausib():
"""
Test if all implemented BusTerminal classes in devices
have the plausible parameters
"""
for n, o in inspect.getmembers(devices):
if inspect.isclass(o) and o not in [DigitalInputTerminal,
DigitalOutputTerminal,
AnalogInputTerminal,
AnalogOutputTerminal]:
print('Terminal: ' + n)
if issubclass(o, DigitalInputTerminal):
assert o.parameters.get('input_bit_width', 0) > 0
assert o.parameters.get('output_bit_width', 0) == 0
assert o.parameters.get('input_word_width', 0) == 0
assert o.parameters.get('output_word_width', 0) == 0
if issubclass(o, DigitalOutputTerminal):
assert o.parameters.get('input_bit_width', 0) == 0
assert o.parameters.get('output_bit_width', 0) > 0
assert o.parameters.get('input_word_width', 0) == 0
assert o.parameters.get('output_word_width', 0) == 0
if issubclass(o, AnalogInputTerminal):
assert o.parameters.get('input_bit_width', 0) == 0
assert o.parameters.get('output_bit_width', 0) == 0
assert o.parameters.get('input_word_width', 0) > 0
if issubclass(o, AnalogOutputTerminal):
assert o.parameters.get('input_bit_width', 0) == 0
assert o.parameters.get('output_bit_width', 0) == 0
assert o.parameters.get('output_word_width', 0) > 0
def test_terminal_setup():
"""
Test if all implemented BusTerminal classes in devices can
be instantiated and connected to a bus coupler
"""
terminal_classes: list[type[pyhoff.BusTerminal]] = []
for n, o in inspect.getmembers(devices):
if inspect.isclass(o) and o not in [DigitalInputTerminal,
DigitalOutputTerminal,
AnalogInputTerminal,
AnalogOutputTerminal]:
if issubclass(o, pyhoff.BusTerminal):
print(n)
terminal_classes.append(o)
bus_cupler = devices.BK9050('localhost', 11255, terminal_classes, timeout=0.001)
assert len(terminal_classes) == len(bus_cupler.bus_terminals)
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()