Compare commits

..

5 Commits

Author SHA1 Message Date
Nicolas 25a79dda1b Readme updated and testfile renamed 2025-04-12 11:35:58 +02:00
Nicolas 7c420ae53b init functions docstrings fixed 2025-04-12 11:27:36 +02:00
Nicolas 05e92d2a20 modbus test changed to pytest format 2025-04-12 11:11:17 +02:00
Nicolas 96c716b1c5 missing explicit function return types added 2025-04-12 11:10:35 +02:00
Nicolas 61f431a534 mypy config added 2025-04-12 11:09:26 +02:00
8 changed files with 106 additions and 69 deletions

View File

@ -79,10 +79,10 @@ To get started with developing the `pyhoff` package, follow these steps:
``` ```
3. **Install Dev Dependencies** 3. **Install Dev Dependencies**
Install the dependencies required for development using `pip`: Install pyhoff from source plus the dependencies required for development using `pip`:
```bash ```bash
pip install -r requirements-dev.txt pip install -e .[dev]
``` ```
4. **Run Tests** 4. **Run Tests**

View File

@ -6,7 +6,7 @@ authors = [
] ]
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP" description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
readme = "README.md" readme = "README.md"
requires-python = ">=3.8" requires-python = ">=3.9"
classifiers = [ classifiers = [
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
@ -24,6 +24,20 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
[project.optional-dependencies]
dev = [
"pytest", "flake8", "mypy"
]
[tool.mypy]
files = ["src"]
strict = true
warn_return_any = true
warn_unused_configs = true
check_untyped_defs = true
no_implicit_optional = true
show_error_codes = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
minversion = "6.0" minversion = "6.0"
addopts = "-ra -q" addopts = "-ra -q"

View File

@ -1,2 +0,0 @@
pytest
flake8

View File

@ -13,13 +13,6 @@ class BusTerminal():
""" """
Base class for all bus terminals. Base class for all bus terminals.
Args:
bus_coupler: The bus coupler to which this terminal is connected.
output_bit_addresses: List of addresses of the output bits.
input_bit_addresses: List of addresses of input bits.
output_word_addresses: List of addresses of output words.
input_word_addresses: List of addresses of input words.
Attributes: Attributes:
bus_coupler: The bus coupler to which this terminal is connected. bus_coupler: The bus coupler to which this terminal is connected.
parameters: The parameters of the terminal. parameters: The parameters of the terminal.
@ -32,7 +25,16 @@ class BusTerminal():
output_word_addresses: list[int], output_word_addresses: list[int],
input_word_addresses: list[int], input_word_addresses: list[int],
mixed_mapping: bool): mixed_mapping: bool):
"""
Instantiate a new BusTerminal base class.
Args:
bus_coupler: The bus coupler to which this terminal is connected.
output_bit_addresses: List of addresses of the output bits.
input_bit_addresses: List of addresses of input bits.
output_word_addresses: List of addresses of output words.
input_word_addresses: List of addresses of input words.
"""
self.bus_coupler = bus_coupler self.bus_coupler = bus_coupler
self._output_bit_addresses = output_bit_addresses self._output_bit_addresses = output_bit_addresses
self._input_bit_addresses = input_bit_addresses self._input_bit_addresses = input_bit_addresses
@ -41,7 +43,7 @@ class BusTerminal():
self._mixed_mapping = mixed_mapping self._mixed_mapping = mixed_mapping
@classmethod @classmethod
def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0): def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0) -> 'BusTerminal':
terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)] terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)]
assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler' assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler'
assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list) - 1}' assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list) - 1}'
@ -221,6 +223,21 @@ class BusCoupler():
bus_terminals: A list of bus terminal classes according to the bus_terminals: A list of bus terminal classes according to the
connected terminals. connected terminals.
modbus: The underlying modbus client used for the connection. modbus: The underlying modbus client used for the connection.
"""
def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False):
"""
Instantiate a new bus coupler base class.
Args:
host: ip or hostname of the bus coupler
port: port of the modbus host
debug: outputs modbus debug information
timeout: timeout for waiting for the device response
watchdog: time in seconds after the device sets all outputs to
default state. A value of 0 deactivates the watchdog.
debug: If True, debug information is printed.
Examples: Examples:
>>> from pyhoff.devices import * >>> from pyhoff.devices import *
@ -230,10 +247,6 @@ class BusCoupler():
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C") >>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
""" """
def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False):
self.bus_terminals: list[BusTerminal] = list() self.bus_terminals: list[BusTerminal] = list()
self._next_output_bit_offset = 0 self._next_output_bit_offset = 0
self._next_input_bit_offset = 0 self._next_input_bit_offset = 0
@ -247,7 +260,7 @@ class BusCoupler():
self.add_bus_terminals(bus_terminals) self.add_bus_terminals(bus_terminals)
self._init_hardware(watchdog) self._init_hardware(watchdog)
def _init_hardware(self, watchdog: float): def _init_hardware(self, watchdog: float) -> None:
pass pass
def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]: def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]:
@ -272,7 +285,7 @@ class BusCoupler():
for terminal_class in terminal_classes: for terminal_class in terminal_classes:
assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal' assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal'
def get_para(key: str): def get_para(key: str) -> int:
return terminal_class.parameters.get(key, 0) return terminal_class.parameters.get(key, 0)
new_terminal = terminal_class( new_terminal = terminal_class(

View File

@ -7,7 +7,7 @@ class BK9000(BusCoupler):
""" """
BK9000 ModBus TCP bus coupler BK9000 ModBus TCP bus coupler
""" """
def _init_hardware(self, watchdog: float): def _init_hardware(self, watchdog: float) -> None:
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf # https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
# config watchdog on page 58 # config watchdog on page 58
@ -44,7 +44,7 @@ class WAGO_750_352(BusCoupler):
""" """
Wago 750-352 ModBus TCP bus coupler Wago 750-352 ModBus TCP bus coupler
""" """
def _init_hardware(self, watchdog: float): def _init_hardware(self, watchdog: float) -> None:
# deactivate/reset watchdog timer: # deactivate/reset watchdog timer:
self.modbus.write_single_register(0x1005, 0xAAAA) self.modbus.write_single_register(0x1005, 0xAAAA)
self.modbus.write_single_register(0x1005, 0x5555) self.modbus.write_single_register(0x1005, 0x5555)

View File

@ -63,6 +63,19 @@ class SimpleModbusClient:
last_error: contains last error message or empty string if no error occurred last_error: contains last error message or empty string if no error occurred
debug: if True prints out transmitted and received bytes in hex debug: if True prints out transmitted and received bytes in hex
"""
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
"""
Instantiate a Modbus TCP client
Args:
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
debug: if True prints out transmitted and received bytes in hex
Example: Example:
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1) >>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
>>> print(client.read_coils(0, 10)) >>> print(client.read_coils(0, 10))
@ -75,7 +88,6 @@ class SimpleModbusClient:
>>> print(client.write_multiple_registers(0, [1234, 5678])) >>> print(client.write_multiple_registers(0, [1234, 5678]))
>>> client.close() >>> client.close()
""" """
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
assert 0 <= unit_id < 256 assert 0 <= unit_id < 256
self.host = host self.host = host
@ -87,7 +99,11 @@ class SimpleModbusClient:
self._socket: None | socket.socket = None self._socket: None | socket.socket = None
self.debug = debug self.debug = debug
def connect(self): def connect(self) -> bool:
"""
Connect manual to the configured modbus server. Usually there is
no need to call this function since it is handled automatically.
"""
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port, for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC, socket.AF_UNSPEC,
socket.SOCK_STREAM): socket.SOCK_STREAM):
@ -110,7 +126,7 @@ class SimpleModbusClient:
self.last_error = 'connection failed' self.last_error = 'connection failed'
return False return False
def close(self): def close(self) -> bytes:
""" """
Close connection Close connection

