Distributed Systems

Handling Kafka messages in microservices

1.0 The CloudCart Incident

The shrill sound of the emergency pager broke the stillness of the CloudCart office. It was Monday morning, and for the last hour, everything had seemed normal. That was, of course, until the alert from the business team hit everyone’s inbox: “URGENT: Orders Not Being Placed!”

Sam, a lead software engineer at CloudCart, glanced at the clock. It was 10:15 AM. Barely an hour into the week, and they already had a crisis on their hands.

He darted to the Operations War Room, which was buzzing with frantic energy. A screen at the front displayed a flurry of error messages and warning signs from the online selling platform. The team lead, Jane, was furiously tapping on her laptop, her face pale with worry. “Alright, people,” she shouted above the noise, “we’ve got a major issue. The business team reports that orders haven’t been processed for the last 20 minutes. Revenue is tanking, and we’re bleeding money every second!”

Sam felt a jolt of adrenaline. The last thing they needed was another production incident. He quickly took a seat and connected to the system logs. His fingers flew over the keyboard, scanning through Kafka error logs and tracing messages between the microservices.

The problem was immediately apparent. The order service wasn’t receiving messages from the inventory service. It was as if the data stream had been severed. He dove deeper, tailing the logs from Kafka. The message broker seemed fine; there were no errors indicating it was down. So, what was blocking the orders?

Then, he saw it — a strange log entry from one of the consumer services. “JSON parse error at line 5: Unexpected property.”

Sam felt his stomach churn. He knew instantly what this meant: someone had changed the data structure in the messages being produced to Kafka. With no schema registry to enforce a consistent data format, a minor change in the message structure could spell disaster across the entire system.

“Who owns the order producer?” Sam shouted, not looking up from his screen. “Who made changes recently?”

In the corner, Kevin, a junior developer on the team, raised his hand, his face a mix of guilt and panic. “Uh, I… I pushed a change to the order producer last night. I added a new field to the JSON payload to support a feature request from marketing. It was a small change. I didn’t think it would break anything.”

Sam felt the tension in the room spike. “Did you notify the consumers? Did you update their parsing logic?”

Kevin shook his head, looking embarrassed. “No… I didn’t think it was necessary. I thought they’d handle it.”

A wave of frustration washed over Sam, but he kept his voice calm. “Alright, let’s not panic. We need to find a solution fast.”

He thought quickly. Without a schema registry, their microservices relied on plain JSON for data interchange. Adding or modifying fields could easily lead to parsing errors if the consumer wasn’t expecting the change. With hundreds of microservices depending on the same data stream, the impact of even a small alteration could be catastrophic.

“Everyone, listen up,” Sam called out. “Here’s the plan. Kevin, roll back your change immediately. Jane, we need to get a hotfix in place for the consumer services to handle the new data gracefully. And I want everyone to start working on setting up a schema registry for Kafka. We can’t afford another incident like this.”

As Kevin scrambled to undo his change, Sam and Jane worked together to patch the consumer services. Minutes ticked by like hours. Sam’s mind raced as he combed through the code, ensuring every consumer could handle the new JSON format without breaking.

Finally, after what felt like an eternity, the alert stopped. The orders started coming in again. The red error messages on the screen began to disappear, replaced by the green checks of a healthy system.

2.0 The Problem

The story above illustrates a hypothetical yet common scenario that engineers often face when managing a microservice architecture. In this case, the order and inventory services communicate with each other through a message broker, exchanging data in JSON format, which lacks a built-in schema.

In large systems, it’s typical for different modules to be broken down into separate, independent applications or services. These services are often developed using various programming languages and frameworks, tailored to specific needs or preferences. Despite their independence, these services must still work together cohesively to achieve the system’s overall objectives, which requires a common communication method that transcends language barriers.

To facilitate this, messages passed between services are typically formatted in a language-agnostic manner. Common formats include:

While these formats are inherently language and platform-independent, it falls upon the developers to ensure the integrity and consistency of the messages being exchanged. This means that any changes in the message schema need to be consistently reflected across all services that produce or consume those messages. Ensuring this alignment is crucial for maintaining seamless communication and functionality across the entire system.

One way to address this challenge is by implementing data validation in every service that processes a specific message. However, this can lead to significant code duplication. In complex systems with dozens or even hundreds of services, managing data validation consistently across all services becomes a tedious and error-prone task.

