Compare commits

..

6 Commits

9 changed files with 236 additions and 118 deletions

View File

@ -5,7 +5,7 @@ authors:
- family-names: Kruse - family-names: Kruse
given-names: Nicolas given-names: Nicolas
orcid: "https://orcid.org/0000-0001-6758-2269" orcid: "https://orcid.org/0000-0001-6758-2269"
version: 1.0.0 version: 1.0.1
#date-released: "2025-04-01" #date-released: "2025-04-01"
#identifiers: #identifiers:
# - description: This is the collection of archived snapshots of all versions of My Research Software # - description: This is the collection of archived snapshots of all versions of My Research Software

View File

@ -9,7 +9,7 @@ over Ethernet TCP/IP based on ModBus TCP.
### Key Features ### Key Features
- Supports a wide range of Beckhoff and WAGO analog and digital bus - Supports a wide range of Beckhoff and WAGO analog and digital bus
terminals. terminals.
- Very light weight: no dependencies; compact code base - Very lightweight: no dependencies; compact code base
- Easy to extend - Easy to extend
- Using standardized ModBus TCP. - Using standardized ModBus TCP.
- Provides high-level abstractions for reading and writing data - Provides high-level abstractions for reading and writing data

View File

@ -9,6 +9,10 @@ cd pyhoff
python -m venv ./.venv python -m venv ./.venv
source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate` source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate`
# Update version number in
# - pyproject.toml
# - CITATION.cff
# Check code: # Check code:
pip install -r requirements-dev.txt pip install -r requirements-dev.txt
flake8 flake8

View File

@ -1,6 +1,6 @@
[project] [project]
name = "pyhoff" name = "pyhoff"
version = "1.0.0" version = "1.0.1"
authors = [ authors = [
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" }, { name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
] ]

View File

