Start Streaming with Kafka and Spring Cloud

An introduction to building loosely coupled event-driven microservices with Apache Kafka, Spring Cloud Stream and Apache Avro.

REST? Where we’re going, we don’t need REST!

When talking about microservices, most people automatically assume that those services should be REST services, exposing their REST API to consumers. This is logical, because REST is easy. Or at least, it seems to be easy at first. Follow a tutorial and in a couple of lines, you can have a REST service up and running. But doing REST properly is not as easy as it seems, and using REST everywhere has some drawbacks.

REST is not easy

Setting up a Hello World REST API is easy, especially with something like Spring Boot. But then you start thinking about resilience. A client of a REST API (typically also a service) needs to be resilient to failure of the service it consumes. That’s where circuit breakers come in. NetFlix has written a nice implementation of this pattern, with Hystrix. But a service must also be resilient to clients behaving badly. So you build bulkheads or add throttling. Throttling can be done with Netflix Zuul. You also want the service to be scalable. So you put a load balancer in front, or you do client-side load balancing, with something like Ribbon. When you have multiple instances, and especially when you do client-side load balancing, you need service discovery, with Eureka or Consul. Long story short: REST is not easy.

REST everywhere

When you only use REST to communicate with other services, you end up with a rather tight coupling between services. You end up with service orchestration. In the long run, this becomes hard to maintain and hard to scale. What you usually want in a microservice architecture is a choreography, where services react to events, and emit other events in response. To put it differently, you want to avoid building a distributed monolith.

The purpose of this exploration

The project I’m currently working on has mostly REST services, with some messaging using ActiveMQ. We’re feeling the pain described above very clearly. I’ve always been a big fan of integrating services with a bus, and of event sourcing since learning about it a couple of years ago. Triggered by the problems we’ve been having over the last 2 years, and by the rising popularity of data streams, reactive programming and things like Kafka, I decided to find out how we could improve our architecture using event streams. I also hoped to prove that event streams are not harder to implement and use than REST APIs. I don’t want to abolish REST completely of course, but I think there are a lot of cases where event streams are a better fit.

What we’ll build

The source code of what we’ll build in this blog post can be found on BitBucket. It’s a very simple setup, containing an event bus (Kafka), a contract for what we send through the channel, an event producer and an event consumer. We’re using tweets as a simple domain. The producer will generate random tweets, and the consumer will display them in the browser. The UI in the browser will be updated using WebSockets.

Here’s what I used:

  • Apache Kafka 0.10.0.1

  • Spring Boot 1.4.0

  • Spring Cloud Stream Brooklyn RELEASE

  • Apache Avro 1.8.1

  • Java 8

  • Gradle 3.0

  • Docker 1.12.3 beta 29.2

I chose Apache Avro simply because the guys at Confluent.io recommend it. It has a schema that you can use to describe the contract of a channel, it’s more compact that JSON, and it has support for schema evolution. There are other similar libraries, like Thrift and Protocol Buffers. The nice thing about Apache Avro is that since the Brooklyn release, Spring Cloud supports it out of the box.

Setting up Kafka for development

Setting up Kafka is no walk in the park, and it is not the purpose of this blog post to focus on that. Luckily, these days we have Docker, and the nice people at Spotify who created a Docker image for Kafka and ZooKeeper. Because the included Kafka version is stuck at 0.8 though, we can’t use it directly. Spring Cloud Stream throws the following exception when trying to use Kafka 0.8:

org.apache.kafka.common.protocol.types.SchemaException:
  Error reading field 'throttle_time_ms':
  java.nio.BufferUnderflowException

The solution is to upgrade to Kafka 0.10.0.1, which means downloading the source code of docker-kafka and making a small change to the Dockerfile and building the image.

kafka/Dockerfile
# Kafka and Zookeeper

FROM java:openjdk-8-jre

...

ENV KAFKA_VERSION 0.10.0.1
...

Now you can build and run the new Docker image:

cd kafka
docker build -t beeworks/kafka .
docker run --rm beeworks/kafka

This should result in Zookeeper and Kafka running in a Docker container:

2016-11-16 07:07:08,803 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-11-16 07:07:08,803 INFO success: kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

For convenience, let’s add a bash script that starts Kafka with the correct settings for this project:

start-kafka.sh
#!/bin/bash
docker run --rm --env ADVERTISED_HOST=kafka --env ADVERTISED_PORT=9092 --hostname kafka --name kafka beeworks/kafka
2016-11-20 09:12:55,630 INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-11-20 09:12:55,630 INFO success: kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

