From f34eef3eb482c33c2e76cdd48575173bb79c1251 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 17 Feb 2025 14:57:03 +0100 Subject: [PATCH] docstrings and project settings updated, few refacoring --- .flake8 | 1 + .gitignore | 1 + notes/build.md | 25 ++++++++ pyproject.toml | 2 +- requirements-dev.txt | 2 + src/pyhoff/__init__.py | 122 ++++++++++++++++++---------------------- src/pyhoff/devices.py | 15 +++-- src/pyhoff/modbus.py | 72 ++++++++++++++++++++---- tests/test_terminals.py | 6 +- 9 files changed, 157 insertions(+), 89 deletions(-) create mode 100644 notes/build.md create mode 100644 requirements-dev.txt diff --git a/.flake8 b/.flake8 index 5965be3..10c587e 100644 --- a/.flake8 +++ b/.flake8 @@ -13,6 +13,7 @@ exclude = build, dist, .conda + .venv # Enable specific plugins or options # Example: Enabling flake8-docstrings diff --git a/.gitignore b/.gitignore index 538fe7c..373d2df 100644 --- a/.gitignore +++ b/.gitignore @@ -100,6 +100,7 @@ celerybeat.pid env/ venv/ ENV/ +.venv/ env.bak/ venv.bak/ diff --git a/notes/build.md b/notes/build.md new file mode 100644 index 0000000..a837cda --- /dev/null +++ b/notes/build.md @@ -0,0 +1,25 @@ +# Notes on building the package + +```bash +# Get code +git clone https://github.com/Nonannet/pyhoff.git +cd pyhoff + +# Setup venv +python -m venv ./.venv +source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate` + +# Check code: +pip install -r requirements-dev.txt +flake8 +pytest + +# Build package: +pip install build +python3 -m build + +# Upload +pip install twine +#python3 -m twine upload dist/* +python3 -m twine upload --repository testpypi dist/* # Test repository: https://test.pypi.org/project/example_package_YOUR_USERNAME_HERE +``` diff --git a/pyproject.toml b/pyproject.toml index 16668a8..d0122ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ Homepage = "https://github.com/Nonannet/pyhoff" Issues = "https://github.com/Nonannet/pyhoff/issues" [build-system] -requires = ["setuptools>=61.0"] +requires = ["setuptools>=61.0", "wheel"] build-backend = "setuptools.build_meta" [tool.setuptools.packages.find] diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..036d8c5 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest +flake8 \ No newline at end of file diff --git a/src/pyhoff/__init__.py b/src/pyhoff/__init__.py index 0b36c1f..90cc910 100644 --- a/src/pyhoff/__init__.py +++ b/src/pyhoff/__init__.py @@ -5,6 +5,17 @@ from typing import Type, Any class BusTerminal(): """ Base class for all bus terminals. + + Args: + bus_coupler: The bus coupler to which this terminal is connected. + output_bit_offset: The offset of the output bits. + input_bit_offset: The offset of the input bits. + output_word_offset: The offset of the output words. + input_word_offset: The offset of the input words. + + Attributes: + bus_coupler: The bus coupler to which this terminal is connected. + parameters: The parameters of the terminal. """ parameters: dict[str, int] = {} @@ -13,12 +24,7 @@ class BusTerminal(): input_bit_offset: int, output_word_offset: int, input_word_offset: int): - """ - Initialize a BusTerminal. - Args: - bus_coupler: The bus coupler to which this terminal is connected. - """ self._output_bit_offset = output_bit_offset self._input_bit_offset = input_bit_offset self._output_word_offset = output_word_offset @@ -28,7 +34,7 @@ class BusTerminal(): class DigitalInputTerminal(BusTerminal): """ - Represents a digital input terminal. + Base class for digital input terminals. """ def read_input(self, channel: int) -> bool | None: """ @@ -45,12 +51,12 @@ class DigitalInputTerminal(BusTerminal): """ if channel < 1 or channel > self.parameters['input_bit_width']: raise Exception("address out of range") - return self.bus_coupler._read_discrete_input(self._input_bit_offset + channel - 1) + return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1) class DigitalOutputTerminal(BusTerminal): """ - Represents a digital output terminal. + Base class for digital output terminals. """ def write_coil(self, channel: int, value: bool) -> bool: """ @@ -85,12 +91,12 @@ class DigitalOutputTerminal(BusTerminal): """ if channel < 1 or channel > self.parameters['output_bit_width']: raise Exception("address out of range") - return self.bus_coupler._read_coil(self._output_bit_offset + channel - 1) + return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1) class AnalogInputTerminal(BusTerminal): """ - Represents an analog input terminal. + Base class for analog input terminals. """ def read_words(self, word_offset: int, word_count: int) -> list[int] | None: """ @@ -141,7 +147,7 @@ class AnalogInputTerminal(BusTerminal): class AnalogOutputTerminal(BusTerminal): """ - Represents an analog output terminal. + Base class for analog output terminals. """ def read_words(self, word_offset: int, word_count: int) -> list[int] | None: """ @@ -207,28 +213,40 @@ class AnalogOutputTerminal(BusTerminal): class BusCoupler(): - """BusCoupler: Busskoppler ModBus TCP""" + """ + Base class for ModBus TCP bus coupler + + Args: + host: ip or hostname of the bus coupler + port: port of the modbus host + debug: outputs modbus debug information + timeout: timeout for waiting for the device response + watchdog: time in seconds after the device sets all outputs to + default state. A value of 0 deactivates the watchdog. + debug: If True, debug information is printed. + + Attributes: + bus_terminals: A list of bus terminal classes according to the + connected terminals. + modbus: The underlying modbus client used for the connection. + + Examples: + >>> from pyhoff.devices import * + >>> bk = BK9000('192.168.0.23', bus_terminals=[KL3202, KL9010]) + >>> t1 = bk.terminals[0].read_temperature(1) + >>> t2 = bk.terminals[0].read_temperature(2) + >>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C") + Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C + """ def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [], timeout: float = 5, watchdog: float = 0, debug: bool = False): - """ - Constructs a Bus Coupler connected over ModBus TCP. - - Args: - host: ip or hostname of the BK9050 - port: port of the modbus host - debug: outputs modbus debug information - timeout: timeout for waiting for the device response - watchdog: time in seconds after the device sets all outputs to - default state. A value of 0 deactivates the watchdog. - debug: If True, debug information is printed. - """ self.bus_terminals: list[Any] = list() - self.next_output_bit_offset = 0 - self.next_input_bit_offset = 0 - self.next_output_word_offset = 0 - self.next_input_word_offset = 0 + self._next_output_bit_offset = 0 + self._next_input_bit_offset = 0 + self._next_output_word_offset = 0 + self._next_input_word_offset = 0 self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug) self.add_bus_terminals(bus_terminals) @@ -237,38 +255,6 @@ class BusCoupler(): def _init_hardware(self, watchdog: float): pass - def _read_discrete_input(self, address: int) -> bool | None: - """ - Read a discrete input from the given register address. - - Args: - address: The register address to read from. - - Returns: - The value of the discrete input. - """ - value = self.modbus.read_discrete_inputs(address) - if value: - return value[0] - else: - return None - - def _read_coil(self, address: int) -> bool | None: - """ - Read a coil from the given register address. - - Args: - address: The register address to read from. - - Returns: - The value of the coil. - """ - value = self.modbus.read_coils(address) - if value: - return value[0] - else: - return None - def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]: """ Add bus terminals to the bus coupler. @@ -282,15 +268,15 @@ class BusCoupler(): for terminal_class in bus_terminals: assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal' new_terminal = terminal_class(self, - self.next_output_bit_offset, - self.next_input_bit_offset, - self.next_output_word_offset, - self.next_input_word_offset) + self._next_output_bit_offset, + self._next_input_bit_offset, + self._next_output_word_offset, + self._next_input_word_offset) - self.next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0) - self.next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0) - self.next_output_word_offset += terminal_class.parameters.get('output_word_width', 0) - self.next_input_word_offset += terminal_class.parameters.get('input_word_width', 0) + self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0) + self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0) + self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0) + self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0) self.bus_terminals.append(new_terminal) diff --git a/src/pyhoff/devices.py b/src/pyhoff/devices.py index 451bfdf..966e7ca 100644 --- a/src/pyhoff/devices.py +++ b/src/pyhoff/devices.py @@ -5,7 +5,7 @@ from . import BusTerminal, BusCoupler class BK9000(BusCoupler): """ - BK9000: Busskoppler ModBus TCP + BK9000 ModBus TCP bus coupler """ def _init_hardware(self, watchdog: float): # https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf @@ -24,21 +24,21 @@ class BK9000(BusCoupler): class BK9050(BK9000): """ - BK9050: Busskoppler ModBus TCP + BK9050 ModBus TCP bus coupler """ pass class BK9100(BK9000): """ - BK9100: Busskoppler ModBus TCP + BK9100 ModBus TCP bus coupler """ pass class WAGO750_352(BusCoupler): """ - Wago 750-352: Busskoppler ModBus TCP + Wago 750-352 ModBus TCP bus coupler """ def _init_hardware(self, watchdog: float): # deactivate/reset watchdog timer: @@ -165,7 +165,9 @@ class KL1512(AnalogInputTerminal): # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) parameters = {'input_word_width': 4, 'output_word_width': 4} - _last_counter_values = [0, 0] + def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int): + super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset) + self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)] def read_counter(self, channel: int) -> int: """ @@ -190,6 +192,7 @@ class KL1512(AnalogInputTerminal): Returns: The counter value. """ + # TODO: handel overflow new_count = self.read_word(channel * 2 - 1) return new_count - self._last_counter_values[channel - 1] @@ -380,7 +383,7 @@ class KL9188(BusTerminal): class WAGO_750_600(BusTerminal): """ - End nodule, no I/O function + End terminal, no I/O function """ pass diff --git a/src/pyhoff/modbus.py b/src/pyhoff/modbus.py index a2760d3..c7b0bbd 100644 --- a/src/pyhoff/modbus.py +++ b/src/pyhoff/modbus.py @@ -46,18 +46,36 @@ def _from_words(values: list[int]) -> bytes: class SimpleModbusClient: """ - Modbus TCP client + A simple Modbus TCP client + + Args: + host: hostname or IP address + port: server port + unit_id: ModBus id + timeout: socket timeout in seconds + debug: if True prints out transmitted and received bytes in hex + + Attributes: + host: hostname or IP address + port: server port + unit_id: ModBus id + timeout: socket timeout in seconds + last_error: contains last error message or empty string if no error occurred + debug: if True prints out transmitted and received bytes in hex + + Example: + >>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1) + >>> print(client.read_coils(0, 10)) + >>> print(client.read_discrete_inputs(0, 10)) + >>> print(client.read_holding_registers(0, 10)) + >>> print(client.read_input_registers(0, 10)) + >>> print(client.write_single_coil(0, True)) + >>> print(client.write_single_register(0, 1234)) + >>> print(client.write_multiple_coils(0, [True, False, True])) + >>> print(client.write_multiple_registers(0, [1234, 5678])) + >>> client.close() """ def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False): - """ - Constructs a Modbus client - - Args: - host: hostname or IPv4/IPv6 address server address - port: server port - unit_id: ModBus id - timeout: socket timeout in seconds - """ assert 0 <= unit_id < 256 self.host = host @@ -272,6 +290,38 @@ class SimpleModbusClient: return data == tx_data + def read_discrete_input(self, address: int) -> bool | None: + """ + Read a discrete input from the given register address. + + Args: + address: The register address to read from. + + Returns: + The value of the discrete input. + """ + value = self.read_discrete_inputs(address) + if value: + return value[0] + else: + return None + + def read_coil(self, address: int) -> bool | None: + """ + Read a coil from the given register address. + + Args: + address: The register address to read from. + + Returns: + The value of the coil. + """ + value = self.read_coils(address) + if value: + return value[0] + else: + return None + def write_single_register(self, register_address: int, value: int) -> bool: """ ModBus function for writing a single register (0x06) @@ -400,7 +450,7 @@ class SimpleModbusClient: try: self._socket.sendall(data) if self.debug: - print(f'-> Send: {' '.join(hex(b) for b in data)}') + print(f'-> Send: {' '.join(hex(b) for b in data)}') return len(data) except socket.error: self.last_error = 'sending data failed' diff --git a/tests/test_terminals.py b/tests/test_terminals.py index 60b5c90..6c2ce38 100644 --- a/tests/test_terminals.py +++ b/tests/test_terminals.py @@ -1,7 +1,7 @@ import inspect -import src.pyhoff as pyhoff -import src.pyhoff.devices as devices -from src.pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal +import pyhoff as pyhoff +import pyhoff.devices as devices +from pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal def test_terminal_plausib():