Compare commits

...

2 Commits

Author SHA1 Message Date
Nicolas 46085a34a4 memory compile flags for runner for bare metal updated 2026-03-20 16:40:23 +01:00
Nicolas 534f0ac793 serial transfer channel added 2026-03-20 16:20:25 +01:00
7 changed files with 161 additions and 23 deletions

View File

@ -4,6 +4,7 @@ from coparun_module import coparun, read_data_mem, create_target, clear_target
import struct
from ._basic_types import value, Net, Node, Store, NumLike, ArrayType, stencil_db_from_package
from ._compiler import compile_to_dag
from ._transfer_channel import TransferChannel
T = TypeVar("T", int, float)
Values: TypeAlias = 'Iterable[NumLike] | NumLike'
@ -59,7 +60,7 @@ def jit(func: Callable[..., TRet]) -> Callable[..., TRet]:
class Target():
"""Target device for compiling for and running on copapy code.
"""
def __init__(self, arch: str = 'native', optimization: str = 'O3') -> None:
def __init__(self, arch: str = 'native', interface_name: str = '', optimization: str = 'O3') -> None:
"""Initialize Target object
Arguments:
@ -68,10 +69,14 @@ class Target():
"""
self.sdb = stencil_db_from_package(arch, optimization)
self._values: dict[Net, tuple[int, int, str]] = {}
self._context = create_target()
if interface_name:
self._context: TransferChannel | int = TransferChannel(interface_name)
else:
self._context = create_target()
def __del__(self) -> None:
clear_target(self._context)
if isinstance(self._context, int):
clear_target(self._context)
def compile(self, *values: NumLike | value[T] | ArrayType[T] | Iterable[T | value[T]]) -> None:
"""Compiles the code to compute the given values.
@ -94,7 +99,11 @@ class Target():
dw, self._values = compile_to_dag(nodes, self.sdb)
dw.write_com(binw.Command.END_COM)
assert coparun(self._context, dw.get_data()) > 0
if isinstance(self._context, TransferChannel):
TransferChannel.send(self._context, dw.get_data())
else:
assert coparun(self._context, dw.get_data()) > 0
def run(self) -> None:
"""Runs the compiled code on the target device.
@ -102,7 +111,11 @@ class Target():
dw = binw.data_writer(self.sdb.byteorder)
dw.write_com(binw.Command.RUN_PROG)
dw.write_com(binw.Command.END_COM)
assert coparun(self._context, dw.get_data()) > 0
if isinstance(self._context, TransferChannel):
TransferChannel.send(self._context, dw.get_data())
else:
assert coparun(self._context, dw.get_data()) > 0
@overload
def read_value(self, variables: value[T]) -> T: ...
@ -135,7 +148,11 @@ class Target():
addr, lengths, _ = self._values[variables.net]
var_type = variables.dtype
assert lengths > 0
data = read_data_mem(self._context, addr, lengths)
if isinstance(self._context, TransferChannel):
raise NotImplementedError("Reading values is not implemented for TransferChannel targets yet. Use read_value_remote to read the raw data of a value by the runner.")
else:
data = read_data_mem(self._context, addr, lengths)
assert data is not None and len(data) == lengths, f"Failed to read value {variables}"
en = {'little': '<', 'big': '>'}[self.sdb.byteorder]
if var_type == 'float':
@ -191,10 +208,18 @@ class Target():
raise ValueError(f"Unsupported value type: {var_type}")
dw.write_com(binw.Command.END_COM)
assert coparun(self._context, dw.get_data()) > 0
if isinstance(self._context, TransferChannel):
TransferChannel.send(self._context, dw.get_data())
else:
assert coparun(self._context, dw.get_data()) > 0
def read_value_remote(self, variable: value[Any]) -> None:
"""Reads the raw data of a value by the runner."""
dw = binw.data_writer(self.sdb.byteorder)
add_read_value_remote(dw, self._values, variable.net)
assert coparun(self._context, dw.get_data()) > 0
if isinstance(self._context, TransferChannel):
TransferChannel.send(self._context, dw.get_data())
else:
assert coparun(self._context, dw.get_data()) > 0