@ -1,5 +1,5 @@
from .modbus import SimpleModbusClient from .modbus import SimpleModbusClient
from typing import Type, Any from typing import Type
class BusTerminal(): class BusTerminal():
@ -8,10 +8,10 @@ class BusTerminal():
Args: Args:
bus_coupler: The bus coupler to which this terminal is connected. bus_coupler: The bus coupler to which this terminal is connected.
output_bit_offset: The offset of the output bits. output_bit_addresses: List of addresses of the output bits.
input_bit_offset: The offset of the input bits. input_bit_addresses: List of addresses of input bits.
output_word_offset: The offset of the output words. output_word_addresses: List of addresses of output words.
input_word_offset: The offset of the input words. input_word_addresses: List of addresses of input words.
Attributes: Attributes:
bus_coupler: The bus coupler to which this terminal is connected. bus_coupler: The bus coupler to which this terminal is connected.
@ -20,16 +20,18 @@ class BusTerminal():
parameters: dict[str, int] = {} parameters: dict[str, int] = {}
def __init__(self, bus_coupler: 'BusCoupler', def __init__(self, bus_coupler: 'BusCoupler',
output_bit_offset: int, output_bit_addresses: list[int],
input_bit_offset: int, input_bit_addresses: list[int],
output_word_offset: int, output_word_addresses: list[int],
input_word_offset: int): input_word_addresses: list[int],
mixed_mapping: bool):
self._output_bit_offset = output_bit_offset
self._input_bit_offset = input_bit_offset
self._output_word_offset = output_word_offset
self._input_word_offset = input_word_offset
self.bus_coupler = bus_coupler self.bus_coupler = bus_coupler
self._output_bit_addresses = output_bit_addresses
self._input_bit_addresses = input_bit_addresses
self._output_word_addresses = output_word_addresses
self._input_word_addresses = input_word_addresses
self._mixed_mapping = mixed_mapping
class DigitalInputTerminal(BusTerminal): class DigitalInputTerminal(BusTerminal):
@ -51,7 +53,7 @@ class DigitalInputTerminal(BusTerminal):
""" """
if channel < 1 or channel > self.parameters['input_bit_width']: if channel < 1 or channel > self.parameters['input_bit_width']:
raise Exception("address out of range") raise Exception("address out of range")
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1) return self.bus_coupler.modbus.read_discrete_input(self._input_bit_addresses[channel - 1])
class DigitalOutputTerminal(BusTerminal): class DigitalOutputTerminal(BusTerminal):
@ -74,7 +76,7 @@ class DigitalOutputTerminal(BusTerminal):
""" """
if channel < 1 or channel > self.parameters['output_bit_width']: if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range") raise Exception("address out of range")
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value) return self.bus_coupler.modbus.write_single_coil(self._output_bit_addresses[channel - 1], value)
def read_coil(self, channel: int) -> bool | None: def read_coil(self, channel: int) -> bool | None:
""" """
@ -91,46 +93,32 @@ class DigitalOutputTerminal(BusTerminal):
""" """
if channel < 1 or channel > self.parameters['output_bit_width']: if channel < 1 or channel > self.parameters['output_bit_width']:
raise Exception("address out of range") raise Exception("address out of range")
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1) return self.bus_coupler.modbus.read_coil(self._output_bit_addresses[channel - 1])
class AnalogInputTerminal(BusTerminal): class AnalogInputTerminal(BusTerminal):
""" """
Base class for analog input terminals. Base class for analog input terminals.
""" """
def read_words(self, word_offset: int, word_count: int) -> list[int] | None: def read_channel_word(self, channel: int, error_value: int = -99999) -> int:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
""" """
Read a single word from the terminal. Read a single word from the terminal.
Args: Args:
word_offset: The word offset (0 based index) to read from. channel: The channel number (1 based index) to read from.
Returns: Returns:
The read word value. The read word value.
Raises:
Exception: If the word offset or count is out of range.
""" """
val = self.read_words(word_offset, 1) assert 1 <= channel <= self.parameters['input_word_width'], \
if val: f"channel out of range, must be between {1} and {self.parameters['input_word_width']}"
return val[0]
else: value = self.bus_coupler.modbus.read_input_registers(self._input_word_addresses[channel - 1], 1)
return -999
return value[0] if value else error_value
def read_normalized(self, channel: int) -> float: def read_normalized(self, channel: int) -> float:
""" """
@ -142,64 +130,51 @@ class AnalogInputTerminal(BusTerminal):
Returns: Returns:
The normalized value. The normalized value.
""" """
return self.read_word(channel * 2 - 1) / 0x7FFF return self.read_channel_word(channel) / 0x7FFF
class AnalogOutputTerminal(BusTerminal): class AnalogOutputTerminal(BusTerminal):
""" """
Base class for analog output terminals. Base class for analog output terminals.
""" """
def read_words(self, word_offset: int, word_count: int) -> list[int] | None: def read_channel_word(self, channel: int, error_value: int = -99999) -> int:
"""
Read a list of words from the terminal.
Args:
word_offset: The starting word offset (0 based index).
word_count: The number of words to read.
Returns:
The read words.
Raises:
Exception: If the word offset or count is out of range.
"""
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
raise Exception("address out of range")
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
def read_word(self, word_offset: int) -> int:
""" """
Read a single word from the terminal. Read a single word from the terminal.
Args: Args:
word_offset: The word offset (0 based index) to read from. channel: The channel number (1 based index) to read from.
Returns: Returns:
The read word value. The read word value.
"""
val = self.read_words(word_offset, 1)
if val:
return val[0]
else:
return -999
def write_word(self, word_offset: int, data: int) -> bool: Raises:
Exception: If the word offset or count is out of range.
"""
assert not self._mixed_mapping, 'Reading of output state is not supported with this Bus Coupler.'
assert 1 <= channel <= self.parameters['output_word_width'], \
f"channel out of range, must be between {1} and {self.parameters['output_word_width']}"
value = self.bus_coupler.modbus.read_holding_registers(self._output_word_addresses[channel - 1], 1)
return value[0] if value else error_value
def write_channel_word(self, channel: int, value: int) -> int:
""" """
Write a word to the terminal. Write a word to the terminal.
Args: Args:
word_offset: The word offset to write to. channel: The channel number (1 based index) to write to.
data: The data to write.
Returns: Returns:
The result of the write operation. True if the write operation succeeded.
Raises: Raises:
Exception: If the word offset is out of range. Exception: If the word offset or count is out of range.
""" """
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']: assert 1 <= channel <= self.parameters['output_word_width'], \
raise Exception("address out of range") f"channel out of range, must be between {1} and {self.parameters['output_word_width']}"
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value)
def set_normalized(self, channel: int, value: float): def set_normalized(self, channel: int, value: float):
""" """
@ -209,7 +184,7 @@ class AnalogOutputTerminal(BusTerminal):
channel: The channel number to set. channel: The channel number to set.
value: The normalized value to set. value: The normalized value to set.
""" """
self.write_word(channel * 2 - 1, int(value * 0x7FFF)) self.write_channel_word(channel, int(value * 0x7FFF))
class BusCoupler(): class BusCoupler():
@ -242,11 +217,14 @@ class BusCoupler():
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [], def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
timeout: float = 5, watchdog: float = 0, debug: bool = False): timeout: float = 5, watchdog: float = 0, debug: bool = False):
self.bus_terminals: list[Any] = list() self.bus_terminals: list[BusTerminal] = list()
self._next_output_bit_offset = 0 self._next_output_bit_offset = 0
self._next_input_bit_offset = 0 self._next_input_bit_offset = 0
self._next_output_word_offset = 0 self._next_output_word_offset = 0
self._next_input_word_offset = 0 self._next_input_word_offset = 0
self._channel_spacing = 1
self._channel_offset = 0
self._mixed_mapping = True
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug) self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
self.add_bus_terminals(bus_terminals) self.add_bus_terminals(bus_terminals)
@ -255,7 +233,7 @@ class BusCoupler():
def _init_hardware(self, watchdog: float): def _init_hardware(self, watchdog: float):
pass pass
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]: def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
""" """
Add bus terminals to the bus coupler. Add bus terminals to the bus coupler.
@ -265,18 +243,36 @@ class BusCoupler():
Returns: Returns:
The corresponding list of bus terminal objects. The corresponding list of bus terminal objects.
""" """
for terminal_class in bus_terminals: for terminal_class in bus_terminals:
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal' assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
new_terminal = terminal_class(self,
self._next_output_bit_offset,
self._next_input_bit_offset,
self._next_output_word_offset,
self._next_input_word_offset)
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0) def get_para(key: str):
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0) return terminal_class.parameters.get(key, 0)
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0) new_terminal = terminal_class(
self,
[i + self._next_output_bit_offset for i in range(get_para('output_bit_width'))],
[i + self._next_input_bit_offset for i in range(get_para('input_bit_width'))],
[i * self._channel_spacing + self._channel_offset + self._next_output_word_offset
for i in range(get_para('output_word_width'))],
[i * self._channel_spacing + self._channel_offset + self._next_input_word_offset
for i in range(get_para('input_word_width'))],
self._mixed_mapping)
output_word_width = get_para('output_word_width')
input_word_width = get_para('input_word_width')
if self._mixed_mapping:
# Shared mapping for word based inputs and outputs
word_width = max(output_word_width, input_word_width)
output_word_width = word_width
input_word_width = word_width
self._next_output_bit_offset += get_para('output_bit_width')
self._next_input_bit_offset += get_para('input_bit_width')
self._next_output_word_offset += output_word_width * self._channel_spacing
self._next_input_word_offset += input_word_width * self._channel_spacing
self.bus_terminals.append(new_terminal) self.bus_terminals.append(new_terminal)

