Mapping for different BusCoupler revised and fixed; WAGO750_352 renamed to WAGO_750_352

This commit is contained in:
Nicolas 2025-02-19 09:59:54 +01:00
parent 5e52a3f866
commit b2e51581bf
2 changed files with 122 additions and 114 deletions

View File

@ -1,5 +1,5 @@
from .modbus import SimpleModbusClient from .modbus import SimpleModbusClient
from typing import Type, Any from typing import Type
class BusTerminal(): class BusTerminal():
@ -8,10 +8,10 @@ class BusTerminal():
Args: Args:
bus_coupler: The bus coupler to which this terminal is connected. bus_coupler: The bus coupler to which this terminal is connected.
output_bit_offset: The offset of the output bits. output_bit_addresses: List of addresses of the output bits.
input_bit_offset: The offset of the input bits. input_bit_addresses: List of addresses of input bits.
output_word_offset: The offset of the output words. output_word_addresses: List of addresses of output words.
input_word_offset: The offset of the input words. input_word_addresses: List of addresses of input words.
Attributes: Attributes:
bus_coupler: The bus coupler to which this terminal is connected. bus_coupler: The bus coupler to which this terminal is connected.
@ -20,16 +20,18 @@ class BusTerminal():
parameters: dict[str, int] = {} parameters: dict[str, int] = {}
def __init__(self, bus_coupler: 'BusCoupler', def __init__(self, bus_coupler: 'BusCoupler',
output_bit_offset: int, output_bit_addresses: list[int],
input_bit_offset: int, input_bit_addresses: list[int],
output_word_offset: int, output_word_addresses: list[int],
input_word_offset: int): input_word_addresses: list[int],
mixed_mapping: bool):
self._output_bit_offset = output_bit_offset
self._input_bit_offset = input_bit_offset
self._output_word_offset = output_word_offset
self._input_word_offset = input_word_offset
self.bus_coupler = bus_coupler self.bus_coupler = bus_coupler
self._output_bit_addresses = output_bit_addresses
self._input_bit_addresses = input_bit_addresses
self._output_word_addresses = output_word_addresses
self._input_word_addresses = input_word_addresses
self._mixed_mapping = mixed_mapping
class DigitalInputTerminal(BusTerminal): class DigitalInputTerminal(BusTerminal):
@ -51,7 +53,7 @@ class DigitalInputTerminal(BusTerminal):
""" """
if channel < 1 or channel > self.parameters['input_bit_width']: if channel < 1 or channel > self.parameters['input_bit_width']:
raise Exception("address out of range") raise Exception("address out of range")
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1) return self.bus_coupler.modbus.read_discrete_input(self._input_bit_addresses[channel - 1])
class DigitalOutputTerminal(BusTerminal): class DigitalOutputTerminal(BusTerminal):
@ -74,7 +76,7 @@ class DigitalOutputTerminal(BusTerminal):
""" """
if channel < 1 or channel > self.parameters['output_bit_width']: if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range") raise Exception("address out of range")
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value) return self.bus_coupler.modbus.write_single_coil(self._output_bit_addresses[channel - 1], value)
def read_coil(self, channel: int) -> bool | None: def read_coil(self, channel: int) -> bool | None:
""" """
@ -91,46 +93,32 @@ class DigitalOutputTerminal(BusTerminal):
""" """
if channel < 1 or channel > self.parameters['output_bit_width']: if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range") raise Exception("address out of range")
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1) return self.bus_coupler.modbus.read_coil(self._output_bit_addresses[channel - 1])
class AnalogInputTerminal(BusTerminal): class AnalogInputTerminal(BusTerminal):
""" """
Base class for analog input terminals. Base class for analog input terminals.
""" """
def read_words(self, word_offset: int, word_count: int) -> list[int] | None: def read_channel_word(self, channel: int, error_value: int = -99999) -> int:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
""" """
Read a single word from the terminal. Read a single word from the terminal.
Args: Args:
word_offset: The word offset (0 based index) to read from. channel: The channel number (1 based index) to read from.
Returns: Returns:
The read word value. The read word value.
Raises:
Exception: If the word offset or count is out of range.
""" """
val = self.read_words(word_offset, 1) assert 1 <= channel <= self.parameters['input_word_width'], \
if val: f"channel out of range, must be between {1} and {self.parameters['input_word_width']}"
return val[0]
else: value = self.bus_coupler.modbus.read_input_registers(self._input_word_addresses[channel - 1], 1)
return -999
return value[0] if value else error_value
def read_normalized(self, channel: int) -> float: def read_normalized(self, channel: int) -> float:
""" """
@ -142,64 +130,51 @@ class AnalogInputTerminal(BusTerminal):
Returns: Returns:
The normalized value. The normalized value.
""" """
return self.read_word(channel * 2 - 1) / 0x7FFF return self.read_channel_word(channel) / 0x7FFF
class AnalogOutputTerminal(BusTerminal): class AnalogOutputTerminal(BusTerminal):
""" """
Base class for analog output terminals. Base class for analog output terminals.
""" """
def read_words(self, word_offset: int, word_count: int) -> list[int] | None: def read_channel_word(self, channel: int, error_value: int = -99999) -> int:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
""" """
Read a single word from the terminal. Read a single word from the terminal.
Args: Args:
word_offset: The word offset (0 based index) to read from. channel: The channel number (1 based index) to read from.
Returns: Returns:
The read word value. The read word value.
"""
val = self.read_words(word_offset, 1)
if val:
return val[0]
else:
return -999
def write_word(self, word_offset: int, data: int) -> bool: Raises:
Exception: If the word offset or count is out of range.
"""
assert not self._mixed_mapping, 'Reading of output state is not supported with this Bus Coupler.'
assert 1 <= channel <= self.parameters['output_word_width'], \
f"channel out of range, must be between {1} and {self.parameters['output_word_width']}"
value = self.bus_coupler.modbus.read_holding_registers(self._output_word_addresses[channel - 1], 1)
return value[0] if value else error_value
def write_channel_word(self, channel: int, value: int) -> int:
""" """
Write a word to the terminal. Write a word to the terminal.
Args: Args:
word_offset: The word offset to write to. channel: The channel number (1 based index) to write to.
data: The data to write.
Returns: Returns:
The result of the write operation. True if the write operation succeeded.
Raises: Raises:
Exception: If the word offset is out of range. Exception: If the word offset or count is out of range.
""" """
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']: assert 1 <= channel <= self.parameters['output_word_width'], \
raise Exception("address out of range") f"channel out of range, must be between {1} and {self.parameters['output_word_width']}"
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value)
def set_normalized(self, channel: int, value: float): def set_normalized(self, channel: int, value: float):
""" """
@ -209,7 +184,7 @@ class AnalogOutputTerminal(BusTerminal):
channel: The channel number to set. channel: The channel number to set.
value: The normalized value to set. value: The normalized value to set.
""" """
self.write_word(channel * 2 - 1, int(value * 0x7FFF)) self.write_channel_word(channel, int(value * 0x7FFF))
class BusCoupler(): class BusCoupler():
@ -242,11 +217,14 @@ class BusCoupler():
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [], def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False): timeout: float = 5, watchdog: float = 0, debug: bool = False):
self.bus_terminals: list[Any] = list() self.bus_terminals: list[BusTerminal] = list()
self._next_output_bit_offset = 0 self._next_output_bit_offset = 0
self._next_input_bit_offset = 0 self._next_input_bit_offset = 0
self._next_output_word_offset = 0 self._next_output_word_offset = 0
self._next_input_word_offset = 0 self._next_input_word_offset = 0
self._channel_spacing = 1
self._channel_offset = 0
self._mixed_mapping = True
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug) self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
self.add_bus_terminals(bus_terminals) self.add_bus_terminals(bus_terminals)
@ -255,7 +233,7 @@ class BusCoupler():
def _init_hardware(self, watchdog: float): def _init_hardware(self, watchdog: float):
pass pass
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]: def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
""" """
Add bus terminals to the bus coupler. Add bus terminals to the bus coupler.
@ -265,18 +243,36 @@ class BusCoupler():
Returns: Returns:
The corresponding list of bus terminal objects. The corresponding list of bus terminal objects.
""" """
for terminal_class in bus_terminals: for terminal_class in bus_terminals:
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal' assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
new_terminal = terminal_class(self,
self._next_output_bit_offset,
self._next_input_bit_offset,
self._next_output_word_offset,
self._next_input_word_offset)
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0) def get_para(key: str):
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0) return terminal_class.parameters.get(key, 0)
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0) new_terminal = terminal_class(
self,
[i + self._next_output_bit_offset for i in range(get_para('output_bit_width'))],
[i + self._next_input_bit_offset for i in range(get_para('input_bit_width'))],
[i * self._channel_spacing + self._channel_offset + self._next_output_word_offset
for i in range(get_para('output_word_width'))],
[i * self._channel_spacing + self._channel_offset + self._next_input_word_offset
for i in range(get_para('input_word_width'))],
self._mixed_mapping)
output_word_width = get_para('output_word_width')
input_word_width = get_para('input_word_width')
if self._mixed_mapping:
# Shared mapping for word based inputs and outputs
word_width = max(output_word_width, input_word_width)
output_word_width = word_width
input_word_width = word_width
self._next_output_bit_offset += get_para('output_bit_width')
self._next_input_bit_offset += get_para('input_bit_width')
self._next_output_word_offset += output_word_width * self._channel_spacing
self._next_input_word_offset += input_word_width * self._channel_spacing
self.bus_terminals.append(new_terminal) self.bus_terminals.append(new_terminal)

