Perspective Integration
This section explains how to build a live web dashboard with Perspective and Beavers.
In Beavers, you can connect any node of type pyarrow.Table to a perspective table.
All you need to do is call dag.psp.to_perspecive, and provide a PerspectiveTableDefinition.
Key Value Example
We'll write a super simple key-value store application. It listens to a topic, and displays the value of kafka messages by key, with their timestamp
Install
pip install beavers[pyarrow, perpective-python]
Defining the schema of incoming message
First we define a schema for the incoming "key value" messages:
- a timestamp, in millis
- a key (string)
- a value (string)
import pyarrow as pa
KEY_VALUE_SCHEMA = pa.schema(
[
pa.field("timestamp", pa.timestamp("ms", "UTC")),
pa.field("topic", pa.string()),
pa.field("partition", pa.int32()),
pa.field("offset", pa.int64()),
pa.field("key", pa.string()),
pa.field("value", pa.string()),
]
)
Convert kafka messages to arrow Table
Then we write a function that converts kafka messages to an apache arrow table of "key value" messages:
import confluent_kafka
def kafka_messages_to_pyarrow(
messages: Sequence[confluent_kafka.Message],
) -> pa.Table:
return pa.table(
[
[m.timestamp()[1] for m in messages],
[m.topic() for m in messages],
[m.partition() for m in messages],
[m.offset() for m in messages],
[None if m.key() is None else m.key().decode("utf-8") for m in messages],
[
None if m.value() is None else m.value().decode("utf-8")
for m in messages
],
],
schema=KEY_VALUE_SCHEMA,
)
Create a dag
We create a super simple dag.
It has a source, called key_value, which is a table of "key value" messages.
The source is plugged into a perspective table, called... key_value, whose index is the key column
from beavers import Dag
from beavers.perspective_wrapper import PerspectiveTableDefinition
def create_test_dag() -> Dag:
dag = Dag()
stream = dag.pa.source_table(
name="key_value",
schema=KEY_VALUE_SCHEMA,
)
dag.psp.to_perspective(
stream,
PerspectiveTableDefinition(
name="key_value",
index_column="key",
),
)
return dag
Run the dashboard
Lastly, we put everything together in an application
from beavers.kafka import KafkaDriver, SourceTopic
from beavers.perspective_wrapper import run_web_application
def run_dashboard(
topic: str = "key-value",
port: int = 8082,
consumer_config: dict | None = None,
):
if consumer_config is None:
consumer_config = {"bootstrap.servers": "localhost:9092", "group.id": "beavers"}
dag = create_test_dag()
kafka_driver = KafkaDriver.create(
dag=dag,
producer_config={},
consumer_config=consumer_config,
source_topics={
"key_value": SourceTopic.from_earliest(topic, kafka_messages_to_pyarrow)
},
sink_topics={},
)
run_web_application(kafka_driver=kafka_driver, port=port)
You should be able to see it in http://localhost:8082/key_value