Skip to content

Pyarrow integration

This section explains how to use beavers with pyarrow.

ETF value calculation example

In this example we want to calculate the value of ETFs. If you are not familiar with ETFs, think about them as just a basket of shares.

Starting with a table of individual share prices:

import pyarrow as pa

PRICE_SCHEMA = pa.schema(
    [
        pa.field("ticker", pa.string()),
        pa.field("price", pa.float64()),
    ]
)

price_table = pa.Table.from_pylist(
    [
        {"ticker": "AAPL", "price": 174.79},
        {"ticker": "GOOGL", "price": 130.25},
        {"ticker": "MSFT", "price": 317.01},
        {"ticker": "F", "price": 12.43},
        {"ticker": "GM", "price": 35.28},
    ],
    schema=PRICE_SCHEMA,
)

ticker price
AAPL 174.79
GOOGL 130.25
MSFT 317.01
F 12.43
GM 35.28

And another table containing the composition of each ETF:

ETF_COMPOSITION_SCHEMA = pa.schema(
    [
        pa.field("etf", pa.string()),
        pa.field("ticker", pa.string()),
        pa.field("quantity", pa.float64()),
    ]
)


etf_composition_table = pa.Table.from_pylist(
    [
        {"etf": "TECH", "ticker": "AAPL", "quantity": 2.0},
        {"etf": "TECH", "ticker": "GOOGL", "quantity": 2.0},
        {"etf": "TECH", "ticker": "MSFT", "quantity": 1.0},
        {"etf": "CARS", "ticker": "F", "quantity": 3.0},
        {"etf": "CARS", "ticker": "GM", "quantity": 2.0},
    ],
    schema=ETF_COMPOSITION_SCHEMA,
)

etf ticker quantity
TECH AAPL 2.0
TECH GOOGL 2.0
TECH MSFT 1.0
CARS F 3.0
CARS GM 1.0

In a few line of pyarrow we can derive the value of each ETF:

import pyarrow.compute as pc

ETF_VALUE_SCHEMA = pa.schema(
    [
        pa.field("etf", pa.string()),
        pa.field("value", pa.float64()),
    ]
)


def calculate_etf_value(etf_composition: pa.Table, price: pa.Table) -> pa.Table:
    positions_with_prices = etf_composition.join(price, keys=["ticker"])
    values = pc.multiply(
        positions_with_prices["price"], positions_with_prices["quantity"]
    )
    positions_with_prices = positions_with_prices.append_column("value", values)
    return (
        positions_with_prices.group_by("etf")
        .aggregate([("value", "sum")])
        .rename_columns(ETF_VALUE_SCHEMA.names)
    )


etf_value_table = calculate_etf_value(
    etf_composition=etf_composition_table, price=price_table
)

etf value
TECH 927.09
CARS 72.57

ETF value calculation DAG

Once the business logic of the calculation is writen and tested it can be added into a Dag. We'll be using the Dag pa helper which makes it easier to deal with pyarrow table in beavers.

First we define two source streams, made of pyarrow.Table:

from beavers import Dag

dag = Dag()
price_source = dag.pa.source_table(schema=PRICE_SCHEMA, name="price")
etf_composition_source = dag.pa.source_table(
    schema=ETF_COMPOSITION_SCHEMA, name="etf_composition"
)

Then we keep track of the latest value for each source stream:

price_state = dag.pa.latest_by_keys(price_source, ["ticker"])
etf_composition_state = dag.pa.latest_by_keys(
    etf_composition_source,
    ["etf", "ticker"],
)

Lastly we put together the share prices and ETF composition:

etf_value_state = dag.state(calculate_etf_value).map(
    etf_composition_state,
    price_state,
)

And that's it:

price_source.set_stream(price_table)
etf_composition_source.set_stream(etf_composition_table)
dag.execute()
assert etf_value_state.get_value() == etf_value_table

Taming updates

This simple dag does the job of calculating the ETF value in real time. But there is one issue. The value of every ETF would update every time either price or etf_composition update. Even if the updates comes on a ticker that is not relevant to the ETFs we are tracking.

In the example below, when the price of GameStop updates, we recalculate the value of every ETF. Even though their value hasn't changed:

new_price_updates = pa.Table.from_pylist(
    [{"ticker": "GME", "price": 123.0}],
    PRICE_SCHEMA,
)
price_source.set_stream(new_price_updates)
dag.execute()
assert len(etf_value_state.get_value()) == 2
assert etf_value_state.get_cycle_id() == dag.get_cycle_id()

To tame updates we need to identify which ETF needs updating.

ETF values can update because their composition has changed:

updated_because_of_composition = dag.pa.get_column(
    etf_composition_source,
    "etf",
)

Or because one of their component has updated:

def get_etf_to_update_because_of_price(
    etf_composition_state: pa.Table, price_update: pa.Table
) -> pa.Array:
    updated_tickers = pc.unique(price_update["ticker"])
    return pc.unique(
        etf_composition_state.filter(
            pc.is_in(etf_composition_state["ticker"], updated_tickers)
        )["etf"]
    )


updated_because_of_price = dag.stream(
    get_etf_to_update_because_of_price, pa.array([], pa.string())
).map(etf_composition_state, price_source)

We can then put it back together and only calculate updates for relevant ETFs:

stale_etfs = dag.pa.concat_arrays(
    updated_because_of_price, updated_because_of_composition
)


def get_composition_for_etfs(
    etf_composition_state: pa.Table, etfs: pa.Array
) -> pa.Table:
    return etf_composition_state.filter(
        pc.is_in(
            etf_composition_state["etf"],
            etfs,
        )
    )


stale_etf_compositions = dag.pa.table_stream(
    get_composition_for_etfs, ETF_COMPOSITION_SCHEMA
).map(etf_composition_state, stale_etfs)

updated_etf = dag.pa.table_stream(calculate_etf_value, ETF_VALUE_SCHEMA).map(
    stale_etf_compositions, price_state
)

And see that only the value "TECH" ETF updates when a tech stock update:

price_source.set_stream(
    pa.Table.from_pylist(
        [{"ticker": "MSFT", "price": 317.05}],
        PRICE_SCHEMA,
    )
)
dag.execute()
assert len(updated_etf.get_value()) == 1

etf value
TECH 927.13