From e82592cc568ad2db0ac13b53591ec0dec2a69c92 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 20 Feb 2025 13:18:18 +0100 Subject: [PATCH] Interface for instantiating, reading and writing extended for more compact type save coding style --- src/pyhoff/__init__.py | 45 ++++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/src/pyhoff/__init__.py b/src/pyhoff/__init__.py index 5a3d4e1..c51b910 100644 --- a/src/pyhoff/__init__.py +++ b/src/pyhoff/__init__.py @@ -1,5 +1,12 @@ from .modbus import SimpleModbusClient -from typing import Type +from typing import Type, Iterable + + +def _is_bus_terminal(bt_type: Type['BusTerminal']) -> bool: + if BusTerminal.__name__ == bt_type.__name__: + return True + + return any(_is_bus_terminal(b) for b in bt_type.__bases__) class BusTerminal(): @@ -33,6 +40,13 @@ class BusTerminal(): self._input_word_addresses = input_word_addresses self._mixed_mapping = mixed_mapping + @classmethod + def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0): + terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)] + assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler' + assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list)-1}' + return terminal_list[terminal_number] + class DigitalInputTerminal(BusTerminal): """ @@ -145,7 +159,7 @@ class AnalogOutputTerminal(BusTerminal): channel: The channel number (1 based index) to read from. Returns: - The read word value. + The read word value or provided error_value if read failed. Raises: Exception: If the word offset or count is out of range. @@ -158,7 +172,7 @@ class AnalogOutputTerminal(BusTerminal): return value[0] if value else error_value - def write_channel_word(self, channel: int, value: int) -> int: + def write_channel_word(self, channel: int, value: int) -> bool: """ Write a word to the terminal. @@ -176,15 +190,18 @@ class AnalogOutputTerminal(BusTerminal): return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value) - def set_normalized(self, channel: int, value: float): + def set_normalized(self, channel: int, value: float) -> bool: """ Set a normalized value between 0 and 1 to a specific channel. Args: channel: The channel number to set. value: The normalized value to set. + + Returns: + True if the write operation succeeded. """ - self.write_channel_word(channel, int(value * 0x7FFF)) + return self.write_channel_word(channel, int(value * 0x7FFF)) class BusCoupler(): @@ -214,7 +231,7 @@ class BusCoupler(): Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C """ - def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [], + def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [], timeout: float = 5, watchdog: float = 0, debug: bool = False): self.bus_terminals: list[BusTerminal] = list() @@ -233,19 +250,27 @@ class BusCoupler(): def _init_hardware(self, watchdog: float): pass - def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]: + def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]: """ Add bus terminals to the bus coupler. Args: - bus_terminals: A list of bus terminal classes to add. + new_bus_terminals: bus terminal classes to add. Returns: The corresponding list of bus terminal objects. """ - for terminal_class in bus_terminals: - assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal' + terminal_classes: list[Type[BusTerminal]] = [] + for element in new_bus_terminals: + if isinstance(element, Iterable): + for bt in element: + terminal_classes.append(bt) + else: + terminal_classes.append(element) + + for terminal_class in terminal_classes: + assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal' def get_para(key: str): return terminal_class.parameters.get(key, 0)