Skip to content

DAG

At its core, beavers executes a Directed Acyclic Graph (DAG), where each node is a python function.
This section discusses the different type of nodes in the DAG.

Stream Source

A stream source is a node whose value can be set externally.

When Dag.execute is called, the updated value is propagated in the DAG

from beavers import Dag

dag = Dag()

source_stream = dag.source_stream()

source_stream.set_stream([1, 2, 3])
dag.execute()
assert source_stream.get_value() == [1, 2, 3]

If the DAG is executed again, the value of the source stream will be reset to its empty value.

dag.execute()
assert source_stream.get_value() == []

The default empty value is set to [], but it can be customized:

dict_source_stream = dag.source_stream(empty_factory=dict)
dict_source_stream.set_stream({"hello": "world"})
dag.execute()
assert dict_source_stream.get_value() == {"hello": "world"}
dag.execute()
assert dict_source_stream.get_value() == {}

A source stream can be given a name, so they can be retrieved (and their value set):

my_source_stream = dag.source_stream(name="my_source")
dag.get_sources()["my_source"].set_stream([4, 5, 6])
dag.execute()
assert my_source_stream.get_value() == [4, 5, 6]

Stream Node

A stream node uses the output of other nodes to calculate its updated value.

def multiply_by_2(values: list[int]) -> list[int]:
    return [v * 2 for v in values]


stream_node = dag.stream(multiply_by_2).map(source_stream)

source_stream.set_stream([1, 2, 3])
dag.execute()
assert stream_node.get_value() == [2, 4, 6]

If the DAG is executed again, the value of the stream node will be reset to its empty value.

dag.execute()
assert stream_node.get_value() == []

The default empty value is set to [], but it can be customized:

set_stream_node = dag.stream(set, empty_factory=set).map(source_stream)
source_stream.set_stream([1, 2, 3, 1, 2, 3])
dag.execute()
assert set_stream_node.get_value() == {1, 2, 3}
dag.execute()
assert set_stream_node.get_value() == set()

The function provided to the node can be any callable, like a lambda:

lambda_stream_node = dag.stream(lambda x: x[:-1]).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert lambda_stream_node.get_value() == [1, 2]

Or a class defining __call__:

class MultiplyBy:
    def __init__(self, by: int):
        self.by = by

    def __call__(self, values: list[int]) -> list[int]:
        return [v * self.by for v in values]


callable_stream_node = dag.stream(MultiplyBy(3)).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert callable_stream_node.get_value() == [3, 6, 9]

State Node

A state node retains its value from one DAG execution to the next, even if it didn't update:

class Accumulator:
    def __init__(self):
        self._count = 0

    def __call__(self, values: list[int]) -> int:
        self._count += sum(values)
        return self._count


state_node = dag.state(Accumulator()).map(source_stream)
source_stream.set_stream([1, 2, 3])
dag.execute()
assert state_node.get_value() == 6
dag.execute()
assert state_node.get_value() == 6

Because they retain their value when they are not updated, state nodes don't require an empty value

Const Node

A const node is a node whose value doesn't change.

const_node = dag.const(2)
assert const_node.get_value() == 2

Const nodes behave like state nodes (their value isn't reset when they don't update).

Connecting Nodes (aka map)

Nodes are connected by calling the map function. Any stream or state node can be connected to state nodes, stream nodes or const nodes.

⚠️ The map function doesn't execute the underlying node. Instead it adds a node to the DAG

The map function can use positional arguments:

to_append = dag.const([3])
positional_stream = dag.stream(lambda x, y: x + y).map(source_stream, to_append)
source_stream.set_stream([1, 2])
dag.execute()
assert positional_stream.get_value() == [1, 2, 3]
Or key word arguments:

key_word = dag.stream(lambda x, y: x + y).map(x=source_stream, y=to_append)

State vs Stream

Stream Nodes:

  • need their return type to implement collections.abc.Sized
  • need an empty value to be specfied (which default to [])
  • have their value reset to empty when they don't update
  • are not considered updated if they return empty

State Nodes:

  • Can return any type
  • don't require an empty value
  • retain their value on cycle they don't update
  • are always considered updated if they are called