Replay
This section explains how to run a beavers application using historical data, typically stored in files or databases.
Manual Replay
Starting with a simple dag with one source going to one sink:
dag = beavers.Dag()
my_source = dag.source_stream(name="my_source")
my_sink = dag.sink("my_sink", my_source)
Assuming your data has got this shape:
import dataclasses
import pandas as pd
@dataclasses.dataclass(frozen=True)
class Message:
timestamp: pd.Timestamp
message: str
You could replay the data manually your self and run the dag for regular interval:
my_source.set_stream(
[
Message(pd.Timestamp("2023-01-01T00:00:00Z"), "hello"),
Message(pd.Timestamp("2023-01-01T00:00:30Z"), "How are you"),
]
)
dag.execute(pd.Timestamp("2023-01-01T00:01:00Z"))
assert my_sink.get_sink_value() == [
Message(pd.Timestamp("2023-01-01T00:00:00Z"), "hello"),
Message(pd.Timestamp("2023-01-01T00:00:30Z"), "How are you"),
]
But this requires a lot of boilerplate code and becomes cumbersome very quickly.
Replay Framework
The replay framework uses a few key abstraction in order to define how the data is loaded and injected in the dag.
DataSource
A DataSource
provides a way of streaming data.
import beavers.replay
@dataclasses.dataclass(frozen=True)
class MessageDataSource:
messages: list[Message]
def read_to(self, timestamp: pd.Timestamp) -> list[Message]:
results = []
while self.messages and self.messages[0].timestamp <= timestamp:
results.append(self.messages.pop(0))
return results
def get_next(self) -> pd.Timestamp:
if self.messages:
return self.messages[0].timestamp
else:
return beavers.replay.UTC_MAX
By convention, DataSource
s:
- return
UTC_MAX
when there is no more data - are stateful and need to remember what has already been read.
ReplayContext
The ReplayContext
contains timing information:
from beavers.replay import ReplayContext
replay_context = ReplayContext(
start=pd.to_datetime("2023-01-01T00:00:00Z"),
end=pd.to_datetime("2023-01-02T00:00:00Z"),
frequency=pd.to_timedelta("1h"),
)
By convention all timestamps are UTC
DataSourceProvider
A DataSourceProvider
provides a way of creating DataSource
.
For example, if the data is stored in a csv file:
timestamp,message
2023-01-01 01:00:00+00:00,Hello
2023-01-01 01:01:00+00:00,How are you
Provided with the ReplayContext
, our DataSourceProvider
will load the and return a DataSource
@dataclasses.dataclass(frozen=True)
class CsvDataSourceProvider:
file_name: str
def __call__(
self, replay_context: ReplayContext
) -> beavers.replay.DataSource[list[Message]]:
df = pd.read_csv(self.file_name, parse_dates=["timestamp"])
messages = [Message(*row) for _, row in df.iterrows()]
messages.sort(key=lambda x: x.timestamp)
return MessageDataSource(messages)
DataSink
A DataSink
provides a way of capturing the output of nodes and saving the data:
@dataclasses.dataclass(frozen=True)
class CsvDataSink:
destination: str
data: list[Message] = dataclasses.field(default_factory=list)
def append(self, timestamp: pd.Timestamp, data: list[Message]):
self.data.extend(data)
def close(self):
pd.DataFrame([dataclasses.asdict(value) for value in self.data]).to_csv(
self.destination, index=False
)
DataSinkProvider
A DataSinkProvider
provides a way of creating DataSink
.
In this example we save the data to csv:
@dataclasses.dataclass(frozen=True)
class CsvDataSinkProvider:
destination: str
def __call__(self, replay_context: ReplayContext) -> CsvDataSink:
return CsvDataSink(self.destination)
ReplayDriver
The replay driver is responsible for putting the dag, context, sources and sinks together, and orchestrate the replay.
from beavers.replay import ReplayDriver
replay_driver = beavers.replay.ReplayDriver.create(
dag=dag,
replay_context=replay_context,
data_source_providers={"my_source": CsvDataSourceProvider("input.csv")},
data_sink_providers={"my_sink": CsvDataSinkProvider("output.csv")},
)
replay_driver.run()
Reading Files Partitioned By Time
Assuming:
- you want to replay a dag for a long period of time.
- all that historic data doesn't fit into time
- the data is partitioned by time period. For example one file per day,
input_2023-01-01.csv
.
It's then possible, with the IteratorDataSourceAdapter
to load each file one by one as they are needed.
In this example, csv files are stored under . We need to provide:
- a generator that will yield a
DataSource
for each file, in order - a way to concatenate the output of 2
DataSource
. In this case we'll use+
to merge two lists - an empty value for the case there is no more data, or we reach the last file.
from beavers.replay import IteratorDataSourceAdapter
@dataclasses.dataclass(frozen=True)
class PartitionedCsvDataSourceProvider:
source_format: str
def __call__(self, replay_context: ReplayContext):
file_names = [
self.source_format.format(date=date)
for date in pd.date_range(replay_context.start, replay_context.end)
]
generator = (self._load_one_file(file_name) for file_name in file_names)
return IteratorDataSourceAdapter(
sources=generator,
empty=[],
concatenator=operator.add,
)
def _load_one_file(self, file_name: str) -> MessageDataSource:
return MessageDataSource(
[
Message(*row)
for _, row in pd.read_csv(
file_name, parse_dates=["timestamp"]
).iterrows()
]
)
source_provider = PartitionedCsvDataSourceProvider("input_{date:%Y-%m-%d}.csv")