mirror of https://github.com/Nonannet/pyhoff.git
Compare commits
No commits in common. "25a79dda1b388fe80c90052ffe97d432c937c6f5" and "3f13f8c2be09c4e57d73d7c744d571bd96ce1748" have entirely different histories.
25a79dda1b
...
3f13f8c2be
|
@ -79,10 +79,10 @@ To get started with developing the `pyhoff` package, follow these steps:
|
||||||
```
|
```
|
||||||
|
|
||||||
3. **Install Dev Dependencies**
|
3. **Install Dev Dependencies**
|
||||||
Install pyhoff from source plus the dependencies required for development using `pip`:
|
Install the dependencies required for development using `pip`:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
pip install -e .[dev]
|
pip install -r requirements-dev.txt
|
||||||
```
|
```
|
||||||
|
|
||||||
4. **Run Tests**
|
4. **Run Tests**
|
||||||
|
|
|
@ -6,7 +6,7 @@ authors = [
|
||||||
]
|
]
|
||||||
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
|
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.9"
|
requires-python = ">=3.8"
|
||||||
classifiers = [
|
classifiers = [
|
||||||
"Programming Language :: Python :: 3",
|
"Programming Language :: Python :: 3",
|
||||||
"License :: OSI Approved :: MIT License",
|
"License :: OSI Approved :: MIT License",
|
||||||
|
@ -24,20 +24,6 @@ build-backend = "setuptools.build_meta"
|
||||||
[tool.setuptools.packages.find]
|
[tool.setuptools.packages.find]
|
||||||
where = ["src"]
|
where = ["src"]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
|
||||||
dev = [
|
|
||||||
"pytest", "flake8", "mypy"
|
|
||||||
]
|
|
||||||
|
|
||||||
[tool.mypy]
|
|
||||||
files = ["src"]
|
|
||||||
strict = true
|
|
||||||
warn_return_any = true
|
|
||||||
warn_unused_configs = true
|
|
||||||
check_untyped_defs = true
|
|
||||||
no_implicit_optional = true
|
|
||||||
show_error_codes = true
|
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
minversion = "6.0"
|
minversion = "6.0"
|
||||||
addopts = "-ra -q"
|
addopts = "-ra -q"
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
pytest
|
||||||
|
flake8
|
|
@ -13,6 +13,13 @@ class BusTerminal():
|
||||||
"""
|
"""
|
||||||
Base class for all bus terminals.
|
Base class for all bus terminals.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bus_coupler: The bus coupler to which this terminal is connected.
|
||||||
|
output_bit_addresses: List of addresses of the output bits.
|
||||||
|
input_bit_addresses: List of addresses of input bits.
|
||||||
|
output_word_addresses: List of addresses of output words.
|
||||||
|
input_word_addresses: List of addresses of input words.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
bus_coupler: The bus coupler to which this terminal is connected.
|
bus_coupler: The bus coupler to which this terminal is connected.
|
||||||
parameters: The parameters of the terminal.
|
parameters: The parameters of the terminal.
|
||||||
|
@ -25,16 +32,7 @@ class BusTerminal():
|
||||||
output_word_addresses: list[int],
|
output_word_addresses: list[int],
|
||||||
input_word_addresses: list[int],
|
input_word_addresses: list[int],
|
||||||
mixed_mapping: bool):
|
mixed_mapping: bool):
|
||||||
"""
|
|
||||||
Instantiate a new BusTerminal base class.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
bus_coupler: The bus coupler to which this terminal is connected.
|
|
||||||
output_bit_addresses: List of addresses of the output bits.
|
|
||||||
input_bit_addresses: List of addresses of input bits.
|
|
||||||
output_word_addresses: List of addresses of output words.
|
|
||||||
input_word_addresses: List of addresses of input words.
|
|
||||||
"""
|
|
||||||
self.bus_coupler = bus_coupler
|
self.bus_coupler = bus_coupler
|
||||||
self._output_bit_addresses = output_bit_addresses
|
self._output_bit_addresses = output_bit_addresses
|
||||||
self._input_bit_addresses = input_bit_addresses
|
self._input_bit_addresses = input_bit_addresses
|
||||||
|
@ -43,7 +41,7 @@ class BusTerminal():
|
||||||
self._mixed_mapping = mixed_mapping
|
self._mixed_mapping = mixed_mapping
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0) -> 'BusTerminal':
|
def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0):
|
||||||
terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)]
|
terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)]
|
||||||
assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler'
|
assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler'
|
||||||
assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list) - 1}'
|
assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list) - 1}'
|
||||||
|
@ -223,21 +221,6 @@ class BusCoupler():
|
||||||
bus_terminals: A list of bus terminal classes according to the
|
bus_terminals: A list of bus terminal classes according to the
|
||||||
connected terminals.
|
connected terminals.
|
||||||
modbus: The underlying modbus client used for the connection.
|
modbus: The underlying modbus client used for the connection.
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [],
|
|
||||||
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
|
||||||
"""
|
|
||||||
Instantiate a new bus coupler base class.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
host: ip or hostname of the bus coupler
|
|
||||||
port: port of the modbus host
|
|
||||||
debug: outputs modbus debug information
|
|
||||||
timeout: timeout for waiting for the device response
|
|
||||||
watchdog: time in seconds after the device sets all outputs to
|
|
||||||
default state. A value of 0 deactivates the watchdog.
|
|
||||||
debug: If True, debug information is printed.
|
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
>>> from pyhoff.devices import *
|
>>> from pyhoff.devices import *
|
||||||
|
@ -247,6 +230,10 @@ class BusCoupler():
|
||||||
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
|
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
|
||||||
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
|
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [],
|
||||||
|
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
||||||
|
|
||||||
self.bus_terminals: list[BusTerminal] = list()
|
self.bus_terminals: list[BusTerminal] = list()
|
||||||
self._next_output_bit_offset = 0
|
self._next_output_bit_offset = 0
|
||||||
self._next_input_bit_offset = 0
|
self._next_input_bit_offset = 0
|
||||||
|
@ -260,7 +247,7 @@ class BusCoupler():
|
||||||
self.add_bus_terminals(bus_terminals)
|
self.add_bus_terminals(bus_terminals)
|
||||||
self._init_hardware(watchdog)
|
self._init_hardware(watchdog)
|
||||||
|
|
||||||
def _init_hardware(self, watchdog: float) -> None:
|
def _init_hardware(self, watchdog: float):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]:
|
def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]:
|
||||||
|
@ -285,7 +272,7 @@ class BusCoupler():
|
||||||
for terminal_class in terminal_classes:
|
for terminal_class in terminal_classes:
|
||||||
assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal'
|
assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal'
|
||||||
|
|
||||||
def get_para(key: str) -> int:
|
def get_para(key: str):
|
||||||
return terminal_class.parameters.get(key, 0)
|
return terminal_class.parameters.get(key, 0)
|
||||||
|
|
||||||
new_terminal = terminal_class(
|
new_terminal = terminal_class(
|
||||||
|
|
|
@ -7,7 +7,7 @@ class BK9000(BusCoupler):
|
||||||
"""
|
"""
|
||||||
BK9000 ModBus TCP bus coupler
|
BK9000 ModBus TCP bus coupler
|
||||||
"""
|
"""
|
||||||
def _init_hardware(self, watchdog: float) -> None:
|
def _init_hardware(self, watchdog: float):
|
||||||
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
|
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
|
||||||
# config watchdog on page 58
|
# config watchdog on page 58
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ class WAGO_750_352(BusCoupler):
|
||||||
"""
|
"""
|
||||||
Wago 750-352 ModBus TCP bus coupler
|
Wago 750-352 ModBus TCP bus coupler
|
||||||
"""
|
"""
|
||||||
def _init_hardware(self, watchdog: float) -> None:
|
def _init_hardware(self, watchdog: float):
|
||||||
# deactivate/reset watchdog timer:
|
# deactivate/reset watchdog timer:
|
||||||
self.modbus.write_single_register(0x1005, 0xAAAA)
|
self.modbus.write_single_register(0x1005, 0xAAAA)
|
||||||
self.modbus.write_single_register(0x1005, 0x5555)
|
self.modbus.write_single_register(0x1005, 0x5555)
|
||||||
|
|
|
@ -63,19 +63,6 @@ class SimpleModbusClient:
|
||||||
last_error: contains last error message or empty string if no error occurred
|
last_error: contains last error message or empty string if no error occurred
|
||||||
debug: if True prints out transmitted and received bytes in hex
|
debug: if True prints out transmitted and received bytes in hex
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
|
|
||||||
"""
|
|
||||||
Instantiate a Modbus TCP client
|
|
||||||
|
|
||||||
Args:
|
|
||||||
host: hostname or IP address
|
|
||||||
port: server port
|
|
||||||
unit_id: ModBus id
|
|
||||||
timeout: socket timeout in seconds
|
|
||||||
debug: if True prints out transmitted and received bytes in hex
|
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
|
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
|
||||||
>>> print(client.read_coils(0, 10))
|
>>> print(client.read_coils(0, 10))
|
||||||
|
@ -88,6 +75,7 @@ class SimpleModbusClient:
|
||||||
>>> print(client.write_multiple_registers(0, [1234, 5678]))
|
>>> print(client.write_multiple_registers(0, [1234, 5678]))
|
||||||
>>> client.close()
|
>>> client.close()
|
||||||
"""
|
"""
|
||||||
|
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
|
||||||
assert 0 <= unit_id < 256
|
assert 0 <= unit_id < 256
|
||||||
|
|
||||||
self.host = host
|
self.host = host
|
||||||
|
@ -99,11 +87,7 @@ class SimpleModbusClient:
|
||||||
self._socket: None | socket.socket = None
|
self._socket: None | socket.socket = None
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
|
|
||||||
def connect(self) -> bool:
|
def connect(self):
|
||||||
"""
|
|
||||||
Connect manual to the configured modbus server. Usually there is
|
|
||||||
no need to call this function since it is handled automatically.
|
|
||||||
"""
|
|
||||||
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
|
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
|
||||||
socket.AF_UNSPEC,
|
socket.AF_UNSPEC,
|
||||||
socket.SOCK_STREAM):
|
socket.SOCK_STREAM):
|
||||||
|
@ -126,7 +110,7 @@ class SimpleModbusClient:
|
||||||
self.last_error = 'connection failed'
|
self.last_error = 'connection failed'
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def close(self) -> bytes:
|
def close(self):
|
||||||
"""
|
"""
|
||||||
Close connection
|
Close connection
|
||||||
|
|
||||||
|
|
|
@ -1,32 +1,36 @@
|
||||||
|
import unittest
|
||||||
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
|
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
|
||||||
|
|
||||||
|
|
||||||
def test_get_bits():
|
class TestModbusFunctions(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_get_bits(self):
|
||||||
data = bytes([0b11101010, 0b11010101])
|
data = bytes([0b11101010, 0b11010101])
|
||||||
bit_number = 16
|
bit_number = 16
|
||||||
expected = [False, True, False, True, False, True, True, True,
|
expected = [False, True, False, True, False, True, True, True,
|
||||||
True, False, True, False, True, False, True, True]
|
True, False, True, False, True, False, True, True]
|
||||||
result = _get_bits(data, bit_number)
|
result = _get_bits(data, bit_number)
|
||||||
assert result == expected
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_get_words(self):
|
||||||
def test_get_words():
|
|
||||||
data = bytes([0x12, 0x34, 0x56, 0x78])
|
data = bytes([0x12, 0x34, 0x56, 0x78])
|
||||||
expected = [0x1234, 0x5678]
|
expected = [0x1234, 0x5678]
|
||||||
result = _get_words(data)
|
result = _get_words(data)
|
||||||
assert result == expected
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_from_bits(self):
|
||||||
def test_from_bits():
|
|
||||||
values = [False, True, False, True, False, True, True, True,
|
values = [False, True, False, True, False, True, True, True,
|
||||||
True, False, True, False, True, False, True, True]
|
True, False, True, False, True, False, True, True]
|
||||||
expected = bytes([0b11101010, 0b11010101])
|
expected = bytes([0b11101010, 0b11010101])
|
||||||
result = _from_bits(values)
|
result = _from_bits(values)
|
||||||
assert result == expected
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_from_words(self):
|
||||||
def test_from_words():
|
|
||||||
values = [0x1234, 0x5678]
|
values = [0x1234, 0x5678]
|
||||||
expected = bytes([0x12, 0x34, 0x56, 0x78])
|
expected = bytes([0x12, 0x34, 0x56, 0x78])
|
||||||
result = _from_words(values)
|
result = _from_words(values)
|
||||||
assert result == expected
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue