mirror of https://github.com/Nonannet/pyhoff.git
Compare commits
8 Commits
228b1e7a65
...
3f13f8c2be
Author | SHA1 | Date |
---|---|---|
|
3f13f8c2be | |
|
7e7be77bac | |
|
7310e13a50 | |
|
258e9e858f | |
|
db3bf0d7de | |
|
27514a8438 | |
|
ad9398e078 | |
|
e82592cc56 |
|
@ -5,7 +5,7 @@ authors:
|
||||||
- family-names: Kruse
|
- family-names: Kruse
|
||||||
given-names: Nicolas
|
given-names: Nicolas
|
||||||
orcid: "https://orcid.org/0000-0001-6758-2269"
|
orcid: "https://orcid.org/0000-0001-6758-2269"
|
||||||
version: 1.0.1
|
version: 1.0.2
|
||||||
#date-released: "2025-04-01"
|
#date-released: "2025-04-01"
|
||||||
#identifiers:
|
#identifiers:
|
||||||
# - description: This is the collection of archived snapshots of all versions of My Research Software
|
# - description: This is the collection of archived snapshots of all versions of My Research Software
|
||||||
|
|
26
README.md
26
README.md
|
@ -34,26 +34,24 @@ It is easy to use as the following example code shows:
|
||||||
from pyhoff.devices import *
|
from pyhoff.devices import *
|
||||||
|
|
||||||
# connect to the BK9050 by tcp/ip on default port 502
|
# connect to the BK9050 by tcp/ip on default port 502
|
||||||
bus_coupler = BK9050("172.16.17.1")
|
bk = BK9050("172.16.17.1")
|
||||||
|
|
||||||
# list of all bus terminals connected to the bus coupler
|
# add all bus terminals connected to the bus coupler
|
||||||
# in the order of the physical arrangement
|
# in the order of the physical arrangement
|
||||||
terminal_list = [KL2404, KL2424, KL9100, KL1104, KL3202,
|
bk.add_bus_terminals(KL2404, KL2424, KL9100, KL1104, KL3202,
|
||||||
KL4002, KL9188, KL3054, KL3214, KL4004,
|
KL3202, KL4002, KL9188, KL3054, KL3214,
|
||||||
KL9010]
|
KL4004, KL9010)
|
||||||
|
|
||||||
terminals = bus_coupler.add_bus_terminals(terminal_list)
|
# Set 1. output of the first KL2404-type bus terminal to hi
|
||||||
|
KL2404.select(bk, 0).write_coil(1, True)
|
||||||
|
|
||||||
# Set 1. output of the first bus terminal (KL2404) to hi
|
# read temperature from the 2. channel of the 2. KL3202-type
|
||||||
terminals[0].write_coil(1, True)
|
# bus terminal
|
||||||
|
t = KL3202.select(bk, 1).read_temperature(2)
|
||||||
# read the temperature from the 2. channel of the 5. bus
|
|
||||||
# terminal (KL3202)
|
|
||||||
t = terminals[4].read_temperature(2)
|
|
||||||
print(f"t = {t:.1f} °C")
|
print(f"t = {t:.1f} °C")
|
||||||
|
|
||||||
# Set 1. output of the 6. bus terminal (KL4002) to 4.2 V
|
# Set 1. output of the 1. KL4002-type bus terminal to 4.2 V
|
||||||
terminals[5].set_voltage(1, 4.2)
|
KL4002.select(bk, 0).set_voltage(1, 4.2)
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -20,10 +20,10 @@ pytest
|
||||||
|
|
||||||
# Build package:
|
# Build package:
|
||||||
pip install build
|
pip install build
|
||||||
python3 -m build
|
python -m build
|
||||||
|
|
||||||
# Upload
|
# Upload
|
||||||
pip install twine
|
pip install twine
|
||||||
#python3 -m twine upload dist/*
|
#python3 -m twine upload dist/*
|
||||||
python3 -m twine upload --repository testpypi dist/* # Test repository: https://test.pypi.org/project/example_package_YOUR_USERNAME_HERE
|
python -m twine upload dist/*
|
||||||
```
|
```
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[project]
|
[project]
|
||||||
name = "pyhoff"
|
name = "pyhoff"
|
||||||
version = "1.0.1"
|
version = "1.0.2"
|
||||||
authors = [
|
authors = [
|
||||||
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
|
{ name="Nicolas Kruse", email="nicolas.kruse@nonan.net" },
|
||||||
]
|
]
|
||||||
|
|
|
@ -1,5 +1,12 @@
|
||||||
from .modbus import SimpleModbusClient
|
from .modbus import SimpleModbusClient
|
||||||
from typing import Type
|
from typing import Type, Iterable
|
||||||
|
|
||||||
|
|
||||||
|
def _is_bus_terminal(bt_type: Type['BusTerminal']) -> bool:
|
||||||
|
if BusTerminal.__name__ == bt_type.__name__:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return any(_is_bus_terminal(b) for b in bt_type.__bases__)
|
||||||
|
|
||||||
|
|
||||||
class BusTerminal():
|
class BusTerminal():
|
||||||
|
@ -33,6 +40,13 @@ class BusTerminal():
|
||||||
self._input_word_addresses = input_word_addresses
|
self._input_word_addresses = input_word_addresses
|
||||||
self._mixed_mapping = mixed_mapping
|
self._mixed_mapping = mixed_mapping
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def select(cls, bus_coupler: 'BusCoupler', terminal_number: int = 0):
|
||||||
|
terminal_list = [bt for bt in bus_coupler.bus_terminals if isinstance(bt, cls)]
|
||||||
|
assert terminal_list, f'No instance of {cls.__name__} configured at this BusCoupler'
|
||||||
|
assert 0 <= terminal_number < len(terminal_list), f'Out of range, select in range: 0..{len(terminal_list) - 1}'
|
||||||
|
return terminal_list[terminal_number]
|
||||||
|
|
||||||
|
|
||||||
class DigitalInputTerminal(BusTerminal):
|
class DigitalInputTerminal(BusTerminal):
|
||||||
"""
|
"""
|
||||||
|
@ -145,7 +159,7 @@ class AnalogOutputTerminal(BusTerminal):
|
||||||
channel: The channel number (1 based index) to read from.
|
channel: The channel number (1 based index) to read from.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The read word value.
|
The read word value or provided error_value if read failed.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
Exception: If the word offset or count is out of range.
|
Exception: If the word offset or count is out of range.
|
||||||
|
@ -158,7 +172,7 @@ class AnalogOutputTerminal(BusTerminal):
|
||||||
|
|
||||||
return value[0] if value else error_value
|
return value[0] if value else error_value
|
||||||
|
|
||||||
def write_channel_word(self, channel: int, value: int) -> int:
|
def write_channel_word(self, channel: int, value: int) -> bool:
|
||||||
"""
|
"""
|
||||||
Write a word to the terminal.
|
Write a word to the terminal.
|
||||||
|
|
||||||
|
@ -176,15 +190,18 @@ class AnalogOutputTerminal(BusTerminal):
|
||||||
|
|
||||||
return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value)
|
return self.bus_coupler.modbus.write_single_register(self._output_word_addresses[channel - 1], value)
|
||||||
|
|
||||||
def set_normalized(self, channel: int, value: float):
|
def set_normalized(self, channel: int, value: float) -> bool:
|
||||||
"""
|
"""
|
||||||
Set a normalized value between 0 and 1 to a specific channel.
|
Set a normalized value between 0 and 1 to a specific channel.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
channel: The channel number to set.
|
channel: The channel number to set.
|
||||||
value: The normalized value to set.
|
value: The normalized value to set.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the write operation succeeded.
|
||||||
"""
|
"""
|
||||||
self.write_channel_word(channel, int(value * 0x7FFF))
|
return self.write_channel_word(channel, int(value * 0x7FFF))
|
||||||
|
|
||||||
|
|
||||||
class BusCoupler():
|
class BusCoupler():
|
||||||
|
@ -214,7 +231,7 @@ class BusCoupler():
|
||||||
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
|
Temperature ch1: 23.2 °C, Temperature ch2: 22.1 °C
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, host: str, port: int = 502, bus_terminals: list[Type[BusTerminal]] = [],
|
def __init__(self, host: str, port: int = 502, bus_terminals: Iterable[Type[BusTerminal]] = [],
|
||||||
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
timeout: float = 5, watchdog: float = 0, debug: bool = False):
|
||||||
|
|
||||||
self.bus_terminals: list[BusTerminal] = list()
|
self.bus_terminals: list[BusTerminal] = list()
|
||||||
|
@ -233,19 +250,27 @@ class BusCoupler():
|
||||||
def _init_hardware(self, watchdog: float):
|
def _init_hardware(self, watchdog: float):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def add_bus_terminals(self, bus_terminals: list[Type[BusTerminal]]) -> list[BusTerminal]:
|
def add_bus_terminals(self, *new_bus_terminals: Type[BusTerminal] | Iterable[Type[BusTerminal]]) -> list[BusTerminal]:
|
||||||
"""
|
"""
|
||||||
Add bus terminals to the bus coupler.
|
Add bus terminals to the bus coupler.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
bus_terminals: A list of bus terminal classes to add.
|
new_bus_terminals: bus terminal classes to add.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The corresponding list of bus terminal objects.
|
The corresponding list of bus terminal objects.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for terminal_class in bus_terminals:
|
terminal_classes: list[Type[BusTerminal]] = []
|
||||||
assert issubclass(terminal_class, BusTerminal), f'{terminal_class} is not a bus terminal'
|
for element in new_bus_terminals:
|
||||||
|
if isinstance(element, Iterable):
|
||||||
|
for bt in element:
|
||||||
|
terminal_classes.append(bt)
|
||||||
|
else:
|
||||||
|
terminal_classes.append(element)
|
||||||
|
|
||||||
|
for terminal_class in terminal_classes:
|
||||||
|
assert _is_bus_terminal(terminal_class), f'{terminal_class} is not a bus terminal'
|
||||||
|
|
||||||
def get_para(key: str):
|
def get_para(key: str):
|
||||||
return terminal_class.parameters.get(key, 0)
|
return terminal_class.parameters.get(key, 0)
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -15,15 +15,16 @@ def test_terminal_plausib():
|
||||||
DigitalOutputTerminal,
|
DigitalOutputTerminal,
|
||||||
AnalogInputTerminal,
|
AnalogInputTerminal,
|
||||||
AnalogOutputTerminal]:
|
AnalogOutputTerminal]:
|
||||||
|
|
||||||
print('Terminal: ' + n)
|
print('Terminal: ' + n)
|
||||||
if issubclass(o, DigitalInputTerminal):
|
if issubclass(o, DigitalInputTerminal):
|
||||||
assert o.parameters.get('input_bit_width', 0) > 0
|
assert o.parameters.get('input_bit_width', 0) > 0
|
||||||
assert o.parameters.get('output_bit_width', 0) == 0
|
# assert o.parameters.get('output_bit_width', 0) == 0
|
||||||
assert o.parameters.get('input_word_width', 0) == 0
|
assert o.parameters.get('input_word_width', 0) == 0
|
||||||
assert o.parameters.get('output_word_width', 0) == 0
|
assert o.parameters.get('output_word_width', 0) == 0
|
||||||
|
|
||||||
if issubclass(o, DigitalOutputTerminal):
|
if issubclass(o, DigitalOutputTerminal):
|
||||||
assert o.parameters.get('input_bit_width', 0) == 0
|
# assert o.parameters.get('input_bit_width', 0) == 0
|
||||||
assert o.parameters.get('output_bit_width', 0) > 0
|
assert o.parameters.get('output_bit_width', 0) > 0
|
||||||
assert o.parameters.get('input_word_width', 0) == 0
|
assert o.parameters.get('input_word_width', 0) == 0
|
||||||
assert o.parameters.get('output_word_width', 0) == 0
|
assert o.parameters.get('output_word_width', 0) == 0
|
||||||
|
|
Loading…
Reference in New Issue