In this blog, we are going to explore how Spring Cloud Stream and Kafka Streams can be used
to write basic stateless stream processing applications. Kafka Streams is a great library for writing very powerful
streaming applications and Spring Cloud Stream provides a very familiar programming model to making the
task of writing such applications easier.
The application that we develop is a very trivial one, but it will
introduce the basic programming model that can be used for advanced applications.
This is a stateless application that simply prints the incoming stream of records
from a Kafka topic on the console and then return that same stream unchanged to another Kafka topic.
First lets bootstrap the application. An easier way to do so is to use the Spring Initializr.
Go to https://start.spring.io and then search for "Cloud Stream" and "Kafka Streams". Also provide appropriate maven coordinates.
Here is an example.

The artifact is named as stream-processing-basic.
Hit the "Generate Project" button.
Extract the zip archive file and then open the project in your favorite IDE.
If you examine the dependencies for the project, you can see that the
project contains all the necessary components such as core spring cloud stream (spring-cloud-stream),
kafka streams binder (spring-cloud-stream-binder-kafkap-streams), spring kafka (spring-kafka) and other Spring Boot components.
Open the class StreamProcessingBasicApplication.
At this point, this is a basic bare bone class that is only annotated with a Spring Boot annotation.
You can run it as a boot application right now, but it won’t do much.
We are going to incrementally add content to this class and make it a stateless stream processing application.
The very first thing that we need to do is to add bindings for the input and output.
Kafka Streams binder provides a basic interface that contains a singe input and output as KStream types.
If your application only needs a single KStream input and a single KStream output, you can use this interface.
After adding the binding, this is how the code looks like. Note the use of the EnableBinding annotation.
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class StreamProcessingBasicApplication {
public static void main(String[] args) {
SpringApplication.run(StreamProcessingBasicApplication.class, args);
}
}
Now that the bindings are provided to the application, we should start using them.
We need to write a processor using these bindings. For that, we can use the StreamListener and SendTo annotations.
Add the following method to the application.
@StreamListener("input")
@SendTo("output")
public KStream<?, String> process(KStream<?, String> input) {
return input.peek((k, v) -> System.out.println("Value received: " + v));
}
As mentioned before, it doesn’t do much, but worth mentioning a few details.
StreamListener listens for any records coming through the input binding and convert them as KStream objects
and pass them down to the method. The method peeks at the records and prints them on the console.
Then it returns the record unchanged to a Kafka topic through the output binding.
The SendTo annotation is used to specify the name of the output binding.
By default, Spring Cloud Stream will expect the same name for Kafka topics as the bindings, i.e. input and output.
In most cases, you want to change this though.
For e.g. if you want to receive data from a topic named foo and send data to another topic bar, you can do that by providing the following configuration.
spring:
cloud:
stream:
bindings:
input.destination: foo
output.destination: bar
Kafka Stream applications require an application id. There are multiple ways you can provide the application id through Spring Cloud Stream.
However, for the purposes of this simple application, we can just provide the property spring.application.name
which will be used by the binder as the application id.
spring.application.name: basic-stateless-stream-processing
Try running the application either on your IDE or from CLI. If you are running from CLI, build the project first by running ./mvnw clean package. Then run
java -jar <path-to-the-jar.
Send some data to the topic foo and watch the application console. You should see the value getting logged.
You can also watch the output topic bar and see the same data over there.
There it is. In this article, went through the basic steps involved in creating a stateless stream processing application using Spring Cloud Stream and Kafka Streams.
The code used in this blog can be found here.