mirror of https://github.com/Nonannet/pyhoff.git
Compare commits
No commits in common. "228b1e7a650aaa74be263d12d6d72628640dcdb4" and "5e52a3f866ae79de8c33b7d03d47f5c3e43a35c7" have entirely different histories.
228b1e7a65
...
5e52a3f866
|
@ -5,7 +5,7 @@ authors:
|
|||
- family-names: Kruse
|
||||
given-names: Nicolas
|
||||
orcid: "https://orcid.org/0000-0001-6758-2269"
|
||||
version: 1.0.1
|
||||
version: 1.0.0
|
||||
#date-released: "2025-04-01"
|
||||
#identifiers:
|
||||
# - description: This is the collection of archived snapshots of all versions of My Research Software
|
||||
|
|
|
@ -9,10 +9,6 @@ cd pyhoff
|
|||
python -m venv ./.venv
|
||||
source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate`
|
||||
|
||||
# Update version number in
|
||||
# - pyproject.toml
|
||||
# - CITATION.cff
|
||||
|
||||
# Check code:
|
||||
pip install -r requirements-dev.txt
|
||||
flake8
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[project]
|
||||
name = "pyhoff"
|
||||
version = "1.0.1"
|
||||
version = "1.0.0"
|
||||
authors = [
|
||||
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
|
||||
]
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from .modbus import SimpleModbusClient
|
||||
from typing import Type
|
||||
from typing import Type, Any
|
||||
|
||||
|
||||
class BusTerminal():
|
||||
|
@ -8,10 +8,10 @@ class BusTerminal():
|
|||
|
||||
Args:
|
||||
bus_coupler: The bus coupler to which this terminal is connected.
|
||||
output_bit_addresses: List of addresses of the output bits.
|
||||
input_bit_addresses: List of addresses of input bits.
|
||||
output_word_addresses: List of addresses of output words.
|
||||
input_word_addresses: List of addresses of input words.
|
||||
output_bit_offset: The offset of the output bits.
|
||||
input_bit_offset: The offset of the input bits.
|
||||
output_word_offset: The offset of the output words.
|
||||
input_word_offset: The offset of the input words.
|
||||
|
||||
Attributes:
|
||||
bus_coupler: The bus coupler to which this terminal is connected.
|
||||
|
@ -20,18 +20,16 @@ class BusTerminal():
|
|||
parameters: dict[str, int] = {}
|
||||
|
||||
def __init__(self, bus_coupler: 'BusCoupler',
|
||||
output_bit_addresses: list[int],
|
||||
input_bit_addresses: list[int],
|
||||
output_word_addresses: list[int],
|
||||
input_word_addresses: list[int],
|
||||
mixed_mapping: bool):
|
||||
output_bit_offset: int,
|
||||
input_bit_offset: int,
|
||||
output_word_offset: int,
|
||||
input_word_offset: int):
|
||||
|
||||
self._output_bit_offset = output_bit_offset
|
||||
self._input_bit_offset = input_bit_offset
|
||||
self._output_word_offset = output_word_offset
|
||||
self._input_word_offset = input_word_offset
|
||||
self.bus_coupler = bus_coupler
|
||||
self._output_bit_addresses = output_bit_addresses
|
||||
self._input_bit_addresses = input_bit_addresses
|
||||
self._output_word_addresses = output_word_addresses
|
||||
self._input_word_addresses = input_word_addresses
|
||||
self._mixed_mapping = mixed_mapping
|
||||
|
||||
|
||||
class DigitalInputTerminal(BusTerminal):
|
||||
|
@ -53,7 +51,7 @@ class DigitalInputTerminal(BusTerminal):
|
|||
"""
|
||||
if channel < 1 or channel > self.parameters['input_bit_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_addresses[channel - 1])
|
||||
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1)
|
||||
|
||||
|
||||
class DigitalOutputTerminal(BusTerminal):
|
||||
|
@ -76,7 +74,7 @@ class DigitalOutputTerminal(BusTerminal):
|
|||
"""
|
||||
if channel < 1 or channel > self.parameters['output_bit_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.write_single_coil(self._output_bit_addresses[channel - 1], value)
|
||||
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value)
|
||||
|
||||
def read_coil(self, channel: int) -> bool | None:
|
||||
"""
|
||||
|
@ -93,32 +91,46 @@ class DigitalOutputTerminal(BusTerminal):
|
|||
"""
|
||||
if channel < 1 or channel > self.parameters['output_bit_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_coil(self._output_bit_addresses[channel - 1])
|
||||
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1)
|
||||
|
||||
|
||||
class AnalogInputTerminal(BusTerminal):
|
||||
"""
|
||||
Base class for analog input terminals.
|
||||
"""
|
||||
def read_channel_word(self, channel: int, error_value: int = -99999) -> int:
|
||||
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
|
||||
"""
|
||||
Read a single word from the terminal.
|
||||
Read a list of words from the terminal.
|
||||
|
||||
Args:
|
||||
channel: The channel number (1 based index) to read from.
|
||||
word_offset: The starting word offset (0 based index).
|
||||
word_count: The number of words to read.
|
||||
|
||||
Returns:
|
||||
The read word value.
|
||||
The read words.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
"""
|
||||
assert 1 <= channel <= self.parameters['input_word_width'], \
|
||||
f"channel out of range, must be between {1} and {self.parameters['input_word_width']}"
|
||||
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
|
||||
|
||||
value = self.bus_coupler.modbus.read_input_registers(self._input_word_addresses[channel - 1], 1)
|
||||
def read_word(self, word_offset: int) -> int:
|
||||
"""
|
||||
Read a single word from the terminal.
|
||||
|
||||
return value[0] if value else error_value
|
||||
Args:
|
||||
word_offset: The word offset (0 based index) to read from.
|
||||
|
||||
Returns:
|
||||
The read word value.
|
||||
"""
|
||||
val = self.read_words(word_offset, 1)
|
||||
if val:
|
||||
return val[0]
|
||||
else:
|
||||
return -999
|
||||
|
||||
def read_normalized(self, channel: int) -> float:
|
||||
"""
|
||||
|
@ -130,51 +142,64 @@ class AnalogInputTerminal(BusTerminal):
|
|||
Returns:
|
||||
The normalized value.
|
||||
"""
|
||||
return self.read_channel_word(channel) / 0x7FFF
|
||||
return self.read_word(channel * 2 - 1) / 0x7FFF
|
||||
|
||||
|
||||
class AnalogOutputTerminal(BusTerminal):
|
||||
"""
|
||||
Base class for analog output terminals.
|
||||
"""
|
||||
def read_channel_word(self, channel: int, error_value: int = -99999) -> int:
|
||||
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
|
||||
"""
|
||||
Read a list of words from the terminal.
|
||||
|
||||
Args:
|
||||
word_offset: The starting word offset (0 based index).
|
||||
word_count: The number of words to read.
|
||||
|
||||
Returns:
|
||||
The read words.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
"""
|
||||
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
|
||||
|
||||
def read_word(self, word_offset: int) -> int:
|
||||
"""
|
||||
Read a single word from the terminal.
|
||||
|
||||
Args:
|
||||
channel: The channel number (1 based index) to read from.
|
||||
word_offset: The word offset (0 based index) to read from.
|
||||
|
||||
Returns:
|
||||
The read word value.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
"""
|
||||
assert not self._mixed_mapping, 'Reading of output state is not supported with this Bus Coupler.'
|
||||
assert 1 <= channel <= self.parameters['output_word_width'], \
|
||||
f"channel out of range, must be between {1} and {self.parameters['output_word_width']}"
|
||||
val = self.read_words(word_offset, 1)
|
||||
if val:
|
||||
return val[0]
|
||||
else:
|
||||
return -999
|
||||
|
||||
value = self.bus_coupler.modbus.read_holding_registers(self._output_word_addresses[channel - 1], 1)
|
||||
|
||||
return value[0] if value else error_value
|
||||
|
||||
def write_channel_word(self, channel: int, value: int) -> int:
|
||||
def write_word(self, word_offset: int, data: int) -> bool:
|
||||
"""
|
||||
Write a word to the terminal.
|
||||
|
||||
Args:
|
||||
channel: The channel number (1 based index) to write to.
|
||||
word_offset: The word offset to write to.
|
||||
data: The data to write.
|
||||
|
||||
Returns:
|
||||
True if the write operation succeeded.
|
||||
The result of the write operation.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
Exception: If the word offset is out of range.
|
||||
"""
|
||||
assert 1 <= channel <= self.parameters['output_word_width'], \
|
||||
f"channel out of range, must be between {1} and {self.parameters['output_word_width']}"
|
||||
|
||||
return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value)
|
||||
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
"""
|
||||
|
@ -184,7 +209,7 @@ class AnalogOutputTerminal(BusTerminal):
|
|||
channel: The channel number to set.
|
||||
value: The normalized value to set.
|
||||
"""
|
||||
self.write_channel_word(channel, int(value * 0x7FFF))
|
||||
self.write_word(channel * 2 - 1, int(value * 0x7FFF))
|
||||
|
||||
|
||||
class BusCoupler():
|
||||
|
@ -217,14 +242,11 @@ class BusCoupler():
|
|||
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
|
||||
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
||||
|
||||
self.bus_terminals: list[BusTerminal] = list()
|
||||
self.bus_terminals: list[Any] = list()
|
||||
self._next_output_bit_offset = 0
|
||||
self._next_input_bit_offset = 0
|
||||
self._next_output_word_offset = 0
|
||||
self._next_input_word_offset = 0
|
||||
self._channel_spacing = 1
|
||||
self._channel_offset = 0
|
||||
self._mixed_mapping = True
|
||||
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
|
||||
|
||||
self.add_bus_terminals(bus_terminals)
|
||||
|
@ -233,7 +255,7 @@ class BusCoupler():
|
|||
def _init_hardware(self, watchdog: float):
|
||||
pass
|
||||
|
||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
|
||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]:
|
||||
"""
|
||||
Add bus terminals to the bus coupler.
|
||||
|
||||
|
@ -243,36 +265,18 @@ class BusCoupler():
|
|||
Returns:
|
||||
The corresponding list of bus terminal objects.
|
||||
"""
|
||||
|
||||
for terminal_class in bus_terminals:
|
||||
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
|
||||
new_terminal = terminal_class(self,
|
||||
self._next_output_bit_offset,
|
||||
self._next_input_bit_offset,
|
||||
self._next_output_word_offset,
|
||||
self._next_input_word_offset)
|
||||
|
||||
def get_para(key: str):
|
||||
return terminal_class.parameters.get(key, 0)
|
||||
|
||||
new_terminal = terminal_class(
|
||||
self,
|
||||
[i + self._next_output_bit_offset for i in range(get_para('output_bit_width'))],
|
||||
[i + self._next_input_bit_offset for i in range(get_para('input_bit_width'))],
|
||||
[i * self._channel_spacing + self._channel_offset + self._next_output_word_offset
|
||||
for i in range(get_para('output_word_width'))],
|
||||
[i * self._channel_spacing + self._channel_offset + self._next_input_word_offset
|
||||
for i in range(get_para('input_word_width'))],
|
||||
self._mixed_mapping)
|
||||
|
||||
output_word_width = get_para('output_word_width')
|
||||
input_word_width = get_para('input_word_width')
|
||||
|
||||
if self._mixed_mapping:
|
||||
# Shared mapping for word based inputs and outputs
|
||||
word_width = max(output_word_width, input_word_width)
|
||||
output_word_width = word_width
|
||||
input_word_width = word_width
|
||||
|
||||
self._next_output_bit_offset += get_para('output_bit_width')
|
||||
self._next_input_bit_offset += get_para('input_bit_width')
|
||||
self._next_output_word_offset += output_word_width * self._channel_spacing
|
||||
self._next_input_word_offset += input_word_width * self._channel_spacing
|
||||
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
|
||||
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
|
||||
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
|
||||
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
|
||||
|
||||
self.bus_terminals.append(new_terminal)
|
||||
|
||||
|
|
|
@ -19,11 +19,7 @@ class BK9000(BusCoupler):
|
|||
self.modbus.write_single_register(0x1121, 0xAFFE)
|
||||
|
||||
# set process image offset
|
||||
self._next_output_word_offset = 0x0800
|
||||
|
||||
# set channel placement for terminal mapping
|
||||
self._channel_spacing = 2
|
||||
self._channel_offset = 1
|
||||
self.next_output_word_offset = 0x0800
|
||||
|
||||
|
||||
class BK9050(BK9000):
|
||||
|
@ -40,7 +36,7 @@ class BK9100(BK9000):
|
|||
pass
|
||||
|
||||
|
||||
class WAGO_750_352(BusCoupler):
|
||||
class WAGO750_352(BusCoupler):
|
||||
"""
|
||||
Wago 750-352 ModBus TCP bus coupler
|
||||
"""
|
||||
|
@ -57,11 +53,7 @@ class WAGO_750_352(BusCoupler):
|
|||
self.modbus.write_single_register(0x1001, 0xFFFF)
|
||||
|
||||
# set process image offset
|
||||
self._next_output_word_offset = 0x0000
|
||||
self._next_output_bit_offset = 512
|
||||
|
||||
# set separated input output mapping
|
||||
self._mixed_mapping = False
|
||||
self.next_output_word_offset = 0x0000
|
||||
|
||||
|
||||
class DigitalInputTerminal4Bit(DigitalInputTerminal):
|
||||
|
@ -168,14 +160,14 @@ class WAGO_750_530(DigitalOutputTerminal8Bit):
|
|||
|
||||
class KL1512(AnalogInputTerminal):
|
||||
"""
|
||||
KL1512: 2x 16 bit counter, 24 V DC, 1 kHz
|
||||
KL1512: 2x digital input, counter, 24 V DC, 1 kHz
|
||||
"""
|
||||
# Input: 2 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 2}
|
||||
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def __init__(self, bus_coupler: BusCoupler, o_b_addr: list[int], i_b_addr: list[int], o_w_addr: list[int], i_w_addr: list[int], mixed_mapping: bool):
|
||||
super().__init__(bus_coupler, o_b_addr, i_b_addr, o_w_addr, i_w_addr, mixed_mapping)
|
||||
self._last_counter_values = [self.read_channel_word(1), self.read_channel_word(2)]
|
||||
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int):
|
||||
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset)
|
||||
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)]
|
||||
|
||||
def read_counter(self, channel: int) -> int:
|
||||
"""
|
||||
|
@ -188,7 +180,7 @@ class KL1512(AnalogInputTerminal):
|
|||
The counter value.
|
||||
"""
|
||||
|
||||
return self.read_channel_word(channel)
|
||||
return self.read_word(channel * 2 - 1)
|
||||
|
||||
def read_delta(self, channel: int) -> int:
|
||||
"""
|
||||
|
@ -200,13 +192,9 @@ class KL1512(AnalogInputTerminal):
|
|||
Returns:
|
||||
The counter value.
|
||||
"""
|
||||
new_count = self.read_channel_word(channel)
|
||||
delta = new_count - self._last_counter_values[channel - 1]
|
||||
if delta > 0x8000:
|
||||
delta = delta - 0x10000
|
||||
elif delta < -0x8000:
|
||||
delta = delta + 0x10000
|
||||
return delta
|
||||
# TODO: handel overflow
|
||||
new_count = self.read_word(channel * 2 - 1)
|
||||
return new_count - self._last_counter_values[channel - 1]
|
||||
|
||||
|
||||
class KL3054(AnalogInputTerminal):
|
||||
|
@ -214,7 +202,7 @@ class KL3054(AnalogInputTerminal):
|
|||
KL3054: 4x analog input 4...20 mA 12 Bit single-ended
|
||||
"""
|
||||
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 4}
|
||||
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||
|
||||
def read_current(self, channel: int) -> float:
|
||||
"""
|
||||
|
@ -226,7 +214,7 @@ class KL3054(AnalogInputTerminal):
|
|||
Returns:
|
||||
The current value.
|
||||
"""
|
||||
return self.read_normalized(channel) * 16.0 + 4.0
|
||||
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
|
||||
|
||||
|
||||
class KL3042(AnalogInputTerminal):
|
||||
|
@ -234,7 +222,7 @@ class KL3042(AnalogInputTerminal):
|
|||
KL3042: 2x analog input 0...20 mA 12 Bit single-ended
|
||||
"""
|
||||
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 2}
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def read_current(self, channel: int) -> float:
|
||||
"""
|
||||
|
@ -246,7 +234,7 @@ class KL3042(AnalogInputTerminal):
|
|||
Returns:
|
||||
The current value.
|
||||
"""
|
||||
return self.read_normalized(channel) * 20.0
|
||||
return self.read_normalized(channel * 2 - 1) * 20.0
|
||||
|
||||
|
||||
class KL3202(AnalogInputTerminal):
|
||||
|
@ -254,7 +242,7 @@ class KL3202(AnalogInputTerminal):
|
|||
KL3202: 2x analog input PT100 16 Bit 3-wire
|
||||
"""
|
||||
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
|
||||
parameters = {'input_word_width': 2}
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def read_temperature(self, channel: int) -> float:
|
||||
"""
|
||||
|
@ -266,7 +254,7 @@ class KL3202(AnalogInputTerminal):
|
|||
Returns:
|
||||
The temperature value in °C.
|
||||
"""
|
||||
val = self.read_channel_word(channel)
|
||||
val = self.read_word(channel * 2 - 1)
|
||||
if val > 0x7FFF:
|
||||
return (val - 0x10000) / 10.0
|
||||
else:
|
||||
|
@ -279,7 +267,7 @@ class KL3214(AnalogInputTerminal):
|
|||
"""
|
||||
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
|
||||
# out: 4 x 8 Bit Control (optional)
|
||||
parameters = {'input_word_width': 4}
|
||||
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||
|
||||
def read_temperature(self, channel: int) -> float:
|
||||
"""
|
||||
|
@ -291,7 +279,7 @@ class KL3214(AnalogInputTerminal):
|
|||
Returns:
|
||||
The temperature value.
|
||||
"""
|
||||
val = self.read_channel_word(channel)
|
||||
val = self.read_word(channel * 2 - 1)
|
||||
if val > 0x7FFF:
|
||||
return (val - 0x10000) / 10.0
|
||||
else:
|
||||
|
@ -303,7 +291,7 @@ class KL4002(AnalogOutputTerminal):
|
|||
KL4002: 2x analog output 0...10 V 12 Bit differentiell
|
||||
"""
|
||||
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||
parameters = {'output_word_width': 2}
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
"""
|
||||
|
@ -321,7 +309,7 @@ class KL4132(AnalogOutputTerminal):
|
|||
KL4002: 2x analog output ±10 V 16 bit differential
|
||||
"""
|
||||
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||
parameters = {'output_word_width': 2}
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
"""
|
||||
|
@ -332,9 +320,9 @@ class KL4132(AnalogOutputTerminal):
|
|||
value: The normalized value to set.
|
||||
"""
|
||||
if value >= 0:
|
||||
self.write_channel_word(channel, int(value * 0x7FFF))
|
||||
self.write_word(channel - 1, int(value * 0x7FFF))
|
||||
else:
|
||||
self.write_channel_word(channel, int(0x10000 + value * 0x7FFF))
|
||||
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF))
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
"""
|
||||
|
@ -352,7 +340,7 @@ class KL4004(AnalogOutputTerminal):
|
|||
KL4004: 4x analog output 0...10 V 12 Bit differentiell
|
||||
"""
|
||||
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
|
||||
parameters = {'output_word_width': 4}
|
||||
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
"""
|
||||
|
|
|
@ -425,15 +425,10 @@ class SimpleModbusClient:
|
|||
buffer = bytes()
|
||||
while len(buffer) < number_of_bytes:
|
||||
try:
|
||||
tx_data = self._socket.recv(number_of_bytes - len(buffer))
|
||||
buffer += self._socket.recv(number_of_bytes - len(buffer))
|
||||
except socket.error:
|
||||
return bytes()
|
||||
|
||||
if tx_data:
|
||||
buffer += tx_data
|
||||
else:
|
||||
return bytes()
|
||||
|
||||
if self.debug:
|
||||
print(f'<- Received: {' '.join(hex(b) for b in buffer)}')
|
||||
|
||||
|
@ -517,8 +512,6 @@ class SimpleModbusClient:
|
|||
|
||||
if data[0] > 0x80:
|
||||
self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})'
|
||||
if self.debug:
|
||||
print(self.last_error)
|
||||
return bytes()
|
||||
|
||||
return data[1:]
|
||||
|
|
|
@ -1,66 +0,0 @@
|
|||
import pyhoff as pyhoff
|
||||
from typing import Type
|
||||
from pyhoff.devices import KL2404, KL2424, KL9100, KL1104, \
|
||||
KL9188, KL3054, KL3214, KL4004, KL9010, BK9050
|
||||
|
||||
|
||||
def test_against_old_traces():
|
||||
"""
|
||||
Test modbus tcp byte streams against data from an old
|
||||
known good implementation for some Beckhoff terminals.
|
||||
"""
|
||||
|
||||
debug_data: list[str] = list()
|
||||
|
||||
# dummy modbus send function
|
||||
def debug_send_dummy(data: bytes) -> int:
|
||||
print(f'-> Send: {' '.join(hex(b) for b in data)}')
|
||||
for b in data:
|
||||
debug_data.append(f"{b:02X}")
|
||||
return len(data)
|
||||
|
||||
terminals_list: list[Type[pyhoff.BusTerminal]] = [KL2404, KL2424, KL2424, KL2424, KL9100, KL1104,
|
||||
KL1104, KL2404, KL9188, KL3054, KL3054, KL3214,
|
||||
KL3214, KL3214, KL4004, KL4004, KL9010]
|
||||
|
||||
bk = BK9050("localhost", 11255, timeout=0.001)
|
||||
|
||||
# injecting debug function
|
||||
bk.modbus._send = debug_send_dummy # type: ignore
|
||||
|
||||
bts = bk.add_bus_terminals(terminals_list)
|
||||
|
||||
terminal1 = bts[15]
|
||||
assert isinstance(terminal1, KL4004)
|
||||
ref_data = ['86', 'E2', '00', '00', '00', '06', '01', '06', '08', '35', '71', 'A9']
|
||||
debug_data.clear()
|
||||
terminal1.set_voltage(3, 8.88)
|
||||
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
|
||||
|
||||
terminal2 = bts[13]
|
||||
assert isinstance(terminal2, KL3214)
|
||||
ref_data = ['8B', '18', '00', '00', '00', '06', '01', '04', '00', '25', '00', '01']
|
||||
debug_data.clear()
|
||||
terminal2.read_temperature(3)
|
||||
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
|
||||
|
||||
ref_data = ['08', 'F8', '00', '00', '00', '06', '01', '04', '00', '27', '00', '01']
|
||||
debug_data.clear()
|
||||
terminal2.read_temperature(4)
|
||||
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
|
||||
|
||||
terminal3 = bts[7]
|
||||
assert isinstance(terminal3, KL2404)
|
||||
ref_data = ['80', '8F', '00', '00', '00', '06', '01', '05', '00', '12', 'FF', '00']
|
||||
debug_data.clear()
|
||||
terminal3.write_coil(3, True)
|
||||
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
|
||||
|
||||
ref_data = ['23', '96', '00', '00', '00', '06', '01', '01', '00', '13', '00', '01']
|
||||
debug_data.clear()
|
||||
terminal3.read_coil(4)
|
||||
assert debug_data[2:] == ref_data[2:], print('test:' + ' '.join(debug_data) + '\nref: ' + ' '.join(ref_data) + '\n')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_against_old_traces()
|
|
@ -39,30 +39,6 @@ def test_terminal_plausib():
|
|||
assert o.parameters.get('output_word_width', 0) > 0
|
||||
|
||||
|
||||
def rw_all_bus_terminals(bus_cupler: pyhoff.BusCoupler):
|
||||
for bt in bus_cupler.bus_terminals:
|
||||
if isinstance(bt, AnalogOutputTerminal):
|
||||
for channel in range(1, bt.parameters.get('output_word_width', 0) + 1):
|
||||
bt.set_normalized(channel, 0)
|
||||
bt.set_normalized(channel, 1)
|
||||
bt.set_normalized(channel, 2)
|
||||
|
||||
if isinstance(bt, AnalogInputTerminal):
|
||||
for channel in range(1, bt.parameters.get('input_word_width', 0) + 1):
|
||||
assert bt.read_channel_word(channel, 1337) == 1337
|
||||
assert bt.read_channel_word(channel, 1337) == 1337
|
||||
assert bt.read_channel_word(channel, 1337) == 1337
|
||||
|
||||
if isinstance(bt, DigitalOutputTerminal):
|
||||
for channel in range(1, bt.parameters.get('output_bit_width', 0) + 1):
|
||||
assert not bt.write_coil(channel, True)
|
||||
assert not bt.write_coil(channel, False)
|
||||
|
||||
if isinstance(bt, DigitalInputTerminal):
|
||||
for channel in range(1, bt.parameters.get('input_bit_width', 0) + 1):
|
||||
assert bt.read_input(channel) is None
|
||||
|
||||
|
||||
def test_terminal_setup():
|
||||
"""
|
||||
Test if all implemented BusTerminal classes in devices can
|
||||
|
@ -79,16 +55,7 @@ def test_terminal_setup():
|
|||
print(n)
|
||||
terminal_classes.append(o)
|
||||
|
||||
# Beckhoff
|
||||
bus_cupler = devices.BK9050('localhost', 11255, terminal_classes, timeout=0.001)
|
||||
|
||||
assert len(terminal_classes) == len(bus_cupler.bus_terminals)
|
||||
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()
|
||||
rw_all_bus_terminals(bus_cupler)
|
||||
|
||||
# Wago
|
||||
bus_cupler = devices.WAGO_750_352('localhost', 11255, terminal_classes, timeout=0.001)
|
||||
|
||||
assert len(terminal_classes) == len(bus_cupler.bus_terminals)
|
||||
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()
|
||||
rw_all_bus_terminals(bus_cupler)
|
||||
|
|
Loading…
Reference in New Issue