View File

@ -0,0 +1,31 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from serial import Serial
else:
try:
from serial import Serial
except ImportError:
Serial = None
class TransferChannel():
"""Class for handling data transfer between copapy and a target device.
"""
def __init__(self, channel: str):
self.channel = Serial(channel, baudrate=115200, timeout=0.01)
def send(self, data: bytes) -> None:
"""Sends data to the target device in chunks."""
chunk_size = 1024
for i in range(0, len(data), chunk_size):
print(f"> Sending chunk {i//chunk_size + 1} of {(len(data) + chunk_size - 1) // chunk_size}")
self.channel.write(data[i:i+chunk_size])
self.channel.flush()
ret_data = self.channel.read_all()
print(f"> Received response: {ret_data}")
def recv(self, size: int) -> bytes:
"""Receives data from the target device."""
return self.channel.read(size)

View File

@ -9,37 +9,48 @@
#include <stdint.h>
#if defined DATA_MEMORY_ADDR || defined EXECUTABLE_MEMORY_ADDR
#if defined DATA_MEMORY_LEN || defined EXECUTABLE_MEMORY_LEN
/* Bare metal implementations */
#if not defined(EXECUTABLE_MEMORY_ADDR) || not defined(DATA_MEMORY_ADDR)
#error "For bare metal, you must define DATA_MEMORY_ADDR and DATA_EXECUTABLE_MEMORY_ADDR."
#if !defined(EXECUTABLE_MEMORY_LEN) || !defined(DATA_MEMORY_LEN)
#error "For bare metal, you must define DATA_MEMORY_LEN and DATA_EXECUTABLE_MEMORY_LEN."
#endif
uint8_t executable_memory_pool[EXECUTABLE_MEMORY_LEN];
uint8_t data_memory_pool[DATA_MEMORY_LEN];
uint8_t *allocate_executable_memory(uint32_t num_bytes) {
return (uint8_t*)EXECUTABLE_MEMORY_ADDR;
if (num_bytes > EXECUTABLE_MEMORY_LEN) return 0;
return executable_memory_pool;
}
uint8_t *allocate_data_memory(uint32_t num_bytes) {
return (uint8_t*)DATA_MEMORY_ADDR;
if (num_bytes > DATA_MEMORY_LEN) return 0;
return data_memory_pool;
}
int mark_mem_executable(uint8_t *memory, uint32_t memory_len) {
/* No-op for bare metal */
(void)memory; // Mark as used
(void)memory_len; // Mark as used
return 1;
}
void deallocate_memory(uint8_t *memory, uint32_t memory_len) {
(void)memory; // Mark as used
(void)memory_len; // Mark as used
/* No-op for bare metal */
}
void memcpy(void *dest, const void *src, size_t n) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wbuiltin-declaration-mismatch"
void memcpy(void *dest, const void *src, unsigned int n) {
uint8_t *d = (uint8_t*)dest;
const uint8_t *s = (const uint8_t*)src;
for (size_t i = 0; i < n; i++) {
for (unsigned int i = 0; i < n; i++) {
d[i] = s[i];
}
return 0;
}
#pragma GCC diagnostic pop
#elif defined _WIN32

View File

@ -7,8 +7,11 @@ uint8_t *allocate_buffer_memory(uint32_t num_bytes);
int mark_mem_executable(uint8_t *memory, uint32_t memory_len);
void deallocate_memory(uint8_t *memory, uint32_t memory_len);
#ifdef DATA_MEMORY_ADDR
void memcpy(void *dest, const void *src, size_t n);
#ifdef DATA_MEMORY_LEN
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wbuiltin-declaration-mismatch"
void memcpy(void *dest, const void *src, unsigned int n);
#pragma GCC diagnostic pop
#else
#include <string.h>
#endif

View File

@ -119,9 +119,9 @@ void free_memory(runmem_t *context) {
deallocate_memory(context->data_memory, context->data_memory_len);
context->executable_memory_len = 0;
context->data_memory_len = 0;
context->executable_memory = NULL;
context->data_memory = NULL;
context->entr_point = NULL;
context->executable_memory = 0;
context->data_memory = 0;
context->entr_point = 0;
context->data_offs = 0;
}
@ -129,7 +129,6 @@ int update_data_offs(runmem_t *context) {
if (context->data_memory && context->executable_memory &&
(context->data_memory - context->executable_memory > 0x7FFFFFFF ||
context->executable_memory - context->data_memory > 0x7FFFFFFF)) {
perror("Error: code and data memory to far apart");
return 0;
}
context->data_offs = (int)(context->data_memory - context->executable_memory);

View File

@ -10,7 +10,7 @@
#include <stdint.h>
#ifdef DATA_MEMORY_ADDR
#ifdef DATA_MEMORY_LEN
#define PRINTF(...)
#else
#include <stdio.h>

View File

@ -0,0 +1,69 @@
from copapy import value, Target, NumLike, iif
import pytest
import copapy
def function1(c1: NumLike) -> list[NumLike]:
return [c1 / 4, c1 / -4, c1 // 4, c1 // -4, (c1 * -1) // 4,
c1 * 4, c1 * -4,
c1 + 4, c1 - 4,
c1 > 2, c1 > 100, c1 < 4, c1 < 100]
def function2(c1: NumLike) -> list[NumLike]:
return [c1 / 4.44, c1 / -4.44, c1 // 4.44, c1 // -4.44, (c1 * -1) // 4.44,
c1 * 4.44, c1 * -4.44,
c1 + 4.44, c1 - 4.44,
c1 > 2, c1 > 100.11, c1 < 4.44, c1 < 100.11]
def function3(c1: NumLike) -> list[NumLike]:
return [c1 / 4]
def function4(c1: NumLike) -> list[NumLike]:
return [c1 == 9, c1 == 4, c1 != 9, c1 != 4]
def function5(c1: NumLike) -> list[NumLike]:
return [c1 == True, c1 == False, c1 != True, c1 != False, c1 / 2, c1 + 2]
def function6(c1: NumLike) -> list[NumLike]:
return [c1 == True]
def iiftests(c1: NumLike) -> list[NumLike]:
return [iif(c1 > 5, 8, 9),
iif(c1 < 5, 8.5, 9.5),
iif(1 > 5, 3.3, 8.8) + c1,
iif(1 < 5, c1 * 3.3, 8.8),
iif(c1 < 5, c1 * 3.3, 8.8)]
def test_compile():
c_i = value(9)
c_f = value(1.111)
c_b = value(True)
ret_test = function1(c_i) + function1(c_f) + function2(c_i)# + function2(c_f) + function3(c_i) + function4(c_i) + function5(c_b) + [value(9) % 2] + iiftests(c_i) + iiftests(c_f)
ret_ref = function1(9) + function1(1.111) + function2(9) #+ function2(1.111) + function3(9) + function4(9) + function5(True) + [9 % 2] + iiftests(9) + iiftests(1.111)
tg = Target(interface_name="COM12")
print('* compile and copy ...')
tg.compile(ret_test)
print('* run and copy ...')
tg.run()
print('* finished')
#for test, ref in zip(ret_test, ret_ref):
# assert isinstance(test, copapy.value)
# val = tg.read_value(test)
# print('+', val, ref, test.dtype)
# for t in (int, float, bool):
# assert isinstance(val, t) == isinstance(ref, t), f"Result type does not match for {val} and {ref}"
# assert val == pytest.approx(ref, 1e-5), f"Result does not match: {val} and reference: {ref}" # pyright: ignore[reportUnknownMemberType]
if __name__ == "__main__":
test_compile()