View File

@ -19,7 +19,11 @@ class BK9000(BusCoupler):
self.modbus.write_single_register(0x1121, 0xAFFE) self.modbus.write_single_register(0x1121, 0xAFFE)
# set process image offset # set process image offset
self.next_output_word_offset = 0x0800 self._next_output_word_offset = 0x0800
# set channel placement for terminal mapping
self._channel_spacing = 2
self._channel_offset = 1
class BK9050(BK9000): class BK9050(BK9000):
@ -36,7 +40,7 @@ class BK9100(BK9000):
pass pass
class WAGO750_352(BusCoupler): class WAGO_750_352(BusCoupler):
""" """
Wago 750-352 ModBus TCP bus coupler Wago 750-352 ModBus TCP bus coupler
""" """
@ -53,7 +57,11 @@ class WAGO750_352(BusCoupler):
self.modbus.write_single_register(0x1001, 0xFFFF) self.modbus.write_single_register(0x1001, 0xFFFF)
# set process image offset # set process image offset
self.next_output_word_offset = 0x0000 self._next_output_word_offset = 0x0000
self._next_output_bit_offset = 512
# set separated input output mapping
self._mixed_mapping = False
class DigitalInputTerminal4Bit(DigitalInputTerminal): class DigitalInputTerminal4Bit(DigitalInputTerminal):
@ -160,14 +168,14 @@ class WAGO_750_530(DigitalOutputTerminal8Bit):
class KL1512(AnalogInputTerminal): class KL1512(AnalogInputTerminal):
""" """
KL1512: 2x digital input, counter, 24 V DC, 1 kHz KL1512: 2x 16 bit counter, 24 V DC, 1 kHz
""" """
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) # Input: 2 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'input_word_width': 2}
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int): def __init__(self, bus_coupler: BusCoupler, o_b_addr: list[int], i_b_addr: list[int], o_w_addr: list[int], i_w_addr: list[int], mixed_mapping: bool):
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset) super().__init__(bus_coupler, o_b_addr, i_b_addr, o_w_addr, i_w_addr, mixed_mapping)
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)] self._last_counter_values = [self.read_channel_word(1), self.read_channel_word(2)]
def read_counter(self, channel: int) -> int: def read_counter(self, channel: int) -> int:
""" """
@ -180,7 +188,7 @@ class KL1512(AnalogInputTerminal):
The counter value. The counter value.
""" """
return self.read_word(channel * 2 - 1) return self.read_channel_word(channel)
def read_delta(self, channel: int) -> int: def read_delta(self, channel: int) -> int:
""" """
@ -192,9 +200,13 @@ class KL1512(AnalogInputTerminal):
Returns: Returns:
The counter value. The counter value.
""" """
# TODO: handel overflow new_count = self.read_channel_word(channel)
new_count = self.read_word(channel * 2 - 1) delta = new_count - self._last_counter_values[channel - 1]
return new_count - self._last_counter_values[channel - 1] if delta > 0x8000:
delta = delta - 0x10000
elif delta < -0x8000:
delta = delta + 0x10000
return delta
class KL3054(AnalogInputTerminal): class KL3054(AnalogInputTerminal):
@ -202,7 +214,7 @@ class KL3054(AnalogInputTerminal):
KL3054: 4x analog input 4...20 mA 12 Bit single-ended KL3054: 4x analog input 4...20 mA 12 Bit single-ended
""" """
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8} parameters = {'input_word_width': 4}
def read_current(self, channel: int) -> float: def read_current(self, channel: int) -> float:
""" """
@ -214,7 +226,7 @@ class KL3054(AnalogInputTerminal):
Returns: Returns:
The current value. The current value.
""" """
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0 return self.read_normalized(channel) * 16.0 + 4.0
class KL3042(AnalogInputTerminal): class KL3042(AnalogInputTerminal):
@ -222,7 +234,7 @@ class KL3042(AnalogInputTerminal):
KL3042: 2x analog input 0...20 mA 12 Bit single-ended KL3042: 2x analog input 0...20 mA 12 Bit single-ended
""" """
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status) # Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'input_word_width': 2}
def read_current(self, channel: int) -> float: def read_current(self, channel: int) -> float:
""" """
@ -234,7 +246,7 @@ class KL3042(AnalogInputTerminal):
Returns: Returns:
The current value. The current value.
""" """
return self.read_normalized(channel * 2 - 1) * 20.0 return self.read_normalized(channel) * 20.0
class KL3202(AnalogInputTerminal): class KL3202(AnalogInputTerminal):
@ -242,7 +254,7 @@ class KL3202(AnalogInputTerminal):
KL3202: 2x analog input PT100 16 Bit 3-wire KL3202: 2x analog input PT100 16 Bit 3-wire
""" """
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional) # Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'input_word_width': 2}
def read_temperature(self, channel: int) -> float: def read_temperature(self, channel: int) -> float:
""" """
@ -254,7 +266,7 @@ class KL3202(AnalogInputTerminal):
Returns: Returns:
The temperature value in °C. The temperature value in °C.
""" """
val = self.read_word(channel * 2 - 1) val = self.read_channel_word(channel)
if val > 0x7FFF: if val > 0x7FFF:
return (val - 0x10000) / 10.0 return (val - 0x10000) / 10.0
else: else:
@ -267,7 +279,7 @@ class KL3214(AnalogInputTerminal):
""" """
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional) # inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
# out: 4 x 8 Bit Control (optional) # out: 4 x 8 Bit Control (optional)
parameters = {'input_word_width': 8, 'output_word_width': 8} parameters = {'input_word_width': 4}
def read_temperature(self, channel: int) -> float: def read_temperature(self, channel: int) -> float:
""" """
@ -279,7 +291,7 @@ class KL3214(AnalogInputTerminal):
Returns: Returns:
The temperature value. The temperature value.
""" """
val = self.read_word(channel * 2 - 1) val = self.read_channel_word(channel)
if val > 0x7FFF: if val > 0x7FFF:
return (val - 0x10000) / 10.0 return (val - 0x10000) / 10.0
else: else:
@ -291,7 +303,7 @@ class KL4002(AnalogOutputTerminal):
KL4002: 2x analog output 0...10 V 12 Bit differentiell KL4002: 2x analog output 0...10 V 12 Bit differentiell
""" """
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'output_word_width': 2}
def set_voltage(self, channel: int, value: float): def set_voltage(self, channel: int, value: float):
""" """
@ -309,7 +321,7 @@ class KL4132(AnalogOutputTerminal):
KL4002: 2x analog output ±10 V 16 bit differential KL4002: 2x analog output ±10 V 16 bit differential
""" """
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
parameters = {'input_word_width': 4, 'output_word_width': 4} parameters = {'output_word_width': 2}
def set_normalized(self, channel: int, value: float): def set_normalized(self, channel: int, value: float):
""" """
@ -320,9 +332,9 @@ class KL4132(AnalogOutputTerminal):
value: The normalized value to set. value: The normalized value to set.
""" """
if value >= 0: if value >= 0:
self.write_word(channel - 1, int(value * 0x7FFF)) self.write_channel_word(channel, int(value * 0x7FFF))
else: else:
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF)) self.write_channel_word(channel, int(0x10000 + value * 0x7FFF))
def set_voltage(self, channel: int, value: float): def set_voltage(self, channel: int, value: float):
""" """
@ -340,7 +352,7 @@ class KL4004(AnalogOutputTerminal):
KL4004: 4x analog output 0...10 V 12 Bit differentiell KL4004: 4x analog output 0...10 V 12 Bit differentiell
""" """
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status) # Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
parameters = {'input_word_width': 8, 'output_word_width': 8} parameters = {'output_word_width': 4}
def set_voltage(self, channel: int, value: float): def set_voltage(self, channel: int, value: float):
""" """

