Tool Gut: Kafka Message Journey


In this article, I will review how a message travels through its journey in the Kafka ecosystem. If you need a refresher on the core concepts of Kafka, you can refer to this blog post. Also, for more information on any of the concepts covered in this article, I would suggest referring to either of the following resources:

The Path

In a Kafka cluster, there are a couple of components that need to be initialized before the journey begins. Of course, this is something handled by Kafka, but they are necessary, so we are going to review them:


This component is responsible for the coordination of assigning partitions and data replicas. The controller will be selected in a process called Kafka Controller Election (more details on how the process works, can be found in this blog post).

In short, all of the brokers initialized will try to register themselves as the controller on the cluster start. Since only one controller can exist on the cluster by design, the first one who succeeds will become the controller, and the rest will watch. When the controller goes down for whatever reason, Zookeeper will inform the watchers, and the re-election process will be triggered (just like the first one).


They are the primary concurrency mechanism in Kafka to enable producers and consumers to scale horizontally (more info). Controllers allocate these partitions over available brokers based on the configuration defined for each topic using the --partitions argument.


They are the primary fault-tolerance mechanism in Kafka. Replicas are created by the controller component based on the configuration defined for each topic using the --replication-factor argument.

Lead Replica

Shown in orange diamond in the picture above (with “L”), these are the primary replica for each topic responsible for handling read and write requests. They’ll get selected in a process called Kafka Lead Replicat Election (this blog post, has an excellent in-depth overview on the process).

For each topic, Kafka tracks a factor named In-Sync Replica (or ISR for short), which indicates the number of replicas reflecting the latest status. When the lead replica goes down, the next in-sync replica will be selected, and if there is no in-sync replica to choose from, Kafka will wait (accepts no write action) until one such replica gets booted. There is a configuration called unclean-leader-election, which, when enabled, allows Kafka to use any non-sync replication when such a state happens to continue the consumption process.


Now let’s review the message journey (you can use the image in the start of the article to help with visualizing the process):

  1. Producer publishes the message to the cluster.
  2. The target partition will be selected in a round-robin fashion.
  3. The message will be appended to the end of the lead replica in the selected partition, and a unique ID (offset) will be assigned to the message.
  4. The replication mechanism will also create copies of data into the other replicas (if defined based on the replication factor).
  5. During the lifetime message on the topic, any consumer active on the consumer groups will consume the message if it’s not already consumed by other consumers active on that group. Note: This is only effective if the consumer uses a consumer group; otherwise, the consumer is responsible for tracking the offsets.
  6. When the retention period configuration of the topic exceeds, the message will automatically get deleted from the topic. Obvious fact: If no consumer has consumed the message, it’s lost forever!

Tip: Initialize PySpark session with Delta support

Quick Start

Delta’s documentation on how to enable it with Python is relatively straightforward. You install the delta-spark package using pip, and after adding the Delta related configuration, you need to wrap the PySpark builder with a call to configure_spark_with_delta_pip, and then you can .get_or_create your session.

By looking at its code, you’ll find out, that all it does is add a spark.jars.packages to your session’s configuration, which consequently puts the required Java module in your classpath.

AWS Glue

This installation approach works on a typical setup; however, when I was trying to utilize this for a script on AWS Glue, I realized this package was not getting placed in the classpath, causing a ClassNotFound exception. To make it work, I needed to download the desired delta-core jar file from the maven repository, upload it to S3, and the path it to the Glue job as a Dependent jar path.

PySpark version constraints

At the time of this writing, the Delta package works with PySpark < 3.2. If you try to run it with a newer version, it’ll raise the following exception:

java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.SQLConfHelper

Overall, it’s good to make sure your Spark/PySpark versions match together, and they are compatible with the Delta version.