OK, we have Kafka running. Now it’s time to start building.

Contract first

As mentioned before, we’re using Apache Avro as the serialization format for sending our messages through the wire. This means we first need to describe what those messages will look like, in a schema. An Avro schema is a JSON file describing the data structure.

An interesting consequence of having a schema (remember the XML days) is that you can generate classes for your data structure in a number of languages. Avro supports just that, and there’s even a Gradle plugin that can do this code generation in the project build.

For this project, we’re going to create a separate module that contains only the schema definition of our DTOs, the generated Java classes and unit tests that verify our schema works as expected.

Let’s start by describing a tweet message:

dto/tweet/src/main/resources/avro/TweetDto.v1.avsc
{"namespace": "be.beeworks.streaming.tweet",
 "type": "record",
 "name": "TweetDto",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "author", "type": "string", "default": ""},
     {"name": "text", "type": "string", "default": ""},
     {"name": "hashtags", "type": ["null",{"type": "array", "items": "string"}], "default": null},
     {"name": "timestamp", "type": "long"}
 ]
}

This schema defines the data structure for a class (or record) named TweetDto in the package (namespace) be.beeworks.streaming.tweet. It’s a very simple class, with 5 properties:

  • the required field id of type String. Note there is no default value, so if this is left empty an exception will be thrown at serialization time.

  • the required field author of type String. If left empty, this will be set to the default value of empty string.

  • the required field text of type String. Same as author, it has an empty string as default value.

  • the optional field hashtags, which is an array of strings. Note that the type is either a null value or an array type. This is how you can define an optional field. The default is null.

  • the required field timestamp, of type long. This is the time in milliseconds since midnight, January 1st 1970. If left empty, an exception will be thrown.

Note

Finding out how to write this very simple schema turned out to be harder than it should be. I have a similar feeling with Avro now that I had with Maven in 2005: this might become the de facto standard, but they really have to work on documentation.

Based on this schema, we now want to generate Java classes, and we’ll do that with the Gradle plugin:

dto/tweet/build.gradle
plugins {
    id 'java'
    id "com.commercehub.gradle.plugin.avro-base" version "0.9.0"
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
	mavenCentral()
    jcenter()
}

dependencies {
    compile('org.apache.avro:avro:1.8.1')
    testCompile('junit:junit:4.12')
}

task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
    source("src/main/resources/avro")
    outputDir = file("src/main/java")
    fieldVisibility = "PRIVATE"
}

compileJava.source(generateAvro.outputs)

The main responsibility of this build is to generate the Java source files, compile them, run the unit tests and package everything in a jar-file. We need the Avro compile-time dependency, and we need to apply the Avro Gradle plugin. Note that we’re using the avro-base plugin in order to define our own generateAvro task. I chose to do it this way so that I could customize the source and output directories. The output directory is src/main/java, which means all java sources in that directory will be removed before generating the Avro Java source files. You could use another output directory, but Intellij IDEA doesn’t seem to like that very much.

Running this build should result in a Java source file be.beeworks.streaming.tweet.TweetDto.java in the src/main/java directory.

Let’s write a simple unit test to verify the resulting class can be serialized and deserialized as expected.

dto/tweet/src/test/java/be/beeworks/streaming/tweet/TweetSerializationTest.java
package be.beeworks.streaming.tweet;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;

import static org.junit.Assert.*;

public class TweetSerializationTest {
    private Random random = new Random();

    @Test
    public void testFullTweet() throws IOException {
        String text = UUID.randomUUID().toString();
        String author = UUID.randomUUID().toString();
        Long timestamp = random.nextLong();

        TweetDto resultingTweet = serializeAndDeserializeTweet(text, author, timestamp, Arrays.asList("spring", "cloud"));
        assertEquals(text, resultingTweet.getText());
        assertEquals(author, resultingTweet.getAuthor());
        assertEquals(timestamp, resultingTweet.getTimestamp());
        assertNotNull(resultingTweet.getId());
        assertNotNull(resultingTweet.getHashtags());
        assertEquals(2, resultingTweet.getHashtags().size());
        assertEquals("spring", resultingTweet.getHashtags().get(0));
        assertEquals("cloud", resultingTweet.getHashtags().get(1));
    }