Additionally, in large systems, different teams of developers are typically responsible for different services. This organizational structure means that any change to a message schema or the addition of new validation rules must be effectively communicated to all teams relying on that message. While this might sound straightforward, in practice, the speed and accuracy of task completion tend to decrease as the number of teams or individuals involved increases.

In this article, we will explore how to handle communication between different services in a microservice architecture. We’ll also delve into how to implement centralized schema validation for all messages produced or consumed by these services to ensure data consistency. Our discussion will focus on leveraging Kafka, an open-source distributed event streaming platform, to achieve this goal.

The complete application is available at Github.

3.0 Communication between microservices

There are different ways of handling communication between different services in a microservice architecture. We can categorize these techniques into two different types based on the mechanism of data flow:

In a standard client-server architecture, the client sends a request and waits for a response from the server before continuing. This type of communication is known as synchronous communication. A common example of synchronous communication in a microservice architecture is the use of legacy HTTP REST endpoints. Here, a client (which could be a service within the microservice ecosystem) sends a request to a server (another service) via an HTTP endpoint. The client remains idle, waiting for the server to process the request and return a response. One of the primary advantages of this approach is its simplicity—synchronous communication is straightforward to implement, and applications are easier to debug since requests are processed sequentially.

However, with the rapid evolution of technology and the increasing demand for high-performing, scalable, and fast online services, synchronous communication often becomes a bottleneck as systems grow larger and more complex. In the example above, the client sits idle while waiting for the server to process its request, which can lead to delays. Other services attempting to send requests to the same client are also forced to wait in a queue, compounding the problem. While this bottleneck could theoretically be addressed by scaling the services horizontally and deploying them behind load balancers, such an approach requires additional costs, resources, and increased infrastructure management.

To address this issue, an optimal solution is to design a system where a service does not need to remain idle while waiting for the server to process its request. Instead, the service can continue executing other tasks while the response is stored in a buffer within the client service. Once the server’s response is available in the buffer, the client service can retrieve it and proceed with further processing. This method is known as asynchronous communication.

4.0 Kafka and Confluent Schema Registry

Kafka is commonly used for asynchronous communication. If you’re new to Kafka or want to learn more about its various components, I recommend reading the article linked here. In Kafka, producers are services that send messages to consumers through Kafka topics, while consumers are services that subscribe to these topics to receive messages. Kafka supports multiple message formats, such as strings, JSON, Avro, and more. Importantly, messages sent to a topic are not required to have a predefined schema.

To ensure that all messages within a topic conform to a specific schema, a practical approach is to establish a dedicated application where the schemas for messages are defined. When a producer service creates a message, it validates the message against the correct schema before sending it. Similarly, when a consumer receives a message, it can validate the schema using the schema validation application and then parse the message accordingly. This centralized schema validation simplifies management and ensures consistency across the system over time.

One widely used schema validation tool for Kafka is the Confluent Schema Registry. It is a managed schema repository that supports data storage and exchange for both stream processing and data at rest, such as databases, files, and other static data storage.

In this article, we will demonstrate how to implement a system that ensures message schemas remain consistent across all services. To gain a better understanding of the key components, I recommend reading the articles listed in the reference section below to familiarize yourself with Apache Kafka and the Confluent Schema Registry.

5.0 System Overview

We will build a system that sends emails to different recipients. To keep the application straightforward, we will develop two services: Customer Service and Email Service. The Email Service is a Node.js application that contains HTML templates for various types of emails. It functions as a Kafka consumer, subscribing to a Kafka topic named send-emailOn the other hand, the Customer Service is a Django application that provides a user interface with a form for sending emails. Users can select the email category and provide the necessary details to send the email. This service acts as a producer, sending messages to the send-email topic. This topic receives messages in the following format:

    {
    "category": string,
    "to": string,
    "from": string,
    "subject": string,
    "body": string,
    "isHtml": boolean,
    "attachments": string[],
    "cc": string[]
    }

We will define the schema in the Schema Registry and use Avro for our schema. For more information on Avro specifications, I recommend reading this article.

To streamline infrastructure management, we will build the entire system using Docker.

kafka schema registry overview
Figure: Communication between services

The diagram provided above shows the flow of message and message schema among Kafka, producer, consumer, and schema registry.

