# Kafka Offsets And Statistics

Nov 13th, 2020 - written by Kimserey with .

Offsets are a big part of Kafka. They indicate the position of a log in the partition of a topic, allowing consumers to know what to read, and from where to start. In today’s post we will look into how consumers manage the offset, store and commit them, and how brokers maintain them to allow failure to happen on a consumer group.

## Docker Setup

We start first by setting up Kafka in Docker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '2'
services:

zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"

kafka:
build: .
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_CREATE_TOPICS: "kimtopic:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock


This is a simple single broker, with a topic kimtopic with a single partition. We start it via docker-compse:

1
docker-compose up -d


## General Offset Explanation

For this post, we’ll reuse our previous consumer written in C# dotnet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "my-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
EnableAutoOffsetStore = false
};

var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");

try
{
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); // c.StoreOffset(cr); // c.Commit(); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}


This consumer will start an infinite loop which pull messages out of the topics and write them onto the console. In our consumer configuration, we have disabled the auto storage of offset and the auto commit of offset in order to illustrate the usage of offsets:

1
2
EnableAutoCommit = false,
EnableAutoOffsetStore = false


Now if we start our consumer and inspect the broker (via docker cli as explained in our previous post):

1
2
3
4
/ # kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups all-topics --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
my-group-1      kimtopic        0          -               0               -               rdkafka-1c17f9cd-d55d-4510-8e17-3008f3a9a608 /172.21.0.1     rdkafka


we have one topic kimtopic with a single partition 0 with a single consumer group and only one consumer within that group. The consumer is given a member ID rdkafka-1c17f9cd-d55d-4510-8e17-3008f3a9a608 by the broker.

We can see that there are no messages in the topic, hence the current-offset is unknown, the lag is also unkown and the log-end-offset is to 0.

Now if we start a producer with Kafkacat -P and add one message:

1
2
❯ kafkacat -P -b localhost:9094 -t kimtopic
hello world!


We then look at the consumer group:

1
2
3
4
/ # kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups all-topics --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
my-group-1      kimtopic        0          -               1               -               rdkafka-71367ca0-7364-49a6-a3ce-a99796a9bb6f /172.21.0.1     rdkafka


As we continue to produce messages, the log-end-offset will continue to increase and we should see the messages being consumed by our consumer but we will still see that the current-offset and the lag remain unknown due to the fact that we have disable the auto-commit hence no offsets are ever committed.

The side effect of not committing any offset is that if we restart our consumers, the AutoOffsetReset configuration will be checked and based on whether it is set to Earliest or Latest, the consumer will read back from beginning or start reading from the end of the topic. So even though we aren’t committing back to the broker our current offset, the consumer keeps track of where it is at in memory and pull messages from the last message it handled.

But there is a point where we want to commit offsets to the broker in order to survive failures. When a consumer is no longer available, the broker will rebalance the partitions on other consumers part of the same consumer group, if no offset has been committed, the new consumer getting the rebalanced partition will start to consume the partition based on the AutoOffsetReset setting. On the other hand, if the last offset was committed - it will be seen under current-offset and the new consumer will start back from where the failed consumer left (or more presicely - from the last committed offset).

## Committed Offset

Now that we understand the usage of committing offsets, we can look at how we can do so. The most common (and default) way of committing offset is via auto-commit. To reenable auto-commit, we either remove EnableAutoCommit (default is true) or explicitly mark it as true, and remove EnableAutoOffsetStore (default is true) or explicitly mark it as true:

1
2
EnableAutoCommit = true,
EnableAutoOffsetStore = true


The consumer will not commit offsets after each consumption as that would be too inefficient. Instead, it stores the last offsets of handled messages per partition and periodically commits them. This is the reason why EnableAutoOffsetStore has to be true, to mark offset as “ready to be committed”, and EnableAutoCommit has to be true to enable the periodic background commit.

Once we have reenabled both, we can see that the current-offset and lag change as we produce more messages:

1
2
3
4
/ # kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups all-topics --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
my-group-1      kimtopic        0          11              11              0               rdkafka-cb5b1a81-77b2-4cf6-8506-aff53e7032fc /172.21.0.1     rdkafka


As offsets are made available to commit as soon as they are delivered to the consumer, any failure within the consumer might still endup committing the offset. In order to prevent that, we can delay storing the offset to the end of our process by disabling the auto-offset store and manually marking offsets using StoreOffset:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "my-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false
};

var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");

try
{
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); c.StoreOffset(cr); // here we manually store offsets as we have disabled EnableAutoOffsetStore. } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}


## Partition EOF

So far we’ve seen what the offset represents and how to commit them and have been inspecting the current state of our consumer group using kafka-consumer-groups.sh. Another important aspect is figuring out when our consumer has reach the end of the partition, meaning it managed to catch up all messages. The way a consumer can figure this out is via the PARTITION_EOF event.

