DAG
At its core, beavers
executes a Directed Acyclic Graph (DAG), where each node is a python function.
This section discusses the different type of nodes in the DAG.
Stream Source
A stream source is a node whose value can be set externally.
When Dag.execute
is called, the updated value is propagated in the DAG
from beavers import Dag
dag = Dag()
source_stream = dag.source_stream()
source_stream.set_stream([1, 2, 3])
dag.execute()
assert source_stream.get_value() == [1, 2, 3]
If the DAG is executed again, the value of the source stream will be reset to its empty value.
dag.execute()
assert source_stream.get_value() == []
The default empty value is set to []
, but it can be customized:
dict_source_stream = dag.source_stream(empty_factory=dict)
dict_source_stream.set_stream({"hello": "world"})
dag.execute()
assert dict_source_stream.get_value() == {"hello": "world"}
dag.execute()
assert dict_source_stream.get_value() == {}
A source stream can be given a name, so they can be retrieved (and their value set):
my_source_stream = dag.source_stream(name="my_source")
dag.get_sources()["my_source"].set_stream([4, 5, 6])
dag.execute()
assert my_source_stream.get_value() == [4, 5, 6]
Stream Node
A stream node uses the output of other nodes to calculate its updated value.
def multiply_by_2(values: list[int]) -> list[int]:
return [v * 2 for v in values]
stream_node = dag.stream(multiply_by_2).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert stream_node.get_value() == [2, 4, 6]
If the DAG is executed again, the value of the stream node will be reset to its empty value.
dag.execute()
assert stream_node.get_value() == []
The default empty value is set to []
, but it can be customized:
set_stream_node = dag.stream(set, empty_factory=set).map(source_stream)
source_stream.set_stream([1, 2, 3, 1, 2, 3])
dag.execute()
assert set_stream_node.get_value() == {1, 2, 3}
dag.execute()
assert set_stream_node.get_value() == set()
The function provided to the node can be any callable, like a lambda:
lambda_stream_node = dag.stream(lambda x: x[:-1]).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert lambda_stream_node.get_value() == [1, 2]
Or a class defining __call__
:
class MultiplyBy:
def __init__(self, by: int):
self.by = by
def __call__(self, values: list[int]) -> list[int]:
return [v * self.by for v in values]
callable_stream_node = dag.stream(MultiplyBy(3)).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert callable_stream_node.get_value() == [3, 6, 9]
State Node
A state node retains its value from one DAG execution to the next, even if it didn't update:
class Accumulator:
def __init__(self):
self._count = 0
def __call__(self, values: list[int]) -> int:
self._count += sum(values)
return self._count
state_node = dag.state(Accumulator()).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert state_node.get_value() == 6
dag.execute()
assert state_node.get_value() == 6
Because they retain their value when they are not updated, state nodes don't require an empty value
Const Node
A const node is a node whose value doesn't change.
const_node = dag.const(2)
assert const_node.get_value() == 2
Const nodes behave like state nodes (their value isn't reset when they don't update).
Connecting Nodes (aka map
)
Nodes are connected by calling the map
function.
Any stream or state node can be connected to state nodes, stream nodes or const nodes.
The
map
function doesn't execute the underlying node. Instead it adds a node to the DAG
The map function can use positional arguments:
to_append = dag.const([3])
positional_stream = dag.stream(lambda x, y: x + y).map(source_stream, to_append)
source_stream.set_stream([1, 2])
dag.execute()
assert positional_stream.get_value() == [1, 2, 3]
key_word = dag.stream(lambda x, y: x + y).map(x=source_stream, y=to_append)
State vs Stream
Stream Nodes:
- need their return type to implement
collections.abc.Sized
- need an empty value to be specfied (which default to
[]
) - have their value reset to empty when they don't update
- are not considered updated if they return empty
State Nodes:
- Can return any type
- don't require an empty value
- retain their value on cycle they don't update
- are always considered updated if they are called