5.1 Message Schema

The Avro schema for the message related to ‘send-email’ is as follows:

{
    'type': 'record',
    'name': 'EmailCommand',
    'fields': [
        {'name': 'to', 'type': {'type': 'array', 'items': 'string'}},
        {'name': 'from', 'type': 'string'},
        {'name': 'subject', 'type': 'string'},
        {'name': 'body', 'type': 'string'},
        {'name': 'isHtml', 'type': 'boolean'},
        {'name': 'category', 'type': 'string'},
        {'name': 'cc', 'type': {'type': 'array', 'items': 'string'}},
        {'name': 'attachments', 'type': {'type': 'array', 'items': 'string'}}
    ]
}

5.2 Setting up Docker containers

We will use Docker Compose to setup our infrastructure. It is a tool for defining and running multiple container applications. The docker-compose.yml file is as follows:

version: "3.8"

services:  
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.6
    hostname: ksr-zookeeper
    container_name: ksr-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_LOCAL_PORT}
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka_schema_registry_net

  kafka:
    image: confluentinc/cp-kafka:6.2.0
    hostname: kafka
    container_name: ksr-kafka
    depends_on:
      - zookeeper
    ports:
      - "${KAFKA_LISTENER_HOST_PORT}:${KAFKA_LISTENER_PORT}"

    volumes:
      - ./development/kafka/data:/var/lib/kafka/data

    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:${ZOOKEEPER_LOCAL_PORT}'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:${KAFKA_LISTENER_PORT},PLAINTEXT_HOST://localhost:${KAFKA_INTERNAL_LISTENER_PORT}
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

    networks:
      - kafka_schema_registry_net

  kafdrop:
    image: obsidiandynamics/kafdrop:4.0.2
    container_name: ksr-kafdrop
    restart: "no"
    ports:
      - "${KAFDROP_HOST_PORT}:${KAFDROP_LOCAL_PORT}"
    environment:
      KAFKA_BROKERCONNECT: "kafka:${KAFKA_LISTENER_PORT}"

    depends_on:
      - kafka

    networks:
      - kafka_schema_registry_net

  registry:
    image: confluentinc/cp-schema-registry:7.4.6
    container_name: ksr-schema-registry
    hostname: schema-registry
    depends_on:
      - kafka
      - zookeeper
    ports:
      - "${SCHEMA_REGISTRY_HOST_PORT}:${SCHEMA_REGISTRY_LOCAL_PORT}"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:${ZOOKEEPER_LOCAL_PORT}'
      SCHEMA_REGISTRY_LISTENERS: http://schema-registry:${SCHEMA_REGISTRY_LOCAL_PORT}
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:${KAFKA_LISTENER_PORT},PLAINTEXT_HOST://localhost:${KAFKA_INTERNAL_LISTENER_PORT}
      SCHEMA_REGISTRY_DEBUG: 'true'
    networks:
      - kafka_schema_registry_net


  # Services
  email_service:
    build: ./email-service
    container_name: ksr-email-service
    volumes:
      - ./email-service/src:/app/src
      - ./email-service/package.json:/app/package.json
      - ./email-service/node_modules:/app/node_modules
      - ./email-service/tsconfig.json:/app/tsconfig.json

    ports:
      - "${EMAIL_SERVICE_PORT}:${EMAIL_SERVICE_LOCAL_PORT}"

    networks:
      - kafka_schema_registry_net

    environment:
      - DEBUG=true
      - EMAIL_SERVICE_PORT=${EMAIL_SERVICE_LOCAL_PORT}
      - EMAIL_SERVICE_HOST=${EMAIL_SERVICE_HOST}
      - KAFKA_HOST=kafka
      - KAFKA_PORT=${KAFKA_LISTENER_PORT}
      - SCHEMA_REGISTRY_HOST=registry
      - SCHEMA_REGISTRY_PORT=${SCHEMA_REGISTRY_LOCAL_PORT}

    depends_on:
      - kafka
      - registry

  customer_service:
    build: ./customer_service
    container_name: ksr-customer-service

    command: python manage.py runserver 0.0.0.0:${CUSTOMER_SERVICE_LOCAL_PORT}
    volumes:
      - ./customer_service:/app

    ports:
      - "${CUSTOMER_SERVICE_PORT}:${CUSTOMER_SERVICE_LOCAL_PORT}"

    environment:
      - DEBUG=true
      - KAFKA_HOST=kafka
      - KAFKA_PORT=${KAFKA_LISTENER_PORT}
      - SCHEMA_REGISTRY_HOST=schema-registry
      - SCHEMA_REGISTRY_PORT=${SCHEMA_REGISTRY_LOCAL_PORT}
      - SECRET_KEY=${CUSTOMER_SERVICE_SECRET_KEY}

    networks:
      - kafka_schema_registry_net

    depends_on:
      - kafka
      - registry
  
    