    @Test
    public void testEmptyTweet() throws IOException {
        TweetDto resultingTweet = serializeAndDeserializeTweet(null, null, null, null);
        assertEquals("", resultingTweet.getText());
        assertEquals("", resultingTweet.getAuthor());
        assertNotNull(resultingTweet.getTimestamp());
        assertNull(resultingTweet.getHashtags());
    }

    private TweetDto serializeAndDeserializeTweet(String text, String author, Long timestamp, List<String> hashtags) throws IOException {
        TweetDto.Builder tweet1Builder = TweetDto.newBuilder().setId(UUID.randomUUID().toString());
        if (text != null) tweet1Builder.setText(text);
        if (author != null) tweet1Builder.setAuthor(author);
        tweet1Builder.setTimestamp(timestamp == null ? System.currentTimeMillis() : timestamp);
        if (hashtags != null) tweet1Builder.setHashtags(hashtags);
        TweetDto tweet1 = tweet1Builder.build();
        DatumWriter<TweetDto> tweetDatumWriter = new SpecificDatumWriter<>(TweetDto.class);
        DataFileWriter<TweetDto> dataFileWriter = new DataFileWriter<>(tweetDatumWriter);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        dataFileWriter.create(tweet1.getSchema(), outputStream);
        dataFileWriter.append(tweet1);
        dataFileWriter.close();

        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
        DatumReader<TweetDto> tweetDatumReader = new SpecificDatumReader<>(TweetDto.class);
        DataFileStream<TweetDto> dataFileReader = new DataFileStream<>(inputStream, tweetDatumReader);

        assertTrue(dataFileReader.hasNext());
        return dataFileReader.next();
    }
}

The private method serializeAndDeserializeTweet first creates an instance of the TweetDto class based on the method arguments. Then this instance is serialized to a byte array. This byte array is then read and the result is a deserialized instance of the TweetDto. This method is then used by the tests to verify a fully populated TweetDto is serialized and deserialized properly, and the same is done for an empty TweetDo.

Note

You can also use Avro without code generation, but I had mixed results with this. A lot of trial and error was needed to get this working, and I struggled with AvroTypeExceptions and ClassCastExceptions. But since you only use these classes as DTO, the generated code will do nicely.

The API

The API module will contain the domain model of both producer and consumer, as well as the basic infrastructure for enabling the binding and the channel adapters of Spring Cloud Stream.

Note

In a real life microservices architecture you might not want to do this. This API module is a shared library with a shared domain model, which violates the loose coupling of microservices. For this blog post however, I did not want to repeat myself…​

Building with Gradle

Let’s start with the Gradle build file. The microservices that use this API will be based on Spring Boot and Spring Cloud Stream, so we need the Spring Boot Gradle plugin, and the dependencies for Spring Cloud Stream with Kafka (spring-cloud-starter-stream-kafka) and Avro schema support (spring-cloud-stream-schema). We also need the DTO module. Finally, we want to instruct the Spring Boot plugin not to create the fat jar file, because this API won’t run standalone.

api/tweet-streamer/build.gradle
buildscript {
    ext {
        springBootVersion = '1.4.0.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
    maven {
        url 'https://repo.spring.io/libs-milestone'
    }
}


dependencies {
    compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
    compile('org.springframework.cloud:spring-cloud-stream-schema')
    compile project(':dto:tweet')
    testCompile('org.springframework.boot:spring-boot-starter-test')
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.RELEASE"
    }
}

bootRepackage {
    enabled=false

}

task wrapper(type: Wrapper) {
    gradleVersion = '3.0'
}

The Model

The Tweet model is a simple immutable class, containing the same 5 properties as the DTO we defined earlier.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/model/Tweet.java
package be.beeworks.streaming.tweet.model;

import java.util.*;

public class Tweet {
    private final String id;
    private final String text;
    private final String author;
    private final Date timestamp;
    private final Set<String> hashtags;

    public Tweet() {
        this(UUID.randomUUID().toString(),null,null,new Date(),new HashSet<>());
    }

    public Tweet(String id, String text, String author, Date timestamp, Collection<String> hashtags) {
        assert(id != null);
        assert(timestamp != null);
        assert(hashtags != null);
        this.id = id;
        this.text = text;
        this.author = author;
        this.timestamp = timestamp;
        this.hashtags = Collections.unmodifiableSet(new HashSet<>(hashtags));
    }

    public Tweet withHashtag(String hashtag) {
        return withHashtags(Arrays.asList(hashtag));
    }

