Home The outbox pattern
Post
Cancel

The outbox pattern

Problem

Let’s picture a common scenario where we have a service that manages Products and our customer wants to change a property of a given product, let’s say its price. Several other systems might be interested in the price change information, for instance, the marketing team could have some triggers configured to send e-mails to certain customers about offers.

To solve that we can use Kafka as an event bus, so when the price is updated by the product-service, it can publish a new event to a Kafka topic, and then the marketing-service can consume it and have the new value for the price of that product.

    graph LR

    database[( product-database)]
    kafka("Kafka[price-change-topic]")

    User -- changes price --> product-service 
    product-service -- updates price --> database    
    product-service -- publishes new price --> kafka
    kafka -- consumes price changes --> marketing-service

Ok, but the product-database and price-change-topic are different external systems from the product-service, they are separated by a network connection and we have one state that we need to propagate to them and in most cases, we want to ensure that they agree with each other, even if not immediately.

Writing to both using database transactions is the first thought that might come to mind, but that can be misleading. There is no shared transaction boundary between the database and Kafka topic but rather there are two. Our product service would need to synchronize between these two systems and perform some form of two-phase commit to keep systems in sync. Let’s see scenarios that can happen:

 database ✅database ❌
Kafka ✅happy pathsend rollback event
Kafka ❌rollback in DBnoop

In a situation where the transaction is rolled back after the communication with Kafka is flushed, you would end up with a ghost event on the topic that would need to be handled by sending a rollback event. Depending on the publisher’s configuration it could also happen that the event would stay on the buffer to be sent even after the transaction is committed, and in a case of an interruption in the system, those events would be lost.

The problem is in the delivery guarantees of the data. A couple of options can happen:

  • At most once delivery - best effort but no guarantees for durability. In edge cases, some data may be lost. This is a common pattern for storing logs.
  • Exactly once delivery - the data is not duplicated and the producer guarantees full synchronization between Kafka and database. This is very hard to achieve.
  • At least once delivery - the data may be duplicated but the producer guarantees delivery at least once. Usually, events are not duplicated but during edge case scenarios it can happen (f.e. network partition). This is the usual compromise to go, and consumers of data need to compensate. Then consumers apply a deduplication window based on their use case.

How can we ensure that the state between the two storage systems is consistent? Meaning, when I have the price stored in the database I’ll have it on the topic as well.

Implementing the Outbox Pattern

The Outbox Pattern is a trick where we use the database transaction to ensure no events are lost guaranteeing at least once delivery. You can simply implement it by adding a second transaction. First, by storing the event in the database with a pending state, and once it is published to the topic, you update it to published. In the case of something going wrong events can be retried, and the worst inconsistency that can happen is having duplicated events on the topic. Let’s see all the scenarios below.

Happy Path

    sequenceDiagram
        actor User
        User->>product-service: calls post endpoint
        product-service->>+product-database: store the price change
        product-service->>product-database: store the event with status `pending`
        product-database->>-product-service: committed
        product-service->>kafka:publish
        kafka->>product-service: ok
        product-service->>+product-database: update the event with status `completed`
        product-database->>-product-service: committed
        product-service->>User: success response


Failure to publish to Kafka

When there is a failure to publish to Kafka, the service can retry from the database and make sure that no messages are lost. Furthermore, if you want to ensure that the order of the events is preserved, you can go for publishing the events always through a secondary process that would select from the database and publish them to Kafka.

The downside of this approach is the eventual increase in the SLA of the delivery of the event. That’s the trade-off to gain eventual consistency.

    sequenceDiagram
        actor User
        User->>product-service: calls post endpoint
        product-service->>+product-database: store the price change
        product-service->>product-database: store the event with status `pending`
        product-database->>-product-service: commit
        product-service->>User: success response
        loop
            product-service->>+product-database: find pending events
            product-service->>kafka:publish
            kafka-->>product-service: |network exception|
            product-service->>kafka:publish
            kafka->>product-service: ok 
            product-service->>+product-database: update the event with status `completed`
            product-database->>-product-service: committed
        end