networks:
  kafka_schema_registry_net:
    driver: bridge

We are using the following services for our application:

1. zookeeper: This is the zookeeper instance that manages the Kafka cluster.

2. kafka: A Kafka instance running a single node.

3. kafdrop: A service that provides a UI to manage and monitor Kafka clusters.

4. registry: Confluent schema registry that manages the message schema of Kafka.

5. email_service: A NestJS microservice application that receives messages from customer service application and sends appropriate email to the customer.

6. customer_service: A Django application that acts as the message producer. It contains a view to submit an email form which is forwarded to email service. 

I won’t dive into the specifics of NestJS and Django. Instead, I’ll focus on the critical aspects related to message validation, transmission, and reception.

5.3 The Message

We define a class named ‘EmailCommand’ in both the producer and consumer applications. This class represents the message payload that is transmitted via Kafka. I have provided the schema defined in NestJS and Django for comparison.

export interface EmailCommand {
    to: string[];
    from: string;
    subject: string;
    body:string;
    isHtml: boolean;
    cc?: string[]|null;
    category: string;
    attachments?: string[]|null;
}
class EmailCommand:
    GENERAL_EMAIL_CATEGORY = 'general'
    TECHNICAL_EMAIL_CATEGORY = 'technical'
    BILLING_EMAIL_CATEGORY = 'billing'

    def __init__(
        self, 
        category:str, 
        to:List[str], 
        from_email:str, 
        subject:str, 
        body:str, 
        is_html:bool=False,
        cc:List[str]=[],
        attachments:List[str]=[]
    ):
        self.category = category
        self.to = to
        self.from_email = from_email
        self.subject = subject
        self.body = body
        self.is_html = is_html
        self.cc = cc
        self.attachments = attachments

5.4 The Django Producer

customer service interface

The Customer service application is a web-based platform featuring a form on its homepage. Users can enter the details of the email they wish to send and submit the form. Upon submission, the input is converted into a message object, validated against the schema registry, and subsequently sent to a Kafka topic. 

Users can choose the type (or category) of email being sent to the consumer, allowing different kinds of emails to be managed through a single message schema. This category is included as the key of the Kafka message. Additionally, users can input relevant details such as the sender, recipient, subject, body, attachments, and CC.

The application includes two distinct commands:

1. Register message schema: This command utilizes the REST API endpoint of the registry application to generate a message schema for the ‘send-email’ topic. It allows us to create a script to automatically update the schema prior to deploying the application.

2. Fetch all message schemas from the registry: This command retrieves all registered message schemas from the registry, which is particularly useful for debugging purposes.

Configuring the Kafka producer

To maintain simplicity in the application, we configured the producer within our views.py file. Initially, we set up the configuration for our schema registry, choosing Avro as our schema definition language. We then created a producer object to send messages to Kafka. Depending on the use case, we can either instantiate a producer for each request or establish a global producer connection. Each approach offers its own advantages. In our case, we opted for a globally instantiated producer connection to maintain a continuous link with Kafka.

from django.http import JsonResponse
from django.shortcuts import render
from django.conf import settings

from django.http import HttpRequest, HttpResponse
import json
from typing import List
from .commands import EmailCommand


from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry.error import SchemaRegistryError


# Create the configuration for Schema registry. Use Avro as the schema definition format
schema_registry_client = SchemaRegistryClient(settings.SCHEMA_REGISTRY_CONF)
avro_serializer = AvroSerializer(schema_registry_client, json.dumps(EmailCommand.get_avro_schema()), EmailCommand.parse_from_schema)
string_serializer = StringSerializer('utf_8')

# Create the producer instance
producer = Producer({'bootstrap.servers': settings.KAFKA_BROKER_URL})
Producing the message

