Advanced
This section discuss advanced features that control how updates propagate in the DAG.
How updates propagate in the DAG
- Nodes are notified if any of their input node was updated during the current execution cycle
source_1 = dag.source_stream() source_2 = dag.source_stream() node = dag.stream(lambda x, y: x + y).map(source_1, source_2) source_1.set_stream([1, 2, 3]) dag.execute() assert node.get_value() == [1, 2, 3] # source_1 updated source_2.set_stream([4, 5, 6]) dag.execute() assert node.get_value() == [4, 5, 6] # source_2 updated dag.execute() assert node.get_value() == [] # no updates, reset to empty
- You can check if a node updated by looking at its
cycle_id
source_1.set_stream([1, 2, 3]) dag.execute() assert node.get_value() == [1, 2, 3] assert node.get_cycle_id() == dag.get_cycle_id() dag.execute() assert node.get_value() == [] assert node.get_cycle_id() == dag.get_cycle_id() - 1
- If several inputs of a node get updated during the same cycle, the node will be executed once (and not once per input)
source_1.set_stream([1, 2, 3]) source_2.set_stream([4, 5, 6]) dag.execute() assert node.get_value() == [1, 2, 3, 4, 5, 6] assert node.get_cycle_id() == dag.get_cycle_id()
- Stream nodes (and sources) are not considered updated if their output is empty
def even_only(values: list[int]) -> list[int]: return [v for v in values if (v % 2) == 0] even = dag.stream(even_only).map(source_1) source_1.set_stream([1, 2, 3]) dag.execute() assert even.get_value() == [2] assert even.get_cycle_id() == dag.get_cycle_id() source_1.set_stream([1, 3]) dag.execute() assert even.get_value() == [] assert even.get_cycle_id() == dag.get_cycle_id() - 1
Now node
Beavers can be used in both live
and replay
mode.
In replay
mode, the wall clock isn't relevant.
To access the current time of the replay, you should use the now node:
def get_delay(timestamps: list[pd.Timestamp], now: pd.Timestamp) -> list[pd.Timedelta]:
return [now - timestamp for timestamp in timestamps]
timestamp_stream = dag.source_stream()
delay = dag.stream(get_delay).map(timestamp_stream, dag.now())
timestamp_stream.set_stream(
[
pd.to_datetime("2022-01-01", utc=True),
pd.to_datetime("2022-01-02", utc=True),
pd.to_datetime("2022-01-03", utc=True),
]
)
dag.execute(timestamp=pd.to_datetime("2022-01-04", utc=True))
assert delay.get_value() == [
pd.to_timedelta("3d"),
pd.to_timedelta("2d"),
pd.to_timedelta("1d"),
]
The now node is shared for the whole DAG. Its value gets updated silently.
TimerManager
To be notified when time passes, nodes can subscribe to a TimerManager
node.
from beavers import TimerManager
def get_year(now: pd.Timestamp, timer_manager: TimerManager):
if not timer_manager.has_next_timer():
timer_manager.set_next_timer(
pd.Timestamp(year=now.year + 1, day=1, month=1, tzinfo=now.tzinfo)
)
return now.year
year = dag.state(get_year).map(dag.now(), dag.timer_manager())
dag.execute(pd.to_datetime("2022-01-01", utc=True))
assert year.get_value() == 2022
assert year.get_cycle_id() == dag.get_cycle_id()
dag.execute(pd.to_datetime("2022-01-02", utc=True))
assert year.get_value() == 2022
assert year.get_cycle_id() == dag.get_cycle_id() - 1
dag.execute(pd.to_datetime("2023-01-02", utc=True))
assert year.get_value() == 2023
assert year.get_cycle_id() == dag.get_cycle_id()
Silent updates
Some node may update too often, or their updates may not be relevant to other nodes. In this case it's possible to silence them:
source_1 = dag.source_stream()
source_1_silence = dag.silence(source_1)
source_2 = dag.source_stream()
both = dag.stream(lambda x, y: x + y).map(source_1_silence, source_2)
source_1.set_stream([1, 2, 3])
source_2.set_stream([4, 5, 6])
dag.execute()
assert both.get_value() == [1, 2, 3, 4, 5, 6]
assert both.get_cycle_id() == dag.get_cycle_id()
source_1.set_stream([1, 2, 3])
dag.execute()
assert both.get_value() == []
assert (
both.get_cycle_id() == dag.get_cycle_id() - 1
) # No update because source_1 is silent
silence
returns a new silenced node (rather than modify the existing node)
Value Cutoff
By default, state nodes will update everytime they are notified. The framework doesn't check that their value has changed.
You can add a cutoff, to prevent updates when the value hasn't changed:
class GetMax:
def __init__(self):
self._max = 0.0
def __call__(self, values: list[float]) -> float:
self._max = max(self._max, *values)
return self._max
source = dag.source_stream()
get_max = dag.state(GetMax()).map(source)
get_max_cutoff = dag.cutoff(get_max)
source.set_stream([1.0, 2.0])
dag.execute()
assert get_max.get_value() == 2.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff.get_cycle_id() == dag.get_cycle_id()
source.set_stream([1.0])
dag.execute()
assert get_max.get_value() == 2.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff.get_cycle_id() == dag.get_cycle_id() - 1
source.set_stream([3.0])
dag.execute()
assert get_max.get_value() == 3.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff.get_cycle_id() == dag.get_cycle_id()
You can also provide a custom comparator to allow some tolerance when deciding if a value has changed:
get_max_cutoff_custom = dag.cutoff(get_max, lambda x, y: abs(x - y) < 0.1)
source.set_stream([4.0])
dag.execute()
assert get_max.get_value() == 4.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff_custom.get_cycle_id() == dag.get_cycle_id()
source.set_stream([4.05])
dag.execute()
assert get_max.get_value() == 4.05
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff_custom.get_value() == 4.0
assert get_max_cutoff_custom.get_cycle_id() == dag.get_cycle_id() - 1
source.set_stream([4.11])
dag.execute()
assert get_max.get_value() == 4.11
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff_custom.get_value() == 4.11
assert get_max_cutoff_custom.get_cycle_id() == dag.get_cycle_id()