    public Tweet withHashtags(Collection<String> hashtagsToAdd) {
        Set<String> hashtags = new HashSet<>();
        hashtags.addAll(this.hashtags);
        if (hashtagsToAdd != null) hashtags.addAll(hashtagsToAdd);
        return new Tweet(this.id, this.text, this.author, this.timestamp, hashtags);
    }

    public Tweet withText(String text) {
        return new Tweet(this.id, text, this.author, this.timestamp, hashtags);
    }

    public Tweet withAuthor(String author) {
        return new Tweet(this.id, this.text, author, this.timestamp, hashtags);
    }

    public Tweet withId(String id) {
        return new Tweet(id, this.text, this.author, this.timestamp, hashtags);
    }

    public Tweet withTimestamp(Date timestamp) {
        return new Tweet(this.id, this.text, this.author, timestamp, hashtags);
    }

    public String getId() {
        return id;
    }

    public String getText() {
        return text;
    }

    public String getAuthor() {
        return author;
    }

    public Date getTimestamp() {
        return timestamp;
    }

    public Set<String> getHashtags() {
        return hashtags;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Tweet{");
        sb.append("id='").append(id).append('\'');
        sb.append(", text='").append(text).append('\'');
        sb.append(", author='").append(author).append('\'');
        sb.append(", timestamp=").append(timestamp);
        sb.append(", hashtags=").append(hashtags);
        sb.append('}');
        return sb.toString();
    }
}

We also need a mapper for creating TweetDTOs from Tweets and vice versa. Note that we mark this class as a component that can be autowired later.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/mapper/TweetMapper.java
package be.beeworks.streaming.tweet.mapper;

import be.beeworks.streaming.tweet.TweetDto;
import be.beeworks.streaming.tweet.model.Tweet;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Date;

@Component
public class TweetMapper {
    public TweetDto getTweetDto(Tweet tweet) {
        if (tweet == null) throw new IllegalArgumentException("tweet cannot be null");
        TweetDto.Builder builder = TweetDto.newBuilder();
        builder.setId(tweet.getId());
        builder.setTimestamp(tweet.getTimestamp().getTime());
        builder.setHashtags(new ArrayList<>(tweet.getHashtags()));
        builder.setAuthor(tweet.getAuthor());
        builder.setText(tweet.getText());
        return builder.build();
    }

    public Tweet getTweet(TweetDto tweetDto) {
        if (tweetDto == null) throw new IllegalArgumentException("tweetDto cannot be null");
        return new Tweet()
                .withId(tweetDto.getId())
                .withTimestamp(new Date(tweetDto.getTimestamp()))
                .withHashtags(tweetDto.getHashtags())
                .withAuthor(tweetDto.getAuthor())
                .withText(tweetDto.getText());
    }
}

And of course a little unit test to verify the mapper works correctly.

api/tweet-streamer/src/test/java/be/beeworks/streaming/tweet/mapper/TweetMapperTest.java
package be.beeworks.streaming.tweet.mapper;

import be.beeworks.streaming.tweet.TweetDto;
import be.beeworks.streaming.tweet.model.Tweet;
import org.junit.Before;
import org.junit.Test;

import java.util.*;

import static org.junit.Assert.assertEquals;

public class TweetMapperTest {
    private TweetMapper tweetMapper;
    private String testAuthor;
    private String testText;
    private String testId;
    private Date testDate;
    private List<String> testHashtags;

    @Before
    public void setup() {
        tweetMapper = new TweetMapper();
        testId = UUID.randomUUID().toString();
        testAuthor = UUID.randomUUID().toString();
        testText = UUID.randomUUID().toString();
        testHashtags = Arrays.asList("test","tweet","kafka");
        testDate = new Date();
    }

    @Test
    public void testTweetToDto() {
        TweetDto tweetDto = tweetMapper.getTweetDto(new Tweet(testId, testText, testAuthor, testDate, testHashtags));
        assertEquals(testId, tweetDto.getId());
        assertEquals(testAuthor, tweetDto.getAuthor());
        assertEquals(testText, tweetDto.getText());
        assertEquals((Long)testDate.getTime(), tweetDto.getTimestamp());
        assertEquals(new HashSet<String>(testHashtags), new HashSet<String>(tweetDto.getHashtags()));
    }

