shaikezam.com

Spring Cloud Stream Imperative Programming

13-12-2021 - Shai Zambrovski


Microservices Asynchronous Communication

There are two concepts regarding communication between microservices:

With event driven communication, we can send from P2P or via pub\sub any events we want, for example: state changed or content.

Spring Cloud Stream

Among the many projects within the Spring Cloud Project, the Spring Cloud Stream library provided the developer an abstraction\wrapper implementation on top of the messaging brokers. That means that Spring application could:

RabbitMQ

We will not cover and learn about the RabbitMQ message broker, but it widely popular lightweight messaging platform.

We will install via docker command (if you don’t want use docker, feel free to install it as you wish)

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

Using this command, we installed (as container) a RabbitMQ docker image with management GUI and it will be accessible via localhost:15672 with username guest and password guest (those are the defaults).

Spring Cloud Stream Concepts

Our simple application will be a Spring Boot Application that publish and consume tasks.

The maven project will be defined in the pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.spring.cloud.stream</groupId>
    <artifactId>rabbitmq-imperative-programming</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>>rabbitmq-imperative-programming</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2020.0.4</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Then, let’s create our model: Task:

import org.apache.commons.lang3.RandomStringUtils;

public class Task {

    private String id;
    private String title;
    private String description;

    public Task(String id, String title, String description) {
        this.id = id;
        this.title = title;
        this.description = description;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    @Override
    public String toString() {
        return "Task{" +
                "id=" + id +
                ", title='" + title + '\'' +
                ", description='" + description + '\'' +
                '}';
    }

    public static class TaskBuilder {

        private String id = RandomStringUtils.randomAlphanumeric(10);
        private String title = RandomStringUtils.randomAlphanumeric(10);
        private String description = RandomStringUtils.randomAlphanumeric(10);

        public TaskBuilder() {
        }

        public TaskBuilder withId(String id) {
            this.id = id;
            return this;
        }

        public TaskBuilder withTitle(String title) {
            this.title = title;
            return this;
        }

        public TaskBuilder withDescription(String description) {
            this.description = description;
            return this;
        }

        public Task build() {
            return new Task(id, title, description);
        }

    }
}

Now we will define our bindings interface (which responsible the Input\Output methods):

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface TaskBinding {
    String TASK_CHANNEL_INPUT = "task-channel-in";
    String TASK_CHANNEL_OUT = "task-channel-out";

    @Input(TASK_CHANNEL_INPUT)
    SubscribableChannel inboundTasks();
    @Output(TASK_CHANNEL_OUT)
    MessageChannel outboundTasks();
}

We define the output channel named as task-channel-out and the input channel named as task-channel-in

Then, we will define our application.properties file:

server.port=333
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.task-channel-in.destination=imperative-programming-tasks
spring.cloud.stream.bindings.task-channel-out.destination=imperative-programming-tasks

logging.level.org.springframework.cloud.stream=debug

We set our RabbitMQ server properties and associate between the channel and the queue to be used

Last step, we will create our Spring Boot Application class with the relevant annotations:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableBinding(TaskBinding.class)
@EnableScheduling
public class SpringCloudStreamRabbitMQImperativeProgramming {

    private MessageChannel messageChannel;

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamRabbitMQImperativeProgramming.class, args);
    }

    public SpringCloudStreamRabbitMQImperativeProgramming(TaskBinding taskBinding) {
        this.messageChannel = taskBinding.outboundTasks();
    }

    @Scheduled(fixedDelay = 1000)
    public void publishTask() {
        Task task = new Task.TaskBuilder().build();
        System.out.println("producing task: " + task);
        messageChannel.send(MessageBuilder.withPayload(task).build());
    }

    @StreamListener(TaskBinding.TASK_CHANNEL_INPUT)
    public void consumeTask(String msg) {
        System.out.println("consumed task: " + msg);
    }

}

Basically, our application run as a scheduled app in which each 1 seconds, it will publish to the queue a random Task instance. let’s do quick explanation on the new annotations.

Now, if we run the application we will see in the console the prints upon tasks producing and consuming:

producing task: Task{id=NAk32IfCnB, title='jO5Be3cXmW', description='TslmrawNbQ'}
consumed task: {"id":"NAk32IfCnB","title":"jO5Be3cXmW","description":"TslmrawNbQ"}

In new versions of Spring Cloud Stream we noticed that some of the annotations are deprecated.

In the next artical we will using Spring Cloud Stream with Functional Programing

Feel free to look in the source code and try it your own.