Apache Kafka - Understanding Producer and Consumer for Beginners (Java)
Introduction
This tutorial assumes that you have done the initial setup for Kafka on your local machine, if you have not kindly refer from here.
In this tutorial we will focus on writing a simple Producer and a Consumer in java which which will write and read String data.
Maven Dependencies
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
Producer
Kafka Producer acts as a source of data for kafka cluster. A producer can publish messages to single or multiple kafka topics. Kafka Producer - Producer<K,V> class is parameterized with key and value type that we are going to write on the kafka topics. We need below mandatory properties to instantiate a producer
- bootstrap.servers - list of brokers
- key.serializer and value.serializer - as kafka is agnostic of the data being written on the topics and only accepts byte streams we need to provide a serializer - to serialize the data being written on the topics into byte streams.
ProducerRecord
We need to create ProducerRecord objects in order to publish to kafka topics. Bare minimum ProducerRecord constructor takes topicName and value as parameters.
Producer.send
Send method of Producer is used to publish messages to kafka topics both in synchronous and asynchronous fashion. In this example we are using asynchronous send without callback. We will learn more about this in upcoming blogs.
producer.close() - closing the producer
The producer will automatically create the topic if it doesnot already exists.
public static void main(String[] args) throws Exception{
String topicName = "SimpleProducerTopic";
String key = "Key";
String value = "Value";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <String, String>(props);
for(int i=0;i<=100;i++){
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,key+i,value+i);
//asynchronous send without callback.
producer.send(record);
}
producer.close();
System.out.println("SimpleProducer Completed.");
}
Consumer
For creating a bare minimum consumer we need to provide the below mandatory properties,
- Topic List - list of topic the consumer is going to subscribe to.
- key.deserializer and value.deserializer - to deserialize the bytes written on the kafka topics.
- bootstrap.servers - list of brokers
- group.id - the consumer are grouped according to use case or functionality. Every consumer. Each consumer group is a subscriber to one or more Kafka topics and has a unique id. A record gets delivered to only 1 consumer in a group. For mulitple subscribers you need multiple consumer groups. This helps Kafka in parallel processing, load balancing and failover.
A consumer can subscribe to one or topics using the subscribe method.
Once subscribed you can start reading from the topic using consumer.poll(timeoutMs) method.
public static void main(String[] args) throws Exception {
String topicName = "SimpleProducerTopic";
String groupName = "SimpleProducerTopicGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Simple String message= " + String.valueOf(record.value()));
}
}
}
Running the code
Build your jar and run the below steps to see the output
- Start the ZooKeeper - bin/zookeeper-server-start.sh config/zookeeper.properties
- Start the kafka server - bin/kafka-server-start.sh config/server.properties
- Run the producer in one terminal - java -cp basickafka.jar SimpleProducer
- Run the Consumer in another terminal - java -cp basickafka.jar SimpleConsumer