In order to enable PARTITION_EOF, we need to set EnablePartitionEof on the config:

1
EnablePartitionEof = true


and check the flag IsPartitionEOF on consume result:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "my-group-1",
BootstrapServers = "localhost:9094",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
EnablePartitionEof = true
};

var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
using var c = consumerBuilder.Build();
c.Subscribe("kimtopic");

try
{
while (true)
{
try
{
var cr = c.Consume();

if (cr.IsPartitionEOF)
{
Console.WriteLine($"Consumer reach end of partition '{cr.TopicPartition}'"); continue; } Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
c.StoreOffset(cr); // here we manually store offsets as we have disabled EnableAutoOffsetStore.
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } }  This will give us an indication of the health of the consumer as it is able to reach the end of the partition. Because the end of partition is a moving target, the log will be showed as many time as the target moves and the consumer catches up. ## Consumer Lag The other indication of health of a consumer is the consumer lag. The consumer lag is computed by the difference between the LOG-END-OFFSET and the CURRENT-OFFSET which correspond to the number of messages ahead of the current position of the consumer. The consumer lag is a good way to figure out how far behind the consumer is at. Another way to measure the lag behind in term of time is to use the timestamp of the message: 1 2 3 4 var lagInMilliseconds = (DateTime.UtcNow - cr.Message.Timestamp.UtcDateTime).TotalMilliseconds; Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}' - lag '{lagInMilliseconds}'.");


## Statistics

Instead of using Docker CLI and interogate the broker, we can also be retrieve directly from the consumer stored offset, committed offset and consumer lag via stastics. To enable statistics, we add a statistics interval on the config, for example 5 seconds:

1
StatisticsIntervalMs = 5000


and add a handler on the consumer builder:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var consumerBuilder = new ConsumerBuilder<Null, string>(conf)
.SetStatisticsHandler((c, json) =>
{
var stats = JsonConvert.DeserializeObject<ConsumerStatistics>(json);

foreach (var (topic, topicStats) in stats.Topics)
{
foreach (var (partition, partitionStats) in topicStats.Partitions.Where(p => p.Key != "-1"))
{
if (!c.Assignment.Any(
a => a.Topic == topic
&& a.Partition.Value.ToString() == partition))
// Only record partition topic metrics for the partition assigned.
continue;

Console.WriteLine($"[{DateTimeOffset.FromUnixTimeSeconds(stats.Time)}] " +$"MemberId: {c.MemberId}\n"
+ $"Topic: {topic} - Partition: {partition}\n" +$"Lag {partitionStats.ConsumerLag}\n"
+ $"Hi Offset {partitionStats.HiOffset}\n" +$"Stored Offset {partitionStats.StoredOffset}\n"
+ \$"Committed Offset {partitionStats.CommittedOffset}\n");
}
}
});


SetStatisticsHandler will emit the statistics in json format which we can deserialize following the documentation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ConsumerStatistics
{
[JsonProperty(PropertyName = "time")]
public long Time { get; set; }

[JsonProperty(PropertyName = "topics")]
public Dictionary<string, TopicStatistics> Topics { get; set; }
}

public class TopicStatistics
{
[JsonProperty(PropertyName = "partitions")]
public Dictionary<string, PartitionStatistics> Partitions { get; set; }
}

public class PartitionStatistics
{
[JsonProperty(PropertyName = "consumer_lag")]
public long ConsumerLag { get; set; }

[JsonProperty(PropertyName = "committed_offset")]
public long CommittedOffset { get; set; }

[JsonProperty(PropertyName = "stored_offset")]
public long StoredOffset { get; set; }

[JsonProperty(PropertyName = "hi_offset")]
public long HiOffset { get; set; }
}


which we then can print to console:

1
2
3
4
5
6
[11/01/2020 17:18:50 +00:00] MemberId: rdkafka-2a9e513e-65bc-4547-a88d-9f4c6602fc39
Topic: kimtopic - Partition: 0
Lag 0
Hi Offset 14
Stored Offset 14
Committed Offset 14


From the statistics, we look at the high watermark offset. The high watermark offset is the latest offset which has been replicated to all replicas of the partition. It is the last offset of the partition ready to be consumed, and is actually the offset being checked to compute the lag (as opposed to the log end offset).

And that concludes today’s post!

## Conclusion

In today’s post, we looked at what offsets meant in Kafka, starting from an explanation of how Kafka manages offsets. We then looked at what was the purpose of committing offsets and some variations of storing offsets. We then completed the post by looking at how we could figure out where the consumer was at with the partition EOF, the consumer lag and using the broker statistics to monitor our consumer. I hope you liked this post and I see you on the next one!

## External Sources

Designed, built and maintained by Kimserey Lam.