mirror of https://github.com/Nonannet/pyhoff.git
Compare commits
6 Commits
759e1d168d
...
5e52a3f866
Author | SHA1 | Date |
---|---|---|
|
5e52a3f866 | |
|
f34eef3eb4 | |
|
b68b32680a | |
|
2d69ffc7d5 | |
|
6a4aa22575 | |
|
e7c4ad6f7c |
|
@ -0,0 +1,20 @@
|
|||
[flake8]
|
||||
# Specify the maximum allowed line length
|
||||
max-line-length = 88
|
||||
|
||||
# Ignore specific rules
|
||||
# For example, E501: Line too long, W503: Line break before binary operator
|
||||
ignore = E501, W503, W504
|
||||
|
||||
# Exclude specific files or directories
|
||||
exclude =
|
||||
.git,
|
||||
__pycache__,
|
||||
build,
|
||||
dist,
|
||||
.conda
|
||||
.venv
|
||||
|
||||
# Enable specific plugins or options
|
||||
# Example: Enabling flake8-docstrings
|
||||
select = C,E,F,W,D
|
|
@ -82,31 +82,6 @@ target/
|
|||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
|
@ -125,6 +100,7 @@ celerybeat.pid
|
|||
env/
|
||||
venv/
|
||||
ENV/
|
||||
.venv/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
|
@ -152,9 +128,7 @@ dmypy.json
|
|||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
pyModbusTCP_old/
|
||||
test.py
|
||||
test_*.ipynb
|
||||
settings.json
|
|
@ -0,0 +1,15 @@
|
|||
cff-version: 1.2.0
|
||||
title: pyhoff
|
||||
abstract: The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP
|
||||
authors:
|
||||
- family-names: Kruse
|
||||
given-names: Nicolas
|
||||
orcid: "https://orcid.org/0000-0001-6758-2269"
|
||||
version: 1.0.0
|
||||
#date-released: "2025-04-01"
|
||||
#identifiers:
|
||||
# - description: This is the collection of archived snapshots of all versions of My Research Software
|
||||
# type: doi
|
||||
# value: "10.5281/"
|
||||
license: MIT License
|
||||
repository-code: "https://github.com/Nonannet/pyhoff"
|
2
LICENSE
2
LICENSE
|
@ -1,6 +1,6 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2023 Nonannet
|
||||
Copyright (c) 2025 Nicolas Kruse
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
|
89
README.md
89
README.md
|
@ -1,20 +1,40 @@
|
|||
# pyhoff
|
||||
|
||||
This python code allows to read and write the most common
|
||||
Beckhoff bus terminals ("Busklemmen") using the bus coupler
|
||||
BK9050 ("Busskoppler BK9050") over Ethernet TCP/IP.
|
||||
## Description
|
||||
The pyhoff package allows you to read and write the most common
|
||||
Beckhoff and WAGO bus terminals ("Busklemmen") using the Ethernet bus
|
||||
coupler ("Busskoppler") BK9000, BK9050, BK9100, or WAGO 750_352
|
||||
over Ethernet TCP/IP based on ModBus TCP.
|
||||
|
||||
It depends on the package pyModbusTCP. This can be installed with:
|
||||
### Key Features
|
||||
- Supports a wide range of Beckhoff and WAGO analog and digital bus
|
||||
terminals.
|
||||
- Very light weight: no dependencies; compact code base
|
||||
- Easy to extend
|
||||
- Using standardized ModBus TCP.
|
||||
- Provides high-level abstractions for reading and writing data
|
||||
from/to IO-terminals with minimal code
|
||||
|
||||
pip install pyModbusTCP
|
||||
### Usage Scenarios
|
||||
- Industrial test setups.
|
||||
- Research automation setups.
|
||||
- Data acquisition and monitoring.
|
||||
|
||||
## Installation
|
||||
The package has no additional decencies. It can be installed with pip:
|
||||
|
||||
```bash
|
||||
pip install pyhoff
|
||||
```
|
||||
|
||||
## Usage
|
||||
It is easy to use as the following example code shows:
|
||||
|
||||
```python
|
||||
from pyhoff import *
|
||||
from pyhoff.devices import *
|
||||
|
||||
# connect to the BK9050 by tcp/ip on default port 502
|
||||
bus_coupler = ModBusBK9050("172.16.17.1")
|
||||
bus_coupler = BK9050("172.16.17.1")
|
||||
|
||||
# list of all bus terminals connected to the bus coupler
|
||||
# in the order of the physical arrangement
|
||||
|
@ -25,27 +45,54 @@ terminal_list = [KL2404, KL2424, KL9100, KL1104, KL3202,
|
|||
terminals = bus_coupler.add_bus_terminals(terminal_list)
|
||||
|
||||
# Set 1. output of the first bus terminal (KL2404) to hi
|
||||
terminals[0].write_coil(0, True)
|
||||
terminals[0].write_coil(1, True)
|
||||
|
||||
# read the temperature from the 2. channel of the 5. bus
|
||||
# terminal (KL3202)
|
||||
t = terminals[4].read_temperature(1)
|
||||
t = terminals[4].read_temperature(2)
|
||||
print(f"t = {t:.1f} °C")
|
||||
|
||||
# Set 1. output of the 6. bus terminal (KL4002) to 4.2 V
|
||||
terminals[5].set_voltage(0, 4.2)
|
||||
terminals[5].set_voltage(1, 4.2)
|
||||
|
||||
```
|
||||
|
||||
The following terminals are implemented:
|
||||
- KL1104: 4x digital input 24 V
|
||||
- KL2404: 4x digital output with 500 mA
|
||||
- KL2424: 4x digital output with 2000 mA
|
||||
- KL3054: 4x analog input 4...20 mA 12 Bit single-ended
|
||||
- KL3202: 2x analog input PT100 16 Bit 3-wire
|
||||
- KL3214: 4x analog input PT100 16 Bit 3-wire
|
||||
- KL4002: 2x analog output 0...10 V 12 Bit differentiell
|
||||
- KL4004: 4x analog output 0...10 V 12 Bit differentiell
|
||||
- Dummy terminals without io functionality: KL9100, KL9183, KL9188
|
||||
## Contributing
|
||||
Other analog and digital IO terminals are easy to complement. Contributions are welcome!
|
||||
Please open an issue or submit a pull request on GitHub.
|
||||
|
||||
Other analog and digital io-terminals are easy to complement. Pull requests are welcome.
|
||||
## Developer Guide
|
||||
To get started with developing the `pyhoff` package, follow these steps:
|
||||
|
||||
1. **Clone the Repository**
|
||||
First, clone the repository to your local machine using Git:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/Nonannet/pyhoff.git
|
||||
cd pyhoff
|
||||
```
|
||||
|
||||
2. **Set Up a Virtual Environment**
|
||||
It is recommended to use a virtual environment to manage dependencies. You can create one using `venv`:
|
||||
|
||||
```bash
|
||||
python -m venv venv
|
||||
source venv/bin/activate # On Windows use `venv\Scripts\activate`
|
||||
```
|
||||
|
||||
3. **Install Dev Dependencies**
|
||||
Install the dependencies required for development using `pip`:
|
||||
|
||||
```bash
|
||||
pip install -r requirements-dev.txt
|
||||
```
|
||||
|
||||
4. **Run Tests**
|
||||
Ensure that everything is set up correctly by running the tests:
|
||||
|
||||
```bash
|
||||
pytest
|
||||
```
|
||||
|
||||
## License
|
||||
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
|
|
@ -0,0 +1,25 @@
|
|||
# Notes on building the package
|
||||
|
||||
```bash
|
||||
# Get code
|
||||
git clone https://github.com/Nonannet/pyhoff.git
|
||||
cd pyhoff
|
||||
|
||||
# Setup venv
|
||||
python -m venv ./.venv
|
||||
source ./.venv/bin/activate # On Windows use `.\.venv\Scripts\activate`
|
||||
|
||||
# Check code:
|
||||
pip install -r requirements-dev.txt
|
||||
flake8
|
||||
pytest
|
||||
|
||||
# Build package:
|
||||
pip install build
|
||||
python3 -m build
|
||||
|
||||
# Upload
|
||||
pip install twine
|
||||
#python3 -m twine upload dist/*
|
||||
python3 -m twine upload --repository testpypi dist/* # Test repository: https://test.pypi.org/project/example_package_YOUR_USERNAME_HERE
|
||||
```
|
|
@ -0,0 +1,29 @@
|
|||
# WAGO IO-Error LED
|
||||
|
||||
Default credentials are: admin:wago
|
||||
|
||||
Generating a new ea-config.xml based on the currently attached terminals:
|
||||
|
||||
``` js
|
||||
await fetch("http://192.168.178.172/DOCPLFUNC", {
|
||||
"credentials": "include",
|
||||
"headers": {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.7,de;q=0.3",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Sec-GPC": "1",
|
||||
"Authorization": "Basic YWRtaW46d2Fnbw==",
|
||||
"Upgrade-Insecure-Requests": "1",
|
||||
"Priority": "u=4",
|
||||
"Pragma": "no-cache",
|
||||
"Cache-Control": "no-cache"
|
||||
},
|
||||
"referrer": "http://192.168.178.172/webserv/cplcfg/ea-config.ssi",
|
||||
"body": "DO_CREATE_EA_XML=create+ea-config.xml",
|
||||
"method": "POST",
|
||||
"mode": "cors"
|
||||
});
|
||||
```
|
||||
|
||||
Download ea-config.xml under http://192.168.178.172/etc/ea-config.xml (authorization required)
|
328
pyhoff.py
328
pyhoff.py
|
@ -1,328 +0,0 @@
|
|||
from pyModbusTCP.client import ModbusClient
|
||||
from typing import Type
|
||||
|
||||
class BusTerminal():
|
||||
output_bit_width = 0
|
||||
input_bit_width = 0
|
||||
output_word_width = 0
|
||||
input_word_width = 0
|
||||
|
||||
def __init__(self, bus_coupler: 'BusCoupler'):
|
||||
self.output_bit_offset = 0
|
||||
self.input_bit_offset = 0
|
||||
self.output_word_offset = 0
|
||||
self.output_word_offset = 0
|
||||
self.input_word_offset = 0
|
||||
self.bus_coupler = bus_coupler
|
||||
|
||||
class DigitalInputTerminal(BusTerminal):
|
||||
def read_input(self, channel: int):
|
||||
print(self.input_bit_width, channel)
|
||||
if channel < 1 or channel > self.input_bit_width:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.read_discrete_input(self.input_bit_offset + channel - 1)
|
||||
|
||||
class DigitalOutputTerminal(BusTerminal):
|
||||
def write_coil(self, channel: int, value: int):
|
||||
if channel < 1 or channel > self.output_bit_width:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.write_single_coil(self.output_bit_offset + channel - 1, value)
|
||||
|
||||
def read_coil(self, channel: int):
|
||||
if channel < 1 or channel > self.output_bit_width:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.read_coil(self.output_bit_offset + channel - 1)
|
||||
|
||||
class AnalogInputTerminal(BusTerminal):
|
||||
def read_words(self, word_offset: int, word_count: int):
|
||||
if word_offset < 0 or word_offset + word_count > self.input_word_width:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
|
||||
|
||||
def read_words_nocheck(self, word_offset: int, word_count: int):
|
||||
return self.bus_coupler.read_input_registers(self.input_word_offset + word_offset, word_count)
|
||||
|
||||
def read_word(self, word_offset: int):
|
||||
val = self.read_words(word_offset, 1)
|
||||
if val is None:
|
||||
return -999
|
||||
else:
|
||||
return val[0]
|
||||
|
||||
|
||||
class AnalogOutputTerminal(BusTerminal):
|
||||
def read_words(self, word_offset: int, word_count: int):
|
||||
if word_offset < 0 or word_offset + word_count > self.output_word_width:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.read_holding_registers(self.output_word_offset + word_offset, word_count)
|
||||
|
||||
def read_words_nocheck(self, word_offset: int, word_count: int):
|
||||
return self.bus_coupler.read_holding_registers(self.output_word_offset*0 + word_offset, word_count)
|
||||
|
||||
def read_word(self, word_offset: int):
|
||||
val = self.read_words(word_offset, 1)
|
||||
if val is None:
|
||||
return -999
|
||||
else:
|
||||
return val[0]
|
||||
|
||||
def write_word(self, word_offset: int, data: int):
|
||||
if word_offset < 0 or word_offset + 1 > self.output_word_width:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.write_single_register(self.output_word_offset + word_offset, data)
|
||||
|
||||
def write_word_nocheck(self, word_offset: int, data: int):
|
||||
return self.bus_coupler.write_single_register(self.output_word_offset*0 + word_offset, data)
|
||||
|
||||
class KL1104(DigitalInputTerminal):
|
||||
"""KL1104: 4x digital input 24 V"""
|
||||
input_bit_width = 4
|
||||
|
||||
class KL1408(DigitalInputTerminal):
|
||||
"""KL1104: 8x digital input 24 V galvanic isolated"""
|
||||
input_bit_width = 8
|
||||
|
||||
class WAGO_750_1405(DigitalInputTerminal):
|
||||
"""750-1405: 16x digital input 24 V"""
|
||||
input_bit_width = 16
|
||||
|
||||
class KL2404(DigitalOutputTerminal):
|
||||
"""KL2404: 4x digital output with 500 mA"""
|
||||
output_bit_width = 4
|
||||
|
||||
class KL2424(DigitalOutputTerminal):
|
||||
"""KL2424: 4x digital output with 2000 mA"""
|
||||
output_bit_width = 4
|
||||
|
||||
class KL2634(DigitalOutputTerminal):
|
||||
"""KL2634: 4x digital output 250 V AC, 30 V DC, 4 A"""
|
||||
output_bit_width = 4
|
||||
|
||||
class KL2408(DigitalOutputTerminal):
|
||||
"""750-530: 8x digital output with 24 V / 500 mA"""
|
||||
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
|
||||
output_bit_width = 8
|
||||
|
||||
class WAGO_750_530(DigitalOutputTerminal):
|
||||
"""750-530: 8x digital output with 24 V / 500 mA"""
|
||||
#contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8
|
||||
output_bit_width = 8
|
||||
|
||||
class KL3054(AnalogInputTerminal):
|
||||
"""KL3054: 4x analog input 4...20 mA 12 Bit single-ended"""
|
||||
#Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||
input_word_width = 8
|
||||
output_word_width = 8
|
||||
|
||||
def read_normalized(self, channel: int):
|
||||
return self.read_word(channel * 2 - 1) / 0x7FFF
|
||||
|
||||
def read_current(self, channel: int):
|
||||
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
|
||||
|
||||
class KL3042(AnalogInputTerminal):
|
||||
"""KL3042: 2x analog input 0...20 mA 12 Bit single-ended"""
|
||||
#Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
|
||||
input_word_width = 4
|
||||
output_word_width = 4
|
||||
|
||||
def read_normalized(self, channel: int):
|
||||
return self.read_word(channel*2-1) / 0x7FFF
|
||||
|
||||
def read_current(self, channel: int):
|
||||
return self.read_normalized(channel*2-1) * 20.0
|
||||
|
||||
class KL3202(AnalogInputTerminal):
|
||||
"""KL3202: 2x analog input PT100 16 Bit 3-wire"""
|
||||
#Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
|
||||
input_word_width = 4
|
||||
output_word_width = 4
|
||||
|
||||
def read_temperature(self, channel: int):
|
||||
val = self.read_word(channel*2-1)
|
||||
if val > 0x7FFF:
|
||||
return (val - 0x10000) / 10.0
|
||||
else:
|
||||
return val / 10.0
|
||||
|
||||
class KL3214(AnalogInputTerminal):
|
||||
"""KL3214: 4x analog input PT100 16 Bit 3-wire"""
|
||||
#inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
|
||||
#out: 4 x 8 Bit Control (optional)
|
||||
input_word_width = 8
|
||||
output_word_width = 8
|
||||
|
||||
def read_temperature(self, channel: int):
|
||||
val = self.read_word(channel*2-1)
|
||||
if val > 0x7FFF:
|
||||
return (val - 0x10000) / 10.0
|
||||
else:
|
||||
return val / 10.0
|
||||
|
||||
class KL4002(AnalogOutputTerminal):
|
||||
"""KL4002: 2x analog output 0...10 V 12 Bit differentiell"""
|
||||
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||
input_word_width = 4
|
||||
output_word_width = 4
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
self.write_word(channel*2-1, int(value * 0x7FFF))
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
self.set_normalized(channel, value / 10.0)
|
||||
|
||||
class KL4132(AnalogOutputTerminal):
|
||||
"""KL4002: 2x analog output ±10 V 16 bit differential"""
|
||||
#Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||
input_word_width = 4
|
||||
output_word_width = 4
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
if value >= 0:
|
||||
self.write_word(channel-1, int(value * 0x7FFF))
|
||||
else:
|
||||
self.write_word(channel-1, int(0x10000 + value * 0x7FFF ))
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
self.set_normalized(channel, value / 10.0)
|
||||
|
||||
class KL4004(AnalogOutputTerminal):
|
||||
"""KL4004: 4x analog output 0...10 V 12 Bit differentiell"""
|
||||
#Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
|
||||
input_word_width = 8
|
||||
output_word_width = 8
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
self.write_word(channel*2-1, int(value * 0x7FFF))
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
self.set_normalized(channel, value / 10.0)
|
||||
|
||||
class KL9000(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class KL9010(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class KL9100(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class KL9183(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class KL9188(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class WAGO_750_600(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class WAGO_750_602(BusTerminal):
|
||||
"""Dummy, no I/O function"""
|
||||
pass
|
||||
|
||||
class BusCoupler(ModbusClient):
|
||||
"""BusCoupler: Busskoppler ModBus TCP"""
|
||||
|
||||
def __init__(self, host: str, port: int = 502, timeout: int = 5, watchdog: float = 0):
|
||||
"""
|
||||
Constructs all the necessary attributes for the person object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
host : str
|
||||
ip or hostname of the BK9050
|
||||
port : int
|
||||
port of the modbus host
|
||||
debug : bool
|
||||
outputs modbus debug information
|
||||
timeout : float
|
||||
timeout for waiting for the device response
|
||||
watchdog : float
|
||||
time in secounds after the device sets all outputs to
|
||||
default state. A value of 0 deactivates the watchdog.
|
||||
"""
|
||||
|
||||
ModbusClient.__init__(self, host, port, auto_open=True, timeout=timeout)
|
||||
self.bus_terminals: list[BusTerminal] = list()
|
||||
self.next_output_bit_offset = 0
|
||||
self.next_input_bit_offset = 0
|
||||
self.next_output_word_offset = 0
|
||||
self.next_input_word_offset = 0
|
||||
self.init_hardware(watchdog)
|
||||
|
||||
def init_hardware(self, watchdog: float):
|
||||
pass
|
||||
|
||||
def read_discrete_input(self, address: int):
|
||||
value = self.read_discrete_inputs(address)
|
||||
if value is None:
|
||||
return False
|
||||
else:
|
||||
return value[0]
|
||||
|
||||
def read_coil(self, address):
|
||||
value = self.read_coils(address)
|
||||
if value is None:
|
||||
return False
|
||||
else:
|
||||
return value[0]
|
||||
|
||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
|
||||
for terminal_class in bus_terminals:
|
||||
nterm = terminal_class(self)
|
||||
|
||||
nterm.output_bit_offset = self.next_output_bit_offset
|
||||
nterm.input_bit_offset = self.next_input_bit_offset
|
||||
nterm.output_word_offset = self.next_output_word_offset
|
||||
nterm.input_word_offset = self.next_input_word_offset
|
||||
|
||||
self.next_output_bit_offset += terminal_class.output_bit_width
|
||||
self.next_input_bit_offset += terminal_class.input_bit_width
|
||||
self.next_output_word_offset += terminal_class.output_word_width
|
||||
self.next_input_word_offset += terminal_class.input_word_width
|
||||
|
||||
self.bus_terminals.append(nterm)
|
||||
|
||||
return self.bus_terminals
|
||||
|
||||
class ModBusBK9050(BusCoupler):
|
||||
"""BK9050: Busskoppler ModBus TCP"""
|
||||
|
||||
def init_hardware(self, watchdog: float):
|
||||
#https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
|
||||
#config watchdog on page 58
|
||||
|
||||
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
||||
self.write_single_register(0x1120, watchdog * 1000) #ms
|
||||
|
||||
#reset watchdog timer:
|
||||
self.write_single_register(0x1121, 0xBECF)
|
||||
self.write_single_register(0x1121, 0xAFFE)
|
||||
|
||||
#set process image offset
|
||||
self.next_output_word_offset = 0x0800
|
||||
|
||||
class WAGO750_352(BusCoupler):
|
||||
"""Wago 750-352: Busskoppler ModBus TCP"""
|
||||
|
||||
def init_hardware(self, watchdog: float):
|
||||
#deactivate/reset watchdog timer:
|
||||
self.write_single_register(0x1005, 0xAAAA)
|
||||
self.write_single_register(0x1005, 0x5555)
|
||||
|
||||
#set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
||||
self.write_single_register(0x1000, int(watchdog * 10))
|
||||
|
||||
if watchdog:
|
||||
#configure watchdog to reset on all functions codes
|
||||
self.write_single_register(0x1001, 0xFFFF)
|
||||
|
||||
#set process image offset
|
||||
self.next_output_word_offset = 0x0000
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
[project]
|
||||
name = "pyhoff"
|
||||
version = "1.0.0"
|
||||
authors = [
|
||||
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
|
||||
]
|
||||
description = "The pyhoff package allows easy accessing of Beckhoff and Wago terminals with python over ModBus TCP"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
classifiers = [
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/Nonannet/pyhoff"
|
||||
Issues = "https://github.com/Nonannet/pyhoff/issues"
|
||||
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = "6.0"
|
||||
addopts = "-ra -q"
|
||||
testpaths = ["tests"]
|
||||
pythonpath = ["src"]
|
|
@ -0,0 +1,2 @@
|
|||
pytest
|
||||
flake8
|
|
@ -0,0 +1,292 @@
|
|||
from .modbus import SimpleModbusClient
|
||||
from typing import Type, Any
|
||||
|
||||
|
||||
class BusTerminal():
|
||||
"""
|
||||
Base class for all bus terminals.
|
||||
|
||||
Args:
|
||||
bus_coupler: The bus coupler to which this terminal is connected.
|
||||
output_bit_offset: The offset of the output bits.
|
||||
input_bit_offset: The offset of the input bits.
|
||||
output_word_offset: The offset of the output words.
|
||||
input_word_offset: The offset of the input words.
|
||||
|
||||
Attributes:
|
||||
bus_coupler: The bus coupler to which this terminal is connected.
|
||||
parameters: The parameters of the terminal.
|
||||
"""
|
||||
parameters: dict[str, int] = {}
|
||||
|
||||
def __init__(self, bus_coupler: 'BusCoupler',
|
||||
output_bit_offset: int,
|
||||
input_bit_offset: int,
|
||||
output_word_offset: int,
|
||||
input_word_offset: int):
|
||||
|
||||
self._output_bit_offset = output_bit_offset
|
||||
self._input_bit_offset = input_bit_offset
|
||||
self._output_word_offset = output_word_offset
|
||||
self._input_word_offset = input_word_offset
|
||||
self.bus_coupler = bus_coupler
|
||||
|
||||
|
||||
class DigitalInputTerminal(BusTerminal):
|
||||
"""
|
||||
Base class for digital input terminals.
|
||||
"""
|
||||
def read_input(self, channel: int) -> bool | None:
|
||||
"""
|
||||
Read the input from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number (start counting from 1) to read from.
|
||||
|
||||
Returns:
|
||||
The input value of the specified channel or None if the read operation failed.
|
||||
|
||||
Raises:
|
||||
Exception: If the channel number is out of range.
|
||||
"""
|
||||
if channel < 1 or channel > self.parameters['input_bit_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_discrete_input(self._input_bit_offset + channel - 1)
|
||||
|
||||
|
||||
class DigitalOutputTerminal(BusTerminal):
|
||||
"""
|
||||
Base class for digital output terminals.
|
||||
"""
|
||||
def write_coil(self, channel: int, value: bool) -> bool:
|
||||
"""
|
||||
Write a value to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number (start counting from 1) to write to.
|
||||
value: The value to write.
|
||||
|
||||
Returns:
|
||||
True if the write operation succeeded, otherwise False.
|
||||
|
||||
Raises:
|
||||
Exception: If the channel number is out of range.
|
||||
"""
|
||||
if channel < 1 or channel > self.parameters['output_bit_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.write_single_coil(self._output_bit_offset + channel - 1, value)
|
||||
|
||||
def read_coil(self, channel: int) -> bool | None:
|
||||
"""
|
||||
Read the coil value back from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number (start counting from 1) to read from.
|
||||
|
||||
Returns:
|
||||
The coil value of the specified channel or None if the read operation failed.
|
||||
|
||||
Raises:
|
||||
Exception: If the channel number is out of range.
|
||||
"""
|
||||
if channel < 1 or channel > self.parameters['output_bit_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_coil(self._output_bit_offset + channel - 1)
|
||||
|
||||
|
||||
class AnalogInputTerminal(BusTerminal):
|
||||
"""
|
||||
Base class for analog input terminals.
|
||||
"""
|
||||
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
|
||||
"""
|
||||
Read a list of words from the terminal.
|
||||
|
||||
Args:
|
||||
word_offset: The starting word offset (0 based index).
|
||||
word_count: The number of words to read.
|
||||
|
||||
Returns:
|
||||
The read words.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
"""
|
||||
if word_offset < 0 or word_offset + word_count > self.parameters['input_word_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_input_registers(self._input_word_offset + word_offset, word_count)
|
||||
|
||||
def read_word(self, word_offset: int) -> int:
|
||||
"""
|
||||
Read a single word from the terminal.
|
||||
|
||||
Args:
|
||||
word_offset: The word offset (0 based index) to read from.
|
||||
|
||||
Returns:
|
||||
The read word value.
|
||||
"""
|
||||
val = self.read_words(word_offset, 1)
|
||||
if val:
|
||||
return val[0]
|
||||
else:
|
||||
return -999
|
||||
|
||||
def read_normalized(self, channel: int) -> float:
|
||||
"""
|
||||
Read a normalized value (0...1) from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The normalized value.
|
||||
"""
|
||||
return self.read_word(channel * 2 - 1) / 0x7FFF
|
||||
|
||||
|
||||
class AnalogOutputTerminal(BusTerminal):
|
||||
"""
|
||||
Base class for analog output terminals.
|
||||
"""
|
||||
def read_words(self, word_offset: int, word_count: int) -> list[int] | None:
|
||||
"""
|
||||
Read a list of words from the terminal.
|
||||
|
||||
Args:
|
||||
word_offset: The starting word offset (0 based index).
|
||||
word_count: The number of words to read.
|
||||
|
||||
Returns:
|
||||
The read words.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset or count is out of range.
|
||||
"""
|
||||
if word_offset < 0 or word_offset + word_count > self.parameters['output_word_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.read_holding_registers(self._output_word_offset + word_offset, word_count)
|
||||
|
||||
def read_word(self, word_offset: int) -> int:
|
||||
"""
|
||||
Read a single word from the terminal.
|
||||
|
||||
Args:
|
||||
word_offset: The word offset (0 based index) to read from.
|
||||
|
||||
Returns:
|
||||
The read word value.
|
||||
"""
|
||||
val = self.read_words(word_offset, 1)
|
||||
if val:
|
||||
return val[0]
|
||||
else:
|
||||
return -999
|
||||
|
||||
def write_word(self, word_offset: int, data: int) -> bool:
|
||||
"""
|
||||
Write a word to the terminal.
|
||||
|
||||
Args:
|
||||
word_offset: The word offset to write to.
|
||||
data: The data to write.
|
||||
|
||||
Returns:
|
||||
The result of the write operation.
|
||||
|
||||
Raises:
|
||||
Exception: If the word offset is out of range.
|
||||
"""
|
||||
if word_offset < 0 or word_offset + 1 > self.parameters['output_word_width']:
|
||||
raise Exception("address out of range")
|
||||
return self.bus_coupler.modbus.write_single_register(self._output_word_offset + word_offset, data)
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
"""
|
||||
Set a normalized value between 0 and 1 to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to set.
|
||||
value: The normalized value to set.
|
||||
"""
|
||||
self.write_word(channel * 2 - 1, int(value * 0x7FFF))
|
||||
|
||||
|
||||
class BusCoupler():
|
||||
"""
|
||||
Base class for ModBus TCP bus coupler
|
||||
|
||||
Args:
|
||||
host: ip or hostname of the bus coupler
|
||||
port: port of the modbus host
|
||||
debug: outputs modbus debug information
|
||||
timeout: timeout for waiting for the device response
|
||||
watchdog: time in seconds after the device sets all outputs to
|
||||
default state. A value of 0 deactivates the watchdog.
|
||||
debug: If True, debug information is printed.
|
||||
|
||||
Attributes:
|
||||
bus_terminals: A list of bus terminal classes according to the
|
||||
connected terminals.
|
||||
modbus: The underlying modbus client used for the connection.
|
||||
|
||||
Examples:
|
||||
>>> from pyhoff.devices import *
|
||||
>>> bk = BK9000('192.168.0.23', bus_terminals=[KL3202, KL9010])
|
||||
>>> t1 = bk.terminals[0].read_temperature(1)
|
||||
>>> t2 = bk.terminals[0].read_temperature(2)
|
||||
>>> print(f"Temperature ch1: {t1:.1f} °C, Temperature ch2: {t2:.1f} °C")
|
||||
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
|
||||
"""
|
||||
|
||||
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
|
||||
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
||||
|
||||
self.bus_terminals: list[Any] = list()
|
||||
self._next_output_bit_offset = 0
|
||||
self._next_input_bit_offset = 0
|
||||
self._next_output_word_offset = 0
|
||||
self._next_input_word_offset = 0
|
||||
self.modbus = SimpleModbusClient(host, port, timeout=timeout, debug=debug)
|
||||
|
||||
self.add_bus_terminals(bus_terminals)
|
||||
self._init_hardware(watchdog)
|
||||
|
||||
def _init_hardware(self, watchdog: float):
|
||||
pass
|
||||
|
||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[Any]:
|
||||
"""
|
||||
Add bus terminals to the bus coupler.
|
||||
|
||||
Args:
|
||||
bus_terminals: A list of bus terminal classes to add.
|
||||
|
||||
Returns:
|
||||
The corresponding list of bus terminal objects.
|
||||
"""
|
||||
for terminal_class in bus_terminals:
|
||||
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
|
||||
new_terminal = terminal_class(self,
|
||||
self._next_output_bit_offset,
|
||||
self._next_input_bit_offset,
|
||||
self._next_output_word_offset,
|
||||
self._next_input_word_offset)
|
||||
|
||||
self._next_output_bit_offset += terminal_class.parameters.get('output_bit_width', 0)
|
||||
self._next_input_bit_offset += terminal_class.parameters.get('input_bit_width', 0)
|
||||
self._next_output_word_offset += terminal_class.parameters.get('output_word_width', 0)
|
||||
self._next_input_word_offset += terminal_class.parameters.get('input_word_width', 0)
|
||||
|
||||
self.bus_terminals.append(new_terminal)
|
||||
|
||||
return self.bus_terminals
|
||||
|
||||
def get_error(self) -> str:
|
||||
"""
|
||||
Get the last error message.
|
||||
|
||||
Returns:
|
||||
The last error message.
|
||||
"""
|
||||
return self.modbus.last_error
|
|
@ -0,0 +1,395 @@
|
|||
from . import DigitalInputTerminal, DigitalOutputTerminal
|
||||
from . import AnalogInputTerminal, AnalogOutputTerminal
|
||||
from . import BusTerminal, BusCoupler
|
||||
|
||||
|
||||
class BK9000(BusCoupler):
|
||||
"""
|
||||
BK9000 ModBus TCP bus coupler
|
||||
"""
|
||||
def _init_hardware(self, watchdog: float):
|
||||
# https://download.beckhoff.com/download/document/io/bus-terminals/bk9000_bk9050_bk9100de.pdf
|
||||
# config watchdog on page 58
|
||||
|
||||
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
||||
self.modbus.write_single_register(0x1120, int(watchdog * 1000)) # ms
|
||||
|
||||
# reset watchdog timer:
|
||||
self.modbus.write_single_register(0x1121, 0xBECF)
|
||||
self.modbus.write_single_register(0x1121, 0xAFFE)
|
||||
|
||||
# set process image offset
|
||||
self.next_output_word_offset = 0x0800
|
||||
|
||||
|
||||
class BK9050(BK9000):
|
||||
"""
|
||||
BK9050 ModBus TCP bus coupler
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class BK9100(BK9000):
|
||||
"""
|
||||
BK9100 ModBus TCP bus coupler
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WAGO750_352(BusCoupler):
|
||||
"""
|
||||
Wago 750-352 ModBus TCP bus coupler
|
||||
"""
|
||||
def _init_hardware(self, watchdog: float):
|
||||
# deactivate/reset watchdog timer:
|
||||
self.modbus.write_single_register(0x1005, 0xAAAA)
|
||||
self.modbus.write_single_register(0x1005, 0x5555)
|
||||
|
||||
# set time-out/deactivate watchdog timer (deactivate: timeout = 0):
|
||||
self.modbus.write_single_register(0x1000, int(watchdog * 10))
|
||||
|
||||
if watchdog:
|
||||
# configure watchdog to reset on all functions codes
|
||||
self.modbus.write_single_register(0x1001, 0xFFFF)
|
||||
|
||||
# set process image offset
|
||||
self.next_output_word_offset = 0x0000
|
||||
|
||||
|
||||
class DigitalInputTerminal4Bit(DigitalInputTerminal):
|
||||
"""
|
||||
Generic 4 bit input terminal
|
||||
"""
|
||||
parameters = {'input_bit_width': 4}
|
||||
|
||||
|
||||
class DigitalInputTerminal8Bit(DigitalInputTerminal):
|
||||
"""
|
||||
Generic 8 bit input terminal
|
||||
"""
|
||||
parameters = {'input_bit_width': 8}
|
||||
|
||||
|
||||
class DigitalInputTerminal16Bit(DigitalInputTerminal):
|
||||
"""
|
||||
Generic 16 bit input terminal
|
||||
"""
|
||||
parameters = {'input_bit_width': 16}
|
||||
|
||||
|
||||
class DigitalOutputTerminal4Bit(DigitalOutputTerminal):
|
||||
"""
|
||||
Generic 4 bit output terminal
|
||||
"""
|
||||
parameters = {'output_bit_width': 4}
|
||||
|
||||
|
||||
class DigitalOutputTerminal8Bit(DigitalOutputTerminal):
|
||||
"""
|
||||
Generic 8 bit output terminal
|
||||
"""
|
||||
parameters = {'output_bit_width': 8}
|
||||
|
||||
|
||||
class DigitalOutputTerminal16Bit(DigitalOutputTerminal):
|
||||
"""
|
||||
Generic 16 bit output terminal
|
||||
"""
|
||||
parameters = {'output_bit_width': 16}
|
||||
|
||||
|
||||
class KL1104(DigitalInputTerminal4Bit):
|
||||
"""
|
||||
KL1104: 4x digital input 24 V
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL1408(DigitalInputTerminal8Bit):
|
||||
"""
|
||||
KL1104: 8x digital input 24 V galvanic isolated
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WAGO_750_1405(DigitalInputTerminal16Bit):
|
||||
"""
|
||||
750-1405: 16x digital input 24 V
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL2404(DigitalOutputTerminal4Bit):
|
||||
"""
|
||||
KL2404: 4x digital output with 500 mA
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL2424(DigitalOutputTerminal4Bit):
|
||||
"""
|
||||
KL2424: 4x digital output with 2000 mA
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL2634(DigitalOutputTerminal4Bit):
|
||||
"""
|
||||
KL2634: 4x digital output 250 V AC, 30 V DC, 4 A
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL2408(DigitalOutputTerminal8Bit):
|
||||
"""
|
||||
750-530: 8x digital output with 24 V / 500 mA
|
||||
|
||||
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WAGO_750_530(DigitalOutputTerminal8Bit):
|
||||
"""
|
||||
750-530: 8x digital output with 24 V / 500 mA
|
||||
|
||||
Contact order for DO1 to DO8 is: 1, 5, 2, 6, 3, 7, 4, 8.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL1512(AnalogInputTerminal):
|
||||
"""
|
||||
KL1512: 2x digital input, counter, 24 V DC, 1 kHz
|
||||
"""
|
||||
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def __init__(self, bus_coupler: BusCoupler, output_bit_offset: int, input_bit_offset: int, output_word_offset: int, input_word_offset: int):
|
||||
super().__init__(bus_coupler, output_bit_offset, input_bit_offset, output_word_offset, input_word_offset)
|
||||
self._last_counter_values = [self.read_word(1 * 2 - 1), self.read_word(2 * 2 - 1)]
|
||||
|
||||
def read_counter(self, channel: int) -> int:
|
||||
"""
|
||||
Read the absolut counter value of a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The counter value.
|
||||
"""
|
||||
|
||||
return self.read_word(channel * 2 - 1)
|
||||
|
||||
def read_delta(self, channel: int) -> int:
|
||||
"""
|
||||
Read the counter change since last read of a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The counter value.
|
||||
"""
|
||||
# TODO: handel overflow
|
||||
new_count = self.read_word(channel * 2 - 1)
|
||||
return new_count - self._last_counter_values[channel - 1]
|
||||
|
||||
|
||||
class KL3054(AnalogInputTerminal):
|
||||
"""
|
||||
KL3054: 4x analog input 4...20 mA 12 Bit single-ended
|
||||
"""
|
||||
# Input: 4 x 16 Bit Daten (optional 4x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||
|
||||
def read_current(self, channel: int) -> float:
|
||||
"""
|
||||
Read the current value from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The current value.
|
||||
"""
|
||||
return self.read_normalized(channel * 2 - 1) * 16.0 + 4.0
|
||||
|
||||
|
||||
class KL3042(AnalogInputTerminal):
|
||||
"""
|
||||
KL3042: 2x analog input 0...20 mA 12 Bit single-ended
|
||||
"""
|
||||
# Input: 2 x 16 Bit Daten (optional 2x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def read_current(self, channel: int) -> float:
|
||||
"""
|
||||
Read the current value from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The current value.
|
||||
"""
|
||||
return self.read_normalized(channel * 2 - 1) * 20.0
|
||||
|
||||
|
||||
class KL3202(AnalogInputTerminal):
|
||||
"""
|
||||
KL3202: 2x analog input PT100 16 Bit 3-wire
|
||||
"""
|
||||
# Input: 2 x 16 Bit Daten (2 x 8 Bit Control/Status optional)
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def read_temperature(self, channel: int) -> float:
|
||||
"""
|
||||
Read the temperature value from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The temperature value in °C.
|
||||
"""
|
||||
val = self.read_word(channel * 2 - 1)
|
||||
if val > 0x7FFF:
|
||||
return (val - 0x10000) / 10.0
|
||||
else:
|
||||
return val / 10.0
|
||||
|
||||
|
||||
class KL3214(AnalogInputTerminal):
|
||||
"""
|
||||
KL3214: 4x analog input PT100 16 Bit 3-wire
|
||||
"""
|
||||
# inp: 4 x 16 Bit Daten, 4 x 8 Bit Status (optional)
|
||||
# out: 4 x 8 Bit Control (optional)
|
||||
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||
|
||||
def read_temperature(self, channel: int) -> float:
|
||||
"""
|
||||
Read the temperature value from a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to read from.
|
||||
|
||||
Returns:
|
||||
The temperature value.
|
||||
"""
|
||||
val = self.read_word(channel * 2 - 1)
|
||||
if val > 0x7FFF:
|
||||
return (val - 0x10000) / 10.0
|
||||
else:
|
||||
return val / 10.0
|
||||
|
||||
|
||||
class KL4002(AnalogOutputTerminal):
|
||||
"""
|
||||
KL4002: 2x analog output 0...10 V 12 Bit differentiell
|
||||
"""
|
||||
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
"""
|
||||
Set a voltage value to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to set.
|
||||
value: The voltage value to set.
|
||||
"""
|
||||
self.set_normalized(channel, value / 10.0)
|
||||
|
||||
|
||||
class KL4132(AnalogOutputTerminal):
|
||||
"""
|
||||
KL4002: 2x analog output ±10 V 16 bit differential
|
||||
"""
|
||||
# Output: 2 x 16 Bit Daten (optional 2 x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 4, 'output_word_width': 4}
|
||||
|
||||
def set_normalized(self, channel: int, value: float):
|
||||
"""
|
||||
Set a normalized value between -1 and +1 to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to set.
|
||||
value: The normalized value to set.
|
||||
"""
|
||||
if value >= 0:
|
||||
self.write_word(channel - 1, int(value * 0x7FFF))
|
||||
else:
|
||||
self.write_word(channel - 1, int(0x10000 + value * 0x7FFF))
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
"""
|
||||
Set a voltage value between -10 and +10 V to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to set.
|
||||
value: The voltage value to set.
|
||||
"""
|
||||
self.set_normalized(channel, value / 10.0)
|
||||
|
||||
|
||||
class KL4004(AnalogOutputTerminal):
|
||||
"""
|
||||
KL4004: 4x analog output 0...10 V 12 Bit differentiell
|
||||
"""
|
||||
# Output: 4 x 16 Bit Daten (optional 4 x 8 Bit Control/Status)
|
||||
parameters = {'input_word_width': 8, 'output_word_width': 8}
|
||||
|
||||
def set_voltage(self, channel: int, value: float):
|
||||
"""
|
||||
Set a voltage value to a specific channel.
|
||||
|
||||
Args:
|
||||
channel: The channel number to set.
|
||||
value: The voltage value to set.
|
||||
"""
|
||||
self.set_normalized(channel, value / 10.0)
|
||||
|
||||
|
||||
class KL9010(BusTerminal):
|
||||
"""
|
||||
End terminal, no I/O function
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL9100(BusTerminal):
|
||||
"""
|
||||
Potential supply terminal, no I/O function
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL9183(BusTerminal):
|
||||
"""
|
||||
Potential distribution terminal, no I/O function
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class KL9188(BusTerminal):
|
||||
"""
|
||||
Potential distribution terminal, no I/O function
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WAGO_750_600(BusTerminal):
|
||||
"""
|
||||
End terminal, no I/O function
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WAGO_750_602(BusTerminal):
|
||||
"""
|
||||
Potential supply terminal, no I/O function
|
||||
"""
|
||||
pass
|
|
@ -0,0 +1,517 @@
|
|||
import socket
|
||||
import struct
|
||||
import random
|
||||
|
||||
_READ_COILS = 0x01
|
||||
_READ_DISCRETE_INPUTS = 0x02
|
||||
_READ_HOLDING_REGISTERS = 0x03
|
||||
_READ_INPUT_REGISTERS = 0x04
|
||||
_WRITE_SINGLE_COIL = 0x05
|
||||
_WRITE_SINGLE_REGISTER = 0x06
|
||||
_WRITE_MULTIPLE_COILS = 0x0F
|
||||
_WRITE_MULTIPLE_REGISTERS = 0x10
|
||||
|
||||
_modbus_exceptions = {
|
||||
0x01: 'illegal function',
|
||||
0x02: 'illegal data address',
|
||||
0x03: 'illegal data value',
|
||||
0x04: 'slave device failure',
|
||||
0x05: 'acknowledge',
|
||||
0x06: 'slave device busy',
|
||||
0x07: 'negative acknowledge',
|
||||
0x08: 'memory parity error',
|
||||
0x0A: 'gateway path unavailable',
|
||||
0x0B: 'gateway target device failed to respond'
|
||||
}
|
||||
|
||||
|
||||
def _get_bits(data: bytes, bit_number: int) -> list[bool]:
|
||||
return [bool(data[i // 8] >> (i % 8) & 0x01)
|
||||
for i in range(bit_number)]
|
||||
|
||||
|
||||
def _get_words(data: bytes) -> list[int]:
|
||||
return [(data[i * 2] << 8) + data[i * 2 + 1]
|
||||
for i in range(len(data) // 2)]
|
||||
|
||||
|
||||
def _from_bits(values: list[bool]) -> bytes:
|
||||
return bytes(sum(((1 << j) * bool(values[8 * i + j]))
|
||||
for j in range(8)) for i in range(len(values) // 8))
|
||||
|
||||
|
||||
def _from_words(values: list[int]) -> bytes:
|
||||
return b''.join(word.to_bytes(2, byteorder='big') for word in values)
|
||||
|
||||
|
||||
class SimpleModbusClient:
|
||||
"""
|
||||
A simple Modbus TCP client
|
||||
|
||||
Args:
|
||||
host: hostname or IP address
|
||||
port: server port
|
||||
unit_id: ModBus id
|
||||
timeout: socket timeout in seconds
|
||||
debug: if True prints out transmitted and received bytes in hex
|
||||
|
||||
Attributes:
|
||||
host: hostname or IP address
|
||||
port: server port
|
||||
unit_id: ModBus id
|
||||
timeout: socket timeout in seconds
|
||||
last_error: contains last error message or empty string if no error occurred
|
||||
debug: if True prints out transmitted and received bytes in hex
|
||||
|
||||
Example:
|
||||
>>> client = SimpleModbusClient('localhost', port = 502, unit_id = 1)
|
||||
>>> print(client.read_coils(0, 10))
|
||||
>>> print(client.read_discrete_inputs(0, 10))
|
||||
>>> print(client.read_holding_registers(0, 10))
|
||||
>>> print(client.read_input_registers(0, 10))
|
||||
>>> print(client.write_single_coil(0, True))
|
||||
>>> print(client.write_single_register(0, 1234))
|
||||
>>> print(client.write_multiple_coils(0, [True, False, True]))
|
||||
>>> print(client.write_multiple_registers(0, [1234, 5678]))
|
||||
>>> client.close()
|
||||
"""
|
||||
def __init__(self, host: str, port: int = 502, unit_id: int = 1, timeout: float = 5, debug: bool = False):
|
||||
assert 0 <= unit_id < 256
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.unit_id = unit_id
|
||||
self.timeout = timeout
|
||||
self.last_error = ''
|
||||
self._transaction_id = random.randint(0, 0xFFFF)
|
||||
self._socket: None | socket.socket = None
|
||||
self.debug = debug
|
||||
|
||||
def connect(self):
|
||||
for af, st, pr, _, sa in socket.getaddrinfo(self.host, self.port,
|
||||
socket.AF_UNSPEC,
|
||||
socket.SOCK_STREAM):
|
||||
try:
|
||||
self._socket = socket.socket(af, st, pr)
|
||||
except socket.error:
|
||||
self.close()
|
||||
continue
|
||||
try:
|
||||
self._socket.settimeout(self.timeout)
|
||||
self._socket.connect(sa)
|
||||
except socket.error:
|
||||
self.close()
|
||||
continue
|
||||
break
|
||||
|
||||
if self._socket:
|
||||
return True
|
||||
else:
|
||||
self.last_error = 'connection failed'
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close connection
|
||||
|
||||
Returns:
|
||||
empty bytes object
|
||||
"""
|
||||
if self._socket:
|
||||
self._socket.close()
|
||||
self._socket = None
|
||||
|
||||
return bytes()
|
||||
|
||||
def read_coils(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
|
||||
"""
|
||||
ModBus function for reading coils (0x01)
|
||||
|
||||
Args:
|
||||
bit_address: Bit address (0 to 0xffff)
|
||||
bit_lengths: Number of bits to read (1 to 2000)
|
||||
|
||||
Returns:
|
||||
list of bool or None: Bits list or None if error
|
||||
"""
|
||||
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
|
||||
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
|
||||
|
||||
if not self.send_modbus_data(_READ_COILS, _from_words([bit_address, bit_lengths])):
|
||||
return None
|
||||
|
||||
rx_data = self.receive_modbus_data()
|
||||
if not rx_data:
|
||||
return None
|
||||
|
||||
if len(rx_data) < 2:
|
||||
self.last_error = 'received frame under min size'
|
||||
return None
|
||||
|
||||
byte_count = rx_data[0]
|
||||
bit_data = rx_data[1:]
|
||||
|
||||
if not (byte_count * 8 >= bit_lengths and
|
||||
byte_count == len(bit_data)):
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return None
|
||||
|
||||
return _get_bits(bit_data, bit_lengths)
|
||||
|
||||
def read_discrete_inputs(self, bit_address: int, bit_lengths: int = 1) -> list[bool] | None:
|
||||
"""
|
||||
ModBus function for reading discrete inputs (0x02)
|
||||
|
||||
Args:
|
||||
bit_address: Bit address (0 to 0xffff)
|
||||
bit_lengths: Number of bits to read (1 to 2000)
|
||||
|
||||
Returns:
|
||||
list of bool or None: Bits list or None if error
|
||||
"""
|
||||
assert 1 <= bit_lengths <= 2000, 'bit_lengths out of range'
|
||||
assert bit_address + bit_lengths <= 0xffff, 'read after address 0xffff'
|
||||
|
||||
if not self.send_modbus_data(_READ_DISCRETE_INPUTS, _from_words([bit_address, bit_lengths])):
|
||||
return None
|
||||
|
||||
rx_data = self.receive_modbus_data()
|
||||
if not rx_data:
|
||||
return None
|
||||
|
||||
if len(rx_data) < 2:
|
||||
self.last_error = 'received frame under minimum size'
|
||||
return None
|
||||
|
||||
byte_count = rx_data[0]
|
||||
bit_data = rx_data[1:]
|
||||
|
||||
if not (byte_count * 8 >= bit_lengths and
|
||||
byte_count == len(bit_data)):
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return None
|
||||
|
||||
return _get_bits(bit_data, bit_lengths)
|
||||
|
||||
def read_holding_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
|
||||
"""
|
||||
ModBus function for reading holding registers (0x03)
|
||||
|
||||
Args:
|
||||
register_address: Register address (0 to 0xffff)
|
||||
word_lengths: Number of registers to read (1 to 125)
|
||||
|
||||
Returns:
|
||||
list of int or None: Registers list or None if error
|
||||
"""
|
||||
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
|
||||
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
|
||||
|
||||
if not self.send_modbus_data(_READ_HOLDING_REGISTERS, _from_words([register_address, word_lengths])):
|
||||
return None
|
||||
|
||||
rx_data = self.receive_modbus_data()
|
||||
if not rx_data:
|
||||
return None
|
||||
|
||||
if len(rx_data) < 2:
|
||||
self.last_error = 'received frame under minimum size'
|
||||
return None
|
||||
|
||||
byte_count = rx_data[0]
|
||||
reg_data = rx_data[1:]
|
||||
|
||||
if not (byte_count == 2 * word_lengths and
|
||||
byte_count == len(reg_data)):
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return None
|
||||
|
||||
return _get_words(reg_data)
|
||||
|
||||
def read_input_registers(self, register_address: int, word_lengths: int = 1) -> list[int] | None:
|
||||
"""
|
||||
ModBus function for reading input registers (0x04)
|
||||
|
||||
Args:
|
||||
register_address: Register address (0 to 0xffff)
|
||||
word_lengths: Number of registers to read (1 to 125)
|
||||
|
||||
Returns:
|
||||
list of int or None: Registers list or None if error
|
||||
"""
|
||||
assert 1 <= word_lengths <= 125, 'word_lengths out of range'
|
||||
assert register_address + word_lengths <= 0xffff, 'read after address 0xffff'
|
||||
|
||||
if not self.send_modbus_data(_READ_INPUT_REGISTERS, _from_words([register_address, word_lengths])):
|
||||
return None
|
||||
|
||||
rx_data = self.receive_modbus_data()
|
||||
if not rx_data:
|
||||
return None
|
||||
|
||||
if len(rx_data) < 2:
|
||||
self.last_error = 'received frame under minimum size'
|
||||
return None
|
||||
|
||||
byte_count = rx_data[0]
|
||||
reg_data = rx_data[1:]
|
||||
|
||||
if not (byte_count == 2 * word_lengths and
|
||||
byte_count == len(reg_data)):
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return None
|
||||
|
||||
return _get_words(reg_data)
|
||||
|
||||
def write_single_coil(self, bit_address: int, value: bool) -> bool:
|
||||
"""
|
||||
ModBus function for writing a single coil (0x05)
|
||||
|
||||
Args:
|
||||
bit_address: Bit address (0 to 0xffff)
|
||||
value: Value to write (single bit)
|
||||
|
||||
Returns:
|
||||
True if write succeeded or False if failed
|
||||
"""
|
||||
assert 0 <= bit_address <= 0xffff, 'bit_address out of range'
|
||||
|
||||
tx_data = _from_words([bit_address, 0xFF00 * bool(value)])
|
||||
if not self.send_modbus_data(_WRITE_SINGLE_COIL, tx_data):
|
||||
return False
|
||||
|
||||
data = self.receive_modbus_data()
|
||||
if not data:
|
||||
return False
|
||||
|
||||
if len(data) != 4:
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return False
|
||||
|
||||
return data == tx_data
|
||||
|
||||
def read_discrete_input(self, address: int) -> bool | None:
|
||||
"""
|
||||
Read a discrete input from the given register address.
|
||||
|
||||
Args:
|
||||
address: The register address to read from.
|
||||
|
||||
Returns:
|
||||
The value of the discrete input.
|
||||
"""
|
||||
value = self.read_discrete_inputs(address)
|
||||
if value:
|
||||
return value[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def read_coil(self, address: int) -> bool | None:
|
||||
"""
|
||||
Read a coil from the given register address.
|
||||
|
||||
Args:
|
||||
address: The register address to read from.
|
||||
|
||||
Returns:
|
||||
The value of the coil.
|
||||
"""
|
||||
value = self.read_coils(address)
|
||||
if value:
|
||||
return value[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def write_single_register(self, register_address: int, value: int) -> bool:
|
||||
"""
|
||||
ModBus function for writing a single register (0x06)
|
||||
|
||||
Args:
|
||||
register_address: Register address (0 to 0xffff)
|
||||
value: Value to write (0 to 0xffff)
|
||||
|
||||
Returns:
|
||||
True if write succeeded or False if failed
|
||||
"""
|
||||
assert 0 <= register_address <= 0xffff, 'register_address out of range'
|
||||
assert 0 <= value <= 0xffff, 'value out of range 0 to 0xffff'
|
||||
|
||||
tx_data = _from_words([register_address, value])
|
||||
if not self.send_modbus_data(_WRITE_SINGLE_REGISTER, tx_data):
|
||||
return False
|
||||
|
||||
data = self.receive_modbus_data()
|
||||
if not data:
|
||||
return False
|
||||
|
||||
if len(data) != 4:
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return False
|
||||
|
||||
return data == tx_data
|
||||
|
||||
def write_multiple_coils(self, bit_address: int, values: list[bool]) -> bool:
|
||||
"""
|
||||
ModBus function for writing multiple coils (0x0F)
|
||||
|
||||
Args:
|
||||
bit_address: Bit address (0 to 0xffff)
|
||||
values: List of bit values to write
|
||||
|
||||
Returns:
|
||||
True if write succeeded or False if failed
|
||||
"""
|
||||
assert bit_address + len(values) <= 0xffff, 'bit_address out of range'
|
||||
assert 1 <= len(values) <= 2000, 'number values must be from 1 to 2000'
|
||||
|
||||
byte_count = (len(values) + 7) // 8
|
||||
tx_data = struct.pack('>HHB', bit_address, len(values), byte_count) + _from_bits(values)
|
||||
if not self.send_modbus_data(_WRITE_MULTIPLE_COILS, tx_data):
|
||||
return False
|
||||
|
||||
data = self.receive_modbus_data()
|
||||
if not data:
|
||||
return False
|
||||
|
||||
if len(data) != 4:
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return False
|
||||
|
||||
return _get_words(data[0:1])[0] == bit_address
|
||||
|
||||
def write_multiple_registers(self, register_address: int, values: list[int]) -> bool:
|
||||
"""
|
||||
ModBus function for writing multiple registers (0x10)
|
||||
|
||||
Args:
|
||||
register_address: Register address (0 to 0xffff)
|
||||
values: List of 16 bit register values to write
|
||||
|
||||
Returns:
|
||||
True if write succeeded or False if failed
|
||||
"""
|
||||
assert register_address + len(values) <= 0xffff, 'register_address out of range'
|
||||
assert max(values) <= 0xffff, 'value out of range 0 to 0xffff'
|
||||
assert min(values) >= 0, 'value out of range 0 to 0xffff'
|
||||
|
||||
byte_count = len(values) * 2
|
||||
tx_data = struct.pack('>HHB', register_address, len(values), byte_count) + _from_words(values)
|
||||
if not self.send_modbus_data(_WRITE_MULTIPLE_REGISTERS, tx_data):
|
||||
return False
|
||||
|
||||
data = self.receive_modbus_data()
|
||||
if not data:
|
||||
return False
|
||||
|
||||
if len(data) != 4:
|
||||
self.last_error = 'received frame size mismatch'
|
||||
return False
|
||||
|
||||
return _get_words(data[0:1])[0] == register_address
|
||||
|
||||
def _recv(self, number_of_bytes: int) -> bytes:
|
||||
"""
|
||||
Receive data over tcp, wait until all specified bytes are received
|
||||
|
||||
Args:
|
||||
number_of_bytes: Number of bytes to receive
|
||||
|
||||
Returns:
|
||||
returns received bytes or empty an bytes object if an error occurred
|
||||
"""
|
||||
if not self._socket:
|
||||
return bytes()
|
||||
|
||||
buffer = bytes()
|
||||
while len(buffer) < number_of_bytes:
|
||||
try:
|
||||
buffer += self._socket.recv(number_of_bytes - len(buffer))
|
||||
except socket.error:
|
||||
return bytes()
|
||||
|
||||
if self.debug:
|
||||
print(f'<- Received: {' '.join(hex(b) for b in buffer)}')
|
||||
|
||||
return buffer
|
||||
|
||||
def _send(self, data: bytes) -> int:
|
||||
"""
|
||||
Send data over tcp
|
||||
|
||||
Args:
|
||||
data: data to send
|
||||
|
||||
Returns:
|
||||
number of transmitted bytes or 0 if transmission failed
|
||||
"""
|
||||
|
||||
for _ in range(2):
|
||||
if self._socket:
|
||||
try:
|
||||
self._socket.sendall(data)
|
||||
if self.debug:
|
||||
print(f'-> Send: {' '.join(hex(b) for b in data)}')
|
||||
return len(data)
|
||||
except socket.error:
|
||||
self.last_error = 'sending data failed'
|
||||
self.close()
|
||||
self.connect()
|
||||
else:
|
||||
self.connect()
|
||||
|
||||
return 0
|
||||
|
||||
def send_modbus_data(self, function_code: int, body: bytes) -> int:
|
||||
"""
|
||||
Send raw ModBus TCP frame
|
||||
|
||||
Args:
|
||||
unction_code: ModBus function code
|
||||
body: data
|
||||
|
||||
Returns:
|
||||
number of transmitted bytes or 0 if transmission failed
|
||||
"""
|
||||
self._transaction_id = (self._transaction_id + 1) % 0x10000
|
||||
protocol_identifier = 0
|
||||
length = len(body) + 2
|
||||
header = struct.pack('>HHHBB', self._transaction_id,
|
||||
protocol_identifier, length, self.unit_id,
|
||||
function_code)
|
||||
frame = header + body
|
||||
|
||||
return self._send(frame)
|
||||
|
||||
def receive_modbus_data(self) -> bytes:
|
||||
"""
|
||||
Receive a ModBus frame
|
||||
|
||||
Returns:
|
||||
bytes received or empty bytes object if an error occurred
|
||||
"""
|
||||
header = self._recv(7)
|
||||
if not header:
|
||||
self.last_error = 'receiving return frame failed'
|
||||
return self.close()
|
||||
|
||||
transaction_id, protocol_identifier, length, unit_id =\
|
||||
struct.unpack('>HHHB', header)
|
||||
|
||||
if not ((transaction_id == self._transaction_id) and
|
||||
(protocol_identifier == 0) and
|
||||
(unit_id == self.unit_id) and
|
||||
(length <= 0xFF) and
|
||||
(transaction_id == self._transaction_id)):
|
||||
self.last_error = 'received frame is invalid'
|
||||
return self.close()
|
||||
|
||||
data = self._recv(length - 1)
|
||||
if not data:
|
||||
self.last_error = 'receiving data payload failed'
|
||||
return self.close()
|
||||
|
||||
if data[0] > 0x80:
|
||||
self.last_error = f'return error: {_modbus_exceptions.get(data[1], '')} ({data[1]})'
|
||||
return bytes()
|
||||
|
||||
return data[1:]
|
|
@ -0,0 +1,36 @@
|
|||
import unittest
|
||||
from pyhoff.modbus import _get_bits, _get_words, _from_bits, _from_words
|
||||
|
||||
|
||||
class TestModbusFunctions(unittest.TestCase):
|
||||
|
||||
def test_get_bits(self):
|
||||
data = bytes([0b11101010, 0b11010101])
|
||||
bit_number = 16
|
||||
expected = [False, True, False, True, False, True, True, True,
|
||||
True, False, True, False, True, False, True, True]
|
||||
result = _get_bits(data, bit_number)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_get_words(self):
|
||||
data = bytes([0x12, 0x34, 0x56, 0x78])
|
||||
expected = [0x1234, 0x5678]
|
||||
result = _get_words(data)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_from_bits(self):
|
||||
values = [False, True, False, True, False, True, True, True,
|
||||
True, False, True, False, True, False, True, True]
|
||||
expected = bytes([0b11101010, 0b11010101])
|
||||
result = _from_bits(values)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_from_words(self):
|
||||
values = [0x1234, 0x5678]
|
||||
expected = bytes([0x12, 0x34, 0x56, 0x78])
|
||||
result = _from_words(values)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -0,0 +1,61 @@
|
|||
import inspect
|
||||
import pyhoff as pyhoff
|
||||
import pyhoff.devices as devices
|
||||
from pyhoff.devices import DigitalInputTerminal, DigitalOutputTerminal, AnalogInputTerminal, AnalogOutputTerminal
|
||||
|
||||
|
||||
def test_terminal_plausib():
|
||||
"""
|
||||
Test if all implemented BusTerminal classes in devices
|
||||
have the plausible parameters
|
||||
"""
|
||||
|
||||
for n, o in inspect.getmembers(devices):
|
||||
if inspect.isclass(o) and o not in [DigitalInputTerminal,
|
||||
DigitalOutputTerminal,
|
||||
AnalogInputTerminal,
|
||||
AnalogOutputTerminal]:
|
||||
print('Terminal: ' + n)
|
||||
if issubclass(o, DigitalInputTerminal):
|
||||
assert o.parameters.get('input_bit_width', 0) > 0
|
||||
assert o.parameters.get('output_bit_width', 0) == 0
|
||||
assert o.parameters.get('input_word_width', 0) == 0
|
||||
assert o.parameters.get('output_word_width', 0) == 0
|
||||
|
||||
if issubclass(o, DigitalOutputTerminal):
|
||||
assert o.parameters.get('input_bit_width', 0) == 0
|
||||
assert o.parameters.get('output_bit_width', 0) > 0
|
||||
assert o.parameters.get('input_word_width', 0) == 0
|
||||
assert o.parameters.get('output_word_width', 0) == 0
|
||||
|
||||
if issubclass(o, AnalogInputTerminal):
|
||||
assert o.parameters.get('input_bit_width', 0) == 0
|
||||
assert o.parameters.get('output_bit_width', 0) == 0
|
||||
assert o.parameters.get('input_word_width', 0) > 0
|
||||
|
||||
if issubclass(o, AnalogOutputTerminal):
|
||||
assert o.parameters.get('input_bit_width', 0) == 0
|
||||
assert o.parameters.get('output_bit_width', 0) == 0
|
||||
assert o.parameters.get('output_word_width', 0) > 0
|
||||
|
||||
|
||||
def test_terminal_setup():
|
||||
"""
|
||||
Test if all implemented BusTerminal classes in devices can
|
||||
be instantiated and connected to a bus coupler
|
||||
"""
|
||||
|
||||
terminal_classes: list[type[pyhoff.BusTerminal]] = []
|
||||
for n, o in inspect.getmembers(devices):
|
||||
if inspect.isclass(o) and o not in [DigitalInputTerminal,
|
||||
DigitalOutputTerminal,
|
||||
AnalogInputTerminal,
|
||||
AnalogOutputTerminal]:
|
||||
if issubclass(o, pyhoff.BusTerminal):
|
||||
print(n)
|
||||
terminal_classes.append(o)
|
||||
|
||||
bus_cupler = devices.BK9050('localhost', 11255, terminal_classes, timeout=0.001)
|
||||
|
||||
assert len(terminal_classes) == len(bus_cupler.bus_terminals)
|
||||
assert bus_cupler.get_error() == 'connection failed', bus_cupler.get_error()
|
Loading…
Reference in New Issue