View File

@ -425,10 +425,15 @@ class SimpleModbusClient:
buffer = bytes() buffer = bytes()
while len(buffer) < number_of_bytes: while len(buffer) < number_of_bytes:
try: try:
buffer += self._socket.recv(number_of_bytes - len(buffer)) tx_data = self._socket.recv(number_of_bytes - len(buffer))
except socket.error: except socket.error:
return bytes() return bytes()
if tx_data:
buffer += tx_data
else:
return bytes()
if self.debug: if self.debug:
print(f'<- Received: {' '.join(hex(b) for b in buffer)}') print(f'<- Received: {' '.join(hex(b) for b in buffer)}')
@ -512,6 +517,8 @@ class SimpleModbusClient:
if data[0] > 0x80: if data[0] > 0x80:
self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})' self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})'
if self.debug:
print(self.last_error)
return bytes() return bytes()
return data[1:] return data[1:]

66
tests/test_beckh_tace.py Normal file
View File

@ -0,0 +1,66 @@
import pyhoff as pyhoff
from typing import Type
from pyhoff.devices import KL2404, KL2424, KL9100, KL1104, \
KL9188, KL3054, KL3214, KL4004, KL9010, BK9050
def test_against_old_traces():
"""
Test modbus tcp byte streams against data from an old
known good implementation for some Beckhoff terminals.
"""
debug_data: list[str] = list()
# dummy modbus send function
def debug_send_dummy(data: bytes) -> int:
print(f'-> Send: {' '.join(hex(b) for b in data)}')
for b in data:
debug_data.append(f"{b:02X}")
return len(data)
terminals_list: list[Type[pyhoff.BusTerminal]] = [KL2404, KL2424, KL2424, KL2424, KL9100, KL1104,
KL1104, KL2404, KL9188, KL3054, KL3054, KL3214,
KL3214, KL3214, KL4004, KL4004, KL9010]
bk = BK9050("localhost", 11255, timeout=0.001)
# injecting debug function
bk.modbus._send = debug_send_dummy # type: ignore
bts = bk.add_bus_terminals(terminals_list)
terminal1 = bts[15]
assert isinstance(terminal1, KL4004)
ref_data = ['86', 'E2', '00', '00', '00', '06', '01', '06', '08', '35', '71', 'A9']
debug_data.clear()
terminal1.set_voltage(3, 8.88)
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
terminal2 = bts[13]
assert isinstance(terminal2, KL3214)
ref_data = ['8B', '18', '00', '00', '00', '06', '01', '04', '00', '25', '00', '01']
debug_data.clear()
terminal2.read_temperature(3)
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
ref_data = ['08', 'F8', '00', '00', '00', '06', '01', '04', '00', '27', '00', '01']
debug_data.clear()
terminal2.read_temperature(4)
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
terminal3 = bts[7]
assert isinstance(terminal3, KL2404)
ref_data = ['80', '8F', '00', '00', '00', '06', '01', '05', '00', '12', 'FF', '00']
debug_data.clear()
terminal3.write_coil(3, True)
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
ref_data = ['23', '96', '00', '00', '00', '06', '01', '01', '00', '13', '00', '01']
debug_data.clear()
terminal3.read_coil(4)
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
if __name__ == '__main__':
test_against_old_traces()

