Live with Kafka
This section explains how to a beavers application in real time using kafka.
Count Word Example
Starting with a simple "count word" dag with one source going to one sink:
from beavers import Dag
class CountWords:
state = {}
def __call__(self, new_words: list[str]) -> dict[str, int]:
for word in new_words:
self.state[word] = self.state.get(word, 0) + 1
return self.state
def update_stream(
state: dict[str, int], updated_words: list[str]
) -> list[tuple[str, int]]:
return [(word, state[word]) for word in set(updated_words)]
dag = Dag()
word_source = dag.source_stream(name="words")
count_state = dag.state(CountWords()).map(word_source)
count_stream = dag.stream(update_stream, []).map(count_state, word_source)
dag.sink("counts", count_stream)
This dag has got a source node called words
and a sink node called counts
Defining Kafka Source
We will be receiving data from kafka, on a topic called words
.
First we need to define how we deserialize messages coming from kafka:
def deserialize_messages(messages: list[confluent_kafka.Message]) -> list[str]:
return [message.value() for message in messages]
Then, we put together the SourceTopic
with its:
- topic (
words
) - deserializer (
deserialize_messages
) - replay policy (
from_latest
)
from beavers.kafka import SourceTopic, KafkaDriver
source_topic = SourceTopic.from_start_of_day(
"words", deserialize_messages, pd.to_timedelta("15min"), "UTC"
)
There are multiple kafka replay policy available, see the api doc for the full list.
Defining Kafka Sink
We will be sending the results to the counts
topic.
The key will be the word.T The value will be the latest count.
First we need to define a serializer, which converts each count to a KafkaProducerMessage
from beavers.kafka import KafkaProducerMessage
def serialize_counts(values: list[tuple[str, int]]) -> list[KafkaProducerMessage]:
return [
KafkaProducerMessage(
topic="counts",
key=word,
value=str(count),
)
for word, count in values
]
The serializer is responsible for providing the topic for each outgoing message.
Putting it together with KafkaDriver
The KafkaDriver
takes care of creating the kafka producer and consumer, and passing the message through:
kafka_driver = KafkaDriver.create(
dag=dag,
consumer_config={
"group.id": "beavers",
"bootstrap.servers": "localhost:9092",
},
producer_config={"bootstrap.servers": "localhost:9092"},
source_topics={"words": source_topic},
sink_topics={"counts": serialize_counts},
)
while True:
kafka_driver.run_cycle()
Beavers Kafka Features
- One consumer: There is only one consumer (rather than one consumer for each topic)
- One producer: There is only one producer (rather than one producer for each topic)
- When polling messages, beavers tries to read all available messages, up to a limit of
batch_size=5000
(which is configurable in the KafkaDriver) - When replaying past data, beavers orchestrate topic/partition so data is replayed in order, across topics, based on each message timestamp.
- When replaying past data, some newer messages have to be held.
To avoid memory issue, the number of held messages is capped to
batch_size*5
. Once the number of held messages get to high, partitions that are ahead of the watermark are paused. These partitions are un-paused once the application catches up
Beavers Kafka Limitations
- One beavers application consumes every partition for requested topics (no load balancing/scaling)