Here we will see how to send Spring Boot Kafka JSON Message to Kafka Topic using Kafka Template.

Spring Boot Kafka JSON Message:

We can publish the JSON messages to Apache Kafka through spring boot application, in the previous article we have seen how to send simple string messages to Kafka.

Technologies:

  • Spring Boot 2.1.3.RELEASE
  • Spring Kafka
  • Apache Kafka 2.11
  • Java 8
  • Maven

Prerequisites:

To run the application, you have to have Apache Kafka installed on your machine. I have provided a couple of articles which helps you to install Apache Kafka on Windows and Ubuntu operating systems.

Install Apache Kafka on Windows 10 Operating System

Install Apache Kafka On Ubuntu Operating System

After successful installation, start the zookeeper, Kafka servers to connect from spring boot application.

Start Zookeeper server:

Terminal
root@work:/usr/local/kafka/bin# ./zookeeper-server-start.sh ../config/zookeeper.properties 
[2019-03-28 00:22:52,195] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-03-28 00:22:52,210] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-03-28 00:22:52,210] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-03-28 00:22:52,211] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-03-28 00:22:52,211] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)

Start Kafka Server:

Terminal
root@work:/usr/local/kafka/bin# ./kafka-server-start.sh ../config/server.properties 
[2019-03-28 00:24:08,203] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-03-28 00:24:09,146] INFO starting (kafka.server.KafkaServer)
[2019-03-28 00:24:09,147] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-03-28 00:24:09,219] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-03-28 00:24:09,229] INFO Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT (org.apache.zookeeper.ZooKeeper)

Create a Kafka Topic:

Terminal
root@work:/usr/local/kafka/bin# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic items-topic
Created topic "items-topic".

Start Kafka consumer console:

Terminal
root@work:/usr/local/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic items-topic --from-beginning

1. Sending Spring Boot JSON Message to Kafka:

On the above we have created an items-topic from Kafka cli, now we are going to send some JSON messages to items-topic using KafkaTemplate through Spring Boot application.

Project Structure:

2. Dependencies:

pom.xml
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
</dependencies>

3. JSON representing class:

This class represents a JSON message to sending messages to Kafka topic.

Item.java
package com.onlinetutorialspoint.model;

public class Item {
    private int id;
    private String name;
    private String category;

    public Item(int id, String name, String category) {
        this.id = id;
        this.name = name;
        this.category = category;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

4. Kafka JSON Configuration:

Since we are going to send JSON messages to Kafka topic, we have to configure the KafkaProducerFactory with JsonSerializer class. The default configuration for KafkaProducerFactory is StringSerializer, so we don’t have to configure this to send simple messages to Kafka topic.

KafkaConfig.java
package com.onlinetutorialspoint.config;

import com.onlinetutorialspoint.model.Item;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String,Item> producerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String, Item> kafkaTemplate(){
        return new KafkaTemplate<String, Item>(producerFactory());
    }
}

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG tells Kafka IP address and port “127.0.0.1:9092. Currently, I am going to use my local Kafka so that it 127.0.0.1 and the port is 9092.

Note: You can find this config information at Kafka/config/server.properties file.

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG tells the type of key which we are going to send messages to a Kafka topic StringSerializer.class.

ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, tells the type of value which we are going to send messages to a Kafka topic JsonSerializer.class

5. Rest Controller:

It a simple rest client having one post method which will send JSON message to Kafka topic (items-topic) using KafkaTemplate.

ItemController.java
package com.onlinetutorialspoint;

import com.onlinetutorialspoint.model.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("producer")
public class ItemController {

    @Autowired
    KafkaTemplate<String, Item> KafkaJsontemplate;
    String TOPIC_NAME = "items-topic";

    @PostMapping(value = "/postItem",consumes = {"application/json"},produces = {"application/json"})
    public String postJsonMessage(@RequestBody Item item){
        KafkaJsontemplate.send(TOPIC_NAME,new Item(1,"Lenovo","Laptop"));
        return "Message published successfully";
    }
}

6. Spring Main:

SpringBootKafkaJsonMessagesApplication.java
package com.onlinetutorialspoint;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaJsonMessagesApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringBootKafkaJsonMessagesApplication.class, args);
  }

}

7. Run it:

Terminal
$mvn clean install
$mvn spring-boot:run
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-29 02:30:43.248  INFO 19700 --- [           main] o.SpringBootKafkaJsonMessagesApplication : Starting SpringBootKafkaJsonMessagesApplication on work with PID 19700 (/home/cgoka/Documents/Work/Spring_Examples/SpringBoot-Kafka-JSON-Messages-Producer/target/classes started by cgoka in /home/cgoka/Documents/Work/Spring_Examples/SpringBoot-Kafka-JSON-Messages-Producer)
2019-03-29 02:30:43.269  INFO 19700 --- [           main] o.SpringBootKafkaJsonMessagesApplication : No active profile set, falling back to default profiles: default
.....
.....

8. Sending JSON messages to Kafka Topic:

Access the application from the postman and make a post request.

Spring Boot Kafka JSON messages Postman-min

9. Kafka Consumer Console:

We can see our JSON message on the Kafka consumer console whether it consumed or not.

Spring Boot Kafka JSON Message Output-min

References:

Happy Learning 🙂

Download Example