The Lost Tutorial of Kafka 🎏 Pt. 5

🏪 The Store Service

·

7 min read

Welcome to the final part! I appreciate you sticking around. Let’s finish this assignment with the Store Service. This part it’s pretty simple, like the consumer. It’ll have more layers, but if you're used to building REST API then this should be easy enough. Also, we’re going back to some good old Java. Sorry for all my Kotlin lovers, but feel free to keep using it if you want! You're not missing out, I just like using Java for regular REST API.

Configuration

As usual, let’s do some quick configuration. Here will just deal with the resources folder and the pom.xml, so no Kafka configuration here.

Pom

Will be needing new dependencies, so make sure you add it to the service pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
                <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
 </dependencies>
  • Spring Data JPA: this API will help us play with the Database

  • Lombok: annotation library to make code cleaner and ditch boilerplate code.

Resources

server:
  port: 9080

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/db
    username: user
    password: user123
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    show-sql: false
    hibernate:
      ddl-auto: create
    generate-ddl: true
    database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL5InnoDBDialect

Let’s go over this quickly to highlight important points:

  • Port: the assignment specified to use port 9080 so let's use that one.

  • Spring Data: this services will be connecting to a Database, so we need to configure how Spring will handle that with Spring Data.

    • Data source: here will put the credentials to connect to the DB.

    • JPA: configuration to specify the DB dialect

đź’ˇ After you run the app, Spring will create the DB properties (schema, models, etc.). This is due to jpa.hibernate.dd-auto: create. After the creation, make sure you change it to update.

With these properties, the service is ready to roll. Let’s move on to the model. #

Model

In this service will be introducing two new models: Store and DriverDistance. The store model will be similar to the Driver in the producer and consumer. On the hand, DriverDistance will be the same as Driver, but with an additional field. ## Store Let’s start with the new model. The store will be an Entity class for our database. After creating it, we will run the app to see if our configuration worked and created the DB. Note: make sure Docker is running.

@Entity 
@Table(name = "store") 
@Data 
public class Store {

@Id  
@Column(name = "store_id")  
String storeId;

@Column(name = "store_latitude")  
double latitude;

@Column(name = "store_longitude")  
double longitude;
}

Additionally, let’s make sure our main class is updated with @EntityScan annotation.

@SpringBootApplication
@EntityScan("com.luismir.kafka.model")
public class StoreServiceApplication {
    public static void main(String[] args) { SpringApplication.run(StoreServiceApplication.class, args); }
}

Now we have everything to run the app. If you want you can connect to the DB and see if everything is alright, you should see that there is nothing there. If you can connect to the DB, then we should be good to go. If you spin up the app you should see the table created.

Driver

Now we can finish off the models with the new DriverDistance.

@Data
public class DriverDistance {

    private String driverID;

    private double distance;

    public DriverDistance(String driverID, double distance) {
        this.driverID = driverID;
        this.distance = distance;
    }
}

This model will be similar to the one in the producer and consumer, but let's add a new field distance. This will simply hold how far a driver is from a specific store. Aside from that, everything is the same.

Repo

The repository layer will be fairly simple for now, we just need to get a store based on its ID. The other CRUD operation will be fairly simple, and we can use the default one given by the JpaRepository API.

import com.example.storeservice.model.Store;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface StoreRepo extends JpaRepository<Store, String> {

    Store findStoreByStoreId(String id);
}

Service

Now it’s time for the service layer, the middle man (if you like to think of it that way). I’ll be following the DAO design pattern.

Interface

So, like in the repo layer, I’ll also be creating an interface which will be implemented. But, let’s start with the interface to see how this will work.

@Service
public interface StoreService {

    void saveStore(Store store);

    Store getStoreById(String id);

    List<Store> getAllStore();

    List<DriverDistance> getDriversAroundStore(String id, int n);
}

The first three methods are some basic CRUD functionalities for the store. Here we can save a store, get one by the ID or get all of them. For now, that’s all we need, we can add other functionalities layer if needed. The last method, getDriversAroundStore(String id, int n), will be the main functionality to return to a client.

Implementation

Let’s start with the basic functionalities, not much to explain, so let's go over it quickly.

  1. Autowiring
private final StoreRepo storeRepo;

@Autowired
public StoreServiceImpl(StoreRepo storeRepo) {
    this.storeRepo = storeRepo;
}
  1. Basic CRUD functionality
@Override
public void saveStore(Store store) {
    storeRepo.save(store);
}

