docstrings and project settings updated, few refacoring

This commit is contained in:
Nicolas 2025-02-17 14:57:03 +01:00
parent b68b32680a
commit f34eef3eb4
9 changed files with 157 additions and 89 deletions

View File

@ -13,6 +13,7 @@ exclude =
build,
dist,
.conda
.venv
# Enable specific plugins or options
# Example: Enabling flake8-docstrings

1
.gitignore vendored
View File

@ -100,6 +100,7 @@ celerybeat.pid
env/
venv/
ENV/
.venv/
env.bak/
venv.bak/

25
notes/build.md Normal file
View File

@ -0,0 +1,25 @@
# Notes on building the package
```bash
# Get code
git clone https://github.com/Nonannet/pyhoff.git
cd pyhoff
# Setup venv
python -m venv ./.venv
source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate`
# Check code:
pip install -r requirements-dev.txt
flake8
pytest
# Build package:
pip install build
python3 -m build
# Upload
pip install twine
#python3 -m twine upload dist/*
python3 -m twine upload --repository testpypi dist/* # Test repository: https://test.pypi.org/project/example_package_YOUR_USERNAME_HERE
```

View File

@ -18,7 +18,7 @@ Homepage = "https://github.com/Nonannet/pyhoff"
Issues = "https://github.com/Nonannet/pyhoff/issues"
[build-system]
requires = ["setuptools>=61.0"]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]

2
requirements-dev.txt Normal file
View File

@ -0,0 +1,2 @@
pytest
flake8

View File

@ -5,6 +5,17 @@ from typing import Type, Any
class BusTerminal():
"""
Base class for all bus terminals.
Args:
bus_coupler: The bus coupler to which this terminal is connected.
output_bit_offset: The offset of the output bits.
input_bit_offset: The offset of the input bits.
output_word_offset: The offset of the output words.
input_word_offset: The offset of the input words.
Attributes:
bus_coupler: The bus coupler to which this terminal is connected.
parameters: The parameters of the terminal.
"""
parameters: dict[str, int] = {}
@ -13,12 +24,7 @@ class BusTerminal():
input_bit_offset: int,
output_word_offset: int,
input_word_offset: int):
"""
Initialize a BusTerminal.
Args:
bus_coupler: The bus coupler to which this terminal is connected.
"""
self._output_bit_offset = output_bit_offset
self._input_bit_offset = input_bit_offset
self._output_word_offset = output_word_offset
@ -28,7 +34,7 @@ class BusTerminal():
class DigitalInputTerminal(BusTerminal):
"""
Represents a digital input terminal.
Base class for digital input terminals.
"""
def read_input(self, channel: int) -> bool | None:
"""
@ -45,12 +51,12 @@ class DigitalInputTerminal(BusTerminal):
"""
if channel < 1 or channel > self.parameters['input_bit_width']:
raise Exception("address out of range")
return self.bus_coupler._read_discrete_input(self._input_bit_offset + channel - 1)
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1)
class DigitalOutputTerminal(BusTerminal):
"""
Represents a digital output terminal.
Base class for digital output terminals.
"""
def write_coil(self, channel: int, value: bool) -> bool:
"""
@ -85,12 +91,12 @@ class DigitalOutputTerminal(BusTerminal):
"""
if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range")
return self.bus_coupler._read_coil(self._output_bit_offset + channel - 1)
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1)
class AnalogInputTerminal(BusTerminal):
"""
Represents an analog input terminal.
Base class for analog input terminals.
"""
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
"""
@ -141,7 +147,7 @@ class AnalogInputTerminal(BusTerminal):
class AnalogOutputTerminal(BusTerminal):
"""
Represents an analog output terminal.
Base class for analog output terminals.
"""
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
"""
@ -207,28 +213,40 @@ class AnalogOutputTerminal(BusTerminal):
class BusCoupler():
"""BusCoupler: Busskoppler ModBus TCP"""
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False):
"""
Constructs a Bus Coupler connected over ModBus TCP.
Base class for ModBus TCP bus coupler
Args:
host: ip or hostname of the BK9050
host: ip or hostname of the bus coupler
port: port of the modbus host
debug: outputs modbus debug information
timeout: timeout for waiting for the device response
watchdog: time in seconds after the device sets all outputs to
default state. A value of 0 deactivates the watchdog.
debug: If True, debug information is printed.
Attributes:
bus_terminals: A list of bus terminal classes according to the
connected terminals.
modbus: The underlying modbus client used for the connection.
Examples:
>>> from pyhoff.devices import *
>>> bk = BK9000('192.168.0.23', bus_terminals=[KL3202, KL9010])
>>> t1 = bk.terminals[0].read_temperature(1)
>>> t2 = bk.terminals[0].read_temperature(2)
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
"""
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False):
self.bus_terminals: list[Any] = list()
self.next_output_bit_offset = 0
self.next_input_bit_offset = 0
self.next_output_word_offset = 0
self.next_input_word_offset = 0
self._next_output_bit_offset = 0
self._next_input_bit_offset = 0
self._next_output_word_offset = 0
self._next_input_word_offset = 0
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
self.add_bus_terminals(bus_terminals)
@ -237,38 +255,6 @@ class BusCoupler():
def _init_hardware(self, watchdog: float):
pass
def _read_discrete_input(self, address: int) -> bool | None:
"""
Read a discrete input from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the discrete input.
"""
value = self.modbus.read_discrete_inputs(address)
if value:
return value[0]
else:
return None
def _read_coil(self, address: int) -> bool | None:
"""
Read a coil from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the coil.
"""
value = self.modbus.read_coils(address)
if value:
return value[0]
else:
return None
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]:
"""
Add bus terminals to the bus coupler.
@ -282,15 +268,15 @@ class BusCoupler():
for terminal_class in bus_terminals:
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
new_terminal = terminal_class(self,
self.next_output_bit_offset,
self.next_input_bit_offset,
self.next_output_word_offset,
self.next_input_word_offset)
self._next_output_bit_offset,
self._next_input_bit_offset,
self._next_output_word_offset,
self._next_input_word_offset)
self.next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
self.next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
self.next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
self.next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
self.bus_terminals.append(new_terminal)

View File

@ -5,7 +5,7 @@ from . import BusTerminal, BusCoupler
class BK9000(BusCoupler):
"""
BK9000: Busskoppler ModBus TCP
BK9000 ModBus TCP bus coupler
"""
def _init_hardware(self, watchdog: float):
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
@ -24,21 +24,21 @@ class BK9000(BusCoupler):
class BK9050(BK9000):
"""
BK9050: Busskoppler ModBus TCP
BK9050 ModBus TCP bus coupler
"""
pass
class BK9100(BK9000):
"""
BK9100: Busskoppler ModBus TCP
BK9100 ModBus TCP bus coupler
"""
pass
class WAGO750_352(BusCoupler):
"""
Wago 750-352: Busskoppler ModBus TCP
Wago 750-352 ModBus TCP bus coupler
"""
def _init_hardware(self, watchdog: float):
# deactivate/reset watchdog timer:
@ -165,7 +165,9 @@ class KL1512(AnalogInputTerminal):
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4}
_last_counter_values = [0, 0]
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int):
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset)
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)]
def read_counter(self, channel: int) -> int:
"""
@ -190,6 +192,7 @@ class KL1512(AnalogInputTerminal):
Returns:
The counter value.
"""
# TODO: handel overflow
new_count = self.read_word(channel * 2 - 1)
return new_count - self._last_counter_values[channel - 1]
@ -380,7 +383,7 @@ class KL9188(BusTerminal):
class WAGO_750_600(BusTerminal):
"""
End nodule, no I/O function
End terminal, no I/O function
"""
pass

View File

@ -46,18 +46,36 @@ def _from_words(values: list[int]) -> bytes:
class SimpleModbusClient:
"""
Modbus TCP client
"""
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
"""
Constructs a Modbus client
A simple Modbus TCP client
Args:
host: hostname or IPv4/IPv6 address server address
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
debug: if True prints out transmitted and received bytes in hex
Attributes:
host: hostname or IP address
port: server port
unit_id: ModBus id
timeout: socket timeout in seconds
last_error: contains last error message or empty string if no error occurred
debug: if True prints out transmitted and received bytes in hex
Example:
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
>>> print(client.read_coils(0, 10))
>>> print(client.read_discrete_inputs(0, 10))
>>> print(client.read_holding_registers(0, 10))
>>> print(client.read_input_registers(0, 10))
>>> print(client.write_single_coil(0, True))
>>> print(client.write_single_register(0, 1234))
>>> print(client.write_multiple_coils(0, [True, False, True]))
>>> print(client.write_multiple_registers(0, [1234, 5678]))
>>> client.close()
"""
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
assert 0 <= unit_id < 256
self.host = host
@ -272,6 +290,38 @@ class SimpleModbusClient:
return data == tx_data
def read_discrete_input(self, address: int) -> bool | None:
"""
Read a discrete input from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the discrete input.
"""
value = self.read_discrete_inputs(address)
if value:
return value[0]
else:
return None
def read_coil(self, address: int) -> bool | None:
"""
Read a coil from the given register address.
Args:
address: The register address to read from.
Returns:
The value of the coil.
"""
value = self.read_coils(address)
if value:
return value[0]
else:
return None
def write_single_register(self, register_address: int, value: int) -> bool:
"""
ModBus function for writing a single register (0x06)

View File

@ -1,7 +1,7 @@
import inspect
import src.pyhoff as pyhoff
import src.pyhoff.devices as devices
from src.pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal
import pyhoff as pyhoff
import pyhoff.devices as devices
from pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal
def test_terminal_plausib():