From e7c4ad6f7cd1caa9f9788b5b2523d1ad2e67fcd7 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 16 Feb 2025 23:49:16 +0100 Subject: [PATCH] full refactored, minimal modbus tcp implementation integrated --- .flake8 | 24 +++ .gitignore | 35 +-- CITATION.cff | 15 ++ LICENSE | 2 +- README.md | 7 +- notes/wago.md | 29 +++ pyhoff.py | 328 ---------------------------- pyproject.toml | 31 +++ src/pyhoff/__init__.py | 306 ++++++++++++++++++++++++++ src/pyhoff/devices.py | 392 +++++++++++++++++++++++++++++++++ src/pyhoff/modbus.py | 467 ++++++++++++++++++++++++++++++++++++++++ tests/test_modbus.py | 36 ++++ tests/test_terminals.py | 34 +++ 13 files changed, 1343 insertions(+), 363 deletions(-) create mode 100644 .flake8 create mode 100644 CITATION.cff create mode 100644 notes/wago.md delete mode 100644 pyhoff.py create mode 100644 pyproject.toml create mode 100644 src/pyhoff/__init__.py create mode 100644 src/pyhoff/devices.py create mode 100644 src/pyhoff/modbus.py create mode 100644 tests/test_modbus.py create mode 100644 tests/test_terminals.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..a783942 --- /dev/null +++ b/.flake8 @@ -0,0 +1,24 @@ +[flake8] +# Specify the maximum allowed line length +max-line-length = 88 + +# Ignore specific rules +# For example, E501: Line too long, W503: Line break before binary operator +ignore = E501, W503, W504 + +# Exclude specific files or directories +exclude = + .git, + __pycache__, + build, + dist, + .conda + test.py + +# Enable specific plugins or options +# Example: Enabling flake8-docstrings +select = C,E,F,W,D + +# Specify custom error codes to ignore or enable +per-file-ignores = + tests/*: D \ No newline at end of file diff --git a/.gitignore b/.gitignore index 68bc17f..538fe7c 100644 --- a/.gitignore +++ b/.gitignore @@ -82,31 +82,6 @@ target/ profile_default/ ipython_config.py -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/#use-with-ide .pdm.toml # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm @@ -152,9 +127,7 @@ dmypy.json # Cython debug symbols cython_debug/ -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +pyModbusTCP_old/ +test.py +test_*.ipynb +settings.json \ No newline at end of file diff --git a/CITATION.cff b/CITATION.cff new file mode 100644 index 0000000..5d59904 --- /dev/null +++ b/CITATION.cff @@ -0,0 +1,15 @@ +cff-version: 1.2.0 +title: pyhoff +abstract: The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP +authors: + - family-names: Kruse + given-names: Nicolas + orcid: "https://orcid.org/0000-0001-6758-2269" +version: 1.0.0 +#date-released: "2025-04-01" +#identifiers: +# - description: This is the collection of archived snapshots of all versions of My Research Software +# type: doi +# value: "10.5281/" +license: MIT License +repository-code: "https://github.com/Nonannet/pyhoff" \ No newline at end of file diff --git a/LICENSE b/LICENSE index e659ddd..2d84a95 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2023 Nonannet +Copyright (c) 2025 Nicolas Kruse Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 339c697..f3b7a45 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,9 @@ # pyhoff -This python code allows to read and write the most common -Beckhoff bus terminals ("Busklemmen") using the bus coupler -BK9050 ("Busskoppler BK9050") over Ethernet TCP/IP. +The pyhoff package allows to read and write the most common +Beckhoff and WAGO bus terminals ("Busklemmen") using the bus +coupler ("Busskoppler") BK9000, BK9050, BK9100 or WAGO 750_352 +over Ethernet TCP/IP based on ModBus TCP. It depends on the package pyModbusTCP. This can be installed with: diff --git a/notes/wago.md b/notes/wago.md new file mode 100644 index 0000000..e648c84 --- /dev/null +++ b/notes/wago.md @@ -0,0 +1,29 @@ +# WAGO IO-Error LED + +Default credentials are: admin:wago + +Generating a new ea-config.xml based on the currently attached terminals: + +``` js +await fetch("http://192.168.178.172/DOCPLFUNC", { + "credentials": "include", + "headers": { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.7,de;q=0.3", + "Content-Type": "application/x-www-form-urlencoded", + "Sec-GPC": "1", + "Authorization": "Basic YWRtaW46d2Fnbw==", + "Upgrade-Insecure-Requests": "1", + "Priority": "u=4", + "Pragma": "no-cache", + "Cache-Control": "no-cache" + }, + "referrer": "http://192.168.178.172/webserv/cplcfg/ea-config.ssi", + "body": "DO_CREATE_EA_XML=create+ea-config.xml", + "method": "POST", + "mode": "cors" +}); +``` + +Download ea-config.xml under http://192.168.178.172/etc/ea-config.xml (authorization required) \ No newline at end of file diff --git a/pyhoff.py b/pyhoff.py deleted file mode 100644 index 218965a..0000000 --- a/pyhoff.py +++ /dev/null @@ -1,328 +0,0 @@ -from pyModbusTCP.client import ModbusClient -from typing import Type - -class BusTerminal(): - output_bit_width = 0 - input_bit_width = 0 - output_word_width = 0 - input_word_width = 0 - - def __init__(self, bus_coupler: 'BusCoupler'): - self.output_bit_offset = 0 - self.input_bit_offset = 0 - self.output_word_offset = 0 - self.output_word_offset = 0 - self.input_word_offset = 0 - self.bus_coupler = bus_coupler - -class DigitalInputTerminal(BusTerminal): - def read_input(self, channel: int): - print(self.input_bit_width, channel) - if channel < 1 or channel > self.input_bit_width: - raise Exception("address out of range") - return self.bus_coupler.read_discrete_input(self.input_bit_offset + channel - 1) - -class DigitalOutputTerminal(BusTerminal): - def write_coil(self, channel: int, value: int): - if channel < 1 or channel > self.output_bit_width: - raise Exception("address out of range") - return self.bus_coupler.write_single_coil(self.output_bit_offset + channel - 1, value) - - def read_coil(self, channel: int): - if channel < 1 or channel > self.output_bit_width: - raise Exception("address out of range") - return self.bus_coupler.read_coil(self.output_bit_offset + channel - 1) - -class AnalogInputTerminal(BusTerminal): - def read_words(self, word_offset: int, word_count: int): - if word_offset < 0 or word_offset + word_count > self.input_word_width: - raise Exception("address out of range") - return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count) - - def read_words_nocheck(self, word_offset: int, word_count: int): - return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count) - - def read_word(self, word_offset: int): - val = self.read_words(word_offset, 1) - if val is None: - return -999 - else: - return val[0] - - -class AnalogOutputTerminal(BusTerminal): - def read_words(self, word_offset: int, word_count: int): - if word_offset < 0 or word_offset + word_count > self.output_word_width: - raise Exception("address out of range") - return self.bus_coupler.read_holding_registers(self.output_word_offset + word_offset, word_count) - - def read_words_nocheck(self, word_offset: int, word_count: int): - return self.bus_coupler.read_holding_registers(self.output_word_offset*0 + word_offset, word_count) - - def read_word(self, word_offset: int): - val = self.read_words(word_offset, 1) - if val is None: - return -999 - else: - return val[0] - - def write_word(self, word_offset: int, data: int): - if word_offset < 0 or word_offset + 1 > self.output_word_width: - raise Exception("address out of range") - return self.bus_coupler.write_single_register(self.output_word_offset + word_offset, data) - - def write_word_nocheck(self, word_offset: int, data: int): - return self.bus_coupler.write_single_register(self.output_word_offset*0 + word_offset, data) - -class KL1104(DigitalInputTerminal): - """KL1104: 4x digital input 24 V""" - input_bit_width = 4 - -class KL1408(DigitalInputTerminal): - """KL1104: 8x digital input 24 V galvanic isolated""" - input_bit_width = 8 - -class WAGO_750_1405(DigitalInputTerminal): - """750-1405: 16x digital input 24 V""" - input_bit_width = 16 - -class KL2404(DigitalOutputTerminal): - """KL2404: 4x digital output with 500 mA""" - output_bit_width = 4 - -class KL2424(DigitalOutputTerminal): - """KL2424: 4x digital output with 2000 mA""" - output_bit_width = 4 - -class KL2634(DigitalOutputTerminal): - """KL2634: 4x digital output 250 V AC, 30 V DC, 4 A""" - output_bit_width = 4 - -class KL2408(DigitalOutputTerminal): - """750-530: 8x digital output with 24 V / 500 mA""" - #contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8 - output_bit_width = 8 - -class WAGO_750_530(DigitalOutputTerminal): - """750-530: 8x digital output with 24 V / 500 mA""" - #contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8 - output_bit_width = 8 - -class KL3054(AnalogInputTerminal): - """KL3054: 4x analog input 4...20 mA 12 Bit single-ended""" - #Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) - input_word_width = 8 - output_word_width = 8 - - def read_normalized(self, channel: int): - return self.read_word(channel * 2 - 1) / 0x7FFF - - def read_current(self, channel: int): - return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0 - -class KL3042(AnalogInputTerminal): - """KL3042: 2x analog input 0...20 mA 12 Bit single-ended""" - #Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status) - input_word_width = 4 - output_word_width = 4 - - def read_normalized(self, channel: int): - return self.read_word(channel*2-1) / 0x7FFF - - def read_current(self, channel: int): - return self.read_normalized(channel*2-1) * 20.0 - -class KL3202(AnalogInputTerminal): - """KL3202: 2x analog input PT100 16 Bit 3-wire""" - #Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional) - input_word_width = 4 - output_word_width = 4 - - def read_temperature(self, channel: int): - val = self.read_word(channel*2-1) - if val > 0x7FFF: - return (val - 0x10000) / 10.0 - else: - return val / 10.0 - -class KL3214(AnalogInputTerminal): - """KL3214: 4x analog input PT100 16 Bit 3-wire""" - #inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional) - #out: 4 x 8 Bit Control (optional) - input_word_width = 8 - output_word_width = 8 - - def read_temperature(self, channel: int): - val = self.read_word(channel*2-1) - if val > 0x7FFF: - return (val - 0x10000) / 10.0 - else: - return val / 10.0 - -class KL4002(AnalogOutputTerminal): - """KL4002: 2x analog output 0...10 V 12 Bit differentiell""" - #Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) - input_word_width = 4 - output_word_width = 4 - - def set_normalized(self, channel: int, value: float): - self.write_word(channel*2-1, int(value * 0x7FFF)) - - def set_voltage(self, channel: int, value: float): - self.set_normalized(channel, value / 10.0) - -class KL4132(AnalogOutputTerminal): - """KL4002: 2x analog output ±10 V 16 bit differential""" - #Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) - input_word_width = 4 - output_word_width = 4 - - def set_normalized(self, channel: int, value: float): - if value >= 0: - self.write_word(channel-1, int(value * 0x7FFF)) - else: - self.write_word(channel-1, int(0x10000 + value * 0x7FFF )) - - def set_voltage(self, channel: int, value: float): - self.set_normalized(channel, value / 10.0) - -class KL4004(AnalogOutputTerminal): - """KL4004: 4x analog output 0...10 V 12 Bit differentiell""" - #Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status) - input_word_width = 8 - output_word_width = 8 - - def set_normalized(self, channel: int, value: float): - self.write_word(channel*2-1, int(value * 0x7FFF)) - - def set_voltage(self, channel: int, value: float): - self.set_normalized(channel, value / 10.0) - -class KL9000(BusTerminal): - """Dummy, no I/O function""" - pass - -class KL9010(BusTerminal): - """Dummy, no I/O function""" - pass - -class KL9100(BusTerminal): - """Dummy, no I/O function""" - pass - -class KL9183(BusTerminal): - """Dummy, no I/O function""" - pass - -class KL9188(BusTerminal): - """Dummy, no I/O function""" - pass - -class WAGO_750_600(BusTerminal): - """Dummy, no I/O function""" - pass - -class WAGO_750_602(BusTerminal): - """Dummy, no I/O function""" - pass - -class BusCoupler(ModbusClient): - """BusCoupler: Busskoppler ModBus TCP""" - - def __init__(self, host: str, port: int = 502, timeout: int = 5, watchdog: float = 0): - """ - Constructs all the necessary attributes for the person object. - - Parameters - ---------- - host : str - ip or hostname of the BK9050 - port : int - port of the modbus host - debug : bool - outputs modbus debug information - timeout : float - timeout for waiting for the device response - watchdog : float - time in secounds after the device sets all outputs to - default state. A value of 0 deactivates the watchdog. - """ - - ModbusClient.__init__(self, host, port, auto_open=True, timeout=timeout) - self.bus_terminals: list[BusTerminal] = list() - self.next_output_bit_offset = 0 - self.next_input_bit_offset = 0 - self.next_output_word_offset = 0 - self.next_input_word_offset = 0 - self.init_hardware(watchdog) - - def init_hardware(self, watchdog: float): - pass - - def read_discrete_input(self, address: int): - value = self.read_discrete_inputs(address) - if value is None: - return False - else: - return value[0] - - def read_coil(self, address): - value = self.read_coils(address) - if value is None: - return False - else: - return value[0] - - def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]: - for terminal_class in bus_terminals: - nterm = terminal_class(self) - - nterm.output_bit_offset = self.next_output_bit_offset - nterm.input_bit_offset = self.next_input_bit_offset - nterm.output_word_offset = self.next_output_word_offset - nterm.input_word_offset = self.next_input_word_offset - - self.next_output_bit_offset += terminal_class.output_bit_width - self.next_input_bit_offset += terminal_class.input_bit_width - self.next_output_word_offset += terminal_class.output_word_width - self.next_input_word_offset += terminal_class.input_word_width - - self.bus_terminals.append(nterm) - - return self.bus_terminals - -class ModBusBK9050(BusCoupler): - """BK9050: Busskoppler ModBus TCP""" - - def init_hardware(self, watchdog: float): - #https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf - #config watchdog on page 58 - - #set time-out/deactivate watchdog timer (deactivate: timeout = 0): - self.write_single_register(0x1120, watchdog * 1000) #ms - - #reset watchdog timer: - self.write_single_register(0x1121, 0xBECF) - self.write_single_register(0x1121, 0xAFFE) - - #set process image offset - self.next_output_word_offset = 0x0800 - -class WAGO750_352(BusCoupler): - """Wago 750-352: Busskoppler ModBus TCP""" - - def init_hardware(self, watchdog: float): - #deactivate/reset watchdog timer: - self.write_single_register(0x1005, 0xAAAA) - self.write_single_register(0x1005, 0x5555) - - #set time-out/deactivate watchdog timer (deactivate: timeout = 0): - self.write_single_register(0x1000, int(watchdog * 10)) - - if watchdog: - #configure watchdog to reset on all functions codes - self.write_single_register(0x1001, 0xFFFF) - - #set process image offset - self.next_output_word_offset = 0x0000 - \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..16668a8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,31 @@ +[project] +name = "pyhoff" +version = "1.0.0" +authors = [ + { name="Nicolas Kruse", email="nicolas.kruse@nonan.net" }, +] +description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP" +readme = "README.md" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +Homepage = "https://github.com/Nonannet/pyhoff" +Issues = "https://github.com/Nonannet/pyhoff/issues" + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +minversion = "6.0" +addopts = "-ra -q" +testpaths = ["tests"] +pythonpath = ["src"] \ No newline at end of file diff --git a/src/pyhoff/__init__.py b/src/pyhoff/__init__.py new file mode 100644 index 0000000..e84e770 --- /dev/null +++ b/src/pyhoff/__init__.py @@ -0,0 +1,306 @@ +from .modbus import SimpleModbusClient +from typing import Type, Any + + +class BusTerminal(): + """ + Base class for all bus terminals. + """ + parameters: dict[str, int] = {} + + def __init__(self, bus_coupler: 'BusCoupler', + output_bit_offset: int, + input_bit_offset: int, + output_word_offset: int, + input_word_offset: int): + """ + Initialize a BusTerminal. + + Args: + bus_coupler: The bus coupler to which this terminal is connected. + """ + self._output_bit_offset = output_bit_offset + self._input_bit_offset = input_bit_offset + self._output_word_offset = output_word_offset + self._input_word_offset = input_word_offset + self.bus_coupler = bus_coupler + + +class DigitalInputTerminal(BusTerminal): + """ + Represents a digital input terminal. + """ + def read_input(self, channel: int) -> bool | None: + """ + Read the input from a specific channel. + + Args: + channel: The channel number (start counting from 1) to read from. + + Returns: + The input value of the specified channel. + + Raises: + Exception: If the channel number is out of range. + """ + if channel < 1 or channel > self.parameters['input_bit_width']: + raise Exception("address out of range") + return self.bus_coupler.read_discrete_input(self._input_bit_offset + channel - 1) + + +class DigitalOutputTerminal(BusTerminal): + """ + Represents a digital output terminal. + """ + def write_coil(self, channel: int, value: bool) -> bool: + """ + Write a value to a specific channel. + + Args: + channel: The channel number (start counting from 1) to write to. + value: The value to write. + + Returns: + True if the write operation succeeded, otherwise False. + + Raises: + Exception: If the channel number is out of range. + """ + if channel < 1 or channel > self.parameters['output_bit_width']: + raise Exception("address out of range") + return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value) + + def read_coil(self, channel: int) -> bool | None: + """ + Read the coil value back from a specific channel. + + Args: + channel: The channel number (start counting from 1) to read from. + + Returns: + The coil value of the specified channel. + + Raises: + Exception: If the channel number is out of range. + """ + if channel < 1 or channel > self.parameters['output_bit_width']: + raise Exception("address out of range") + return self.bus_coupler.read_coil(self._output_bit_offset + channel - 1) + + +class AnalogInputTerminal(BusTerminal): + """ + Represents an analog input terminal. + """ + def read_words(self, word_offset: int, word_count: int) -> list[int] | None: + """ + Read a list of words from the terminal. + + Args: + word_offset: The starting word offset (0 based index). + word_count: The number of words to read. + + Returns: + The read words. + + Raises: + Exception: If the word offset or count is out of range. + """ + if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']: + raise Exception("address out of range") + return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count) + + def read_word(self, word_offset: int) -> int: + """ + Read a single word from the terminal. + + Args: + word_offset: The word offset (0 based index) to read from. + + Returns: + The read word value. + """ + val = self.read_words(word_offset, 1) + if val: + return val[0] + else: + return -999 + + def read_normalized(self, channel: int) -> float: + """ + Read a normalized value (0...1) from a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The normalized value. + """ + return self.read_word(channel * 2 - 1) / 0x7FFF + + +class AnalogOutputTerminal(BusTerminal): + """ + Represents an analog output terminal. + """ + def read_words(self, word_offset: int, word_count: int) -> list[int] | None: + """ + Read a list of words from the terminal. + + Args: + word_offset: The starting word offset (0 based index). + word_count: The number of words to read. + + Returns: + The read words. + + Raises: + Exception: If the word offset or count is out of range. + """ + if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']: + raise Exception("address out of range") + return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count) + + def read_word(self, word_offset: int) -> int: + """ + Read a single word from the terminal. + + Args: + word_offset: The word offset (0 based index) to read from. + + Returns: + The read word value. + """ + val = self.read_words(word_offset, 1) + if val: + return val[0] + else: + return -999 + + def write_word(self, word_offset: int, data: int) -> bool: + """ + Write a word to the terminal. + + Args: + word_offset: The word offset to write to. + data: The data to write. + + Returns: + The result of the write operation. + + Raises: + Exception: If the word offset is out of range. + """ + if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']: + raise Exception("address out of range") + return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data) + + def set_normalized(self, channel: int, value: float): + """ + Set a normalized value between 0 and 1 to a specific channel. + + Args: + channel: The channel number to set. + value: The normalized value to set. + """ + self.write_word(channel * 2 - 1, int(value * 0x7FFF)) + + +class BusCoupler(): + """BusCoupler: Busskoppler ModBus TCP""" + + def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [], + timeout: int = 5, watchdog: float = 0, debug: bool = False): + """ + Constructs a Bus Coupler connected over ModBus TCP. + + Args: + host: ip or hostname of the BK9050 + port: port of the modbus host + debug: outputs modbus debug information + timeout: timeout for waiting for the device response + watchdog: time in seconds after the device sets all outputs to + default state. A value of 0 deactivates the watchdog. + debug: If True, debug information is printed. + """ + + self.bus_terminals: list[Any] = list() + self.next_output_bit_offset = 0 + self.next_input_bit_offset = 0 + self.next_output_word_offset = 0 + self.next_input_word_offset = 0 + self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug) + + self.add_bus_terminals(bus_terminals) + self._init_hardware(watchdog) + + def _init_hardware(self, watchdog: float): + pass + + def read_discrete_input(self, address: int) -> bool | None: + """ + Read a discrete input from the given register address. + + Args: + address: The register address to read from. + + Returns: + The value of the discrete input. + """ + value = self.modbus.read_discrete_inputs(address) + if value: + return value[0] + else: + return None + + def read_coil(self, address: int) -> bool | None: + """ + Read a coil from the given register address. + + Args: + address: The register address to read from. + + Returns: + The value of the coil. + """ + value = self.modbus.read_coils(address) + if value: + return value[0] + else: + return None + + def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]: + """ + Add bus terminals to the bus coupler. + + Args: + bus_terminals: A list of bus terminal classes to add. + + Returns: + The corresponding list of bus terminal objects. + """ + for terminal_class in bus_terminals: + assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal' + new_terminal = terminal_class(self, + self.next_output_bit_offset, + self.next_input_bit_offset, + self.next_output_word_offset, + self.next_input_word_offset) + + self.next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0) + self.next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0) + self.next_output_word_offset += terminal_class.parameters.get('output_word_width', 0) + self.next_input_word_offset += terminal_class.parameters.get('input_word_width', 0) + + self.bus_terminals.append(new_terminal) + + return self.bus_terminals + + def get_error(self) -> str: + """ + Get the last error message. + + Returns: + The last error message. + """ + return self.modbus.last_error diff --git a/src/pyhoff/devices.py b/src/pyhoff/devices.py new file mode 100644 index 0000000..451bfdf --- /dev/null +++ b/src/pyhoff/devices.py @@ -0,0 +1,392 @@ +from . import DigitalInputTerminal, DigitalOutputTerminal +from . import AnalogInputTerminal, AnalogOutputTerminal +from . import BusTerminal, BusCoupler + + +class BK9000(BusCoupler): + """ + BK9000: Busskoppler ModBus TCP + """ + def _init_hardware(self, watchdog: float): + # https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf + # config watchdog on page 58 + + # set time-out/deactivate watchdog timer (deactivate: timeout = 0): + self.modbus.write_single_register(0x1120, int(watchdog * 1000)) # ms + + # reset watchdog timer: + self.modbus.write_single_register(0x1121, 0xBECF) + self.modbus.write_single_register(0x1121, 0xAFFE) + + # set process image offset + self.next_output_word_offset = 0x0800 + + +class BK9050(BK9000): + """ + BK9050: Busskoppler ModBus TCP + """ + pass + + +class BK9100(BK9000): + """ + BK9100: Busskoppler ModBus TCP + """ + pass + + +class WAGO750_352(BusCoupler): + """ + Wago 750-352: Busskoppler ModBus TCP + """ + def _init_hardware(self, watchdog: float): + # deactivate/reset watchdog timer: + self.modbus.write_single_register(0x1005, 0xAAAA) + self.modbus.write_single_register(0x1005, 0x5555) + + # set time-out/deactivate watchdog timer (deactivate: timeout = 0): + self.modbus.write_single_register(0x1000, int(watchdog * 10)) + + if watchdog: + # configure watchdog to reset on all functions codes + self.modbus.write_single_register(0x1001, 0xFFFF) + + # set process image offset + self.next_output_word_offset = 0x0000 + + +class DigitalInputTerminal4Bit(DigitalInputTerminal): + """ + Generic 4 bit input terminal + """ + parameters = {'input_bit_width': 4} + + +class DigitalInputTerminal8Bit(DigitalInputTerminal): + """ + Generic 8 bit input terminal + """ + parameters = {'input_bit_width': 8} + + +class DigitalInputTerminal16Bit(DigitalInputTerminal): + """ + Generic 16 bit input terminal + """ + parameters = {'input_bit_width': 16} + + +class DigitalOutputTerminal4Bit(DigitalOutputTerminal): + """ + Generic 4 bit output terminal + """ + parameters = {'output_bit_width': 4} + + +class DigitalOutputTerminal8Bit(DigitalOutputTerminal): + """ + Generic 8 bit output terminal + """ + parameters = {'output_bit_width': 8} + + +class DigitalOutputTerminal16Bit(DigitalOutputTerminal): + """ + Generic 16 bit output terminal + """ + parameters = {'output_bit_width': 16} + + +class KL1104(DigitalInputTerminal4Bit): + """ + KL1104: 4x digital input 24 V + """ + pass + + +class KL1408(DigitalInputTerminal8Bit): + """ + KL1104: 8x digital input 24 V galvanic isolated + """ + pass + + +class WAGO_750_1405(DigitalInputTerminal16Bit): + """ + 750-1405: 16x digital input 24 V + """ + pass + + +class KL2404(DigitalOutputTerminal4Bit): + """ + KL2404: 4x digital output with 500 mA + """ + pass + + +class KL2424(DigitalOutputTerminal4Bit): + """ + KL2424: 4x digital output with 2000 mA + """ + pass + + +class KL2634(DigitalOutputTerminal4Bit): + """ + KL2634: 4x digital output 250 V AC, 30 V DC, 4 A + """ + pass + + +class KL2408(DigitalOutputTerminal8Bit): + """ + 750-530: 8x digital output with 24 V / 500 mA + + Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8. + """ + pass + + +class WAGO_750_530(DigitalOutputTerminal8Bit): + """ + 750-530: 8x digital output with 24 V / 500 mA + + Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8. + """ + pass + + +class KL1512(AnalogInputTerminal): + """ + KL1512: 2x digital input, counter, 24 V DC, 1 kHz + """ + # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) + parameters = {'input_word_width': 4, 'output_word_width': 4} + + _last_counter_values = [0, 0] + + def read_counter(self, channel: int) -> int: + """ + Read the absolut counter value of a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The counter value. + """ + + return self.read_word(channel * 2 - 1) + + def read_delta(self, channel: int) -> int: + """ + Read the counter change since last read of a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The counter value. + """ + new_count = self.read_word(channel * 2 - 1) + return new_count - self._last_counter_values[channel - 1] + + +class KL3054(AnalogInputTerminal): + """ + KL3054: 4x analog input 4...20 mA 12 Bit single-ended + """ + # Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status) + parameters = {'input_word_width': 8, 'output_word_width': 8} + + def read_current(self, channel: int) -> float: + """ + Read the current value from a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The current value. + """ + return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0 + + +class KL3042(AnalogInputTerminal): + """ + KL3042: 2x analog input 0...20 mA 12 Bit single-ended + """ + # Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status) + parameters = {'input_word_width': 4, 'output_word_width': 4} + + def read_current(self, channel: int) -> float: + """ + Read the current value from a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The current value. + """ + return self.read_normalized(channel * 2 - 1) * 20.0 + + +class KL3202(AnalogInputTerminal): + """ + KL3202: 2x analog input PT100 16 Bit 3-wire + """ + # Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional) + parameters = {'input_word_width': 4, 'output_word_width': 4} + + def read_temperature(self, channel: int) -> float: + """ + Read the temperature value from a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The temperature value in °C. + """ + val = self.read_word(channel * 2 - 1) + if val > 0x7FFF: + return (val - 0x10000) / 10.0 + else: + return val / 10.0 + + +class KL3214(AnalogInputTerminal): + """ + KL3214: 4x analog input PT100 16 Bit 3-wire + """ + # inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional) + # out: 4 x 8 Bit Control (optional) + parameters = {'input_word_width': 8, 'output_word_width': 8} + + def read_temperature(self, channel: int) -> float: + """ + Read the temperature value from a specific channel. + + Args: + channel: The channel number to read from. + + Returns: + The temperature value. + """ + val = self.read_word(channel * 2 - 1) + if val > 0x7FFF: + return (val - 0x10000) / 10.0 + else: + return val / 10.0 + + +class KL4002(AnalogOutputTerminal): + """ + KL4002: 2x analog output 0...10 V 12 Bit differentiell + """ + # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) + parameters = {'input_word_width': 4, 'output_word_width': 4} + + def set_voltage(self, channel: int, value: float): + """ + Set a voltage value to a specific channel. + + Args: + channel: The channel number to set. + value: The voltage value to set. + """ + self.set_normalized(channel, value / 10.0) + + +class KL4132(AnalogOutputTerminal): + """ + KL4002: 2x analog output ±10 V 16 bit differential + """ + # Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status) + parameters = {'input_word_width': 4, 'output_word_width': 4} + + def set_normalized(self, channel: int, value: float): + """ + Set a normalized value between -1 and +1 to a specific channel. + + Args: + channel: The channel number to set. + value: The normalized value to set. + """ + if value >= 0: + self.write_word(channel - 1, int(value * 0x7FFF)) + else: + self.write_word(channel - 1, int(0x10000 + value * 0x7FFF)) + + def set_voltage(self, channel: int, value: float): + """ + Set a voltage value between -10 and +10 V to a specific channel. + + Args: + channel: The channel number to set. + value: The voltage value to set. + """ + self.set_normalized(channel, value / 10.0) + + +class KL4004(AnalogOutputTerminal): + """ + KL4004: 4x analog output 0...10 V 12 Bit differentiell + """ + # Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status) + parameters = {'input_word_width': 8, 'output_word_width': 8} + + def set_voltage(self, channel: int, value: float): + """ + Set a voltage value to a specific channel. + + Args: + channel: The channel number to set. + value: The voltage value to set. + """ + self.set_normalized(channel, value / 10.0) + + +class KL9010(BusTerminal): + """ + End terminal, no I/O function + """ + pass + + +class KL9100(BusTerminal): + """ + Potential supply terminal, no I/O function + """ + pass + + +class KL9183(BusTerminal): + """ + Potential distribution terminal, no I/O function + """ + pass + + +class KL9188(BusTerminal): + """ + Potential distribution terminal, no I/O function + """ + pass + + +class WAGO_750_600(BusTerminal): + """ + End nodule, no I/O function + """ + pass + + +class WAGO_750_602(BusTerminal): + """ + Potential supply terminal, no I/O function + """ + pass diff --git a/src/pyhoff/modbus.py b/src/pyhoff/modbus.py new file mode 100644 index 0000000..a2760d3 --- /dev/null +++ b/src/pyhoff/modbus.py @@ -0,0 +1,467 @@ +import socket +import struct +import random + +_READ_COILS = 0x01 +_READ_DISCRETE_INPUTS = 0x02 +_READ_HOLDING_REGISTERS = 0x03 +_READ_INPUT_REGISTERS = 0x04 +_WRITE_SINGLE_COIL = 0x05 +_WRITE_SINGLE_REGISTER = 0x06 +_WRITE_MULTIPLE_COILS = 0x0F +_WRITE_MULTIPLE_REGISTERS = 0x10 + +_modbus_exceptions = { + 0x01: 'illegal function', + 0x02: 'illegal data address', + 0x03: 'illegal data value', + 0x04: 'slave device failure', + 0x05: 'acknowledge', + 0x06: 'slave device busy', + 0x07: 'negative acknowledge', + 0x08: 'memory parity error', + 0x0A: 'gateway path unavailable', + 0x0B: 'gateway target device failed to respond' +} + + +def _get_bits(data: bytes, bit_number: int) -> list[bool]: + return [bool(data[i // 8] >> (i % 8) & 0x01) + for i in range(bit_number)] + + +def _get_words(data: bytes) -> list[int]: + return [(data[i * 2] << 8) + data[i * 2 + 1] + for i in range(len(data) // 2)] + + +def _from_bits(values: list[bool]) -> bytes: + return bytes(sum(((1 << j) * bool(values[8 * i + j])) + for j in range(8)) for i in range(len(values) // 8)) + + +def _from_words(values: list[int]) -> bytes: + return b''.join(word.to_bytes(2, byteorder='big') for word in values) + + +class SimpleModbusClient: + """ + Modbus TCP client + """ + def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False): + """ + Constructs a Modbus client + + Args: + host: hostname or IPv4/IPv6 address server address + port: server port + unit_id: ModBus id + timeout: socket timeout in seconds + """ + assert 0 <= unit_id < 256 + + self.host = host + self.port = port + self.unit_id = unit_id + self.timeout = timeout + self.last_error = '' + self._transaction_id = random.randint(0, 0xFFFF) + self._socket: None | socket.socket = None + self.debug = debug + + def connect(self): + for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM): + try: + self._socket = socket.socket(af, st, pr) + except socket.error: + self.close() + continue + try: + self._socket.settimeout(self.timeout) + self._socket.connect(sa) + except socket.error: + self.close() + continue + break + + if self._socket: + return True + else: + self.last_error = 'connection failed' + return False + + def close(self): + """ + Close connection + + Returns: + empty bytes object + """ + if self._socket: + self._socket.close() + self._socket = None + + return bytes() + + def read_coils(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None: + """ + ModBus function for reading coils (0x01) + + Args: + bit_address: Bit address (0 to 0xffff) + bit_lengths: Number of bits to read (1 to 2000) + + Returns: + list of bool or None: Bits list or None if error + """ + assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range' + assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff' + + if not self.send_modbus_data(_READ_COILS, _from_words([bit_address, bit_lengths])): + return None + + rx_data = self.receive_modbus_data() + if not rx_data: + return None + + if len(rx_data) < 2: + self.last_error = 'received frame under min size' + return None + + byte_count = rx_data[0] + bit_data = rx_data[1:] + + if not (byte_count * 8 >= bit_lengths and + byte_count == len(bit_data)): + self.last_error = 'received frame size mismatch' + return None + + return _get_bits(bit_data, bit_lengths) + + def read_discrete_inputs(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None: + """ + ModBus function for reading discrete inputs (0x02) + + Args: + bit_address: Bit address (0 to 0xffff) + bit_lengths: Number of bits to read (1 to 2000) + + Returns: + list of bool or None: Bits list or None if error + """ + assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range' + assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff' + + if not self.send_modbus_data(_READ_DISCRETE_INPUTS, _from_words([bit_address, bit_lengths])): + return None + + rx_data = self.receive_modbus_data() + if not rx_data: + return None + + if len(rx_data) < 2: + self.last_error = 'received frame under minimum size' + return None + + byte_count = rx_data[0] + bit_data = rx_data[1:] + + if not (byte_count * 8 >= bit_lengths and + byte_count == len(bit_data)): + self.last_error = 'received frame size mismatch' + return None + + return _get_bits(bit_data, bit_lengths) + + def read_holding_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None: + """ + ModBus function for reading holding registers (0x03) + + Args: + register_address: Register address (0 to 0xffff) + word_lengths: Number of registers to read (1 to 125) + + Returns: + list of int or None: Registers list or None if error + """ + assert 1 <= word_lengths <= 125, 'word_lengths out of range' + assert register_address + word_lengths <= 0xffff, 'read after address 0xffff' + + if not self.send_modbus_data(_READ_HOLDING_REGISTERS, _from_words([register_address, word_lengths])): + return None + + rx_data = self.receive_modbus_data() + if not rx_data: + return None + + if len(rx_data) < 2: + self.last_error = 'received frame under minimum size' + return None + + byte_count = rx_data[0] + reg_data = rx_data[1:] + + if not (byte_count == 2 * word_lengths and + byte_count == len(reg_data)): + self.last_error = 'received frame size mismatch' + return None + + return _get_words(reg_data) + + def read_input_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None: + """ + ModBus function for reading input registers (0x04) + + Args: + register_address: Register address (0 to 0xffff) + word_lengths: Number of registers to read (1 to 125) + + Returns: + list of int or None: Registers list or None if error + """ + assert 1 <= word_lengths <= 125, 'word_lengths out of range' + assert register_address + word_lengths <= 0xffff, 'read after address 0xffff' + + if not self.send_modbus_data(_READ_INPUT_REGISTERS, _from_words([register_address, word_lengths])): + return None + + rx_data = self.receive_modbus_data() + if not rx_data: + return None + + if len(rx_data) < 2: + self.last_error = 'received frame under minimum size' + return None + + byte_count = rx_data[0] + reg_data = rx_data[1:] + + if not (byte_count == 2 * word_lengths and + byte_count == len(reg_data)): + self.last_error = 'received frame size mismatch' + return None + + return _get_words(reg_data) + + def write_single_coil(self, bit_address: int, value: bool) -> bool: + """ + ModBus function for writing a single coil (0x05) + + Args: + bit_address: Bit address (0 to 0xffff) + value: Value to write (single bit) + + Returns: + True if write succeeded or False if failed + """ + assert 0 <= bit_address <= 0xffff, 'bit_address out of range' + + tx_data = _from_words([bit_address, 0xFF00 * bool(value)]) + if not self.send_modbus_data(_WRITE_SINGLE_COIL, tx_data): + return False + + data = self.receive_modbus_data() + if not data: + return False + + if len(data) != 4: + self.last_error = 'received frame size mismatch' + return False + + return data == tx_data + + def write_single_register(self, register_address: int, value: int) -> bool: + """ + ModBus function for writing a single register (0x06) + + Args: + register_address: Register address (0 to 0xffff) + value: Value to write (0 to 0xffff) + + Returns: + True if write succeeded or False if failed + """ + assert 0 <= register_address <= 0xffff, 'register_address out of range' + assert 0 <= value <= 0xffff, 'value out of range 0 to 0xffff' + + tx_data = _from_words([register_address, value]) + if not self.send_modbus_data(_WRITE_SINGLE_REGISTER, tx_data): + return False + + data = self.receive_modbus_data() + if not data: + return False + + if len(data) != 4: + self.last_error = 'received frame size mismatch' + return False + + return data == tx_data + + def write_multiple_coils(self, bit_address: int, values: list[bool]) -> bool: + """ + ModBus function for writing multiple coils (0x0F) + + Args: + bit_address: Bit address (0 to 0xffff) + values: List of bit values to write + + Returns: + True if write succeeded or False if failed + """ + assert bit_address + len(values) <= 0xffff, 'bit_address out of range' + assert 1 <= len(values) <= 2000, 'number values must be from 1 to 2000' + + byte_count = (len(values) + 7) // 8 + tx_data = struct.pack('>HHB', bit_address, len(values), byte_count) + _from_bits(values) + if not self.send_modbus_data(_WRITE_MULTIPLE_COILS, tx_data): + return False + + data = self.receive_modbus_data() + if not data: + return False + + if len(data) != 4: + self.last_error = 'received frame size mismatch' + return False + + return _get_words(data[0:1])[0] == bit_address + + def write_multiple_registers(self, register_address: int, values: list[int]) -> bool: + """ + ModBus function for writing multiple registers (0x10) + + Args: + register_address: Register address (0 to 0xffff) + values: List of 16 bit register values to write + + Returns: + True if write succeeded or False if failed + """ + assert register_address + len(values) <= 0xffff, 'register_address out of range' + assert max(values) <= 0xffff, 'value out of range 0 to 0xffff' + assert min(values) >= 0, 'value out of range 0 to 0xffff' + + byte_count = len(values) * 2 + tx_data = struct.pack('>HHB', register_address, len(values), byte_count) + _from_words(values) + if not self.send_modbus_data(_WRITE_MULTIPLE_REGISTERS, tx_data): + return False + + data = self.receive_modbus_data() + if not data: + return False + + if len(data) != 4: + self.last_error = 'received frame size mismatch' + return False + + return _get_words(data[0:1])[0] == register_address + + def _recv(self, number_of_bytes: int) -> bytes: + """ + Receive data over tcp, wait until all specified bytes are received + + Args: + number_of_bytes: Number of bytes to receive + + Returns: + returns received bytes or empty an bytes object if an error occurred + """ + if not self._socket: + return bytes() + + buffer = bytes() + while len(buffer) < number_of_bytes: + try: + buffer += self._socket.recv(number_of_bytes - len(buffer)) + except socket.error: + return bytes() + + if self.debug: + print(f'<- Received: {' '.join(hex(b) for b in buffer)}') + + return buffer + + def _send(self, data: bytes) -> int: + """ + Send data over tcp + + Args: + data: data to send + + Returns: + number of transmitted bytes or 0 if transmission failed + """ + + for _ in range(2): + if self._socket: + try: + self._socket.sendall(data) + if self.debug: + print(f'-> Send: {' '.join(hex(b) for b in data)}') + return len(data) + except socket.error: + self.last_error = 'sending data failed' + self.close() + self.connect() + else: + self.connect() + + return 0 + + def send_modbus_data(self, function_code: int, body: bytes) -> int: + """ + Send raw ModBus TCP frame + + Args: + unction_code: ModBus function code + body: data + + Returns: + number of transmitted bytes or 0 if transmission failed + """ + self._transaction_id = (self._transaction_id + 1) % 0x10000 + protocol_identifier = 0 + length = len(body) + 2 + header = struct.pack('>HHHBB', self._transaction_id, + protocol_identifier, length, self.unit_id, + function_code) + frame = header + body + + return self._send(frame) + + def receive_modbus_data(self) -> bytes: + """ + Receive a ModBus frame + + Returns: + bytes received or empty bytes object if an error occurred + """ + header = self._recv(7) + if not header: + self.last_error = 'receiving return frame failed' + return self.close() + + transaction_id, protocol_identifier, length, unit_id =\ + struct.unpack('>HHHB', header) + + if not ((transaction_id == self._transaction_id) and + (protocol_identifier == 0) and + (unit_id == self.unit_id) and + (length <= 0xFF) and + (transaction_id == self._transaction_id)): + self.last_error = 'received frame is invalid' + return self.close() + + data = self._recv(length - 1) + if not data: + self.last_error = 'receiving data payload failed' + return self.close() + + if data[0] > 0x80: + self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})' + return bytes() + + return data[1:] diff --git a/tests/test_modbus.py b/tests/test_modbus.py new file mode 100644 index 0000000..117e762 --- /dev/null +++ b/tests/test_modbus.py @@ -0,0 +1,36 @@ +import unittest +from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words + + +class TestModbusFunctions(unittest.TestCase): + + def test_get_bits(self): + data = bytes([0b11101010, 0b11010101]) + bit_number = 16 + expected = [False, True, False, True, False, True, True, True, + True, False, True, False, True, False, True, True] + result = _get_bits(data, bit_number) + self.assertEqual(result, expected) + + def test_get_words(self): + data = bytes([0x12, 0x34, 0x56, 0x78]) + expected = [0x1234, 0x5678] + result = _get_words(data) + self.assertEqual(result, expected) + + def test_from_bits(self): + values = [False, True, False, True, False, True, True, True, + True, False, True, False, True, False, True, True] + expected = bytes([0b11101010, 0b11010101]) + result = _from_bits(values) + self.assertEqual(result, expected) + + def test_from_words(self): + values = [0x1234, 0x5678] + expected = bytes([0x12, 0x34, 0x56, 0x78]) + result = _from_words(values) + self.assertEqual(result, expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_terminals.py b/tests/test_terminals.py new file mode 100644 index 0000000..a13be9f --- /dev/null +++ b/tests/test_terminals.py @@ -0,0 +1,34 @@ +import inspect +import src.pyhoff.devices as devices +from src.pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal + + +def test_terminal_plausib(): + + for n, o in inspect.getmembers(devices): + if inspect.isclass(o) and o not in [DigitalInputTerminal, + DigitalOutputTerminal, + AnalogInputTerminal, + AnalogOutputTerminal]: + print('Terminal: ' + n) + if issubclass(o, DigitalInputTerminal): + assert o.parameters.get('input_bit_width', 0) > 0 + assert o.parameters.get('output_bit_width', 0) == 0 + assert o.parameters.get('input_word_width', 0) == 0 + assert o.parameters.get('output_word_width', 0) == 0 + + if issubclass(o, DigitalOutputTerminal): + assert o.parameters.get('input_bit_width', 0) == 0 + assert o.parameters.get('output_bit_width', 0) > 0 + assert o.parameters.get('input_word_width', 0) == 0 + assert o.parameters.get('output_word_width', 0) == 0 + + if issubclass(o, AnalogInputTerminal): + assert o.parameters.get('input_bit_width', 0) == 0 + assert o.parameters.get('output_bit_width', 0) == 0 + assert o.parameters.get('input_word_width', 0) > 0 + + if issubclass(o, AnalogOutputTerminal): + assert o.parameters.get('input_bit_width', 0) == 0 + assert o.parameters.get('output_bit_width', 0) == 0 + assert o.parameters.get('output_word_width', 0) > 0