Parsing Apache Kafka __consumer_offsets using Kafka command and Java API
__consumer_offsets is the topic where Apache Kafka stores the offsets. Since the time Kafka migrated the offset storage from Zookeeper to avoid scalability problems __consumer_offsets is the one topic took the center stage in managing the offsets for all the consumers.
By default, consumers cannot consume the __consumer_offsets topic as it is an internal topic. Therefore, we should enable the exclude.internal.topics to false before consuming this topic.
If we want to leverage the Kafka Command line scripts to consume the messages we can use the below command
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter “kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter” --consumer.config consumer.conf --from-beginning
echo "exclude.internal.topics=false" > consumer.config
We cannot access the data directly as Kafka stores this information in binary format. To read this data we need to use formatter class “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” which converts binary data to human-readable format.
Similarly, I have written a utility class which parses the __consumer_offsets topic data using the Kafka client Consumer API.
byte[] key = consumerRecord.key();
byte[] value;
if(key != null) {
Object o = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key));
if(o!= null && o instanceof OffsetKey) {
OffsetKey offsetKey = (OffsetKey) o;
value = consumerRecord.value();
offsetAndMetadata offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
}
You can find the complete code snippet which converts the __consumer_offsets metadata to JSON format in the below GitHub link.
https://github.com/wandermonk/KafkaConsumerOffsetsParser