From b2e51581bfec8466a411b5db0dd6ed68ffc381a9 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 19 Feb 2025 09:59:54 +0100 Subject: [PATCH] Mapping for different BusCoupler revised and fixed; WAGO750_352 renamed to WAGO_750_352 --- src/pyhoff/__init__.py | 172 ++++++++++++++++++++--------------------- src/pyhoff/devices.py | 64 ++++++++------- 2 files changed, 122 insertions(+), 114 deletions(-) diff --git a/src/pyhoff/__init__.py b/src/pyhoff/__init__.py index 90cc910..5a3d4e1 100644 --- a/src/pyhoff/__init__.py +++ b/src/pyhoff/__init__.py @@ -1,5 +1,5 @@ from .modbus import SimpleModbusClient -from typing import Type, Any +from typing import Type class BusTerminal(): @@ -8,10 +8,10 @@ class BusTerminal(): Args: bus_coupler: The bus coupler to which this terminal is connected. - output_bit_offset: The offset of the output bits. - input_bit_offset: The offset of the input bits. - output_word_offset: The offset of the output words. - input_word_offset: The offset of the input words. + output_bit_addresses: List of addresses of the output bits. + input_bit_addresses: List of addresses of input bits. + output_word_addresses: List of addresses of output words. + input_word_addresses: List of addresses of input words. Attributes: bus_coupler: The bus coupler to which this terminal is connected. @@ -20,16 +20,18 @@ class BusTerminal(): parameters: dict[str, int] = {} def __init__(self, bus_coupler: 'BusCoupler', - output_bit_offset: int, - input_bit_offset: int, - output_word_offset: int, - input_word_offset: int): + output_bit_addresses: list[int], + input_bit_addresses: list[int], + output_word_addresses: list[int], + input_word_addresses: list[int], + mixed_mapping: bool): - self._output_bit_offset = output_bit_offset - self._input_bit_offset = input_bit_offset - self._output_word_offset = output_word_offset - self._input_word_offset = input_word_offset self.bus_coupler = bus_coupler + self._output_bit_addresses = output_bit_addresses + self._input_bit_addresses = input_bit_addresses + self._output_word_addresses = output_word_addresses + self._input_word_addresses = input_word_addresses + self._mixed_mapping = mixed_mapping class DigitalInputTerminal(BusTerminal): @@ -51,7 +53,7 @@ class DigitalInputTerminal(BusTerminal): """ if channel < 1 or channel > self.parameters['input_bit_width']: raise Exception("address out of range") - return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1) + return self.bus_coupler.modbus.read_discrete_input(self._input_bit_addresses[channel - 1]) class DigitalOutputTerminal(BusTerminal): @@ -74,7 +76,7 @@ class DigitalOutputTerminal(BusTerminal): """ if channel < 1 or channel > self.parameters['output_bit_width']: raise Exception("address out of range") - return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value) + return self.bus_coupler.modbus.write_single_coil(self._output_bit_addresses[channel - 1], value) def read_coil(self, channel: int) -> bool | None: """ @@ -91,46 +93,32 @@ class DigitalOutputTerminal(BusTerminal): """ if channel < 1 or channel > self.parameters['output_bit_width']: raise Exception("address out of range") - return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1) + return self.bus_coupler.modbus.read_coil(self._output_bit_addresses[channel - 1]) class AnalogInputTerminal(BusTerminal): """ Base class for analog input terminals. """ - def read_words(self, word_offset: int, word_count: int) -> list[int] | None: - """ - Read a list of words from the terminal. - - Args: - word_offset: The starting word offset (0 based index). - word_count: The number of words to read. - - Returns: - The read words. - - Raises: - Exception: If the word offset or count is out of range. - """ - if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']: - raise Exception("address out of range") - return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count) - - def read_word(self, word_offset: int) -> int: + def read_channel_word(self, channel: int, error_value: int = -99999) -> int: """ Read a single word from the terminal. Args: - word_offset: The word offset (0 based index) to read from. + channel: The channel number (1 based index) to read from. Returns: The read word value. + + Raises: + Exception: If the word offset or count is out of range. """ - val = self.read_words(word_offset, 1) - if val: - return val[0] - else: - return -999 + assert 1 <= channel <= self.parameters['input_word_width'], \ + f"channel out of range, must be between {1} and {self.parameters['input_word_width']}" + + value = self.bus_coupler.modbus.read_input_registers(self._input_word_addresses[channel - 1], 1) + + return value[0] if value else error_value def read_normalized(self, channel: int) -> float: """ @@ -142,64 +130,51 @@ class AnalogInputTerminal(BusTerminal): Returns: The normalized value. """ - return self.read_word(channel * 2 - 1) / 0x7FFF + return self.read_channel_word(channel) / 0x7FFF class AnalogOutputTerminal(BusTerminal): """ Base class for analog output terminals. """ - def read_words(self, word_offset: int, word_count: int) -> list[int] | None: - """ - Read a list of words from the terminal. - - Args: - word_offset: The starting word offset (0 based index). - word_count: The number of words to read. - - Returns: - The read words. - - Raises: - Exception: If the word offset or count is out of range. - """ - if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']: - raise Exception("address out of range") - return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count) - - def read_word(self, word_offset: int) -> int: + def read_channel_word(self, channel: int, error_value: int = -99999) -> int: """ Read a single word from the terminal. Args: - word_offset: The word offset (0 based index) to read from. + channel: The channel number (1 based index) to read from. Returns: The read word value. - """ - val = self.read_words(word_offset, 1) - if val: - return val[0] - else: - return -999 - def write_word(self, word_offset: int, data: int) -> bool: + Raises: + Exception: If the word offset or count is out of range. + """ + assert not self._mixed_mapping, 'Reading of output state is not supported with this Bus Coupler.' + assert 1 <= channel <= self.parameters['output_word_width'], \ + f"channel out of range, must be between {1} and {self.parameters['output_word_width']}" + + value = self.bus_coupler.modbus.read_holding_registers(self._output_word_addresses[channel - 1], 1) + + return value[0] if value else error_value + + def write_channel_word(self, channel: int, value: int) -> int: """ Write a word to the terminal. Args: - word_offset: The word offset to write to. - data: The data to write. + channel: The channel number (1 based index) to write to. Returns: - The result of the write operation. + True if the write operation succeeded. Raises: - Exception: If the word offset is out of range. + Exception: If the word offset or count is out of range. """ - if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']: - raise Exception("address out of range") - return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data) + assert 1 <= channel <= self.parameters['output_word_width'], \ + f"channel out of range, must be between {1} and {self.parameters['output_word_width']}" + + return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value) def set_normalized(self, channel: int, value: float): """ @@ -209,7 +184,7 @@ class AnalogOutputTerminal(BusTerminal): channel: The channel number to set. value: The normalized value to set. """ - self.write_word(channel * 2 - 1, int(value * 0x7FFF)) + self.write_channel_word(channel, int(value * 0x7FFF)) class BusCoupler(): @@ -242,11 +217,14 @@ class BusCoupler(): def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [], timeout: float = 5, watchdog: float = 0, debug: bool = False): - self.bus_terminals: list[Any] = list() + self.bus_terminals: list[BusTerminal] = list() self._next_output_bit_offset = 0 self._next_input_bit_offset = 0 self._next_output_word_offset = 0 self._next_input_word_offset = 0 + self._channel_spacing = 1 + self._channel_offset = 0 + self._mixed_mapping = True self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug) self.add_bus_terminals(bus_terminals) @@ -255,7 +233,7 @@ class BusCoupler(): def _init_hardware(self, watchdog: float): pass - def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]: + def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]: """ Add bus terminals to the bus coupler. @@ -265,18 +243,36 @@ class BusCoupler(): Returns: The corresponding list of bus terminal objects. """ + for terminal_class in bus_terminals: assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal' - new_terminal = terminal_class(self, - self._next_output_bit_offset, - self._next_input_bit_offset, - self._next_output_word_offset, - self._next_input_word_offset) - self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0) - self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0) - self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0) - self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0) + def get_para(key: str): + return terminal_class.parameters.get(key, 0) + + new_terminal = terminal_class( + self, + [i + self._next_output_bit_offset for i in range(get_para('output_bit_width'))], + [i + self._next_input_bit_offset for i in range(get_para('input_bit_width'))], + [i * self._channel_spacing + self._channel_offset + self._next_output_word_offset + for i in range(get_para('output_word_width'))], + [i * self._channel_spacing + self._channel_offset + self._next_input_word_offset + for i in range(get_para('input_word_width'))], + self._mixed_mapping) + + output_word_width = get_para('output_word_width') + input_word_width = get_para('input_word_width') + + if self._mixed_mapping: + # Shared mapping for word based inputs and outputs + word_width = max(output_word_width, input_word_width) + output_word_width = word_width + input_word_width = word_width + + self._next_output_bit_offset += get_para('output_bit_width') + self._next_input_bit_offset += get_para('input_bit_width') + self._next_output_word_offset += output_word_width * self._channel_spacing + self._next_input_word_offset += input_word_width * self._channel_spacing self.bus_terminals.append(new_terminal) diff --git a/src/pyhoff/devices.py b/src/pyhoff/devices.py index 966e7ca..ac66055 100644 --- a/src/pyhoff/devices.py +++ b/src/pyhoff/devices.py @@ -19,7 +19,11 @@ class BK9000(BusCoupler): self.modbus.write_single_register(0x1121, 0xAFFE) # set process image offset - self.next_output_word_offset = 0x0800 + self._next_output_word_offset = 0x0800 + + # set channel placement for terminal mapping + self._channel_spacing = 2 + self._channel_offset = 1 class BK9050(BK9000): @@ -36,7 +40,7 @@ class BK9100(BK9000): pass -class WAGO750_352(BusCoupler): +class WAGO_750_352(BusCoupler): """ Wago 750-352 ModBus TCP bus coupler """ @@ -53,7 +57,11 @@ class WAGO750_352(BusCoupler): self.modbus.write_single_register(0x1001, 0xFFFF) # set process image offset - self.next_output_word_offset = 0x0000 + self._next_output_word_offset = 0x0000 + self._next_output_bit_offset = 512 + + # set separated input output mapping + self._mixed_mapping = False class DigitalInputTerminal4Bit(DigitalInputTerminal): @@ -160,14 +168,14 @@ class WAGO_750_530(DigitalOutputTerminal8Bit): class KL1512(AnalogInputTerminal): """ - KL1512: 2x digital input, counter, 24 V DC, 1 kHz + KL1512: 2x 16 bit counter, 24 V DC, 1 kHz """ - # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) - parameters = {'input_word_width': 4, 'output_word_width': 4} + # Input: 2 x 16 Bit Daten (optional 4x 8 Bit Control/Status) + parameters = {'input_word_width': 2} - def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int): - super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset) - self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)] + def __init__(self, bus_coupler: BusCoupler, o_b_addr: list[int], i_b_addr: list[int], o_w_addr: list[int], i_w_addr: list[int], mixed_mapping: bool): + super().__init__(bus_coupler, o_b_addr, i_b_addr, o_w_addr, i_w_addr, mixed_mapping) + self._last_counter_values = [self.read_channel_word(1), self.read_channel_word(2)] def read_counter(self, channel: int) -> int: """ @@ -180,7 +188,7 @@ class KL1512(AnalogInputTerminal): The counter value. """ - return self.read_word(channel * 2 - 1) + return self.read_channel_word(channel) def read_delta(self, channel: int) -> int: """ @@ -192,9 +200,13 @@ class KL1512(AnalogInputTerminal): Returns: The counter value. """ - # TODO: handel overflow - new_count = self.read_word(channel * 2 - 1) - return new_count - self._last_counter_values[channel - 1] + new_count = self.read_channel_word(channel) + delta = new_count - self._last_counter_values[channel - 1] + if delta > 0x8000: + delta = delta - 0x10000 + elif delta < -0x8000: + delta = delta + 0x10000 + return delta class KL3054(AnalogInputTerminal): @@ -202,7 +214,7 @@ class KL3054(AnalogInputTerminal): KL3054: 4x analog input 4...20 mA 12 Bit single-ended """ # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) - parameters = {'input_word_width': 8, 'output_word_width': 8} + parameters = {'input_word_width': 4} def read_current(self, channel: int) -> float: """ @@ -214,7 +226,7 @@ class KL3054(AnalogInputTerminal): Returns: The current value. """ - return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0 + return self.read_normalized(channel) * 16.0 + 4.0 class KL3042(AnalogInputTerminal): @@ -222,7 +234,7 @@ class KL3042(AnalogInputTerminal): KL3042: 2x analog input 0...20 mA 12 Bit single-ended """ # Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status) - parameters = {'input_word_width': 4, 'output_word_width': 4} + parameters = {'input_word_width': 2} def read_current(self, channel: int) -> float: """ @@ -234,7 +246,7 @@ class KL3042(AnalogInputTerminal): Returns: The current value. """ - return self.read_normalized(channel * 2 - 1) * 20.0 + return self.read_normalized(channel) * 20.0 class KL3202(AnalogInputTerminal): @@ -242,7 +254,7 @@ class KL3202(AnalogInputTerminal): KL3202: 2x analog input PT100 16 Bit 3-wire """ # Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional) - parameters = {'input_word_width': 4, 'output_word_width': 4} + parameters = {'input_word_width': 2} def read_temperature(self, channel: int) -> float: """ @@ -254,7 +266,7 @@ class KL3202(AnalogInputTerminal): Returns: The temperature value in °C. """ - val = self.read_word(channel * 2 - 1) + val = self.read_channel_word(channel) if val > 0x7FFF: return (val - 0x10000) / 10.0 else: @@ -267,7 +279,7 @@ class KL3214(AnalogInputTerminal): """ # inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional) # out: 4 x 8 Bit Control (optional) - parameters = {'input_word_width': 8, 'output_word_width': 8} + parameters = {'input_word_width': 4} def read_temperature(self, channel: int) -> float: """ @@ -279,7 +291,7 @@ class KL3214(AnalogInputTerminal): Returns: The temperature value. """ - val = self.read_word(channel * 2 - 1) + val = self.read_channel_word(channel) if val > 0x7FFF: return (val - 0x10000) / 10.0 else: @@ -291,7 +303,7 @@ class KL4002(AnalogOutputTerminal): KL4002: 2x analog output 0...10 V 12 Bit differentiell """ # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) - parameters = {'input_word_width': 4, 'output_word_width': 4} + parameters = {'output_word_width': 2} def set_voltage(self, channel: int, value: float): """ @@ -309,7 +321,7 @@ class KL4132(AnalogOutputTerminal): KL4002: 2x analog output ±10 V 16 bit differential """ # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) - parameters = {'input_word_width': 4, 'output_word_width': 4} + parameters = {'output_word_width': 2} def set_normalized(self, channel: int, value: float): """ @@ -320,9 +332,9 @@ class KL4132(AnalogOutputTerminal): value: The normalized value to set. """ if value >= 0: - self.write_word(channel - 1, int(value * 0x7FFF)) + self.write_channel_word(channel, int(value * 0x7FFF)) else: - self.write_word(channel - 1, int(0x10000 + value * 0x7FFF)) + self.write_channel_word(channel, int(0x10000 + value * 0x7FFF)) def set_voltage(self, channel: int, value: float): """ @@ -340,7 +352,7 @@ class KL4004(AnalogOutputTerminal): KL4004: 4x analog output 0...10 V 12 Bit differentiell """ # Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status) - parameters = {'input_word_width': 8, 'output_word_width': 8} + parameters = {'output_word_width': 4} def set_voltage(self, channel: int, value: float): """