@Override
public Store getStoreById(String id) {
    return storeRepo.findStoreByStoreId(id);
}

@Override
public List<Store> getAllStore() {
    return storeRepo.findAll();
}

Easy enough, right? Let’s go to the fun part now. Calculating the distance between the store and the driver. This part is easy enough, but how are we going to get the events from the consumer? The consumer is in a different project and while it lives inside the same parent pom, we can’t directly import it. What we need to do is install it as a dependency in the service pom file. Under the dependencies tag add:

<dependency>
    <groupId>com.luismir.kafka</groupId>
    <artifactId>consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

Before we implement the get getDriversAroundStore method, we need to add something in the consumer.

Modifying the consumer

Currently, there not much going on in the consumer. The only functionality it has is just logging the events. So we need to think of a way to hold a certain amount of events so that the service can pull from it and get N drivers. What we could do is persist the drivers in a database, but for simplicity, I'll just hold them in memory for now. Ok so here is the game plan:

  • Create a map of events

  • While events are coming in, store them on the map

  • Create another method to return a list

Pretty straightforward, lets see what that would look like:

@Component
class KafkaListeners {

    private val events: HashMap<String, Driver> = HashMap();

    @KafkaListener(topics = ["driver_location"], groupId = "default")
    fun listener(data: Driver) {
        println(data);

        events[data.driverID] = data;
    }

    fun fetchEvents(n: Int): List<Driver> {
        return if (events.isEmpty()) {
            emptyList();
        } else {
            events.values.toList().takeLast(n);
        }
    }
}

As you can see, we created an event HashMap. The reason for using a map is so we don't have a driver repeat itself. Since map keys are unique, if the same driver comes, it will replace the existing Driver object in the map. Additionally there's the fetchEvents method that will be used in the service. Now let's jump back over there.

Implementing the final method

To implement getDriversAroundStore we first need to retrieve a store based on its ID and the list of drivers from the consumer. Then for each driver, we can get its coordinate and calculate the distance from the store. Let's implement it.

@Override
    public List<DriverDistance> getDriversAroundStore(String id, int n) {
        Store store = getStoreById(id);
        List<Driver> drivers = kafkaListeners.fetchEvents(n);

        List<DriverDistance> driverDistances = new ArrayList<>();
        for (Driver driver: drivers) {
            double a = store.getLatitude() - driver.getLatitude();
            double b = store.getLongitude() - driver.getLongitude();
            double distance = Math.sqrt(a * a + b * b);

            driverDistances.add(new DriverDistance(driver.getDriverID(), distance));
        }

        driverDistances.sort(Comparator.comparingDouble(DriverDistance::getDistance));

        return driverDistances;
    }

Additionally, we want to sort the drivers based on how close they are to a store.

Controller

Now we are in the final stage to implement this. All we need to do is implement the same methods found in the service as so:

package com.luismir.kafka.controller;

import com.luismir.kafka.model.DriverDistance;
import com.luismir.kafka.model.Store;
import com.luismir.kafka.service.StoreService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
public class StoreController {

    private final StoreService storeService;

    @Autowired
    public StoreController(StoreService storeService) {
        this.storeService = storeService;
    }

    @PostMapping("/stores")
    public void saveStore(@RequestBody Store store) {
        storeService.saveStore(store);
    }

    @GetMapping("/stores/{id}")
    public Store getStoreById(@PathVariable String id) {
        return storeService.getStoreById(id);
    }

    @GetMapping("/stores")
    public List<Store> getAllStores() {
        return storeService.getAllStore();
    }

    @GetMapping("/stores/{id}/drivers")
    public List<DriverDistance> getDriversAroundStore(@PathVariable String id, @RequestParam int n) {
        return storeService.getDriversAroundStore(id, n);
    }
}

Testing

Nah, I ain't talking about JUnit (please implement this while developing and don't be a chum like me). We are to send a request to the service and hopefully get some responses back (also feel free to add better responses than me). All you need to do is spin the producer and store service. Don't worry about the consumer, because we are using it as a dependency is the service, then it will also be running and you'll be able to see all the logs from there.

Wrapping this up

I hope you enjoyed this tutorial and were able to learn a couple of things. For now, this is just a basic implementation of the project and there's a lot of room for improvement. I also need to add some JUnit. But feel free to follow along on GitHub if you would like to contribute. Also please feel free to give any feedback on how to improve this project.

Â