If you have multiple nodes of the product-service, you still need to take care of the order of the events.

  • One solution is to use a leader election and publish on a single node, using the Single Producer Principle. Although if you are in a high throughput that can be a bit problematic and slow your business down.
  • Another solution is to attribute a specific partition or group of partitions to a specific node that you can select from the database using the partition key and that would still keep the order across the multiple nodes. The downside is a bit of configuration and you would still lock your nodes to the size of partitions and that would make scaling harder.

Change Data Capture

Change Data Capture connects directly to your database, reading its log files, infers the changes from the logs, and publishes the events to Kafka directly. If any error occurs on the publish, it can take care of the retry for you, and it is all transparent. One popular implementation of the CDC is the Debezium project.

    graph LR

    database[(product-database)]
    kafka("Kafka[product-topic]")
    User -- changes price --> product-service 
    product-service -- updates price --> database    
    Debezium -- publishes new product version --> kafka
    kafka -- consumes product version --> marketing-service
    database --- Debezium

    sequenceDiagram
        actor User
        User->>product-service: calls post endpoint
        product-service->>+product-database: store the price change
        product-database->>-product-service: commit
        product-database->>Debezium: observes the change
        Debezium->>kafka:publish
        product-service->>User: success response

Outbox Table Pattern

Now let’s put it all together!

CDC is great for Event Sourcing when we publish the state of the event every time, which is great to replicate data across systems, like in the case where we wanted to propagate the price. But what if we have a use case of Event Streaming where we want to expose a fact that happened, for instance, product-sold-event. In this scenario, we would not have exactly an entity that we want to propagate but a new event that we want to publish. Still, that can follow the same pattern as the CDC. You just need to create a table that would match your new event and store them there instead of publishing them directly. Eventually, you would need to clean up this table to not let it grow forever.

This approach can be very powerful, because it can also allow you to de-normalize data from different sources and then provide it as events downstream, giving this data a new purpose.
Let’s consider this simple example below: (ignore the super simple and nonrealistic modeling)

    erDiagram
    
    Product {
        string SKU PK "Identifier of the product"
        string name
        double price
    }
    Stock ||--|| Product : ""
    Stock {
        string id PK "stock id"
        string productId FK "id of the product"
        int quantity
    }

Depending on your domain, you might have multiple bounded contexts that would benefit from having an event that would already group both entities, Product end Stock. So, from the product-service domain can receive two distinct events from each of the entities, then we can join them and further use the CDC to publish it downstream. Also to note that I’m picturing here Product end Stock as two entities in my database, but they could also be events from different systems.

        graph LR

        database[(product-database)] --> Debezium
        Debezium --> kafkaProduct("Kafka[product-topic]")
        Debezium --> kafkaStock("Kafka[stock-topic]")
        kafkaProduct -->  composer-service
        kafkaStock -->  composer-service
        composer-service -- Joins to database --> composerdb[(composer-database)] 
        composerdb --- Debezium2
        Debezium2 --> kafkaProduct2("Kafka[productEnhanced-topic]")


    erDiagram
  
    ProductStock {
        string name
        double price
        string stockId 
        string productId 
        int quantity
    }

NoSql no problem

Oh, you are not using a SQL database and still want to benefit from this pattern? Confluent provides a long list of connectors that allows you to connect your storage to Kafka, making easy data replication. The list goes from Elasticsearch, Kinesis, and MongoDb to many more.

Summary

Data replication across the organization and sometimes even beyond the organization is a challenge and an opportunity that we face every day. By using these simple tricks we can achieve the right consistency without trading off the latency too much. Of course, if absolute latency is your concern, and you are on the p999 < 100 milliseconds SLAs maybe you should consider other communication patterns.

This post is licensed under CC BY 4.0 by the author.

-

Command Query Responsibility Segregation [CQRS]