Skip to content

Replay

This section explains how to run a beavers application using historical data, typically stored in files or databases.

Manual Replay

Starting with a simple dag with one source going to one sink:

dag = beavers.Dag()
my_source = dag.source_stream(name="my_source")
my_sink = dag.sink("my_sink", my_source)

Assuming your data has got this shape:

import dataclasses
import pandas as pd


@dataclasses.dataclass(frozen=True)
class Message:
    timestamp: pd.Timestamp
    message: str

You could replay the data manually your self and run the dag for regular interval:

my_source.set_stream(
    [
        Message(pd.Timestamp("2023-01-01T00:00:00Z"), "hello"),
        Message(pd.Timestamp("2023-01-01T00:00:30Z"), "How are you"),
    ]
)
dag.execute(pd.Timestamp("2023-01-01T00:01:00Z"))
assert my_sink.get_sink_value() == [
    Message(pd.Timestamp("2023-01-01T00:00:00Z"), "hello"),
    Message(pd.Timestamp("2023-01-01T00:00:30Z"), "How are you"),
]

But this requires a lot of boilerplate code and becomes cumbersome very quickly.

Replay Framework

The replay framework uses a few key abstraction in order to define how the data is loaded and injected in the dag.

DataSource

A DataSource provides a way of streaming data.

import beavers.replay


@dataclasses.dataclass(frozen=True)
class MessageDataSource:
    messages: list[Message]

    def read_to(self, timestamp: pd.Timestamp) -> list[Message]:
        results = []
        while self.messages and self.messages[0].timestamp <= timestamp:
            results.append(self.messages.pop(0))
        return results

    def get_next(self) -> pd.Timestamp:
        if self.messages:
            return self.messages[0].timestamp
        else:
            return beavers.replay.UTC_MAX

By convention, DataSources:

  • return UTC_MAX when there is no more data
  • are stateful and need to remember what has already been read.

ReplayContext

The ReplayContext contains timing information:

from beavers.replay import ReplayContext

replay_context = ReplayContext(
    start=pd.to_datetime("2023-01-01T00:00:00Z"),
    end=pd.to_datetime("2023-01-02T00:00:00Z"),
    frequency=pd.to_timedelta("1h"),
)

⚠️ By convention all timestamps are UTC

DataSourceProvider

A DataSourceProvider provides a way of creating DataSource.

For example, if the data is stored in a csv file:

timestamp,message
2023-01-01 01:00:00+00:00,Hello
2023-01-01 01:01:00+00:00,How are you

Provided with the ReplayContext, our DataSourceProvider will load the and return a DataSource

@dataclasses.dataclass(frozen=True)
class CsvDataSourceProvider:
    file_name: str

    def __call__(
        self, replay_context: ReplayContext
    ) -> beavers.replay.DataSource[list[Message]]:
        df = pd.read_csv(self.file_name, parse_dates=["timestamp"])
        messages = [Message(*row) for _, row in df.iterrows()]
        messages.sort(key=lambda x: x.timestamp)
        return MessageDataSource(messages)

DataSink

A DataSink provides a way of capturing the output of nodes and saving the data:

@dataclasses.dataclass(frozen=True)
class CsvDataSink:
    destination: str
    data: list[Message] = dataclasses.field(default_factory=list)

    def append(self, timestamp: pd.Timestamp, data: list[Message]):
        self.data.extend(data)

    def close(self):
        pd.DataFrame([dataclasses.asdict(value) for value in self.data]).to_csv(
            self.destination, index=False
        )

DataSinkProvider

A DataSinkProvider provides a way of creating DataSink.

In this example we save the data to csv:

@dataclasses.dataclass(frozen=True)
class CsvDataSinkProvider:
    destination: str

    def __call__(self, replay_context: ReplayContext) -> CsvDataSink:
        return CsvDataSink(self.destination)

ReplayDriver

The replay driver is responsible for putting the dag, context, sources and sinks together, and orchestrate the replay.

from beavers.replay import ReplayDriver

replay_driver = beavers.replay.ReplayDriver.create(
    dag=dag,
    replay_context=replay_context,
    data_source_providers={"my_source": CsvDataSourceProvider("input.csv")},
    data_sink_providers={"my_sink": CsvDataSinkProvider("output.csv")},
)
replay_driver.run()

Reading Files Partitioned By Time

Assuming:

  • you want to replay a dag for a long period of time.
  • all that historic data doesn't fit into time
  • the data is partitioned by time period. For example one file per day, input_2023-01-01.csv.

It's then possible, with the IteratorDataSourceAdapter to load each file one by one as they are needed.

In this example, csv files are stored under . We need to provide:

  • a generator that will yield a DataSource for each file, in order
  • a way to concatenate the output of 2 DataSource. In this case we'll use + to merge two lists
  • an empty value for the case there is no more data, or we reach the last file.
from beavers.replay import IteratorDataSourceAdapter


@dataclasses.dataclass(frozen=True)
class PartitionedCsvDataSourceProvider:
    source_format: str

    def __call__(self, replay_context: ReplayContext):
        file_names = [
            self.source_format.format(date=date)
            for date in pd.date_range(replay_context.start, replay_context.end)
        ]
        generator = (self._load_one_file(file_name) for file_name in file_names)
        return IteratorDataSourceAdapter(
            sources=generator,
            empty=[],
            concatenator=operator.add,
        )

    def _load_one_file(self, file_name: str) -> MessageDataSource:
        return MessageDataSource(
            [
                Message(*row)
                for _, row in pd.read_csv(
                    file_name, parse_dates=["timestamp"]
                ).iterrows()
            ]
        )


source_provider = PartitionedCsvDataSourceProvider("input_{date:%Y-%m-%d}.csv")