Engine
Module for building and representing dags and nodes.
Dag
Main class used for building and executing a dag.
pa: 'ArrowDagWrapper'
cached
property
Returns the ArrowDagWrapper.
pd: 'PandasWrapper'
cached
property
Returns the PandasWrapper.
__init__()
Create an empty Dag
.
const(value)
Add a Node
of constant value to the Dag
.
Parameters
value: The value of the constant node. Should be Immutable.
cutoff(node, comparator=operator.eq)
Tame the update from a Node
, given a "cutoff" policy.
Parameters
node:
The node whose value needs cutoff
comparator:
The policy for the cutoff.
When the comparator
returns True, no updates get propagated.
execute(timestamp=None)
Run the dag for a given timestamp.
get_cycle_id()
Return the last cycle id.
get_next_timer()
Return the nest timer that needs to be triggered.
get_sinks()
Return the sink Node
s.
get_sources()
Return the source Node
s.
now()
Return a Node
whose value is the current time.
The now Node
:
- won't trigger update every time the time change.
- is unique for the
Dag
prune()
Remove any parts of the dag that are not connected to a sink.
silence(node)
Silence a node, making it's update not trigger downstream.
sink(name, input_node)
Add a sink.
Parameters
name: The name the sink input_node: The input node to be connected to the sink
source_stream(empty=None, empty_factory=None, name=None)
Add a source stream Node
.
Parameters
empty:
The value to which the stream reset to when there are no update.
Must implement __len__
and be empty
empty_factory:
A provider for the empty value
A callable returning an object that implements __len__
and is empty
name:
The name of the source
state(function)
Add a state NodePrototype
.
Parameters
function:
The processing function of the Node
stream(function, empty=None, empty_factory=None)
Add a stream NodePrototype
.
Stream nodes are reset to their empty value after each cycle.
Therefore, the user must provide an empty
value or an empty_factory
The default is to use list
as the empty_factory
.
Parameters
function:
The processing function of the Node
.
empty:
The value to which the stream reset to when there are no update.
Must implement __len__
and be empty
empty_factory:
A provider for the empty value
A callable returning an object that implements __len__
and is empty
timer_manager()
Create a new TimerManager
to be connected to a Node
.
Any node that must wake up on a timer should be connected
to its own TimerManager
.
DagMetrics
dataclass
Metrics for the execution of a dag.
Node
dataclass
Bases: Generic[T]
Represent an element in a Dag
.
Stores all the runtime information about the node. This includes:
- the underlying processing function
- the node inputs (upstream nodes)
- the node observers (downstream nodes)
- the state of the node (last value, last update cycle id)
Node
should only be used to add to the dag.
You shouldn't use them directly to read values (use sink for this)
get_cycle_id()
Return id of the cycle at which this node last updated.
get_sink_value()
Return the value of a _SinkFunction
.
get_value()
Return the value of the output for the last update.
set_stream(value)
Set the value of a _SourceStream
.
NodePrototype
dataclass
Bases: Generic[T]
A Node
that is yet to be added to the Dag
.
map(*args, **kwargs)
Add the prototype to the dag and connect it to the given inputs.
SilentUpdate
dataclass
Bases: Generic[T]
Wrap a value to make the update silent.
A silent updates means the value changed but downstream nodes don't get notified.
TimerManager
API for setting and accessing timer for a given Node
.
- Timers are represented as
pd.Timestamp
with UTC timestamp. - A timer of
UTC_MAX
means no timer. - Each
TimerManager
is associated to only oneNode
. - Each
Node
can only set one upcoming timer.
This class only stores data. It is accessed by the framework to decide when the next timer.
__init__()
Initialize with default values.
clear_next_timer()
Cancel the next timer.
get_next_timer()
Return the next triggered for this Node.
has_next_timer()
Return True
if there is an upcoming timer.
just_triggered()
Return True
if the timer triggered for the current cycle.
set_next_timer(timer)
Set the next timer, cancelling any existing upcoming timer.