The following section provides the code for sending a message to Kafka after schema validation. It takes a JSON object, converts it into the appropriate message object, and then validates the schema against the registry. If the validation is successful, the message is sent to the designated Kafka topic.

def send_command(data: dict):
    """
    This function receives a dictionary with the data from the form. It creates a message object and validates the schema agains the registry. 
    It then sends the message to the Kafka topic.
    @param data: dict
    @return: Tuple[str, bool]
    """
    # Convert the dictionary to message object
    command = EmailCommand.from_json(data)
    if command is None:
        return "Invalid command"

    response = "Email sent successfully"
    is_success = True
    try:
        # Validate the schema and send message to Kafka topic
        producer.produce(
            settings.KAFKA_EMAIL_TOPIC, 
            key=command.category, 
            value=avro_serializer(EmailCommand.to_json(command), SerializationContext(command.category, MessageField.VALUE)),
            on_delivery=delivery_report
        )
        producer.flush()
    except SchemaRegistryError as e:
        # If the schema is invalid, the message will not be sent
        print(f"Failed to send email: {e}")
        response = e.error_message
        is_success = False

    except Exception as e:
        print(f"Failed to send email: {e}")
        response = "Failed to send email"
        is_success = False
        raise ValueError(e)
    return response, is_success

5.5 The NestJS Consumer

The Email Service application is a NestJS-based microservice that subscribes to a Kafka topic. It receives messages from Kafka, validates them against the schema, and parses them into the correct format before displaying the message.

import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';


async function bootstrap() {
  /**
   * This function creates a microservice instance using the NestFactory class. It configures the Kafka instance,
   */
  const app = await NestFactory.createMicroservice(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: [`${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`],
        }
      }
    },
  );

  await app.listen();
}
bootstrap();

The code provided above is located in the main.ts file of the main application. It initializes the NestJS application and configures the Kafka client to consume messages.

Consuming the message

Next, we implement a function to consume messages from the Kafka server. This function subscribes to the ‘send-email’ topic and receives the message payload as a byte array. It then validates the schema and parses the message into the appropriate format. The following code is available in app.controller.ts file.

import { Controller, Get, OnModuleInit, UseInterceptors } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { EmailCommand } from './commands/email.command';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';

@Controller()
export class AppController {
  private schemaRegistry: SchemaRegistry;
  constructor(
  ) {
    /**
     * This function initializes the SchemaRegistry instance with the host and port of the Schema Registry service.
     */
    this.schemaRegistry = new SchemaRegistry({host: `http://${process.env.SCHEMA_REGISTRY_HOST}:${process.env.SCHEMA_REGISTRY_PORT}`});
  }

  @MessagePattern('send-email')
  async sendEmail(@Payload() data: any) {
    /**
     * This function decodes the data from the Kafka message and parses it into the EmailCommand object.
     * @param data The data from the Kafka message received as byte array.
     * @returns boolean
     */
    const parsedData:EmailCommand = await this.schemaRegistry.decode(data);
    console.log(parsedData);
    return true;
  }
}

5.6 The Output

The following section illustrates the message received by the consumer and the output of the decoded message. The data is initially received as a byte array and is subsequently parsed into the appropriate JSON object.

output

In this article, we’ve embarked on an enlightening exploration of the complexities involved in designing a microservice architecture. We’ve navigated through the intricacies of various messaging protocols and zeroed in on Kafka and the Schema Registry, emphasizing their crucial role in ensuring schema consistency across your message streams. Our hands-on demo showcased how to integrate the Schema Registry into a microservice environment, laying the groundwork for a more resilient system.

But the journey doesn’t stop here. In our next installment, we’ll dive into the art of evolving message schemas—an essential aspect of maintaining a dynamic and scalable microservice ecosystem. We’ll explore strategies for updating schemas while ensuring that all interacting services remain aligned and operational. Expect to uncover best practices for schema versioning, address common pitfalls, and gain insights into managing schema changes with minimal disruption. We will refactor our codebase to remove magic strings and dynamically handle incoming messages.

So, gear up for a deep dive into the future-proofing of your message schemas. Together, we’ll master the nuances of schema evolution and empower your microservices to adapt seamlessly to the ever-changing landscape of modern applications.

References

Top