View File

@ -1,36 +1,32 @@
import unittest
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
class TestModbusFunctions(unittest.TestCase): def test_get_bits():
def test_get_bits(self):
data = bytes([0b11101010, 0b11010101]) data = bytes([0b11101010, 0b11010101])
bit_number = 16 bit_number = 16
expected = [False, True, False, True, False, True, True, True, expected = [False, True, False, True, False, True, True, True,
True, False, True, False, True, False, True, True] True, False, True, False, True, False, True, True]
result = _get_bits(data, bit_number) result = _get_bits(data, bit_number)
self.assertEqual(result, expected) assert result == expected
def test_get_words(self):
def test_get_words():
data = bytes([0x12, 0x34, 0x56, 0x78]) data = bytes([0x12, 0x34, 0x56, 0x78])
expected = [0x1234, 0x5678] expected = [0x1234, 0x5678]
result = _get_words(data) result = _get_words(data)
self.assertEqual(result, expected) assert result == expected
def test_from_bits(self):
def test_from_bits():
values = [False, True, False, True, False, True, True, True, values = [False, True, False, True, False, True, True, True,
True, False, True, False, True, False, True, True] True, False, True, False, True, False, True, True]
expected = bytes([0b11101010, 0b11010101]) expected = bytes([0b11101010, 0b11010101])
result = _from_bits(values) result = _from_bits(values)
self.assertEqual(result, expected) assert result == expected
def test_from_words(self):
def test_from_words():
values = [0x1234, 0x5678] values = [0x1234, 0x5678]
expected = bytes([0x12, 0x34, 0x56, 0x78]) expected = bytes([0x12, 0x34, 0x56, 0x78])
result = _from_words(values) result = _from_words(values)
self.assertEqual(result, expected) assert result == expected
if __name__ == '__main__':
unittest.main()