mirror of https://github.com/Nonannet/copapy.git
serial transfer channel added
This commit is contained in:
parent
759989ffde
commit
534f0ac793
|
|
@ -4,6 +4,7 @@ from coparun_module import coparun, read_data_mem, create_target, clear_target
|
|||
import struct
|
||||
from ._basic_types import value, Net, Node, Store, NumLike, ArrayType, stencil_db_from_package
|
||||
from ._compiler import compile_to_dag
|
||||
from ._transfer_channel import TransferChannel
|
||||
|
||||
T = TypeVar("T", int, float)
|
||||
Values: TypeAlias = 'Iterable[NumLike] | NumLike'
|
||||
|
|
@ -59,7 +60,7 @@ def jit(func: Callable[..., TRet]) -> Callable[..., TRet]:
|
|||
class Target():
|
||||
"""Target device for compiling for and running on copapy code.
|
||||
"""
|
||||
def __init__(self, arch: str = 'native', optimization: str = 'O3') -> None:
|
||||
def __init__(self, arch: str = 'native', interface_name: str = '', optimization: str = 'O3') -> None:
|
||||
"""Initialize Target object
|
||||
|
||||
Arguments:
|
||||
|
|
@ -68,9 +69,13 @@ class Target():
|
|||
"""
|
||||
self.sdb = stencil_db_from_package(arch, optimization)
|
||||
self._values: dict[Net, tuple[int, int, str]] = {}
|
||||
if interface_name:
|
||||
self._context: TransferChannel | int = TransferChannel(interface_name)
|
||||
else:
|
||||
self._context = create_target()
|
||||
|
||||
def __del__(self) -> None:
|
||||
if isinstance(self._context, int):
|
||||
clear_target(self._context)
|
||||
|
||||
def compile(self, *values: NumLike | value[T] | ArrayType[T] | Iterable[T | value[T]]) -> None:
|
||||
|
|
@ -94,6 +99,10 @@ class Target():
|
|||
|
||||
dw, self._values = compile_to_dag(nodes, self.sdb)
|
||||
dw.write_com(binw.Command.END_COM)
|
||||
|
||||
if isinstance(self._context, TransferChannel):
|
||||
TransferChannel.send(self._context, dw.get_data())
|
||||
else:
|
||||
assert coparun(self._context, dw.get_data()) > 0
|
||||
|
||||
def run(self) -> None:
|
||||
|
|
@ -102,6 +111,10 @@ class Target():
|
|||
dw = binw.data_writer(self.sdb.byteorder)
|
||||
dw.write_com(binw.Command.RUN_PROG)
|
||||
dw.write_com(binw.Command.END_COM)
|
||||
|
||||
if isinstance(self._context, TransferChannel):
|
||||
TransferChannel.send(self._context, dw.get_data())
|
||||
else:
|
||||
assert coparun(self._context, dw.get_data()) > 0
|
||||
|
||||
@overload
|
||||
|
|
@ -135,6 +148,10 @@ class Target():
|
|||
addr, lengths, _ = self._values[variables.net]
|
||||
var_type = variables.dtype
|
||||
assert lengths > 0
|
||||
|
||||
if isinstance(self._context, TransferChannel):
|
||||
raise NotImplementedError("Reading values is not implemented for TransferChannel targets yet. Use read_value_remote to read the raw data of a value by the runner.")
|
||||
else:
|
||||
data = read_data_mem(self._context, addr, lengths)
|
||||
assert data is not None and len(data) == lengths, f"Failed to read value {variables}"
|
||||
en = {'little': '<', 'big': '>'}[self.sdb.byteorder]
|
||||
|
|
@ -191,10 +208,18 @@ class Target():
|
|||
raise ValueError(f"Unsupported value type: {var_type}")
|
||||
|
||||
dw.write_com(binw.Command.END_COM)
|
||||
|
||||
if isinstance(self._context, TransferChannel):
|
||||
TransferChannel.send(self._context, dw.get_data())
|
||||
else:
|
||||
assert coparun(self._context, dw.get_data()) > 0
|
||||
|
||||
def read_value_remote(self, variable: value[Any]) -> None:
|
||||
"""Reads the raw data of a value by the runner."""
|
||||
dw = binw.data_writer(self.sdb.byteorder)
|
||||
add_read_value_remote(dw, self._values, variable.net)
|
||||
|
||||
if isinstance(self._context, TransferChannel):
|
||||
TransferChannel.send(self._context, dw.get_data())
|
||||
else:
|
||||
assert coparun(self._context, dw.get_data()) > 0
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from serial import Serial
|
||||
else:
|
||||
try:
|
||||
from serial import Serial
|
||||
except ImportError:
|
||||
Serial = None
|
||||
|
||||
|
||||
class TransferChannel():
|
||||
"""Class for handling data transfer between copapy and a target device.
|
||||
"""
|
||||
|
||||
def __init__(self, channel: str):
|
||||
self.channel = Serial(channel, baudrate=115200, timeout=0.01)
|
||||
|
||||
def send(self, data: bytes) -> None:
|
||||
"""Sends data to the target device in chunks."""
|
||||
chunk_size = 1024
|
||||
for i in range(0, len(data), chunk_size):
|
||||
print(f"> Sending chunk {i//chunk_size + 1} of {(len(data) + chunk_size - 1) // chunk_size}")
|
||||
self.channel.write(data[i:i+chunk_size])
|
||||
self.channel.flush()
|
||||
ret_data = self.channel.read_all()
|
||||
print(f"> Received response: {ret_data}")
|
||||
|
||||
def recv(self, size: int) -> bytes:
|
||||
"""Receives data from the target device."""
|
||||
return self.channel.read(size)
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
from copapy import value, Target, NumLike, iif
|
||||
import pytest
|
||||
import copapy
|
||||
|
||||
|
||||
def function1(c1: NumLike) -> list[NumLike]:
|
||||
return [c1 / 4, c1 / -4, c1 // 4, c1 // -4, (c1 * -1) // 4,
|
||||
c1 * 4, c1 * -4,
|
||||
c1 + 4, c1 - 4,
|
||||
c1 > 2, c1 > 100, c1 < 4, c1 < 100]
|
||||
|
||||
|
||||
def function2(c1: NumLike) -> list[NumLike]:
|
||||
return [c1 / 4.44, c1 / -4.44, c1 // 4.44, c1 // -4.44, (c1 * -1) // 4.44,
|
||||
c1 * 4.44, c1 * -4.44,
|
||||
c1 + 4.44, c1 - 4.44,
|
||||
c1 > 2, c1 > 100.11, c1 < 4.44, c1 < 100.11]
|
||||
|
||||
|
||||
def function3(c1: NumLike) -> list[NumLike]:
|
||||
return [c1 / 4]
|
||||
|
||||
|
||||
def function4(c1: NumLike) -> list[NumLike]:
|
||||
return [c1 == 9, c1 == 4, c1 != 9, c1 != 4]
|
||||
|
||||
|
||||
def function5(c1: NumLike) -> list[NumLike]:
|
||||
return [c1 == True, c1 == False, c1 != True, c1 != False, c1 / 2, c1 + 2]
|
||||
|
||||
|
||||
def function6(c1: NumLike) -> list[NumLike]:
|
||||
return [c1 == True]
|
||||
|
||||
|
||||
def iiftests(c1: NumLike) -> list[NumLike]:
|
||||
return [iif(c1 > 5, 8, 9),
|
||||
iif(c1 < 5, 8.5, 9.5),
|
||||
iif(1 > 5, 3.3, 8.8) + c1,
|
||||
iif(1 < 5, c1 * 3.3, 8.8),
|
||||
iif(c1 < 5, c1 * 3.3, 8.8)]
|
||||
|
||||
|
||||
def test_compile():
|
||||
c_i = value(9)
|
||||
c_f = value(1.111)
|
||||
c_b = value(True)
|
||||
|
||||
ret_test = function1(c_i) + function1(c_f) + function2(c_i)# + function2(c_f) + function3(c_i) + function4(c_i) + function5(c_b) + [value(9) % 2] + iiftests(c_i) + iiftests(c_f)
|
||||
ret_ref = function1(9) + function1(1.111) + function2(9) #+ function2(1.111) + function3(9) + function4(9) + function5(True) + [9 % 2] + iiftests(9) + iiftests(1.111)
|
||||
|
||||
tg = Target(interface_name="COM12")
|
||||
print('* compile and copy ...')
|
||||
tg.compile(ret_test)
|
||||
print('* run and copy ...')
|
||||
tg.run()
|
||||
print('* finished')
|
||||
|
||||
#for test, ref in zip(ret_test, ret_ref):
|
||||
# assert isinstance(test, copapy.value)
|
||||
# val = tg.read_value(test)
|
||||
# print('+', val, ref, test.dtype)
|
||||
# for t in (int, float, bool):
|
||||
# assert isinstance(val, t) == isinstance(ref, t), f"Result type does not match for {val} and {ref}"
|
||||
# assert val == pytest.approx(ref, 1e-5), f"Result does not match: {val} and reference: {ref}" # pyright: ignore[reportUnknownMemberType]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_compile()
|
||||
Loading…
Reference in New Issue