This tutorial helps you to understand how to consume Kafka JSON messages from spring boot application.

Spring Boot Kafka Consume JSON Messages:

As part of this example, I am going to create a Kafka integrated spring boot application and publish JSON messages from Kafka producer console and read these messages from the application using Spring Boot Kakfka Listener.

Technologies:

  • Spring Boot 2.1.3 RELEASE
  • Spring Kafka
  • Kafka 2.11
  • Java 8
  • Maven

Prerequisites:

To run the application, you have to have Apache Kafka installed on your operating system. I provided a couple of articles which helps you to install Kafka on Windows and Ubuntu operating systems.

Install Apache Kafka on Windows 10 Operating System

Install Apache Kafka On Ubuntu Operating System

After successful installation, you should have to start the zookeeper, Kafka servers to connect from spring boot application.

Start Zookeeper Server:

Terminal
root@work:/usr/local/kafka/bin# ./zookeeper-server-start.sh ../config/zookeeper.properties
[2019-03-30 01:55:58,567] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-03-30 01:55:58,584] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-03-30 01:55:58,585] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-03-30 01:55:58,585] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-03-30 01:55:58,585] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2019-03-30 01:55:58,640] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-03-30 01:55:58,641] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2019-03-30 01:55:58,725] INFO Server environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT (org.apache.zookeeper.server.ZooKeeperServer)
.....
.....

Start Kafka Server:

Terminal
root@work:/usr/local/kafka/bin# ./kafka-server-start.sh ../config/server.properties
[2019-03-30 01:56:19,474] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-03-30 01:56:20,461] INFO starting (kafka.server.KafkaServer)
[2019-03-30 01:56:20,462] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-03-30 01:56:20,523] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-03-30 01:56:20,556] INFO Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT (org.apache.zookeeper.ZooKeeper)
[2019-03-30 01:56:20,556] INFO Client environment:host.name=work (org.apache.zookeeper.ZooKeeper)
[2019-03-30 01:56:20,556] INFO Client environment:java.version=11.0.2 (org.apache.zookeeper.ZooKeeper)
[2019-03-30 01:56:20,557] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-03-30 01:56:20,557] INFO Client environment:java.home=/usr/lib/jvm/java-11-oracle (org.apache.zookeeper.ZooKeeper)

Create a Kafka topic:

Terminal
root@work:/usr/local/kafka/bin# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic items-topic
Created topic "items-topic".

Start Kafka Producer Console:

It is used to publish messages to a Kafka topic.

Terminal
root@work:/usr/local/kafka/bin# ./kafka-console-producer.sh --broker-list localhost:9092 --topic items-topic

Spring Boot Kafka Consume JSON Messages Example:

On the above we have created an items-topic from Kafka cli, now we are going to send some JSON messages from Kafka producer console and listen the items-topic from Spring boot application to read messages as soon as producer publishes the messages.

Application Structure:

Spring Boot Kafka Consume JSON Messages Example-min

Dependencies:

pom.xml
<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.8</version>
    </dependency>
</dependencies>

Item Model:

Representing the JSON message structure.

Item.java
package com.onlinetutorialspoint.model;

public class Item {
    private int id;
    private String name;
    private String category;

    public Item(int id, String name, String category) {
        this.id = id;
        this.name = name;
        this.category = category;
    }

    public Item() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }

    @Override
    public String toString() {
        return "Item{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", category='" + category + '\'' +
                '}';
    }
}

Kafka Configuration:

@EnableKafka annotation makes the application to listen on given Kafka topic.

KafkaConfig.java
package com.onlinetutorialspoint.config;

import com.onlinetutorialspoint.model.Item;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String,Item> consumerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"sample-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(),
                new JsonDeserializer<>(Item.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Item> kafkaListener(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG – “127.0.0.1:9092” – Registering Kafka consumer with Kafka server. Currently, I am running my Kafka server in my local machine so that it is 127.0.0.1.

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG – StringSerializer.class – tells the type of key which we are going to receive messages from a Kafka topic StringSerializer.class.

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG – JsonSerializer.class – tells the type of value which we are going to read messages from a Kafka topic JsonSerializer.class

Kafka Service:

A typical service class is acting as a KafkaListener which listens on items-topic.

KafkaConsumerService.java
package com.onlinetutorialspoint.service;

import com.onlinetutorialspoint.model.Item;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "items-topic", groupId = "sample-group",containerFactory = "kafkaListener")
    public void consume(Item item){
        System.out.println("Consumed Message :"+item);
    }
}

Spring Boot Main:

SpringBootKafkaConsumerApplication.java
package com.onlinetutorialspoint;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;

@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
public class SpringBootKafkaConsumerApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringBootKafkaConsumerApplication.class, args);
  }

}

Run it.

Terminal
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-30 02:39:10.760  INFO 31760 --- [           main] c.o.SpringBootKafkaConsumerApplication   : Starting SpringBootKafkaConsumerApplication on work with PID 31760 (/home/cgoka/Documents/Work/Spring_Examples/SpringBoot-Kafka-Consumer/target/classes started by cgoka in /home/cgoka/Documents/Work/Spring_Examples/SpringBoot-Kafka-Consumer)
2019-03-30 02:39:10.770  INFO 31760 --- [           main] c.o.SpringBootKafkaConsumerApplication   : No active profile set, falling back to default profiles: default
2019-03-30 02:39:12.675  INFO 31760 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$98cbb433] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-03-30 02:39:13.870  INFO 31760 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
  auto.commit.interval.ms = 5000
  auto.offset.reset = latest
  bootstrap.servers = [127.0.0.1:9092]
  check.crcs = true
  client.id = 
....
....

Publish Kafka JSON Message from Kafka Producer Console:

Spring Boot Kafka Consumer Example-min

Output:

Now you can see the published Kafka JSON message consumed by the Spring Boot application.

Spring Boot Kafka Consumer Example output(1)-min

References:

Happy Learning 🙂