mirror of https://github.com/Nonannet/pyhoff.git
Interface for instantiating, reading and writing extended for more compact type save coding style
This commit is contained in:
parent
228b1e7a65
commit
e82592cc56
|
@ -1,5 +1,12 @@
|
|||
from .modbus import SimpleModbusClient
|
||||
from typing import Type
|
||||
from typing import Type, Iterable
|
||||
|
||||
|
||||
def _is_bus_terminal(bt_type: Type['BusTerminal']) -> bool:
|
||||
if BusTerminal.__name__ == bt_type.__name__:
|
||||
return True
|
||||
|
||||
return any(_is_bus_terminal(b) for b in bt_type.__bases__)
|
||||
|
||||
|
||||
class BusTerminal():
|
||||
|
@ -33,6 +40,13 @@ class BusTerminal():
|
|||
self._input_word_addresses = input_word_addresses
|
||||
self._mixed_mapping = mixed_mapping
|
||||
|
||||
@classmethod
|
||||
def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0):
|
||||
terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)]
|
||||
assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler'
|
||||
assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list)-1}'
|
||||
return terminal_list[terminal_number]
|
||||
|
||||
|
||||
class DigitalInputTerminal(BusTerminal):
|
||||
"""
|
||||
|
@ -145,7 +159,7 @@ class AnalogOutputTerminal(BusTerminal):
|
|||
channel: The channel number (1 based index) to read from.
|
||||
|
||||
Returns:
|
||||
The read word value.
|
||||
The read word value or provided error_value if read failed.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
|
@ -158,7 +172,7 @@ class AnalogOutputTerminal(BusTerminal):
|
|||
|
||||
return value[0] if value else error_value
|
||||
|
||||
def write_channel_word(self, channel: int, value: int) -> int:
|
||||
def write_channel_word(self, channel: int, value: int) -> bool:
|
||||
"""
|
||||
Write a word to the terminal.
|
||||
|
||||
|
@ -176,15 +190,18 @@ class AnalogOutputTerminal(BusTerminal):
|
|||
|
||||
return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value)
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
def set_normalized(self, channel: int, value: float) -> bool:
|
||||
"""
|
||||
Set a normalized value between 0 and 1 to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to set.
|
||||
value: The normalized value to set.
|
||||
|
||||
Returns:
|
||||
True if the write operation succeeded.
|
||||
"""
|
||||
self.write_channel_word(channel, int(value * 0x7FFF))
|
||||
return self.write_channel_word(channel, int(value * 0x7FFF))
|
||||
|
||||
|
||||
class BusCoupler():
|
||||
|
@ -214,7 +231,7 @@ class BusCoupler():
|
|||
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
|
||||
"""
|
||||
|
||||
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
|
||||
def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [],
|
||||
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
||||
|
||||
self.bus_terminals: list[BusTerminal] = list()
|
||||
|
@ -233,19 +250,27 @@ class BusCoupler():
|
|||
def _init_hardware(self, watchdog: float):
|
||||
pass
|
||||
|
||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
|
||||
def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]:
|
||||
"""
|
||||
Add bus terminals to the bus coupler.
|
||||
|
||||
Args:
|
||||
bus_terminals: A list of bus terminal classes to add.
|
||||
new_bus_terminals: bus terminal classes to add.
|
||||
|
||||
Returns:
|
||||
The corresponding list of bus terminal objects.
|
||||
"""
|
||||
|
||||
for terminal_class in bus_terminals:
|
||||
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
|
||||
terminal_classes: list[Type[BusTerminal]] = []
|
||||
for element in new_bus_terminals:
|
||||
if isinstance(element, Iterable):
|
||||
for bt in element:
|
||||
terminal_classes.append(bt)
|
||||
else:
|
||||
terminal_classes.append(element)
|
||||
|
||||
for terminal_class in terminal_classes:
|
||||
assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal'
|
||||
|
||||
def get_para(key: str):
|
||||
return terminal_class.parameters.get(key, 0)
|
||||
|
|
Loading…
Reference in New Issue