Skip to content

Pandas integration

This section explains how to use beavers with pandas.

ETF value calculation example

In this example we want to calculate the value of ETFs. If you are not familiar with ETFs, think about them as just a basket of shares.

Starting with a table of individual share prices:

import pandas as pd

price_table = pd.DataFrame.from_records(
    [
        {"ticker": "AAPL", "price": 174.79},
        {"ticker": "GOOGL", "price": 130.25},
        {"ticker": "MSFT", "price": 317.01},
        {"ticker": "F", "price": 12.43},
        {"ticker": "GM", "price": 35.28},
    ],
)

price_dtypes = price_table.dtypes

ticker price
AAPL 174.79
GOOGL 130.25
MSFT 317.01
F 12.43
GM 35.28

And another table containing the composition of each ETF:

etf_composition_table = pd.DataFrame.from_records(
    [
        {"etf": "TECH", "ticker": "AAPL", "quantity": 2.0},
        {"etf": "TECH", "ticker": "GOOGL", "quantity": 2.0},
        {"etf": "TECH", "ticker": "MSFT", "quantity": 1.0},
        {"etf": "CARS", "ticker": "F", "quantity": 3.0},
        {"etf": "CARS", "ticker": "GM", "quantity": 2.0},
    ],
)

etf_composition_dtypes = etf_composition_table.dtypes

etf ticker quantity
TECH AAPL 2.0
TECH GOOGL 2.0
TECH MSFT 1.0
CARS F 3.0
CARS GM 1.0

In a few line of pandas we can derive the value of each ETF:

def calculate_etf_value(
    etf_composition: pd.DataFrame, price: pd.DataFrame
) -> pd.DataFrame:
    return (
        etf_composition.merge(price, left_on="ticker", right_on="ticker", how="left")
        .assign(values=lambda x: x["price"] * x["quantity"])
        .groupby("etf")
        .aggregate([("value", "sum")])
    )


etf_value_table = calculate_etf_value(
    etf_composition=etf_composition_table, price=price_table
)

etf value
TECH 927.09
CARS 72.57

ETF value calculation DAG

Once the business logic of the calculation is writen and tested it can be added into a Dag. We'll be using the Dag pd helper which makes it easier to deal with pandas table in beavers.

First we define two source streams, made of pandas.DataFrame:

from beavers import Dag

dag = Dag()
price_source = dag.pd.source_df(dtypes=price_dtypes, name="price")
etf_composition_source = dag.pd.source_df(
    dtypes=etf_composition_dtypes, name="etf_composition"
)

Then we keep track of the latest value for each source stream:

price_state = dag.pd.latest_by_keys(price_source, ["ticker"])
etf_composition_state = dag.pd.latest_by_keys(
    etf_composition_source,
    ["etf", "ticker"],
)

Lastly we put together the share prices and ETF composition:

etf_value_state = dag.state(calculate_etf_value).map(
    etf_composition_state,
    price_state,
)

And that's it:

price_source.set_stream(price_table)
etf_composition_source.set_stream(etf_composition_table)
dag.execute()
pd.testing.assert_frame_equal(etf_value_state.get_value(), etf_value_table)