Pandas integration
This section explains how to use beavers with pandas.
ETF value calculation example
In this example we want to calculate the value of ETFs. If you are not familiar with ETFs, think about them as just a basket of shares.
Starting with a table of individual share prices:
import pandas as pd
price_table = pd.DataFrame.from_records(
[
{"ticker": "AAPL", "price": 174.79},
{"ticker": "GOOGL", "price": 130.25},
{"ticker": "MSFT", "price": 317.01},
{"ticker": "F", "price": 12.43},
{"ticker": "GM", "price": 35.28},
],
)
price_dtypes = price_table.dtypes
ticker | price |
---|---|
AAPL | 174.79 |
GOOGL | 130.25 |
MSFT | 317.01 |
F | 12.43 |
GM | 35.28 |
And another table containing the composition of each ETF:
etf_composition_table = pd.DataFrame.from_records(
[
{"etf": "TECH", "ticker": "AAPL", "quantity": 2.0},
{"etf": "TECH", "ticker": "GOOGL", "quantity": 2.0},
{"etf": "TECH", "ticker": "MSFT", "quantity": 1.0},
{"etf": "CARS", "ticker": "F", "quantity": 3.0},
{"etf": "CARS", "ticker": "GM", "quantity": 2.0},
],
)
etf_composition_dtypes = etf_composition_table.dtypes
etf | ticker | quantity |
---|---|---|
TECH | AAPL | 2.0 |
TECH | GOOGL | 2.0 |
TECH | MSFT | 1.0 |
CARS | F | 3.0 |
CARS | GM | 1.0 |
In a few line of pandas
we can derive the value of each ETF:
def calculate_etf_value(
etf_composition: pd.DataFrame, price: pd.DataFrame
) -> pd.DataFrame:
return (
etf_composition.merge(price, left_on="ticker", right_on="ticker", how="left")
.assign(values=lambda x: x["price"] * x["quantity"])
.groupby("etf")
.aggregate([("value", "sum")])
)
etf_value_table = calculate_etf_value(
etf_composition=etf_composition_table, price=price_table
)
etf | value |
---|---|
TECH | 927.09 |
CARS | 72.57 |
ETF value calculation DAG
Once the business logic of the calculation is writen and tested it can be added into a Dag.
We'll be using the Dag pd
helper which makes it easier to deal with pandas
table in beavers.
First we define two source streams, made of pandas.DataFrame
:
from beavers import Dag
dag = Dag()
price_source = dag.pd.source_df(dtypes=price_dtypes, name="price")
etf_composition_source = dag.pd.source_df(
dtypes=etf_composition_dtypes, name="etf_composition"
)
Then we keep track of the latest value for each source stream:
price_state = dag.pd.latest_by_keys(price_source, ["ticker"])
etf_composition_state = dag.pd.latest_by_keys(
etf_composition_source,
["etf", "ticker"],
)
Lastly we put together the share prices and ETF composition:
etf_value_state = dag.state(calculate_etf_value).map(
etf_composition_state,
price_state,
)
And that's it:
price_source.set_stream(price_table)
etf_composition_source.set_stream(etf_composition_table)
dag.execute()
pd.testing.assert_frame_equal(etf_value_state.get_value(), etf_value_table)