Compare commits

..

No commits in common. "5e52a3f866ae79de8c33b7d03d47f5c3e43a35c7" and "759e1d168d2e11efc80fccc706887a644485a123" have entirely different histories.

15 changed files with 388 additions and 1504 deletions

20
.flake8
View File

@ -1,20 +0,0 @@
[flake8]
# Specify the maximum allowed line length
max-line-length = 88
# Ignore specific rules
# For example, E501: Line too long, W503: Line break before binary operator
ignore = E501, W503, W504
# Exclude specific files or directories
exclude =
.git,
__pycache__,
build,
dist,
.conda
.venv
# Enable specific plugins or options
# Example: Enabling flake8-docstrings
select = C,E,F,W,D

36
.gitignore vendored
View File

@ -82,6 +82,31 @@ target/
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
@ -100,7 +125,6 @@ celerybeat.pid
env/
venv/
ENV/
.venv/
env.bak/
venv.bak/
@ -128,7 +152,9 @@ dmypy.json
# Cython debug symbols
cython_debug/
pyModbusTCP_old/
test.py
test_*.ipynb
settings.json
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

View File

@ -1,15 +0,0 @@
cff-version: 1.2.0
title: pyhoff
abstract: The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP
authors:
- family-names: Kruse
given-names: Nicolas
orcid: "https://orcid.org/0000-0001-6758-2269"
version: 1.0.0
#date-released: "2025-04-01"
#identifiers:
# - description: This is the collection of archived snapshots of all versions of My Research Software
# type: doi
# value: "10.5281/"
license: MIT License
repository-code: "https://github.com/Nonannet/pyhoff"

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2025 Nicolas Kruse
Copyright (c) 2023 Nonannet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1,40 +1,20 @@
# pyhoff
## Description
The pyhoff package allows you to read and write the most common
Beckhoff and WAGO bus terminals ("Busklemmen") using the Ethernet bus
coupler ("Busskoppler") BK9000, BK9050, BK9100, or WAGO 750_352
over Ethernet TCP/IP based on ModBus TCP.
This python code allows to read and write the most common
Beckhoff bus terminals ("Busklemmen") using the bus coupler
BK9050 ("Busskoppler BK9050") over Ethernet TCP/IP.
### Key Features
- Supports a wide range of Beckhoff and WAGO analog and digital bus
terminals.
- Very light weight: no dependencies; compact code base
- Easy to extend
- Using standardized ModBus TCP.
- Provides high-level abstractions for reading and writing data
from/to IO-terminals with minimal code
It depends on the package pyModbusTCP. This can be installed with:
### Usage Scenarios
- Industrial test setups.
- Research automation setups.
- Data acquisition and monitoring.
pip install pyModbusTCP
## Installation
The package has no additional decencies. It can be installed with pip:
```bash
pip install pyhoff
```
## Usage
It is easy to use as the following example code shows:
```python
from pyhoff.devices import *
from pyhoff import *
#connect to the BK9050 by tcp/ip on default port 502
bus_coupler = BK9050("172.16.17.1")
bus_coupler = ModBusBK9050("172.16.17.1")
#list of all bus terminals connected to the bus coupler
#in the order of the physical arrangement
@ -45,54 +25,27 @@ terminal_list = [KL2404, KL2424, KL9100, KL1104, KL3202,
terminals = bus_coupler.add_bus_terminals(terminal_list)
#Set 1. output of the first bus terminal (KL2404) to hi
terminals[0].write_coil(1, True)
terminals[0].write_coil(0, True)
#read the temperature from the 2. channel of the 5. bus
#terminal (KL3202)
t = terminals[4].read_temperature(2)
t = terminals[4].read_temperature(1)
print(f"t = {t:.1f} °C")
#Set 1. output of the 6. bus terminal (KL4002) to 4.2 V
terminals[5].set_voltage(1, 4.2)
terminals[5].set_voltage(0, 4.2)
```
## Contributing
Other analog and digital IO terminals are easy to complement. Contributions are welcome!
Please open an issue or submit a pull request on GitHub.
The following terminals are implemented:
- KL1104: 4x digital input 24 V
- KL2404: 4x digital output with 500 mA
- KL2424: 4x digital output with 2000 mA
- KL3054: 4x analog input 4...20 mA 12 Bit single-ended
- KL3202: 2x analog input PT100 16 Bit 3-wire
- KL3214: 4x analog input PT100 16 Bit 3-wire
- KL4002: 2x analog output 0...10 V 12 Bit differentiell
- KL4004: 4x analog output 0...10 V 12 Bit differentiell
- Dummy terminals without io functionality: KL9100, KL9183, KL9188
## Developer Guide
To get started with developing the `pyhoff` package, follow these steps:
1. **Clone the Repository**
First, clone the repository to your local machine using Git:
```bash
git clone https://github.com/Nonannet/pyhoff.git
cd pyhoff
```
2. **Set Up a Virtual Environment**
It is recommended to use a virtual environment to manage dependencies. You can create one using `venv`:
```bash
python -m venv venv
source venv/bin/activate # On Windows use `venv\Scripts\activate`
```
3. **Install Dev Dependencies**
Install the dependencies required for development using `pip`:
```bash
pip install -r requirements-dev.txt
```
4. **Run Tests**
Ensure that everything is set up correctly by running the tests:
```bash
pytest
```
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
Other analog and digital io-terminals are easy to complement. Pull requests are welcome.