View File

@ -19,7 +19,11 @@ class BK9000(BusCoupler):
self.modbus.write_single_register(0x1121, 0xAFFE) self.modbus.write_single_register(0x1121, 0xAFFE)
# set process image offset # set process image offset
self.next_output_word_offset = 0x0800 self._next_output_word_offset = 0x0800
# set channel placement for terminal mapping
self._channel_spacing = 2
self._channel_offset = 1
class BK9050(BK9000): class BK9050(BK9000):
@ -36,7 +40,7 @@ class BK9100(BK9000):
pass pass
class WAGO750_352(BusCoupler): class WAGO_750_352(BusCoupler):
""" """
Wago 750-352 ModBus TCP bus coupler Wago 750-352 ModBus TCP bus coupler
""" """
@ -53,7 +57,11 @@ class WAGO750_352(BusCoupler):
self.modbus.write_single_register(0x1001, 0xFFFF) self.modbus.write_single_register(0x1001, 0xFFFF)
# set process image offset # set process image offset
self.next_output_word_offset = 0x0000 self._next_output_word_offset = 0x0000
self._next_output_bit_offset = 512
# set separated input output mapping
self._mixed_mapping = False
class DigitalInputTerminal4Bit(DigitalInputTerminal): class DigitalInputTerminal4Bit(DigitalInputTerminal):
@ -160,14 +168,14 @@ class WAGO_750_530(DigitalOutputTerminal8Bit):
class KL1512(AnalogInputTerminal): class KL1512(AnalogInputTerminal):
""" """
KL1512: 2x digital input, counter, 24 V DC, 1 kHz KL1512: 2x 16 bit counter, 24 V DC, 1 kHz
""" """
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) # Input: 2 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'input_word_width': 2}
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int): def __init__(self, bus_coupler: BusCoupler, o_b_addr: list[int], i_b_addr: list[int], o_w_addr: list[int], i_w_addr: list[int], mixed_mapping: bool):
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset) super().__init__(bus_coupler, o_b_addr, i_b_addr, o_w_addr, i_w_addr, mixed_mapping)
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)] self._last_counter_values = [self.read_channel_word(1), self.read_channel_word(2)]
def read_counter(self, channel: int) -> int: def read_counter(self, channel: int) -> int:
""" """
@ -180,7 +188,7 @@ class KL1512(AnalogInputTerminal):
The counter value. The counter value.
""" """
return self.read_word(channel * 2 - 1) return self.read_channel_word(channel)
def read_delta(self, channel: int) -> int: def read_delta(self, channel: int) -> int:
""" """
@ -192,9 +200,13 @@ class KL1512(AnalogInputTerminal):
Returns: Returns:
The counter value. The counter value.
""" """
# TODO: handel overflow new_count = self.read_channel_word(channel)
new_count = self.read_word(channel * 2 - 1) delta = new_count - self._last_counter_values[channel - 1]
return new_count - self._last_counter_values[channel - 1] if delta > 0x8000:
delta = delta - 0x10000
elif delta < -0x8000:
delta = delta + 0x10000
return delta
class KL3054(AnalogInputTerminal): class KL3054(AnalogInputTerminal):
@ -202,7 +214,7 @@ class KL3054(AnalogInputTerminal):
KL3054: 4x analog input 4...20 mA 12 Bit single-ended KL3054: 4x analog input 4...20 mA 12 Bit single-ended
""" """
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8} parameters = {'input_word_width': 4}
def read_current(self, channel: int) -> float: def read_current(self, channel: int) -> float:
""" """
@ -214,7 +226,7 @@ class KL3054(AnalogInputTerminal):
Returns: Returns:
The current value. The current value.
""" """
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0 return self.read_normalized(channel) * 16.0 + 4.0
class KL3042(AnalogInputTerminal): class KL3042(AnalogInputTerminal):
@ -222,7 +234,7 @@ class KL3042(AnalogInputTerminal):
KL3042: 2x analog input 0...20 mA 12 Bit single-ended KL3042: 2x analog input 0...20 mA 12 Bit single-ended
""" """
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status) # Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'input_word_width': 2}
def read_current(self, channel: int) -> float: def read_current(self, channel: int) -> float:
""" """
@ -234,7 +246,7 @@ class KL3042(AnalogInputTerminal):
Returns: Returns:
The current value. The current value.
""" """
return self.read_normalized(channel * 2 - 1) * 20.0 return self.read_normalized(channel) * 20.0
class KL3202(AnalogInputTerminal): class KL3202(AnalogInputTerminal):
@ -242,7 +254,7 @@ class KL3202(AnalogInputTerminal):
KL3202: 2x analog input PT100 16 Bit 3-wire KL3202: 2x analog input PT100 16 Bit 3-wire
""" """
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional) # Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'input_word_width': 2}
def read_temperature(self, channel: int) -> float: def read_temperature(self, channel: int) -> float:
""" """
@ -254,7 +266,7 @@ class KL3202(AnalogInputTerminal):
Returns: Returns:
The temperature value in °C. The temperature value in °C.
""" """
val = self.read_word(channel * 2 - 1) val = self.read_channel_word(channel)
if val > 0x7FFF: if val > 0x7FFF:
return (val - 0x10000) / 10.0 return (val - 0x10000) / 10.0
else: else:
@ -267,7 +279,7 @@ class KL3214(AnalogInputTerminal):
""" """
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional) # inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
# out: 4 x 8 Bit Control (optional) # out: 4 x 8 Bit Control (optional)
parameters = {'input_word_width': 8, 'output_word_width': 8} parameters = {'input_word_width': 4}
def read_temperature(self, channel: int) -> float: def read_temperature(self, channel: int) -> float:
""" """
@ -279,7 +291,7 @@ class KL3214(AnalogInputTerminal):
Returns: Returns:
The temperature value. The temperature value.
""" """
val = self.read_word(channel * 2 - 1) val = self.read_channel_word(channel)
if val > 0x7FFF: if val > 0x7FFF:
return (val - 0x10000) / 10.0 return (val - 0x10000) / 10.0
else: else:
@ -291,7 +303,7 @@ class KL4002(AnalogOutputTerminal):
KL4002: 2x analog output 0...10 V 12 Bit differentiell KL4002: 2x analog output 0...10 V 12 Bit differentiell
""" """
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'output_word_width': 2}
def set_voltage(self, channel: int, value: float): def set_voltage(self, channel: int, value: float):
""" """
@ -309,7 +321,7 @@ class KL4132(AnalogOutputTerminal):
KL4002: 2x analog output ±10 V 16 bit differential KL4002: 2x analog output ±10 V 16 bit differential
""" """
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'output_word_width': 2}
def set_normalized(self, channel: int, value: float): def set_normalized(self, channel: int, value: float):
""" """
@ -320,9 +332,9 @@ class KL4132(AnalogOutputTerminal):
value: The normalized value to set. value: The normalized value to set.
""" """
if value >= 0: if value >= 0:
self.write_word(channel - 1, int(value * 0x7FFF)) self.write_channel_word(channel, int(value * 0x7FFF))
else: else:
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF)) self.write_channel_word(channel, int(0x10000 + value * 0x7FFF))
def set_voltage(self, channel: int, value: float): def set_voltage(self, channel: int, value: float):
""" """
@ -340,7 +352,7 @@ class KL4004(AnalogOutputTerminal):
KL4004: 4x analog output 0...10 V 12 Bit differentiell KL4004: 4x analog output 0...10 V 12 Bit differentiell
""" """
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status) # Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8} parameters = {'output_word_width': 4}
def set_voltage(self, channel: int, value: float): def set_voltage(self, channel: int, value: float):
""" """