Skip to content

Engine

Module for building and representing dags and nodes.

Dag

Main class used for building and executing a dag.

pa: 'ArrowDagWrapper' cached property

Returns the ArrowDagWrapper.

pd: 'PandasWrapper' cached property

Returns the PandasWrapper.

__init__()

Create an empty Dag.

const(value)

Add a Node of constant value to the Dag.

Parameters

value: The value of the constant node. Should be Immutable.

cutoff(node, comparator=operator.eq)

Tame the update from a Node, given a "cutoff" policy.

Parameters

node: The node whose value needs cutoff comparator: The policy for the cutoff. When the comparator returns True, no updates get propagated.

execute(timestamp=None)

Run the dag for a given timestamp.

get_cycle_id()

Return the last cycle id.

get_next_timer()

Return the nest timer that needs to be triggered.

get_sinks()

Return the sink Nodes.

get_sources()

Return the source Nodes.

now()

Return a Node whose value is the current time.

The now Node:

  • won't trigger update every time the time change.
  • is unique for the Dag

prune()

Remove any parts of the dag that are not connected to a sink.

silence(node)

Silence a node, making it's update not trigger downstream.

sink(name, input_node)

Add a sink.

Parameters

name: The name the sink input_node: The input node to be connected to the sink

source_stream(empty=None, empty_factory=None, name=None)

Add a source stream Node.

Parameters

empty: The value to which the stream reset to when there are no update. Must implement __len__ and be empty empty_factory: A provider for the empty value A callable returning an object that implements __len__ and is empty name: The name of the source

state(function)

Add a state NodePrototype.

Parameters

function: The processing function of the Node

stream(function, empty=None, empty_factory=None)

Add a stream NodePrototype.

Stream nodes are reset to their empty value after each cycle. Therefore, the user must provide an empty value or an empty_factory

The default is to use list as the empty_factory.

Parameters

function: The processing function of the Node. empty: The value to which the stream reset to when there are no update. Must implement __len__ and be empty empty_factory: A provider for the empty value A callable returning an object that implements __len__ and is empty

timer_manager()

Create a new TimerManager to be connected to a Node.

Any node that must wake up on a timer should be connected to its own TimerManager.

DagMetrics dataclass

Metrics for the execution of a dag.

Node dataclass

Bases: Generic[T]

Represent an element in a Dag.

Stores all the runtime information about the node. This includes:

  • the underlying processing function
  • the node inputs (upstream nodes)
  • the node observers (downstream nodes)
  • the state of the node (last value, last update cycle id)

Node should only be used to add to the dag. You shouldn't use them directly to read values (use sink for this)

get_cycle_id()

Return id of the cycle at which this node last updated.

get_sink_value()

Return the value of a _SinkFunction.

get_value()

Return the value of the output for the last update.

set_stream(value)

Set the value of a _SourceStream.

NodePrototype dataclass

Bases: Generic[T]

A Node that is yet to be added to the Dag.

map(*args, **kwargs)

Add the prototype to the dag and connect it to the given inputs.

SilentUpdate dataclass

Bases: Generic[T]

Wrap a value to make the update silent.

A silent updates means the value changed but downstream nodes don't get notified.

TimerManager

API for setting and accessing timer for a given Node.

  • Timers are represented as pd.Timestamp with UTC timestamp.
  • A timer of UTC_MAX means no timer.
  • Each TimerManager is associated to only one Node.
  • Each Node can only set one upcoming timer.

This class only stores data. It is accessed by the framework to decide when the next timer.

__init__()

Initialize with default values.

clear_next_timer()

Cancel the next timer.

get_next_timer()

Return the next triggered for this Node.

has_next_timer()

Return True if there is an upcoming timer.

just_triggered()

Return True if the timer triggered for the current cycle.

set_next_timer(timer)

Set the next timer, cancelling any existing upcoming timer.