Skip to content

Perspective Integration

This section explains how to build a live web dashboard with Perspective and Beavers.

In Beavers, you can connect any node of type pyarrow.Table to a perspective table. All you need to do is call dag.psp.to_perspecive, and provide a PerspectiveTableDefinition.

Key Value Example

We'll write a super simple key-value store application. It listens to a topic, and displays the value of kafka messages by key, with their timestamp

Install

pip install beavers[pyarrow, perpective-python]

Defining the schema of incoming message

First we define a schema for the incoming "key value" messages:

  • a timestamp, in millis
  • a key (string)
  • a value (string)
import pyarrow as pa


KEY_VALUE_SCHEMA = pa.schema(
    [
        pa.field("timestamp", pa.timestamp("ms", "UTC")),
        pa.field("topic", pa.string()),
        pa.field("partition", pa.int32()),
        pa.field("offset", pa.int64()),
        pa.field("key", pa.string()),
        pa.field("value", pa.string()),
    ]
)

Convert kafka messages to arrow Table

Then we write a function that converts kafka messages to an apache arrow table of "key value" messages:

import confluent_kafka


def kafka_messages_to_pyarrow(
    messages: Sequence[confluent_kafka.Message],
) -> pa.Table:
    return pa.table(
        [
            [m.timestamp()[1] for m in messages],
            [m.topic() for m in messages],
            [m.partition() for m in messages],
            [m.offset() for m in messages],
            [None if m.key() is None else m.key().decode("utf-8") for m in messages],
            [
                None if m.value() is None else m.value().decode("utf-8")
                for m in messages
            ],
        ],
        schema=KEY_VALUE_SCHEMA,
    )

Create a dag

We create a super simple dag. It has a source, called key_value, which is a table of "key value" messages. The source is plugged into a perspective table, called... key_value, whose index is the key column

from beavers import Dag
from beavers.perspective_wrapper import PerspectiveTableDefinition


def create_test_dag() -> Dag:
    dag = Dag()
    stream = dag.pa.source_table(
        name="key_value",
        schema=KEY_VALUE_SCHEMA,
    )
    dag.psp.to_perspective(
        stream,
        PerspectiveTableDefinition(
            name="key_value",
            index_column="key",
        ),
    )
    return dag

Run the dashboard

Lastly, we put everything together in an application

from beavers.kafka import KafkaDriver, SourceTopic
from beavers.perspective_wrapper import run_web_application


def run_dashboard(
    topic: str = "key-value",
    port: int = 8082,
    consumer_config: dict | None = None,
):
    if consumer_config is None:
        consumer_config = {"bootstrap.servers": "localhost:9092", "group.id": "beavers"}

    dag = create_test_dag()

    kafka_driver = KafkaDriver.create(
        dag=dag,
        producer_config={},
        consumer_config=consumer_config,
        source_topics={
            "key_value": SourceTopic.from_earliest(topic, kafka_messages_to_pyarrow)
        },
        sink_topics={},
    )

    run_web_application(kafka_driver=kafka_driver, port=port)

You should be able to see it in http://localhost:8082/key_value