The Lost Tutorial of Kafka 🎏 Pt. 4

📥 The Consumer

In the previous part, we talked about the Producer in Kafka and how to send events to the topic. We saw that Kafka has a simple consumer to run from the terminal to listen to events being produced to our topics. But we can also set up a consumer programmatically with Spring to listen to a specific topic. Will be doing that now.

For this part, the Consumer will be a little less involved than the producer and a lot of the steps here will be similar to what we did when building the producer. So working with the producer will be more straightforward and quicker to develop. Let’s get started!

Configuration

Configuring the consumer will be like the producer. To summarize:

  • Kotlin: delete the main java file and create a Kotlin main (just copy and paste from the producer).

    • Run a Maven clean install to make sure everything is working smoothly: mvn clean install.
  • Resources: The application.yaml file will be identical for the consumer, just make sure you change the server port. Let’s say 9070 or something.

Consumer Configuration

The configuration for the consumer will be similar to the producer. But under a config folder will just be creating one class:

@EnableKafka
@Configuration
class KafkaConsumerConfig(
    @Value("\\${kafka.bootstrapAddress}")
    private val servers: String
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String?, Any?> {
        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = DriverDeserializer::class.java
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any>? {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        factory.containerProperties.isSyncCommits = true
        return factory
    }
}
  • Constructor: will be using the server port located in the application.yml file.

  • Consumer Factory: here will be the main configuration, like the key and value of the event.

  • Listener Factory: this helps in creating containers for methods annotated with @KafkaListener.

Model

Well, I suppose you know what to do now right? Yep! C & P. Nothing different here as well. We need the driver model to be the same so it can be mapped out correctly.

Driver

data class Driver(
    val driverID: String,
    val latitude: Double,
    val longitude: Double
)

Deserializer

In the producer, we focused on serializing the data so that Kafka can send events in Bytes. Now the consumer needs to handle that message which is in Bytes. Below is the same image from the Producer article for a quick refresher on how data is managed in Kafka.

If you guessed correctly, we can also build our custom Deserializers!

class DriverDeserializer : Deserializer<Driver> {
    private val objectMapper = ObjectMapper()
    private val log = LoggerFactory.getLogger(javaClass)

    override fun deserialize(topic: String?, data: ByteArray?): Driver? {
        log.info("Deserializing...")
        return objectMapper.readValue(
            String(
                data ?: throw SerializationException("Error deserializing Driver"), UTF_8
            ), Driver::class.java
        )
    }

    override fun close() {}

}

Now that's out of the way, we can finally move on to the fun part of the consumer.

Listener

The listener layer is where we can play around with the consumer. Here will simply just listen to incoming events from the topic.

@Component
class KafkaListeners {

    @KafkaListener(topics = ["driver_location"])
    fun listener(data: Driver) {
        println("listener received: $data")
    }
}

Example

Now to test the consumer, it will be similar to when we tested the producer. Just run this app and the producer at the same time. When you run the app, the consumer will already read events that are already stored in the topic. If you hit the request a second time, then it will read that a new event coming in.