    @Test
    public void testDtoToTweet() {
        Tweet tweet = tweetMapper.getTweet(TweetDto.newBuilder().setId(testId).setAuthor(testAuthor).setText(testText).setTimestamp(testDate.getTime()).setHashtags(testHashtags).build());
        assertEquals(testId, tweet.getId());
        assertEquals(testAuthor, tweet.getAuthor());
        assertEquals(testText, tweet.getText());
        assertEquals(testDate, tweet.getTimestamp());
        assertEquals(new HashSet<String>(testHashtags), new HashSet<String>(tweet.getHashtags()));
    }
}

Infrastructure

Output

Let’s start with the output of Tweets to a Kafka stream. First, we need an interface to declare an output message channel for our tweets. You could use the provided Source.OUTPUT value to declare the channel, but I prefer to make a bit more explicit.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/output/TweetOutput.java
package be.beeworks.streaming.tweet.output;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TweetOutput {
    String TWEET_OUTPUT = "tweetOutput";

    @Output(TWEET_OUTPUT)
    MessageChannel tweetOutput();
}

This interface can now be used to name the message channel and to name the Kafka topic. We can also use the tweetOutput name of the channel to configure the Kafka topic, in tweets.properties:

api/tweet-streamer/src/main/resources/tweets.properties
spring.cloud.stream.bindings.tweetOutput.destination=tweet
Note

Normally, you should also add a property spring.cloud.stream.bindings.tweetOutput.content-type=application/tweet.v1+avro, so that the Avro message converter can be assigned to this output channel. However, when you do add this property the consumer service suddenly throws ClassCastExceptions, complaining about the UTF8 StringType. This can be fixed by setting the stringType property of the Gradle Avro plugin to Utf8. Of course, you need to change the mapper then as well. This seems to be a bug in Avro. Strangely enough, everything does work correctly when not setting the content-type for the output channel. I need to further investigate this.

This property indicates that the Kafka topic that is bound to the tweetOutput message channel should be called tweet.

The Spring MessageConverter will be configured in the TweetStreamConfiguration class, which is a Spring Configuration, and which uses the properties file above as a property source. We only need one bean in this configuration: the tweetMessageConverter.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/infrastructure/TweetStreamerConfiguration.java
package be.beeworks.streaming.tweet.infrastructure;

import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.MimeType;

import java.io.IOException;

@Configuration
@PropertySource("classpath:/tweets.properties")
public class TweetStreamerConfiguration {

    @Bean
    public MessageConverter tweetMessageConverter() throws IOException {
        AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter(MimeType.valueOf("application/tweet.v1+avro"));
        avroSchemaMessageConverter.setSchemaLocation(new ClassPathResource("avro/TweetDto.v1.avsc"));
        return avroSchemaMessageConverter;
    }

}

Finally, in order to send TweetDTOs through the Kafka topic, we need a TweetSender, with an autowired MessageChannel that represents the Kafka stream. We will also autowire the TweetMapper. Note the @Qualifier annotation on the message channel, telling Spring to inject the tweetOutput channel here.

Also note that the TweetSender is not yet annotated with the @Component annotation. We will add this bean to the application context in the producer microservice, not here. If we were to add it here, the consumer microservice would also bind to the output channel, which is not what we want.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/output/TweetSender.java
package be.beeworks.streaming.tweet.output;

import be.beeworks.streaming.tweet.mapper.TweetMapper;
import be.beeworks.streaming.tweet.model.Tweet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

public class TweetSender {
    private static final Logger logger = LoggerFactory.getLogger(TweetSender.class);

    @Autowired @Qualifier(TweetOutput.TWEET_OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private TweetMapper tweetMapper;

    public void send(Tweet tweet) {
        logger.info("sending tweet: {}", tweet);
        messageChannel.send(MessageBuilder.withPayload(tweetMapper.getTweetDto(tweet)).build());
    }
}

Input

Now that we have the basic setup for sending tweets, let’s take care of receiving tweets. First we need the input counterpart of the TweetOutput interface, aptly named TweetInput which, instead of a MessageChannel, will have Spring configure a SubscribableChannel that we can then use to subscribe to events in the Kafka topic. The name of this channel is tweetInput.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/input/TweetInput.java
package be.beeworks.streaming.tweet.input;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface TweetInput {
    String TWEET_INPUT = "tweetInput";

    @Input(TWEET_INPUT)
    SubscribableChannel tweetInput();
}

We can now configure this channel in tweets.properties:

api/tweet-streamer/src/main/resources/tweets.properties
spring.cloud.stream.bindings.tweetInput.group=${tweetConsumerGroup}
spring.cloud.stream.kafka.bindings.tweetInput.consumer.resetOffsets=true

tweetConsumerGroup="tweet-api"

The first property specifies the name of the consumer group for this topic. A consumer group in Kafka is used for scaling: Kafka scales topic consumption by distributing partitions among the members of a consumer group. You can read about this in detail on the Confluent.IO blog.

The resetOffsets property means that all events on the Kafka topic will be processed again by the consumer when restarting the service. This is perfect for development.

Now all we need is a TweetEventListener, containing a method that is called when a new TweetDTO is received from the Kafka topic. This method is annotated with the @StreamListener annotation, pointing to the tweetInput channel. Note that you should use the @StreamListener annotation rather than the @ServiceActivator annotation. The former uses message converters to process the messages, while the latter does not.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/input/TweetEventListener.java
package be.beeworks.streaming.tweet.input;

import be.beeworks.streaming.tweet.TweetDto;
import be.beeworks.streaming.tweet.mapper.TweetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;

public class TweetEventListener {
    @Autowired(required = false)
    private TweetReceiver tweetReceiver;

