Kafka basics — everything you need to know to start using Kafka

Nicol Leung
5 min readNov 21, 2018

Basic concepts

Kafka is yet another project that implements the old school topic based publish subscribe system, but in a fashionable way. It provides high availability, high performance, consumer groups, different levels of consistency and various modes of acknowledgment etc., which is essential features but missing in other popular systems.

Producers, consumers, topics

Producer can send messages to different topics and consumers can subscribe to multiple topics to receive messages from these topics.

Broker

A Kafka broker is a node in a Kafka cluster or a standalone Kafka node.

Log

Log of messages sent to Kafka topics.

Partition, offset

A topic can be partitioned for load balancing of brokers, and enable parallel processing by a group of consumers.

Each partition read by a consumer have a partition offset for that consumer to mark the last processed messages position.

Consumer group

Consumers can form groups to share loads from a topic by assigning each partition to a consumer in the group evenly.

Replication, leader

Each partition can have more than one copy to ensure high availability. Replication factor can be set per topic. Each partition has a broker as leader responsible for receiving new messages.

Zookeeper

Zookeeper is used to store and synchronize configurations of Kafka cluster. Kafka brokers connect to Zookeeper to retrieve configurations.

Setup a cluster

First unpack the Kafka tarball to /opt in all nodes.

Update the following in config/server.properties in all nodes:

# Broker id must be unique in a cluster
broker.id=1
# Switch to enable topic deletion or not, default value is false
# We suggest turning it on in case we need to delete data
delete.topic.enable=true
# Listener address, may leave it to default PLAINTEXT://:9092 by comment out the line
# Enable encryption by changing protocol to SSL
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.1.123:9092
# Topic logs, by default is /tmp/kafka-logs, we suggest moving it out of /tmp since /tmp may be cleaned up
log.dir=/var/lib/kafka/logs
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# Zookeeper instance list (standalone or cluster)
zookeeper.connect=localhost:2181

Notice that broker.id should be unique for each node, and you should set advertised.listeners for each node in case hostname is not set or DNS cannot resolve the hostname.

Before you start the cluster, you need Zookeeper service running. You can start a standalone Zookeeper by:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

For production use, you should deploy Zookeeper cluster to ensure high availability. For cluster setup, check Zookeeper official documentation.

Once Zookeeper is up, start Kafka on each node by

bin/kafka-server-start.sh -daemon config/server.properties

Create a topic

Create a topic by updating Zookeeper configurations. We suggest the number of partitions to be 3x the cluster size, because the number of consumers in a consumer group cannot exceed the number of partitions, and you don’t need to change the number when you expand the cluster. Meanwhile, replication factor cannot exceed the cluster size.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic my-replicated-topic

Programming producers

When producing new records, you can set a key for partitioning, if you leave the key empty, the record is assigned to a partition on round robin or random basis.

https://stackoverflow.com/questions/40872520/whats-the-purpose-of-kafkas-key-value-pair-based-messaging

Add more brokers

Start new brokers using bin/kafka-server-start.sh as shown above.

After the new brokers are started, existing topics will NOT be distributed to new broker automatically. The following section explains how to rebalance partitions.

Rebalance partitions

You need to rebalance partitions after you add new brokers or before you remove a broker. You can check existing distribution of partitions by bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Kafka provided a script to reassign partitions. Before you run the reassignment, you need to prepare a topic list.

We list the topic first:

bin/kafka-topics.sh --zookeeper localhost:2181 --list

Then we create a list as follows, let’s name it topics-to-move.json .

{
"topics": [{"topic": "foo1"}, {"topic": "foo2"}],
"version": 1
}

We also need to list all the broker ids:

bin/zookeeper-shell.sh localhost:2181 <<< "ls /brokers/ids"

Then we generate the new scheme (not executing the reassignment):

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3" --generate

The new scheme is displayed in JSON format, copy it to expand-cluster-reassignment.json .

Then we use the new scheme to execute reassignment:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

Since the reassignment may take a while to finish, the command will not wait for completion. You can check the progress as follows:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

Once rebalance is started, consumers will stop fetching until rebalance is completed. After that, consumer will connect to new leaders of each partition.

https://stackoverflow.com/questions/27181693/how-does-consumer-rebalancing-work-in-kafka

Broker fail-over

Once a broker is down in a Kafka cluster, the cluster will distribute the leader role on the failed broker evenly to other online brokers. Producers and consumers will automatically connect to the new leader of each partition once connections are interrupted.

Once the broker is up again ??????????????????????????????????

https://dzone.com/articles/understanding-kafka-failover

Consumer group rebalance

Once you add a new consumer in a consumer group, Kafka will assign partitions held by other existing consumers to this new consumer.

Once a consumer is down in a consumer group, the partitions held by the consumer will be reassigned to other consumers in the group evenly.

https://dzone.com/articles/understanding-kafka-failover

Rolling upgrade

You can upgrade Kafka without any service impact. Check official documentation for details.

Decommission brokers

Last but not least, we go through how to decommission brokers.

We need to reassign the partitions for all topics as shown above. The only thing we need to change is to remove the brokers from --broker-list argument.

Then shutdown the brokers, that’s it.

Appendix

Commonly used commands

Check partition offset of a topic

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-replicated-topic

Check consumer group partition assignment and offset

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group

Delete a topic

Deleting a topic can free up the disk space used immediately. Before you can delete a topic, remember delete.topic.enable need to be true in server configuration.

bin/kafka-topics.sh --delete --topic shiny-new-topic --zookeeper localhost:2181

Common issues

Consumer not working

Remove broker configurations in Zookeeper (will also remove partitioning configurations, please make sure no data will be lost):

bin/zookeeper-shell.sh localhost:2181 <<< "rmr /brokers"

Then restart Kafka cluster.

--

--