Kafka

The fundamental building block of Kafka is Partitions. Or Topic-Partitions. A topic contains multiple partitions that each contain a leader. Additionally Kafka elects one of the brokers to act as a Controller to decide which broker contains which partitions. One of the broker will be leader for the partition, while others are followers. The followers that are keeping up with the leader by a predetermined number of messages (about 4000) are called In-Sync Replicas (ISR).

Kafka saves all this information in Zookeeper so that a broker failing doesn’t lose information about topics, partitions and leaders for those partitions.

Each log message increments the counter called Offset. These are monotonically increasing sequence of numbers that are guaranteed to be unique within a partition.

Brokers

Each Kafka server is referred to as a "Broker" - this is standard in Queue parlance, as the meeting point for producers of messages and the consumers of said messages. What makes Kafka unique is that unlike other ESBs (TIBCO, SonicMQ) or MQs (RabbitMQ), it doesn't push data to you, and instead relies on the consumers polling for messages. A producer writes to a topic using a "key" which then gets assigned to a partition - to be consumed by a consumer. Additionally, kafka relies on an append-only logfile structure. A lot of the design of Kafka relies on existing operating system primitives rather than inventing new protocols. Kafka also only guarantees at least once delivery and not exactly-once, so it is upto the consumer to see if it is getting a duplicate message or not.

Kafka Brokers rely on Zookeeper to keep a registry of all available brokers in the /brokers/ids path. You can connect to any live broker and find the remaining brokers. So you will often see the servers referred to as bootstrap-list, this is similar to other distributed systems like Cassandra that don't rely on Zookeeper like Kafka does.

Kafka Logs

Kafka keeps a series of logfiles for each topic partition in the data directories given. Additionally, it keeps one index file until 0.9.x and two since 0.10.x to lookup offsets in the log file quickly.

Replicas and In-Sync-Replicas

As with any Distributed Log system, there are certain brokers that are in the set of Replicas for the topics. Producer writes to the leader, and the Followers replicate from the leader. If the followers fall behind by certain number of messages, they're considered to not be "In-Sync" anymore. If all your replicas fall behind and are not in sync, the partition becomes offline. An offline partition isn't writable, or readable from.

How Kafka Consumers work

You can have a consumer read from many topics and partitions without joining any consumer-groups, and no record is kept where that particular consumer is relative to the offset. But, this wouldn't be fault-tolerant if your consumer crashes, so you want to run Kafka Consumers in a group.

Consumer Groups

A consumer group is a way to run application across multiple nodes in a way that failure of one node leads to other nodes taking over the partitions. You will often come across "High Level Consumer Group" -- this means a consumer can be multi-threaded consuming from many topics and partitions.

Notes:

  • You give a unique name to your consumer group
  • A consumer group can listen to more than one topic
  • The mapping of partitions to consumer is done by one of the consumers chosen as leader by the Coordinating broker
    • This mapping is done all at once for all the topics

To learn more about the underlying protocol, Kafka's documentation is a good place to start

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

Not every service needs to have consumer group either. You may have a service that's receiving commands uniformly or configuration update on a topic, at that point you can just consume from the topic without a consumer group. This avoids the pause that kafka introduces for when the topics are rebalanced when a new consumer joins the group.

Lag

The distance (or time) from the most recent message that producer has written to the partition to how far the consumer is. You can think of it as “how far is the worker from real-time.”

Coordinator

Kafka Assignments and partitions are managed by a kafka Broker assigned as a coordinator. You can check the current coordinator's id in zookeeper at <kafka>/coordinator

It's the coordinator's job to check for topic leadership skews for brokers and rebalance them.

Defaults

Name Value
Port 9092
Number of Nodes At least the number of replication factor for your partitions. Which you want to be an odd number

Operationalizing

Fundamentals

Since Kafka uses the disk to write the logs and Linux's sendfile() syscall to send messages, you want to keep a close watch on io utilization and memory.

Kafka Specific

In Sync Replica/Under Replication metrics and Fetcher Lag Metrics, these are often closely related to the io utilization on the server.

Gathering Metrics

Kafka has loads of metrics exposed over JMX. We used the diamond library for gathering metrics from Kafka using the mx4j library for exposing the metrics. With Diamond, you can report them anywhere Diamond can report -- Graphite or InfluxDB are popular choice. I'll present the example of Graphite here because it's easiest to get started with and maps reasonably well to the JMX metrics.

Readings

Kafka has some excellent documentation on operationalizing it at https://kafka.apache.org/documentation/\#operations

Learning Kafka is a good book to start too.

Committing Offsets to Kafka

Up until version 0.10 of Kafka, client libraries were committing offsets to Zookeeper. Since then the new consumer API was introduced and the clients could commit to kafka directly. For kafka this is done on a topic named __consumer_offsets

KPIs

  • Each consumer group's lag from the head of the topic
  • Number of under-replicated partitions
  • IO Utilization % for the disk
  • Zookeeper Session timeouts
  • Offline Partitions count
  • Request Queue Wait time

Kafka Maintenance Strategies

Adding Partitions

We add partitions with the following strategy:

1 for very small topics that will likely not need a lot of parallelism or are strictly point to point or broadcast.

After 1, if you want parallelism, the next logical step is to make the number factorable so that you can handle churn in workers while keeping your utilization high.

If you anticipate any kind of parallelism for your service, starting with 2 partitions is best even if you start with only one worker. This forces you early into planning for state management and handling rebalancing when one worker goes away. This also gives you a higher throughput from the beginning than a single partition would.

We then just keep adding prime factors for increased parallelism.

  • 2^1 * 3^1 = 6 seems logical place to start but you're limited to 2 workers or 3 assuming even distribution of messages. At this point, we can go one higher with 2^2 * 3 ^1 = 12 for more scaling options and keeping maximum utilization. When you have to scale even higher, the next logical choice is to add another prime as 2^2 * 3^1 * 5*1 = 60

Often the examples online and naive assumption is to go from 5, 10, 15, 20, but that limits your options when you are packing the stream processing systems to have the maximum utilization.

While there can be spikes on partitions but assuming an uneven partition from the beginning is an anti-pattern for kafka partition design. The stateful workers should have strategies to offload the state into a remote cache and rehydrate as needed rather than assume a message will always be in a certain partition.

Partition increases aren't a solution for immediate scaling needs -- it's part of planning for the next scale needs. Also, once you increase them, you cannot decrease them without deleting and recreating them.

When we get to the scheduling decisions chapter, this distribution of streams makes it easier to pack workers into better bins.

Libraries

Go

https://github.com/Shopify/sarama is a great library to use for working with Kafka. We built a lot of production services and our logging services based on sarama.

results matching ""

    No results matching ""