    @Autowired
    private TweetMapper tweetMapper;

    @StreamListener(TweetInput.TWEET_INPUT)
    public void receiveTweet(TweetDto tweetDto) {
        if (tweetReceiver != null) tweetReceiver.receive(tweetMapper.getTweet(tweetDto));
    }
}

As you can see, the receiveTweet method maps the TweetDto to a Tweet, and then delegates the handling of the Tweet to the TweetReceiver, which is an interface we need to implement in the consumer microservices.

api/tweet-streamer/src/main/java/be/beeworks/streaming/tweet/input/TweetReceiver.java
package be.beeworks.streaming.tweet.input;

import be.beeworks.streaming.tweet.model.Tweet;

public interface TweetReceiver {
    void receive(Tweet tweet);
}

What Kafka?

We need to configure one last thing: where to find Kafka. Because we’ll be running the producer and consumer microservices in a Docker container, we can use container links. When we set up the Kafka container earlier, we named it kafka. In the microservice Docker containers, we can now link this kafka container to the hostname kafka. So we need to point Spring Cloud Stream to the kafka host:

api/tweet-streamer/src/main/resources/tweets.properties
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka

The producer

The actual microservices for producing and consuming tweet messages are very simple to implement, now that we have the API. Let’s start with the producer.

Build with Gradle

The Gradle build file is nothing new, just a basic Spring Boot microservice build file.

impl/producer/random-tweet-producer/build.gradle
buildscript {
    ext {
        springBootVersion = '1.4.0.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
    maven {
        url 'https://repo.spring.io/libs-milestone'
    }
}

dependencies {
    compile project(':api:tweet-streamer')
    compile('com.thedeanda:lorem:2.1')
    testCompile('org.springframework.boot:spring-boot-starter-test')
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.RELEASE"
    }
}


task wrapper(type: Wrapper) {
    gradleVersion = '3.0'
}

You will notice 2 important dependencies: the API project, and the Lorem library, which will allow us to generate random tweets and names.

Generating tweets

The producer will generate random tweets using the Lorem library, and publish these as messages on the Kafka topic. Generating the tweets is a responsibility of the TweetGenerator, and generating authors is the responsibility of the AuthorGenerator. We will limit the number of authors so we can display them in columns in the consumer UI.

impl/producer/random-tweet-producer/src/main/java/be/beeworks/streaming/tweet/TweetGenerator.java
package be.beeworks.streaming.tweet;

import be.beeworks.streaming.tweet.model.Tweet;
import be.beeworks.streaming.tweet.output.TweetSender;
import com.thedeanda.lorem.Lorem;
import com.thedeanda.lorem.LoremIpsum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class TweetGenerator {
    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    private Lorem lorem = LoremIpsum.getInstance();

    @Autowired
    private TweetSender tweetWriter;

    @Autowired
    private AuthorGenerator authorGenerator;

    public void generateTweet() {
        Tweet tweet = new Tweet()
                .withAuthor(authorGenerator.randomAuthor())
                .withText(lorem.getWords(20))
                .withHashtags(Arrays.asList(lorem.getWords(1,5).split(" ")));
        logger.info("sending random tweet: {}", tweet);
        tweetWriter.send(tweet);
    }
}
impl/producer/random-tweet-producer/src/main/java/be/beeworks/streaming/tweet/AuthorGenerator.java
package be.beeworks.streaming.tweet;

import com.thedeanda.lorem.Lorem;
import com.thedeanda.lorem.LoremIpsum;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;

@Component
public class AuthorGenerator {
    private final List<String> authors;
    private final Random random = new Random();