View File

@ -39,6 +39,30 @@ def test_terminal_plausib():
assert o.parameters.get('output_word_width', 0) > 0 assert o.parameters.get('output_word_width', 0) > 0
def rw_all_bus_terminals(bus_cupler: pyhoff.BusCoupler):
for bt in bus_cupler.bus_terminals:
if isinstance(bt, AnalogOutputTerminal):
for channel in range(1, bt.parameters.get('output_word_width', 0) + 1):
bt.set_normalized(channel, 0)
bt.set_normalized(channel, 1)
bt.set_normalized(channel, 2)
if isinstance(bt, AnalogInputTerminal):
for channel in range(1, bt.parameters.get('input_word_width', 0) + 1):
assert bt.read_channel_word(channel, 1337) == 1337
assert bt.read_channel_word(channel, 1337) == 1337
assert bt.read_channel_word(channel, 1337) == 1337
if isinstance(bt, DigitalOutputTerminal):
for channel in range(1, bt.parameters.get('output_bit_width', 0) + 1):
assert not bt.write_coil(channel, True)
assert not bt.write_coil(channel, False)
if isinstance(bt, DigitalInputTerminal):
for channel in range(1, bt.parameters.get('input_bit_width', 0) + 1):
assert bt.read_input(channel) is None
def test_terminal_setup(): def test_terminal_setup():
""" """
Test if all implemented BusTerminal classes in devices can Test if all implemented BusTerminal classes in devices can
@ -55,7 +79,16 @@ def test_terminal_setup():
print(n) print(n)
terminal_classes.append(o) terminal_classes.append(o)
# Beckhoff
bus_cupler = devices.BK9050('localhost', 11255, terminal_classes, timeout=0.001) bus_cupler = devices.BK9050('localhost', 11255, terminal_classes, timeout=0.001)
assert len(terminal_classes) == len(bus_cupler.bus_terminals) assert len(terminal_classes) == len(bus_cupler.bus_terminals)
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error() assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()
rw_all_bus_terminals(bus_cupler)
# Wago
bus_cupler = devices.WAGO_750_352('localhost', 11255, terminal_classes, timeout=0.001)
assert len(terminal_classes) == len(bus_cupler.bus_terminals)
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()
rw_all_bus_terminals(bus_cupler)