sharing the constant for scalar/vector and scalar/

matrix operations;
volatile property for net objects added
This commit is contained in:
Nicolas Kruse 2025-12-14 18:08:37 +01:00
parent a0ab604aad
commit 247fc1a28f
5 changed files with 132 additions and 41 deletions

View File

@ -61,7 +61,6 @@ class Node:
return hash(self.name) ^ hash(frozenset(a.source.node_hash for a in self.args)) return hash(self.name) ^ hash(frozenset(a.source.node_hash for a in self.args))
return hash(self.name) ^ hash(tuple(a.source.node_hash for a in self.args)) return hash(self.name) ^ hash(tuple(a.source.node_hash for a in self.args))
def __hash__(self) -> int: def __hash__(self) -> int:
return self.node_hash return self.node_hash
@ -77,6 +76,7 @@ class Net:
def __init__(self, dtype: str, source: Node): def __init__(self, dtype: str, source: Node):
self.dtype = dtype self.dtype = dtype
self.source = source self.source = source
self.volatile = False
def __repr__(self) -> str: def __repr__(self) -> str:
names = get_var_name(self) names = get_var_name(self)
@ -93,7 +93,7 @@ class value(Generic[TNum], Net):
Attributes: Attributes:
dtype (str): Data type of this value. dtype (str): Data type of this value.
""" """
def __init__(self, source: TNum | Node, dtype: str | None = None): def __init__(self, source: TNum | Node, dtype: str | None = None, volatile: bool = True):
"""Instance a value. """Instance a value.
Args: Args:
@ -113,6 +113,7 @@ class value(Generic[TNum], Net):
else: else:
self.source = CPConstant(source) self.source = CPConstant(source)
self.dtype = 'int' self.dtype = 'int'
self.volatile = volatile
@overload @overload
def __add__(self: 'value[TNum]', other: 'value[TNum] | TNum') -> 'value[TNum]': ... def __add__(self: 'value[TNum]', other: 'value[TNum] | TNum') -> 'value[TNum]': ...
@ -220,33 +221,33 @@ class value(Generic[TNum], Net):
return add_op('floordiv', [other, self]) return add_op('floordiv', [other, self])
def __neg__(self: TCPNum) -> TCPNum: def __neg__(self: TCPNum) -> TCPNum:
if self.dtype == 'int': if self.dtype == 'float':
return cast(TCPNum, add_op('sub', [value(0), self])) return cast(TCPNum, add_op('sub', [value(0.0, volatile=False), self]))
return cast(TCPNum, add_op('sub', [value(0.0), self])) return cast(TCPNum, add_op('sub', [value(0, volatile=False), self]))
def __gt__(self, other: TVarNumb) -> 'value[int]': def __gt__(self, other: TVarNumb) -> 'value[int]':
ret = add_op('gt', [self, other]) ret = add_op('gt', [self, other])
return value(ret.source, dtype='bool') return value(ret.source, dtype='bool', volatile=False)
def __lt__(self, other: TVarNumb) -> 'value[int]': def __lt__(self, other: TVarNumb) -> 'value[int]':
ret = add_op('gt', [other, self]) ret = add_op('gt', [other, self])
return value(ret.source, dtype='bool') return value(ret.source, dtype='bool', volatile=False)
def __ge__(self, other: TVarNumb) -> 'value[int]': def __ge__(self, other: TVarNumb) -> 'value[int]':
ret = add_op('ge', [self, other]) ret = add_op('ge', [self, other])
return value(ret.source, dtype='bool') return value(ret.source, dtype='bool', volatile=False)
def __le__(self, other: TVarNumb) -> 'value[int]': def __le__(self, other: TVarNumb) -> 'value[int]':
ret = add_op('ge', [other, self]) ret = add_op('ge', [other, self])
return value(ret.source, dtype='bool') return value(ret.source, dtype='bool', volatile=False)
def __eq__(self, other: TVarNumb) -> 'value[int]': # type: ignore def __eq__(self, other: TVarNumb) -> 'value[int]': # type: ignore
ret = add_op('eq', [self, other], True) ret = add_op('eq', [self, other], True)
return value(ret.source, dtype='bool') return value(ret.source, dtype='bool', volatile=False)
def __ne__(self, other: TVarNumb) -> 'value[int]': # type: ignore def __ne__(self, other: TVarNumb) -> 'value[int]': # type: ignore
ret = add_op('ne', [self, other], True) ret = add_op('ne', [self, other], True)
return value(ret.source, dtype='bool') return value(ret.source, dtype='bool', volatile=False)
@overload @overload
def __mod__(self: 'value[TNum]', other: 'value[TNum] | TNum') -> 'value[TNum]': ... def __mod__(self: 'value[TNum]', other: 'value[TNum] | TNum') -> 'value[TNum]': ...
@ -358,7 +359,7 @@ class Op(Node):
def net_from_value(val: Any) -> value[Any]: def net_from_value(val: Any) -> value[Any]:
vi = CPConstant(val) vi = CPConstant(val)
return value(vi, vi.dtype) return value(vi, vi.dtype, False)
@overload @overload