    public AuthorGenerator() {
            authors = new ArrayList<>();
            final Lorem lorem = LoremIpsum.getInstance();
            IntStream.range(0,9).forEach(number -> authors.add(lorem.getName()));
    }

    public String randomAuthor() {
        return authors.get(random.nextInt(authors.size()));
    }
}

Putting it all together

The final piece of the producer puzzle is the Application. Apart from the usual Spring Boot stuff, 3 things need to be done to make the producer do its thing.

  1. The @EnableBinding annotation triggers the configuration of the Spring Cloud Stream infrastructure, and by providing the TweetOutput class, it sets up the message channel for connecting to Kafka.

  2. We create an instance of the TweetSender class from the API and add it as a bean in the application context

  3. By annotating the class with @EnableScheduling and the generateTweet method with @Scheduled, we make sure that a new tweet is sent every second.

package be.beeworks.streaming.tweet;

import be.beeworks.streaming.tweet.output.TweetOutput;
import be.beeworks.streaming.tweet.output.TweetSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@ComponentScan("be.beeworks.streaming.tweet")
@EnableBinding(TweetOutput.class)
@EnableScheduling
public class Application {
    @Bean
    public TweetSender tweetWriter() {
        return new TweetSender();
    }

    @Autowired
    private TweetGenerator tweetGenerator;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Scheduled(initialDelay = 5000, fixedDelay = 1000)
    public void generateTweet() {
        tweetGenerator.generateTweet();
    }
}

Running in a Docker container

Since we have Kafka running in Docker, it’s convenient to also run the services in containers.

There are 2 ways we could run this application in Docker: either build the jar-file, and use java -jar random-tweet-producer.jar as the entry point for the container, or use ./gradlew clean bootRun as the entry point. In either case, we need a Docker image with Java 8 installed. Let’s use the first option:

/impl/producer/random-tweet-producer/run.sh
#!/bin/bash
# start a Docker container with JDK 8,
# mount the project build directory to its working directory
# and link to the Kafka container,
# then run the application as standalone jar
docker run --rm -v "$PWD":/usr/src/app -w /usr/src/app/build/libs --link kafka:kafka openjdk:8 java -jar random-tweet-producer.jar

Build the service with gradle clean build, make sure the Kafka container is running, and start the service container with ./run.sh. After a couple of seconds you should see something like this:

Producer log

The Kafka topic is now being populated with tweets.

The consumer

Now we need to do about the same for the consumer. As a bonus, we will also add a web UI, and WebSockets to push the new tweets to the browser.

Build with Gradle

The build file is once again the same thing, except for the dependencies:

impl/consumer/websocket-tweet-consumer/build.gradle
...
dependencies {
    compile project(':api:tweet-streamer')

    compile("org.springframework.boot:spring-boot-starter-websocket")
    compile("org.webjars:webjars-locator")
    compile("org.webjars:sockjs-client:1.0.2")
    compile("org.webjars:stomp-websocket:2.3.3")
    compile("org.webjars:bootstrap:3.3.7")
    compile("org.webjars:jquery:3.1.0")
    compile("org.webjars:jquery-ui:1.12.1")

    testCompile('org.springframework.boot:spring-boot-starter-test')
}
...

Apart from the API, we’re adding WebSocket support, and a number of webjars for the frontend: SockJS, Twitter Bootstrap and JQuery.

The Application

Let’s start with the Application first. Instead of binding to the output channel, we point the @EnableBinding annotation to the TweetInput interface, for consuming the Kafka topic. Then we register the TweetEventListener as a Spring bean.

impl/consumer/websocket-tweet-consumer/src/main/java/be/beeworks/streaming/tweet/Application.java
package be.beeworks.streaming.tweet;


import be.beeworks.streaming.tweet.input.TweetEventListener;
import be.beeworks.streaming.tweet.input.TweetInput;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@EnableBinding({TweetInput.class})
@ComponentScan("be.beeworks.streaming.tweet")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public TweetEventListener tweetEventListener() {
        return new TweetEventListener();
    }
}

The TweetEventListener has a dependency on an instance of TweetReceiver, so we’ll write this next.

impl/consumer/websocket-tweet-consumer/src/main/java/be/beeworks/streaming/tweet/input/WebSocketTweetReceiver.java
package be.beeworks.streaming.tweet.input;

import be.beeworks.streaming.tweet.model.Tweet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
public class WebSocketTweetReceiver implements TweetReceiver {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketTweetReceiver.class);

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public void receive(Tweet tweet) {
        logger.info("Received tweet: {}", tweet);
        simpMessagingTemplate.convertAndSend("/topic/tweets", tweet);
    }
}

