Skip to content

Live with Kafka

This section explains how to a beavers application in real time using kafka.

Count Word Example

Starting with a simple "count word" dag with one source going to one sink:

from beavers import Dag


class CountWords:
    state = {}

    def __call__(self, new_words: list[str]) -> dict[str, int]:
        for word in new_words:
            self.state[word] = self.state.get(word, 0) + 1
        return self.state


def update_stream(
    state: dict[str, int], updated_words: list[str]
) -> list[tuple[str, int]]:
    return [(word, state[word]) for word in set(updated_words)]


dag = Dag()
word_source = dag.source_stream(name="words")
count_state = dag.state(CountWords()).map(word_source)
count_stream = dag.stream(update_stream, []).map(count_state, word_source)
dag.sink("counts", count_stream)

This dag has got a source node called words and a sink node called counts

Defining Kafka Source

We will be receiving data from kafka, on a topic called words.

First we need to define how we deserialize messages coming from kafka:

def deserialize_messages(messages: list[confluent_kafka.Message]) -> list[str]:
    return [message.value() for message in messages]

Then, we put together the SourceTopic with its:

  • topic (words)
  • deserializer (deserialize_messages)
  • replay policy (from_latest)
from beavers.kafka import SourceTopic, KafkaDriver

source_topic = SourceTopic.from_start_of_day(
    "words", deserialize_messages, pd.to_timedelta("15min"), "UTC"
)

There are multiple kafka replay policy available, see the api doc for the full list.

Defining Kafka Sink

We will be sending the results to the counts topic. The key will be the word.T The value will be the latest count.

First we need to define a serializer, which converts each count to a KafkaProducerMessage

from beavers.kafka import KafkaProducerMessage


def serialize_counts(values: list[tuple[str, int]]) -> list[KafkaProducerMessage]:
    return [
        KafkaProducerMessage(
            topic="counts",
            key=word,
            value=str(count),
        )
        for word, count in values
    ]

The serializer is responsible for providing the topic for each outgoing message.

Putting it together with KafkaDriver

The KafkaDriver takes care of creating the kafka producer and consumer, and passing the message through:

kafka_driver = KafkaDriver.create(
    dag=dag,
    consumer_config={
        "group.id": "beavers",
        "bootstrap.servers": "localhost:9092",
    },
    producer_config={"bootstrap.servers": "localhost:9092"},
    source_topics={"words": source_topic},
    sink_topics={"counts": serialize_counts},
)
while True:
    kafka_driver.run_cycle()

Beavers Kafka Features

  • One consumer: There is only one consumer (rather than one consumer for each topic)
  • One producer: There is only one producer (rather than one producer for each topic)
  • When polling messages, beavers tries to read all available messages, up to a limit of batch_size=5000 (which is configurable in the KafkaDriver)
  • When replaying past data, beavers orchestrate topic/partition so data is replayed in order, across topics, based on each message timestamp.
  • When replaying past data, some newer messages have to be held. To avoid memory issue, the number of held messages is capped to batch_size*5. Once the number of held messages get to high, partitions that are ahead of the watermark are paused. These partitions are un-paused once the application catches up

Beavers Kafka Limitations

  • One beavers application consumes every partition for requested topics (no load balancing/scaling)