View File

@ -78,10 +78,16 @@ class matrix(Generic[TNum]):
tuple(a + b for a, b in zip(row1, row2)) tuple(a + b for a, b in zip(row1, row2))
for row1, row2 in zip(self.values, other.values) for row1, row2 in zip(self.values, other.values)
) )
if isinstance(other, value):
return matrix( return matrix(
tuple(a + other for a in row) tuple(a + other for a in row)
for row in self.values for row in self.values
) )
o = value(other, volatile=False) # Make sure a single constant is allocated
return matrix(
tuple(a + o if isinstance(a, value) else a + other for a in row)
for row in self.values
)
@overload @overload
def __radd__(self: 'matrix[float]', other: MatNumLike) -> 'matrix[float]': ... def __radd__(self: 'matrix[float]', other: MatNumLike) -> 'matrix[float]': ...
@ -106,10 +112,16 @@ class matrix(Generic[TNum]):
tuple(a - b for a, b in zip(row1, row2)) tuple(a - b for a, b in zip(row1, row2))
for row1, row2 in zip(self.values, other.values) for row1, row2 in zip(self.values, other.values)
) )
if isinstance(other, value):
return matrix( return matrix(
tuple(a - other for a in row) tuple(a - other for a in row)
for row in self.values for row in self.values
) )
o = value(other, volatile=False) # Make sure a single constant is allocated
return matrix(
tuple(a - o if isinstance(a, value) else a - other for a in row)
for row in self.values
)
@overload @overload
def __rsub__(self: 'matrix[float]', other: MatNumLike) -> 'matrix[float]': ... def __rsub__(self: 'matrix[float]', other: MatNumLike) -> 'matrix[float]': ...
@ -123,10 +135,16 @@ class matrix(Generic[TNum]):
tuple(b - a for a, b in zip(row1, row2)) tuple(b - a for a, b in zip(row1, row2))
for row1, row2 in zip(self.values, other.values) for row1, row2 in zip(self.values, other.values)
) )
if isinstance(other, value):
return matrix( return matrix(
tuple(other - a for a in row) tuple(other - a for a in row)
for row in self.values for row in self.values
) )
o = value(other, volatile=False) # Make sure a single constant is allocated
return matrix(
tuple(o - a if isinstance(a, value) else other - a for a in row)
for row in self.values
)
@overload @overload
def __mul__(self: 'matrix[int]', other: MatFloatLike) -> 'matrix[float]': ... def __mul__(self: 'matrix[int]', other: MatFloatLike) -> 'matrix[float]': ...
@ -145,10 +163,16 @@ class matrix(Generic[TNum]):
tuple(a * b for a, b in zip(row1, row2)) tuple(a * b for a, b in zip(row1, row2))
for row1, row2 in zip(self.values, other.values) for row1, row2 in zip(self.values, other.values)
) )
if isinstance(other, value):
return matrix( return matrix(
tuple(a * other for a in row) tuple(a * other for a in row)
for row in self.values for row in self.values
) )
o = value(other, volatile=False) # Make sure a single constant is allocated
return matrix(
tuple(a * o if isinstance(a, value) else a * other for a in row)
for row in self.values
)
@overload @overload
def __rmul__(self: 'matrix[float]', other: MatNumLike) -> 'matrix[float]': ... def __rmul__(self: 'matrix[float]', other: MatNumLike) -> 'matrix[float]': ...
@ -166,10 +190,16 @@ class matrix(Generic[TNum]):
tuple(a / b for a, b in zip(row1, row2)) tuple(a / b for a, b in zip(row1, row2))
for row1, row2 in zip(self.values, other.values) for row1, row2 in zip(self.values, other.values)
) )
if isinstance(other, value):
return matrix( return matrix(
tuple(a / other for a in row) tuple(a / other for a in row)
for row in self.values for row in self.values
) )
o = value(other, volatile=False) # Make sure a single constant is allocated
return matrix(
tuple(a / o if isinstance(a, value) else a / other for a in row)
for row in self.values
)
def __rtruediv__(self, other: MatNumLike) -> 'matrix[float]': def __rtruediv__(self, other: MatNumLike) -> 'matrix[float]':
if isinstance(other, matrix): if isinstance(other, matrix):
@ -179,10 +209,16 @@ class matrix(Generic[TNum]):
tuple(b / a for a, b in zip(row1, row2)) tuple(b / a for a, b in zip(row1, row2))
for row1, row2 in zip(self.values, other.values) for row1, row2 in zip(self.values, other.values)
) )
if isinstance(other, value):
return matrix( return matrix(
tuple(other / a for a in row) tuple(other / a for a in row)
for row in self.values for row in self.values
) )
o = value(other, volatile=False) # Make sure a single constant is allocated
return matrix(
tuple(o / a if isinstance(a, value) else other / a for a in row)
for row in self.values
)
@overload @overload
def __matmul__(self: 'matrix[TNum]', other: 'vector[TNum]') -> 'vector[TNum]': ... def __matmul__(self: 'matrix[TNum]', other: 'vector[TNum]') -> 'vector[TNum]': ...
@ -269,7 +305,7 @@ class matrix(Generic[TNum]):
"""Convert all elements to copapy values if any element is a copapy value.""" """Convert all elements to copapy values if any element is a copapy value."""
if any(isinstance(val, value) for row in self.values for val in row): if any(isinstance(val, value) for row in self.values for val in row):
return matrix( return matrix(
tuple(value(val) if not isinstance(val, value) else val for val in row) tuple(value(val, volatile=False) if not isinstance(val, value) else val for val in row)
for row in self.values for row in self.values
) )
else: else:

View File

@ -19,6 +19,6 @@ def mixed_sum(scalars: Iterable[int | float | value[Any]]) -> Any:
def mixed_homogenize(scalars: Iterable[T | value[T]]) -> Iterable[T] | Iterable[value[T]]: def mixed_homogenize(scalars: Iterable[T | value[T]]) -> Iterable[T] | Iterable[value[T]]:
if any(isinstance(val, value) for val in scalars): if any(isinstance(val, value) for val in scalars):
return (value(val) if not isinstance(val, value) else val for val in scalars) return (value(val, volatile=False) if not isinstance(val, value) else val for val in scalars)
else: else:
return (val for val in scalars if not isinstance(val, value)) return (val for val in scalars if not isinstance(val, value))

View File

@ -1,6 +1,6 @@
from . import value from . import value
from ._mixed import mixed_sum, mixed_homogenize from ._mixed import mixed_sum, mixed_homogenize
from typing import TypeVar, Iterable, Any, overload, TypeAlias, Callable, Iterator, Generic from typing import Sequence, TypeVar, Iterable, Any, overload, TypeAlias, Callable, Iterator, Generic
import copapy as cp import copapy as cp
from ._helper_types import TNum from ._helper_types import TNum
@ -57,7 +57,10 @@ class vector(Generic[TNum]):
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a + b for a, b in zip(self.values, other.values)) return vector(a + b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a + other for a in self.values) return vector(a + other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a + o if isinstance(a, value) else a + other for a in self.values)
@overload @overload
def __radd__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ... def __radd__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ...
@ -80,7 +83,10 @@ class vector(Generic[TNum]):
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a - b for a, b in zip(self.values, other.values)) return vector(a - b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a - other for a in self.values) return vector(a - other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a - o if isinstance(a, value) else a - other for a in self.values)
@overload @overload
def __rsub__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ... def __rsub__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ...
@ -92,7 +98,10 @@ class vector(Generic[TNum]):
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(b - a for a, b in zip(self.values, other.values)) return vector(b - a for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(other - a for a in self.values) return vector(other - a for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(o - a if isinstance(a, value) else other - a for a in self.values)
@overload @overload
def __mul__(self: 'vector[int]', other: VecFloatLike) -> 'vector[float]': ... def __mul__(self: 'vector[int]', other: VecFloatLike) -> 'vector[float]': ...
@ -106,7 +115,10 @@ class vector(Generic[TNum]):
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a * b for a, b in zip(self.values, other.values)) return vector(a * b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a * other for a in self.values) return vector(a * other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a * o if isinstance(a, value) else a * other for a in self.values)
@overload @overload
def __rmul__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ... def __rmul__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ...
@ -129,7 +141,10 @@ class vector(Generic[TNum]):
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a ** b for a, b in zip(self.values, other.values)) return vector(a ** b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a ** other for a in self.values) return vector(a ** other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a ** o if isinstance(a, value) else a ** other for a in self.values)
@overload @overload
def __rpow__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ... def __rpow__(self: 'vector[float]', other: VecNumLike) -> 'vector[float]': ...
@ -138,19 +153,31 @@ class vector(Generic[TNum]):
@overload @overload
def __rpow__(self, other: VecNumLike) -> 'vector[Any]': ... def __rpow__(self, other: VecNumLike) -> 'vector[Any]': ...
def __rpow__(self, other: VecNumLike) -> Any: def __rpow__(self, other: VecNumLike) -> Any:
return self ** other if isinstance(other, vector):
assert len(self.values) == len(other.values)
return vector(b ** a for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(other ** a for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(o ** a if isinstance(a, value) else other ** a for a in self.values)
def __truediv__(self, other: VecNumLike) -> 'vector[float]': def __truediv__(self, other: VecNumLike) -> 'vector[float]':
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a / b for a, b in zip(self.values, other.values)) return vector(a / b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a / other for a in self.values) return vector(a / other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a / o if isinstance(a, value) else a / other for a in self.values)
def __rtruediv__(self, other: VecNumLike) -> 'vector[float]': def __rtruediv__(self, other: VecNumLike) -> 'vector[float]':
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(b / a for a, b in zip(self.values, other.values)) return vector(b / a for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(other / a for a in self.values) return vector(other / a for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(o / a if isinstance(a, value) else other / a for a in self.values)
@overload @overload
def dot(self: 'vector[int]', other: 'vector[int]') -> int | value[int]: ... def dot(self: 'vector[int]', other: 'vector[int]') -> int | value[int]: ...
@ -191,37 +218,55 @@ class vector(Generic[TNum]):
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a > b for a, b in zip(self.values, other.values)) return vector(a > b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a > other for a in self.values) return vector(a > other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a > o if isinstance(a, value) else a > other for a in self.values)
def __lt__(self, other: VecNumLike) -> 'vector[int]': def __lt__(self, other: VecNumLike) -> 'vector[int]':
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a < b for a, b in zip(self.values, other.values)) return vector(a < b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a < other for a in self.values) return vector(a < other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a < o if isinstance(a, value) else a < other for a in self.values)
def __ge__(self, other: VecNumLike) -> 'vector[int]': def __ge__(self, other: VecNumLike) -> 'vector[int]':
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a >= b for a, b in zip(self.values, other.values)) return vector(a >= b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a >= other for a in self.values) return vector(a >= other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a >= o if isinstance(a, value) else a >= other for a in self.values)
def __le__(self, other: VecNumLike) -> 'vector[int]': def __le__(self, other: VecNumLike) -> 'vector[int]':
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a <= b for a, b in zip(self.values, other.values)) return vector(a <= b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a <= other for a in self.values) return vector(a <= other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a <= o if isinstance(a, value) else a <= other for a in self.values)
def __eq__(self, other: VecNumLike) -> 'vector[int]': # type: ignore def __eq__(self, other: VecNumLike | Sequence[int | float]) -> 'vector[int]': # type: ignore
if isinstance(other, vector): if isinstance(other, vector | Sequence):
assert len(self.values) == len(other.values) assert len(self) == len(other)
return vector(a == b for a, b in zip(self.values, other.values)) return vector(a == b for a, b in zip(self.values, other))
if isinstance(other, value):
return vector(a == other for a in self.values) return vector(a == other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a == o if isinstance(a, value) else a == other for a in self.values)
def __ne__(self, other: VecNumLike) -> 'vector[int]': # type: ignore def __ne__(self, other: VecNumLike) -> 'vector[int]': # type: ignore
if isinstance(other, vector): if isinstance(other, vector):
assert len(self.values) == len(other.values) assert len(self.values) == len(other.values)
return vector(a != b for a, b in zip(self.values, other.values)) return vector(a != b for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(a != other for a in self.values) return vector(a != other for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(a != o if isinstance(a, value) else a != other for a in self.values)
@property @property
def shape(self) -> tuple[int]: def shape(self) -> tuple[int]:
@ -256,6 +301,15 @@ class vector(Generic[TNum]):
"""Applies a function to each element of the vector and returns a new vector.""" """Applies a function to each element of the vector and returns a new vector."""
return vector(func(x) for x in self.values) return vector(func(x) for x in self.values)
def _map2(self, other: VecNumLike, func: Callable[[Any, Any], value[int | float]]) -> 'vector[Any]':
if isinstance(other, vector):
assert len(self.values) == len(other.values)
return vector(func(a, b) for a, b in zip(self.values, other.values))
if isinstance(other, value):
return vector(func(a, other) for a in self.values)
o = value(other, volatile=False) # Make sure a single constant is allocated
return vector(func(a, o) if isinstance(a, value) else a + other for a in self.values)
def cross_product(v1: vector[float], v2: vector[float]) -> vector[float]: def cross_product(v1: vector[float], v2: vector[float]) -> vector[float]:
"""Calculate the cross product of two 3D vectors.""" """Calculate the cross product of two 3D vectors."""

View File

@ -103,8 +103,8 @@ def test_matrix_scalar_division():
m1 = cp.matrix([[6.0, 8.0], [12.0, 16.0]]) m1 = cp.matrix([[6.0, 8.0], [12.0, 16.0]])
m2 = m1 / 2.0 m2 = m1 / 2.0
assert m2[0] == pytest.approx((3.0, 4.0)) # pyright: ignore[reportUnknownMemberType] assert list(m2[0]) == pytest.approx((3.0, 4.0)) # pyright: ignore[reportUnknownMemberType]
assert m2[1] == pytest.approx((6.0, 8.0)) # pyright: ignore[reportUnknownMemberType] assert list(m2[1]) == pytest.approx((6.0, 8.0)) # pyright: ignore[reportUnknownMemberType]
def test_matrix_vector_multiplication(): def test_matrix_vector_multiplication():