When receiving a tweet from the Kafka topic, this class converts the tweet to JSON and pushes it via WebSockets to the frontend, on the tweets topic. We use the SimpMessagingTemplate for this.

But we haven’t configured WebSockets yet:

impl/consumer/websocket-tweet-consumer/src/main/java/be/beeworks/streaming/tweet/infrastructure/WebSocketConfig.java
package be.beeworks.streaming.tweet.infrastructure;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/tweet-websocket").withSockJS();
    }
}

The @EnableWebSocketMessageBroker annotation configures the Spring infrastructure for WebSockets, and then we configure the message broker: The frontend will listen to topics under the /topic path. We also enable STOMP on the WebSocket endpoint /tweet-websocket.

That’s all for the backend, now all we need is a simple frontend.

Frontend

The frontend consists of a simple HTML page and some Javascript to subscribe to the /topic/tweets topic and react to messages received from this topic.

The web page will display all tweets in columns, grouped by author. The layout is made possible with this CSS.

impl/consumer/websocket-tweet-consumer/src/main/resources/static/index.html
<!DOCTYPE html>
<html>
<head>
    <title>Streaming Tweet Consumer</title>
    <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
    <link href="/bootstrap-horizon.css" rel="stylesheet">
    <link href="/main.css" rel="stylesheet">
    <script src="/webjars/jquery/jquery.min.js"></script>
    <script src="/webjars/jquery-ui/jquery-ui.min.js"></script>
    <script src="/webjars/sockjs-client/sockjs.min.js"></script>
    <script src="/webjars/stomp-websocket/stomp.min.js"></script>
    <script src="/app.js"></script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being
    enabled. Please enable
    Javascript and reload this page!</h2></noscript>
<div id="main-title" class="container-fluid">

    <div class="page-header">
        <h1>Streaming Tweets <small>Consume tweets from Kafka and display with WebSockets</small></h1>
    </div>
    <div id="deck" class="row row-horizon">
    </div>
    </form>
</div>
</body>
</html>
impl/consumer/websocket-tweet-consumer/src/main/resources/static/app.js
var stompClient = null;

function connect() {
    var socket = new SockJS('/tweet-websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {
        console.log('Connected: ' + frame);
        stompClient.subscribe('/topic/tweets', function (entry) {
            showEntry(JSON.parse(entry.body));
        });
    });
}

function showEntry(entry) {
    var authorId = toId(entry.author);
    var panelId = toId(entry.text);
    addColumn(authorId, entry.author);
    $("#" + authorId).prepend("<div id='" + panelId + "' class='panel panel-default'><div class='panel-heading'>" + entry.author + "</div><div class='panel-body'>" + entry.text + "</div><div class='panel-footer' id='" + panelId + "Footer'> </div> </div>");
    $("#" + panelId).effect("highlight","slow");
}

function addColumn(authorId, title) {
    if ( !$( "#" + authorId ).length ) {
        $("#deck").append("<div class='col-xs-4'><h2>"+ title + "</h2><div id='" + authorId + "'></div></div>");
    }
}

function toId(text) {
    return text.replace(/\W+/g, "");
}

$(function () {
    connect();
});

Running in a Docker container

Running the consumer in a Docker container is mostly the same as with the producer, except that we need to expose port 8080. In the script below we map the container port 8080 to port 80 on the host.

impl/consumer/websocket-tweet-consumer/run.sh
docker run --rm -v "$PWD":/usr/src/app -w /usr/src/app/build/libs --link kafka:kafka -p 80:8080 openjdk:8 java -jar websocket-tweet-consumer.jar

Build the consumer with gradle clean build, make sure the Kafka container and the producer are still running, and start the consumer container with ./run.sh. When the service has started successfully, point your browser to http://localhost/, and you should see the tweets popping up.

Consumer

Conclusion

It took some time to figure everything out, but in the end, building event-driven microservices with Spring Cloud Stream is quite straightforward. I still have mixed feelings about Avro. The library itself works quite well, but the documentation isn’t good enough.