View File

@ -1,25 +0,0 @@
# Notes on building the package
```bash
# Get code
git clone https://github.com/Nonannet/pyhoff.git
cd pyhoff
# Setup venv
python -m venv ./.venv
source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate`
# Check code:
pip install -r requirements-dev.txt
flake8
pytest
# Build package:
pip install build
python3 -m build
# Upload
pip install twine
#python3 -m twine upload dist/*
python3 -m twine upload --repository testpypi dist/* # Test repository: https://test.pypi.org/project/example_package_YOUR_USERNAME_HERE
```

View File

@ -1,29 +0,0 @@
# WAGO IO-Error LED
Default credentials are: admin:wago
Generating a new ea-config.xml based on the currently attached terminals:
``` js
await fetch("http://192.168.178.172/DOCPLFUNC", {
"credentials": "include",
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.7,de;q=0.3",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-GPC": "1",
"Authorization": "Basic YWRtaW46d2Fnbw==",
"Upgrade-Insecure-Requests": "1",
"Priority": "u=4",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
},
"referrer": "http://192.168.178.172/webserv/cplcfg/ea-config.ssi",
"body": "DO_CREATE_EA_XML=create+ea-config.xml",
"method": "POST",
"mode": "cors"
});
```
Download ea-config.xml under http://192.168.178.172/etc/ea-config.xml (authorization required)

328
pyhoff.py Normal file
View File

@ -0,0 +1,328 @@
from pyModbusTCP.client import ModbusClient
from typing import Type
class BusTerminal():
output_bit_width = 0
input_bit_width = 0
output_word_width = 0
input_word_width = 0
def __init__(self, bus_coupler: 'BusCoupler'):
self.output_bit_offset = 0
self.input_bit_offset = 0
self.output_word_offset = 0
self.output_word_offset = 0
self.input_word_offset = 0
self.bus_coupler = bus_coupler
class DigitalInputTerminal(BusTerminal):
def read_input(self, channel: int):
print(self.input_bit_width, channel)
if channel < 1 or channel > self.input_bit_width:
raise Exception("address out of range")
return self.bus_coupler.read_discrete_input(self.input_bit_offset + channel - 1)
class DigitalOutputTerminal(BusTerminal):
def write_coil(self, channel: int, value: int):
if channel < 1 or channel > self.output_bit_width:
raise Exception("address out of range")
return self.bus_coupler.write_single_coil(self.output_bit_offset + channel - 1, value)
def read_coil(self, channel: int):
if channel < 1 or channel > self.output_bit_width:
raise Exception("address out of range")
return self.bus_coupler.read_coil(self.output_bit_offset + channel - 1)
class AnalogInputTerminal(BusTerminal):
def read_words(self, word_offset: int, word_count: int):
if word_offset < 0 or word_offset + word_count > self.input_word_width:
raise Exception("address out of range")
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
def read_words_nocheck(self, word_offset: int, word_count: int):
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
def read_word(self, word_offset: int):
val = self.read_words(word_offset, 1)
if val is None:
return -999
else:
return val[0]
class AnalogOutputTerminal(BusTerminal):
def read_words(self, word_offset: int, word_count: int):
if word_offset < 0 or word_offset + word_count > self.output_word_width:
raise Exception("address out of range")
return self.bus_coupler.read_holding_registers(self.output_word_offset + word_offset, word_count)
def read_words_nocheck(self, word_offset: int, word_count: int):
return self.bus_coupler.read_holding_registers(self.output_word_offset*0 + word_offset, word_count)
def read_word(self, word_offset: int):
val = self.read_words(word_offset, 1)
if val is None:
return -999
else:
return val[0]
def write_word(self, word_offset: int, data: int):
if word_offset < 0 or word_offset + 1 > self.output_word_width:
raise Exception("address out of range")
return self.bus_coupler.write_single_register(self.output_word_offset + word_offset, data)
def write_word_nocheck(self, word_offset: int, data: int):
return self.bus_coupler.write_single_register(self.output_word_offset*0 + word_offset, data)
class KL1104(DigitalInputTerminal):
"""KL1104: 4x digital input 24 V"""
input_bit_width = 4
class KL1408(DigitalInputTerminal):
"""KL1104: 8x digital input 24 V galvanic isolated"""
input_bit_width = 8
class WAGO_750_1405(DigitalInputTerminal):
"""750-1405: 16x digital input 24 V"""
input_bit_width = 16
class KL2404(DigitalOutputTerminal):
"""KL2404: 4x digital output with 500 mA"""
output_bit_width = 4
class KL2424(DigitalOutputTerminal):
"""KL2424: 4x digital output with 2000 mA"""
output_bit_width = 4
class KL2634(DigitalOutputTerminal):
"""KL2634: 4x digital output 250 V AC, 30 V DC, 4 A"""
output_bit_width = 4
class KL2408(DigitalOutputTerminal):
"""750-530: 8x digital output with 24 V / 500 mA"""
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
output_bit_width = 8
class WAGO_750_530(DigitalOutputTerminal):
"""750-530: 8x digital output with 24 V / 500 mA"""
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
output_bit_width = 8
class KL3054(AnalogInputTerminal):
"""KL3054: 4x analog input 4...20 mA 12 Bit single-ended"""
#Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
input_word_width = 8
output_word_width = 8
def read_normalized(self, channel: int):
return self.read_word(channel * 2 - 1) / 0x7FFF
def read_current(self, channel: int):
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
class KL3042(AnalogInputTerminal):
"""KL3042: 2x analog input 0...20 mA 12 Bit single-ended"""
#Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
input_word_width = 4
output_word_width = 4
def read_normalized(self, channel: int):
return self.read_word(channel*2-1) / 0x7FFF
def read_current(self, channel: int):
return self.read_normalized(channel*2-1) * 20.0
class KL3202(AnalogInputTerminal):
"""KL3202: 2x analog input PT100 16 Bit 3-wire"""
#Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
input_word_width = 4
output_word_width = 4
def read_temperature(self, channel: int):
val = self.read_word(channel*2-1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL3214(AnalogInputTerminal):
"""KL3214: 4x analog input PT100 16 Bit 3-wire"""
#inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
#out: 4 x 8 Bit Control (optional)
input_word_width = 8
output_word_width = 8
def read_temperature(self, channel: int):
val = self.read_word(channel*2-1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL4002(AnalogOutputTerminal):
"""KL4002: 2x analog output 0...10 V 12 Bit differentiell"""
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
input_word_width = 4
output_word_width = 4
def set_normalized(self, channel: int, value: float):
self.write_word(channel*2-1, int(value * 0x7FFF))
def set_voltage(self, channel: int, value: float):
self.set_normalized(channel, value / 10.0)
class KL4132(AnalogOutputTerminal):
"""KL4002: 2x analog output ±10 V 16 bit differential"""
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
input_word_width = 4
output_word_width = 4
def set_normalized(self, channel: int, value: float):
if value >= 0:
self.write_word(channel-1, int(value * 0x7FFF))
else:
self.write_word(channel-1, int(0x10000 + value * 0x7FFF ))
def set_voltage(self, channel: int, value: float):
self.set_normalized(channel, value / 10.0)
class KL4004(AnalogOutputTerminal):
"""KL4004: 4x analog output 0...10 V 12 Bit differentiell"""
#Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
input_word_width = 8
output_word_width = 8
def set_normalized(self, channel: int, value: float):
self.write_word(channel*2-1, int(value * 0x7FFF))
def set_voltage(self, channel: int, value: float):
self.set_normalized(channel, value / 10.0)
class KL9000(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9010(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9100(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9183(BusTerminal):
"""Dummy, no I/O function"""
pass
class KL9188(BusTerminal):
"""Dummy, no I/O function"""
pass
class WAGO_750_600(BusTerminal):
"""Dummy, no I/O function"""
pass
class WAGO_750_602(BusTerminal):
"""Dummy, no I/O function"""
pass
class BusCoupler(ModbusClient):
"""BusCoupler: Busskoppler ModBus TCP"""
def __init__(self, host: str, port: int = 502, timeout: int = 5, watchdog: float = 0):
"""
Constructs all the necessary attributes for the person object.
Parameters
----------
host : str
ip or hostname of the BK9050
port : int
port of the modbus host
debug : bool
outputs modbus debug information
timeout : float
timeout for waiting for the device response
watchdog : float
time in secounds after the device sets all outputs to
default state. A value of 0 deactivates the watchdog.
"""
ModbusClient.__init__(self, host, port, auto_open=True, timeout=timeout)
self.bus_terminals: list[BusTerminal] = list()
self.next_output_bit_offset = 0
self.next_input_bit_offset = 0
self.next_output_word_offset = 0
self.next_input_word_offset = 0
self.init_hardware(watchdog)
def init_hardware(self, watchdog: float):
pass
def read_discrete_input(self, address: int):
value = self.read_discrete_inputs(address)
if value is None:
return False
else:
return value[0]
def read_coil(self, address):
value = self.read_coils(address)
if value is None:
return False
else:
return value[0]
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
for terminal_class in bus_terminals:
nterm = terminal_class(self)
nterm.output_bit_offset = self.next_output_bit_offset
nterm.input_bit_offset = self.next_input_bit_offset
nterm.output_word_offset = self.next_output_word_offset
nterm.input_word_offset = self.next_input_word_offset
self.next_output_bit_offset += terminal_class.output_bit_width
self.next_input_bit_offset += terminal_class.input_bit_width
self.next_output_word_offset += terminal_class.output_word_width
self.next_input_word_offset += terminal_class.input_word_width
self.bus_terminals.append(nterm)
return self.bus_terminals
class ModBusBK9050(BusCoupler):
"""BK9050: Busskoppler ModBus TCP"""
def init_hardware(self, watchdog: float):
#https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
#config watchdog on page 58
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.write_single_register(0x1120, watchdog * 1000) #ms
#reset watchdog timer:
self.write_single_register(0x1121, 0xBECF)
self.write_single_register(0x1121, 0xAFFE)
#set process image offset
self.next_output_word_offset = 0x0800
class WAGO750_352(BusCoupler):
"""Wago 750-352: Busskoppler ModBus TCP"""
def init_hardware(self, watchdog: float):
#deactivate/reset watchdog timer:
self.write_single_register(0x1005, 0xAAAA)
self.write_single_register(0x1005, 0x5555)
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.write_single_register(0x1000, int(watchdog * 10))
if watchdog:
#configure watchdog to reset on all functions codes
self.write_single_register(0x1001, 0xFFFF)
#set process image offset
self.next_output_word_offset = 0x0000

View File

@ -1,31 +0,0 @@
[project]
name = "pyhoff"
version = "1.0.0"
authors = [
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
]
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.urls]
Homepage = "https://github.com/Nonannet/pyhoff"
Issues = "https://github.com/Nonannet/pyhoff/issues"
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = ["tests"]
pythonpath = ["src"]

View File

@ -1,2 +0,0 @@
pytest
flake8

View File

@ -1,292 +0,0 @@
from .modbus import SimpleModbusClient
from typing import Type, Any
class BusTerminal():
"""
Base class for all bus terminals.
Args:
bus_coupler: The bus coupler to which this terminal is connected.
output_bit_offset: The offset of the output bits.
input_bit_offset: The offset of the input bits.
output_word_offset: The offset of the output words.
input_word_offset: The offset of the input words.
Attributes:
bus_coupler: The bus coupler to which this terminal is connected.
parameters: The parameters of the terminal.
"""
parameters: dict[str, int] = {}
def __init__(self, bus_coupler: 'BusCoupler',
output_bit_offset: int,
input_bit_offset: int,
output_word_offset: int,
input_word_offset: int):
self._output_bit_offset = output_bit_offset
self._input_bit_offset = input_bit_offset
self._output_word_offset = output_word_offset
self._input_word_offset = input_word_offset
self.bus_coupler = bus_coupler
class DigitalInputTerminal(BusTerminal):
"""
Base class for digital input terminals.
"""
def read_input(self, channel: int) -> bool | None:
"""
Read the input from a specific channel.
Args:
channel: The channel number (start counting from 1) to read from.
Returns:
The input value of the specified channel or None if the read operation failed.
Raises:
Exception: If the channel number is out of range.
"""
if channel < 1 or channel > self.parameters['input_bit_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1)
class DigitalOutputTerminal(BusTerminal):
"""
Base class for digital output terminals.
"""
def write_coil(self, channel: int, value: bool) -> bool:
"""
Write a value to a specific channel.
Args:
channel: The channel number (start counting from 1) to write to.
value: The value to write.
Returns:
True if the write operation succeeded, otherwise False.
Raises:
Exception: If the channel number is out of range.
"""
if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value)
def read_coil(self, channel: int) -> bool | None:
"""
Read the coil value back from a specific channel.
Args:
channel: The channel number (start counting from 1) to read from.
Returns:
The coil value of the specified channel or None if the read operation failed.
Raises:
Exception: If the channel number is out of range.
"""
if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1)
class AnalogInputTerminal(BusTerminal):
"""
Base class for analog input terminals.
"""
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
"""
Read a single word from the terminal.
Args:
word_offset: The word offset (0 based index) to read from.
Returns:
The read word value.
"""
val = self.read_words(word_offset, 1)
if val:
return val[0]
else:
return -999
def read_normalized(self, channel: int) -> float:
"""
Read a normalized value (0...1) from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The normalized value.
"""
return self.read_word(channel * 2 - 1) / 0x7FFF
class AnalogOutputTerminal(BusTerminal):
"""
Base class for analog output terminals.
"""
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
"""
Read a single word from the terminal.
Args:
word_offset: The word offset (0 based index) to read from.
Returns:
The read word value.
"""
val = self.read_words(word_offset, 1)
if val:
return val[0]
else:
return -999
def write_word(self, word_offset: int, data: int) -> bool:
"""
Write a word to the terminal.
Args:
word_offset: The word offset to write to.
data: The data to write.
Returns:
The result of the write operation.
Raises:
Exception: If the word offset is out of range.
"""
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
def set_normalized(self, channel: int, value: float):
"""
Set a normalized value between 0 and 1 to a specific channel.
Args:
channel: The channel number to set.
value: The normalized value to set.
"""
self.write_word(channel * 2 - 1, int(value * 0x7FFF))
class BusCoupler():
"""
Base class for ModBus TCP bus coupler
Args:
host: ip or hostname of the bus coupler
port: port of the modbus host
debug: outputs modbus debug information
timeout: timeout for waiting for the device response
watchdog: time in seconds after the device sets all outputs to
default state. A value of 0 deactivates the watchdog.
debug: If True, debug information is printed.
Attributes:
bus_terminals: A list of bus terminal classes according to the
connected terminals.
modbus: The underlying modbus client used for the connection.
Examples:
>>> from pyhoff.devices import *
>>> bk = BK9000('192.168.0.23', bus_terminals=[KL3202, KL9010])
>>> t1 = bk.terminals[0].read_temperature(1)
>>> t2 = bk.terminals[0].read_temperature(2)
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
"""
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False):
self.bus_terminals: list[Any] = list()
self._next_output_bit_offset = 0
self._next_input_bit_offset = 0
self._next_output_word_offset = 0
self._next_input_word_offset = 0
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
self.add_bus_terminals(bus_terminals)
self._init_hardware(watchdog)
def _init_hardware(self, watchdog: float):
pass
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]:
"""
Add bus terminals to the bus coupler.
Args:
bus_terminals: A list of bus terminal classes to add.
Returns:
The corresponding list of bus terminal objects.
"""
for terminal_class in bus_terminals:
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
new_terminal = terminal_class(self,
self._next_output_bit_offset,
self._next_input_bit_offset,
self._next_output_word_offset,
self._next_input_word_offset)
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
self.bus_terminals.append(new_terminal)
return self.bus_terminals
def get_error(self) -> str:
"""
Get the last error message.
Returns:
The last error message.
"""
return self.modbus.last_error

View File

@ -1,395 +0,0 @@
from . import DigitalInputTerminal, DigitalOutputTerminal
from . import AnalogInputTerminal, AnalogOutputTerminal
from . import BusTerminal, BusCoupler
class BK9000(BusCoupler):
"""
BK9000 ModBus TCP bus coupler
"""
def _init_hardware(self, watchdog: float):
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
# config watchdog on page 58
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.modbus.write_single_register(0x1120, int(watchdog * 1000)) # ms
# reset watchdog timer:
self.modbus.write_single_register(0x1121, 0xBECF)
self.modbus.write_single_register(0x1121, 0xAFFE)
# set process image offset
self.next_output_word_offset = 0x0800
class BK9050(BK9000):
"""
BK9050 ModBus TCP bus coupler
"""
pass
class BK9100(BK9000):
"""
BK9100 ModBus TCP bus coupler
"""
pass
class WAGO750_352(BusCoupler):
"""
Wago 750-352 ModBus TCP bus coupler
"""
def _init_hardware(self, watchdog: float):
# deactivate/reset watchdog timer:
self.modbus.write_single_register(0x1005, 0xAAAA)
self.modbus.write_single_register(0x1005, 0x5555)
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
self.modbus.write_single_register(0x1000, int(watchdog * 10))
if watchdog:
# configure watchdog to reset on all functions codes
self.modbus.write_single_register(0x1001, 0xFFFF)
# set process image offset
self.next_output_word_offset = 0x0000
class DigitalInputTerminal4Bit(DigitalInputTerminal):
"""
Generic 4 bit input terminal
"""
parameters = {'input_bit_width': 4}
class DigitalInputTerminal8Bit(DigitalInputTerminal):
"""
Generic 8 bit input terminal
"""
parameters = {'input_bit_width': 8}
class DigitalInputTerminal16Bit(DigitalInputTerminal):
"""
Generic 16 bit input terminal
"""
parameters = {'input_bit_width': 16}
class DigitalOutputTerminal4Bit(DigitalOutputTerminal):
"""
Generic 4 bit output terminal
"""
parameters = {'output_bit_width': 4}
class DigitalOutputTerminal8Bit(DigitalOutputTerminal):
"""
Generic 8 bit output terminal
"""
parameters = {'output_bit_width': 8}
class DigitalOutputTerminal16Bit(DigitalOutputTerminal):
"""
Generic 16 bit output terminal
"""
parameters = {'output_bit_width': 16}
class KL1104(DigitalInputTerminal4Bit):
"""
KL1104: 4x digital input 24 V
"""
pass
class KL1408(DigitalInputTerminal8Bit):
"""
KL1104: 8x digital input 24 V galvanic isolated
"""
pass
class WAGO_750_1405(DigitalInputTerminal16Bit):
"""
750-1405: 16x digital input 24 V
"""
pass
class KL2404(DigitalOutputTerminal4Bit):
"""
KL2404: 4x digital output with 500 mA
"""
pass
class KL2424(DigitalOutputTerminal4Bit):
"""
KL2424: 4x digital output with 2000 mA
"""
pass
class KL2634(DigitalOutputTerminal4Bit):
"""
KL2634: 4x digital output 250 V AC, 30 V DC, 4 A
"""
pass
class KL2408(DigitalOutputTerminal8Bit):
"""
750-530: 8x digital output with 24 V / 500 mA
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
"""
pass
class WAGO_750_530(DigitalOutputTerminal8Bit):
"""
750-530: 8x digital output with 24 V / 500 mA
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
"""
pass
class KL1512(AnalogInputTerminal):
"""
KL1512: 2x digital input, counter, 24 V DC, 1 kHz
"""
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int):
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset)
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)]
def read_counter(self, channel: int) -> int:
"""
Read the absolut counter value of a specific channel.
Args:
channel: The channel number to read from.
Returns:
The counter value.
"""
return self.read_word(channel * 2 - 1)
def read_delta(self, channel: int) -> int:
"""
Read the counter change since last read of a specific channel.
Args:
channel: The channel number to read from.
Returns:
The counter value.
"""
# TODO: handel overflow
new_count = self.read_word(channel * 2 - 1)
return new_count - self._last_counter_values[channel - 1]
class KL3054(AnalogInputTerminal):
"""
KL3054: 4x analog input 4...20 mA 12 Bit single-ended
"""
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8}
def read_current(self, channel: int) -> float:
"""
Read the current value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The current value.
"""
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
class KL3042(AnalogInputTerminal):
"""
KL3042: 2x analog input 0...20 mA 12 Bit single-ended
"""
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def read_current(self, channel: int) -> float:
"""
Read the current value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The current value.
"""
return self.read_normalized(channel * 2 - 1) * 20.0
class KL3202(AnalogInputTerminal):
"""
KL3202: 2x analog input PT100 16 Bit 3-wire
"""
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def read_temperature(self, channel: int) -> float:
"""
Read the temperature value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The temperature value in °C.
"""
val = self.read_word(channel * 2 - 1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL3214(AnalogInputTerminal):
"""
KL3214: 4x analog input PT100 16 Bit 3-wire
"""
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
# out: 4 x 8 Bit Control (optional)
parameters = {'input_word_width': 8, 'output_word_width': 8}
def read_temperature(self, channel: int) -> float:
"""
Read the temperature value from a specific channel.
Args:
channel: The channel number to read from.
Returns:
The temperature value.
"""
val = self.read_word(channel * 2 - 1)
if val > 0x7FFF:
return (val - 0x10000) / 10.0
else:
return val / 10.0
class KL4002(AnalogOutputTerminal):
"""
KL4002: 2x analog output 0...10 V 12 Bit differentiell
"""
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def set_voltage(self, channel: int, value: float):
"""
Set a voltage value to a specific channel.
Args:
channel: The channel number to set.
value: The voltage value to set.
"""
self.set_normalized(channel, value / 10.0)
class KL4132(AnalogOutputTerminal):
"""
KL4002: 2x analog output ±10 V 16 bit differential
"""
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
def set_normalized(self, channel: int, value: float):
"""
Set a normalized value between -1 and +1 to a specific channel.
Args:
channel: The channel number to set.
value: The normalized value to set.
"""
if value >= 0:
self.write_word(channel - 1, int(value * 0x7FFF))
else:
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF))
def set_voltage(self, channel: int, value: float):
"""
Set a voltage value between -10 and +10 V to a specific channel.
Args:
channel: The channel number to set.
value: The voltage value to set.
"""
self.set_normalized(channel, value / 10.0)
class KL4004(AnalogOutputTerminal):
"""
KL4004: 4x analog output 0...10 V 12 Bit differentiell
"""
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8}
def set_voltage(self, channel: int, value: float):
"""
Set a voltage value to a specific channel.
Args:
channel: The channel number to set.
value: The voltage value to set.
"""
self.set_normalized(channel, value / 10.0)
class KL9010(BusTerminal):
"""
End terminal, no I/O function
"""
pass
class KL9100(BusTerminal):
"""
Potential supply terminal, no I/O function
"""
pass
class KL9183(BusTerminal):
"""
Potential distribution terminal, no I/O function
"""
pass
class KL9188(BusTerminal):
"""
Potential distribution terminal, no I/O function
"""
pass
class WAGO_750_600(BusTerminal):
"""
End terminal, no I/O function
"""
pass
class WAGO_750_602(BusTerminal):
"""
Potential supply terminal, no I/O function
"""
pass

View File

@ -1,517 +0,0 @@
import socket
import struct
import random
_READ_COILS = 0x01
_READ_DISCRETE_INPUTS = 0x02
_READ_HOLDING_REGISTERS = 0x03
_READ_INPUT_REGISTERS = 0x04
_WRITE_SINGLE_COIL = 0x05
_WRITE_SINGLE_REGISTER = 0x06
_WRITE_MULTIPLE_COILS = 0x0F
_WRITE_MULTIPLE_REGISTERS = 0x10
_modbus_exceptions = {
0x01: 'illegal function',
0x02: 'illegal data address',
0x03: 'illegal data value',
0x04: 'slave device failure',
0x05: 'acknowledge',
0x06: 'slave device busy',
0x07: 'negative acknowledge',
0x08: 'memory parity error',
0x0A: 'gateway path unavailable',
0x0B: 'gateway target device failed to respond'
}
def _get_bits(data: bytes, bit_number: int) -> list[bool]:
return [bool(data[i // 8] >> (i % 8) & 0x01)
for i in range(bit_number)]
def _get_words(data: bytes) -> list[int]:
return [(data[i * 2] << 8) + data[i * 2 + 1]
for i in range(len(data) // 2)]
def _from_bits(values: list[bool]) -> bytes:
return bytes(sum(((1 << j) * bool(values[8 * i + j]))
for j in range(8)) for i in range(len(values) // 8))
def _from_words(values: list[int]) -> bytes:
return b''.join(word.to_bytes(2, byteorder='big') for word in values)
class SimpleModbusClient:
"""
A simple Modbus TCP client
Args:
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
debug: if True prints out transmitted and received bytes in hex
Attributes:
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
last_error: contains last error message or empty string if no error occurred
debug: if True prints out transmitted and received bytes in hex
Example:
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
>>> print(client.read_coils(0, 10))
>>> print(client.read_discrete_inputs(0, 10))
>>> print(client.read_holding_registers(0, 10))
>>> print(client.read_input_registers(0, 10))
>>> print(client.write_single_coil(0, True))
>>> print(client.write_single_register(0, 1234))
>>> print(client.write_multiple_coils(0, [True, False, True]))
>>> print(client.write_multiple_registers(0, [1234, 5678]))
>>> client.close()
"""
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
assert 0 <= unit_id < 256
self.host = host
self.port = port
self.unit_id = unit_id
self.timeout = timeout
self.last_error = ''
self._transaction_id = random.randint(0, 0xFFFF)
self._socket: None | socket.socket = None
self.debug = debug
def connect(self):
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM):
try:
self._socket = socket.socket(af, st, pr)
except socket.error:
self.close()
continue
try:
self._socket.settimeout(self.timeout)
self._socket.connect(sa)
except socket.error:
self.close()
continue
break
if self._socket:
return True
else:
self.last_error = 'connection failed'
return False
def close(self):
"""
Close connection
Returns:
empty bytes object
"""
if self._socket:
self._socket.close()
self._socket = None
return bytes()
def read_coils(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
"""
ModBus function for reading coils (0x01)
Args:
bit_address: Bit address (0 to 0xffff)
bit_lengths: Number of bits to read (1 to 2000)
Returns:
list of bool or None: Bits list or None if error
"""
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_COILS, _from_words([bit_address, bit_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under min size'
return None
byte_count = rx_data[0]
bit_data = rx_data[1:]
if not (byte_count * 8 >= bit_lengths and
byte_count == len(bit_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_bits(bit_data, bit_lengths)
def read_discrete_inputs(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
"""
ModBus function for reading discrete inputs (0x02)
Args:
bit_address: Bit address (0 to 0xffff)
bit_lengths: Number of bits to read (1 to 2000)
Returns:
list of bool or None: Bits list or None if error
"""
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_DISCRETE_INPUTS, _from_words([bit_address, bit_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under minimum size'
return None
byte_count = rx_data[0]
bit_data = rx_data[1:]
if not (byte_count * 8 >= bit_lengths and
byte_count == len(bit_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_bits(bit_data, bit_lengths)
def read_holding_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
"""
ModBus function for reading holding registers (0x03)
Args:
register_address: Register address (0 to 0xffff)
word_lengths: Number of registers to read (1 to 125)
Returns:
list of int or None: Registers list or None if error
"""
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_HOLDING_REGISTERS, _from_words([register_address, word_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under minimum size'
return None
byte_count = rx_data[0]
reg_data = rx_data[1:]
if not (byte_count == 2 * word_lengths and
byte_count == len(reg_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_words(reg_data)
def read_input_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
"""
ModBus function for reading input registers (0x04)
Args:
register_address: Register address (0 to 0xffff)
word_lengths: Number of registers to read (1 to 125)
Returns:
list of int or None: Registers list or None if error
"""
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
if not self.send_modbus_data(_READ_INPUT_REGISTERS, _from_words([register_address, word_lengths])):
return None
rx_data = self.receive_modbus_data()
if not rx_data:
return None
if len(rx_data) < 2:
self.last_error = 'received frame under minimum size'
return None
byte_count = rx_data[0]
reg_data = rx_data[1:]
if not (byte_count == 2 * word_lengths and
byte_count == len(reg_data)):
self.last_error = 'received frame size mismatch'
return None
return _get_words(reg_data)
def write_single_coil(self, bit_address: int, value: bool) -> bool:
"""
ModBus function for writing a single coil (0x05)
Args:
bit_address: Bit address (0 to 0xffff)
value: Value to write (single bit)
Returns:
True if write succeeded or False if failed
"""
assert 0 <= bit_address <= 0xffff, 'bit_address out of range'
tx_data = _from_words([bit_address, 0xFF00 * bool(value)])
if not self.send_modbus_data(_WRITE_SINGLE_COIL, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return data == tx_data
def read_discrete_input(self, address: int) -> bool | None:
"""
Read a discrete input from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the discrete input.
"""
value = self.read_discrete_inputs(address)
if value:
return value[0]
else:
return None
def read_coil(self, address: int) -> bool | None:
"""
Read a coil from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the coil.
"""
value = self.read_coils(address)
if value:
return value[0]
else:
return None
def write_single_register(self, register_address: int, value: int) -> bool:
"""
ModBus function for writing a single register (0x06)
Args:
register_address: Register address (0 to 0xffff)
value: Value to write (0 to 0xffff)
Returns:
True if write succeeded or False if failed
"""
assert 0 <= register_address <= 0xffff, 'register_address out of range'
assert 0 <= value <= 0xffff, 'value out of range 0 to 0xffff'
tx_data = _from_words([register_address, value])
if not self.send_modbus_data(_WRITE_SINGLE_REGISTER, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return data == tx_data
def write_multiple_coils(self, bit_address: int, values: list[bool]) -> bool:
"""
ModBus function for writing multiple coils (0x0F)
Args:
bit_address: Bit address (0 to 0xffff)
values: List of bit values to write
Returns:
True if write succeeded or False if failed
"""
assert bit_address + len(values) <= 0xffff, 'bit_address out of range'
assert 1 <= len(values) <= 2000, 'number values must be from 1 to 2000'
byte_count = (len(values) + 7) // 8
tx_data = struct.pack('>HHB', bit_address, len(values), byte_count) + _from_bits(values)
if not self.send_modbus_data(_WRITE_MULTIPLE_COILS, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return _get_words(data[0:1])[0] == bit_address
def write_multiple_registers(self, register_address: int, values: list[int]) -> bool:
"""
ModBus function for writing multiple registers (0x10)
Args:
register_address: Register address (0 to 0xffff)
values: List of 16 bit register values to write
Returns:
True if write succeeded or False if failed
"""
assert register_address + len(values) <= 0xffff, 'register_address out of range'
assert max(values) <= 0xffff, 'value out of range 0 to 0xffff'
assert min(values) >= 0, 'value out of range 0 to 0xffff'
byte_count = len(values) * 2
tx_data = struct.pack('>HHB', register_address, len(values), byte_count) + _from_words(values)
if not self.send_modbus_data(_WRITE_MULTIPLE_REGISTERS, tx_data):
return False
data = self.receive_modbus_data()
if not data:
return False
if len(data) != 4:
self.last_error = 'received frame size mismatch'
return False
return _get_words(data[0:1])[0] == register_address
def _recv(self, number_of_bytes: int) -> bytes:
"""
Receive data over tcp, wait until all specified bytes are received
Args:
number_of_bytes: Number of bytes to receive
Returns:
returns received bytes or empty an bytes object if an error occurred
"""
if not self._socket:
return bytes()
buffer = bytes()
while len(buffer) < number_of_bytes:
try:
buffer += self._socket.recv(number_of_bytes - len(buffer))
except socket.error:
return bytes()
if self.debug:
print(f'<- Received: {' '.join(hex(b) for b in buffer)}')
return buffer
def _send(self, data: bytes) -> int:
"""
Send data over tcp
Args:
data: data to send
Returns:
number of transmitted bytes or 0 if transmission failed
"""
for _ in range(2):
if self._socket:
try:
self._socket.sendall(data)
if self.debug:
print(f'-> Send: {' '.join(hex(b) for b in data)}')
return len(data)
except socket.error:
self.last_error = 'sending data failed'
self.close()
self.connect()
else:
self.connect()
return 0
def send_modbus_data(self, function_code: int, body: bytes) -> int:
"""
Send raw ModBus TCP frame
Args:
unction_code: ModBus function code
body: data
Returns:
number of transmitted bytes or 0 if transmission failed
"""
self._transaction_id = (self._transaction_id + 1) % 0x10000
protocol_identifier = 0
length = len(body) + 2
header = struct.pack('>HHHBB', self._transaction_id,
protocol_identifier, length, self.unit_id,
function_code)
frame = header + body
return self._send(frame)
def receive_modbus_data(self) -> bytes:
"""
Receive a ModBus frame
Returns:
bytes received or empty bytes object if an error occurred
"""
header = self._recv(7)
if not header:
self.last_error = 'receiving return frame failed'
return self.close()
transaction_id, protocol_identifier, length, unit_id =\
struct.unpack('>HHHB', header)
if not ((transaction_id == self._transaction_id) and
(protocol_identifier == 0) and
(unit_id == self.unit_id) and
(length <= 0xFF) and
(transaction_id == self._transaction_id)):
self.last_error = 'received frame is invalid'
return self.close()
data = self._recv(length - 1)
if not data:
self.last_error = 'receiving data payload failed'
return self.close()
if data[0] > 0x80:
self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})'
return bytes()
return data[1:]

View File

@ -1,36 +0,0 @@
import unittest
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
class TestModbusFunctions(unittest.TestCase):
def test_get_bits(self):
data = bytes([0b11101010, 0b11010101])
bit_number = 16
expected = [False, True, False, True, False, True, True, True,
True, False, True, False, True, False, True, True]
result = _get_bits(data, bit_number)
self.assertEqual(result, expected)
def test_get_words(self):
data = bytes([0x12, 0x34, 0x56, 0x78])
expected = [0x1234, 0x5678]
result = _get_words(data)
self.assertEqual(result, expected)
def test_from_bits(self):
values = [False, True, False, True, False, True, True, True,
True, False, True, False, True, False, True, True]
expected = bytes([0b11101010, 0b11010101])
result = _from_bits(values)
self.assertEqual(result, expected)
def test_from_words(self):
values = [0x1234, 0x5678]
expected = bytes([0x12, 0x34, 0x56, 0x78])
result = _from_words(values)
self.assertEqual(result, expected)
if __name__ == '__main__':
unittest.main()

View File

@ -1,61 +0,0 @@
import inspect
import pyhoff as pyhoff
import pyhoff.devices as devices
from pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal
def test_terminal_plausib():
"""
Test if all implemented BusTerminal classes in devices
have the plausible parameters
"""
for n, o in inspect.getmembers(devices):
if inspect.isclass(o) and o not in [DigitalInputTerminal,
DigitalOutputTerminal,
AnalogInputTerminal,
AnalogOutputTerminal]:
print('Terminal: ' + n)
if issubclass(o, DigitalInputTerminal):
assert o.parameters.get('input_bit_width', 0) > 0
assert o.parameters.get('output_bit_width', 0) == 0
assert o.parameters.get('input_word_width', 0) == 0
assert o.parameters.get('output_word_width', 0) == 0
if issubclass(o, DigitalOutputTerminal):
assert o.parameters.get('input_bit_width', 0) == 0
assert o.parameters.get('output_bit_width', 0) > 0
assert o.parameters.get('input_word_width', 0) == 0
assert o.parameters.get('output_word_width', 0) == 0
if issubclass(o, AnalogInputTerminal):
assert o.parameters.get('input_bit_width', 0) == 0
assert o.parameters.get('output_bit_width', 0) == 0
assert o.parameters.get('input_word_width', 0) > 0
if issubclass(o, AnalogOutputTerminal):
assert o.parameters.get('input_bit_width', 0) == 0
assert o.parameters.get('output_bit_width', 0) == 0
assert o.parameters.get('output_word_width', 0) > 0
def test_terminal_setup():
"""
Test if all implemented BusTerminal classes in devices can
be instantiated and connected to a bus coupler
"""
terminal_classes: list[type[pyhoff.BusTerminal]] = []
for n, o in inspect.getmembers(devices):
if inspect.isclass(o) and o not in [DigitalInputTerminal,
DigitalOutputTerminal,
AnalogInputTerminal,
AnalogOutputTerminal]:
if issubclass(o, pyhoff.BusTerminal):
print(n)
terminal_classes.append(o)
bus_cupler = devices.BK9050('localhost', 11255, terminal_classes, timeout=0.001)
assert len(terminal_classes) == len(bus_cupler.bus_terminals)
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()