Spring Webflux, Reactive Kafka, Cassandra. Complete Reactive Spring Apps

Spring Webflux, Reactive Kafka, Cassandra. Complete Reactive Spring Apps

Consume and produce message to topics using Spring Webflux and Reactive Kafka.

·

5 min read

Introduction

Building modern, scalable web applications requires technologies that can handle large amounts of data and high volumes of traffic. The combination of Spring WebFlux, Reactive Kafka, and Cassandra can provide a powerful stack for building such applications. In this blog post, let's explore how to integrate these technologies and build a reactive, event-driven system.

The Architecture of the Apps

We'll need:

  • Java > 8.

  • Cassandra (you can use AstraDB like me if you don't want to install it locally)

  • Apache Kafka.

Configure Kafka on your machine:

  1. Download the binary file and extract it.

  2. Open the command prompt in the extracted folder.

  3. Start Zookeeper:

    .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

  4. Start Kafka: In another command prompt window

    .\bin\windows\kafka-server-start.bat .\config\server.properties

  5. Create topics app_updates & emp_updates: In another command prompt window

    .\bin\windows\kafka-topics.bat --create --topic app_updates--bootstrap-server localhost:9092

    .\bin\windows\kafka-topics.bat --create --topic emp_updates--bootstrap-server localhost:9092

  6. For listening to a topic directly from command prompt

    .\bin\windows\kafka-console-consumer.bat -bootstrap-server localhost:9092 -topic topic_name . Replace topic_name with your topic name(Eg: app_updates, emp_updates)

App-1: https://github.com/utronics/Kafka-App-1

App-2: https://github.com/utronics/Kafka-App-2

App-1

Dependencies required:

Add the following dependencies while generating the project using Spring Initializer.

Additionally, add the following dependencies in your pom.xml

        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-webflux-core</artifactId>
            <version>1.6.14</version>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-webflux-ui</artifactId>
            <version>1.6.14</version>
        </dependency>

The first dependency, io.projectreactor.kafka:reactor-kafka, is the reactive Kafka client library that provides a set of abstractions for building reactive Kafka consumers and producers.

The next two dependencies are for Swagger.

I am using a previous app I build which performs CRUD operations to the AstraDb-Cassandra database and added the functionality to produce Kafka messages to a topic. (check out the previous app blog here https://utronics.hashnode.dev/connect-spring-webflux-with-astradb-cassandra-database-crud-operations)

Application properties

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  data:
    cassandra:
      keyspace-name:
      username: #Client ID - get from Token
      password: #Client Secret - get from Token
      schema-action: create-if-not-exists
      request:
        timeout: 10s
      connection:
        connect-timeout: 10s
        init-query-timeout: 10s
datastax.astra:
  secure-connect-bundle: #Path of the connect bundle zip file.
astra.db:
  id: #Datacenter ID
  region: #region
  keyspace: #keyspace-name
  application.token: #Token
PRODUCER_TOPIC: app_updates

Config class

@Configuration
public class EmpKafkaProducerConfig {

    @Bean
    public SenderOptions<String, Employee> producerProps(KafkaProperties kafkaProperties) {
        return SenderOptions.create(kafkaProperties.buildProducerProperties());
    }
    @Bean
    public ReactiveKafkaProducerTemplate<String, Employee> reactiveKafkaProducerTemplate(
            SenderOptions<String, Employee> producerProps) {
        return new ReactiveKafkaProducerTemplate<>(producerProps);
    }
}

The producerProps method does some behind-the-scenes work. It creates a set of properties, known as producerProps, that defines how the Kafka producer should behave. These properties include things like which server to connect to, how to serialize the data, and more. It will take into consideration the kafka properties mentioned in application.properties file

The reactiveKafkaProducerTemplate method does something similar, but it sets up a special template that wraps around the Kafka producer. This template provides an easier and more reactive way to send messages to Kafka.

@Component
@Slf4j
public class EmpKafkaProducer {

    @Value(value = "${PRODUCER_TOPIC}")
    private String topic;
    @Autowired
    private ReactiveKafkaProducerTemplate<String, Employee> reactiveKafkaProducerTemplate;

    public void sendMessages(Employee employee) {
        log.info("send to topic={}, {}={},", topic, Employee.class.getSimpleName(), employee);
        reactiveKafkaProducerTemplate.send(topic, String.valueOf(employee.getId()), employee)
                .doOnSuccess(senderResult -> log.info("sent {} offset : {}",
                        employee,
                        senderResult.recordMetadata().offset()))
                .subscribe();
    }

}

The reactiveKafkaProducerTemplate.send method is used to send the message to the Kafka topic. It specifies the topic, a key (converted to a string using the employee's ID), and the employee object as the value.

This method is being called from the controller for update and insert endpoints.

APP 2:

Properties

server:
  port: 8081
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: group-update-service
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            use:
              type:
                headers: false
            value:
              default:
                type: com.updateservice.model.EmployeeRequest
#
CONSUMER_TOPIC: app_updates
EMP_UPDATES_TOPIC: emp_updates

The consumer subsection configures the Kafka consumer. It sets the consumer group ID, which is used to coordinate the consumption of messages among multiple consumers. It also specifies the auto-offset-reset property as "earliest", meaning that the consumer will start consuming from the earliest available offset if no committed offset is available for a partition.

The key-deserializer and value-deserializer properties specify the deserializers to be used for the key and value.

By setting spring.json.use.type.headers to false, it disables the inclusion of type information in JSON headers. Additionally, it specifies the default type to be used for deserializing JSON values of the EmployeeRequest class.

Config:

@Configuration
@Slf4j
public class EmpKafkaConsumerConfig {

    @Bean
    public ReceiverOptions<String, EmployeeRequest> kafkaReceiverOptions(@Value(value = "${CONSUMER_TOPIC}") String topic,
                                                                         KafkaProperties kafkaProperties) {
        ReceiverOptions<String, EmployeeRequest> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(partitions -> log.info("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.info("onPartitionsRevoked {}", partitions));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, EmployeeRequest> reactiveKafkaConsumerTemplate(
            ReceiverOptions<String, EmployeeRequest> kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
    }
}

The kafkaReceiverOptions method creates a bean called kafkaReceiverOptions that provides the configuration options for the Kafka consumer. It takes the topic name and Kafka properties as inputs, from properties file.

The reactiveKafkaConsumerTemplate method creates a bean named reactiveKafkaConsumerTemplate that represents a reactive wrapper around the Kafka consumer. It takes the kafkaReceiverOptions bean as an input and creates a new instance of ReactiveKafkaConsumerTemplate using the provided options.

@Configuration
public class EmpKafkaProducerConfig {

    @Bean
    public SenderOptions<String, EmployeeRequest> producerProps(KafkaProperties kafkaProperties) {
        return SenderOptions.create(kafkaProperties.buildProducerProperties());
    }
    @Bean
    public ReactiveKafkaProducerTemplate<String, EmployeeRequest> reactiveKafkaProducerTemplate(
            SenderOptions<String, EmployeeRequest> producerProps) {
        return new ReactiveKafkaProducerTemplate<>(producerProps);
    }
}

Service:

@Service
@Slf4j
public class EmpKafkaConsumerService {

    @Autowired
    private EmpKafkaProducerService empKafkaProducerService;
    @Autowired
    private ReactiveKafkaConsumerTemplate<String, EmployeeRequest> reactiveKafkaConsumerTemplate;

    @Value(value = "${EMP_UPDATES_TOPIC}")
    private String empTopic;
    public Flux<EmployeeRequest> consumeAppUpdates() {
        log.info("In consumeAppUpdates()");
        return reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .doOnNext(employeeRequest -> {
                    log.info("successfully consumed {}={}", EmployeeRequest.class.getSimpleName(), employeeRequest);
                    empKafkaProducerService.sendMessages(employeeRequest,empTopic);
                })
                .doOnError(throwable -> log.error("something went wrong while consuming : {}", throwable.getMessage()));
    }

    @PostConstruct
    public void init() {
        log.info("In init()");
        this.consumeAppUpdates().subscribe();
    }

}

The init method is annotated with @PostConstruct, which means it will be executed after the object is constructed and its dependencies are injected.

Inside the init method, the consumeAppUpdates method is called, and the resulting Flux is subscribed to. This initiates the consumption of messages from the Kafka topic.

In consume consumeAppUpdates methods we are calling empKafkaProducerService.sendMessages(employeeRequest,empTopic); which sends message to emp_updates topic.

@Service
@Slf4j
public class EmpKafkaProducerService {

    @Autowired
    private ReactiveKafkaProducerTemplate<String, EmployeeRequest> reactiveKafkaProducerTemplate;

    public void sendMessages(EmployeeRequest employeeRequest, String topic) {
        log.info("send to topic={}, {}={},", topic, EmployeeRequest.class.getSimpleName(), employeeRequest);
        reactiveKafkaProducerTemplate.send(topic, String.valueOf(employeeRequest.getId()), employeeRequest)
                .doOnSuccess(senderResult -> log.info("sent {} offset : {}", employeeRequest,
                        senderResult.recordMetadata().offset()))
                .subscribe();
    }

}

Result:

Console consumer:

Messages are produced to app_updates topic by App1

The messages from the app_updates topic are consumed by App2 and produced again to the emp_updates topic