Synchronous requests in a kafka world

In designing scalable architectures, Command Query Responsibility Segregation (CQRS) and Event Sourcing is a common pattern used. A typical implementation of CQRS would have a component like kafka residing in the middle of everything. The system is then coordinated by the movement of events as services publish and consume from kafka. With kafka as the source of truth, it can be used to populate data stores and support polyglot views on the frontend. You can read more about such patterns in Ben Stopford’s book Designing Event-Driven Systems.

One of the patterns discussed in the book is “CQRS with blocking reads”, which he describes as …collapsing the asynchronicity of the CQRS pattern so that it appears synchronous to the client. This post explores my implementation of this pattern and how I solved the challenges I encountered.

I am working on a side project in which I implemented this pattern, you can check out the site here. Below is a cross section of the process and components involved in creating a new user in the system.

Figure 1: create-user flow
Figure 1: create-user flow
---> produces to
---> consumes from

  1. graphql service receives a “create user” request and produces a “create user” message unto the create-user-topic.
  2. “create user” message is consumed by the user microservice.
  3. user microservice creates user by writing to database.
  4. user microservice produces “user created” message unto user-created-topic.
  5. graphql service consumes user created message and responds to waiting client.
  6. user microservice produces “send welcome email” message unto send-email-topic.
  7. emailer microservice consumes “send welcome email” message and sends verification email to client.
  8. elasticsearch kafka connect sink consumes from user-created-topic and writes new user document to elasticsearch

This post is going to focus on steps 1-5

Figure 2: Request/Response
Figure 2: Request/Response

I will present the problem by means of a scenario.

  • Imagine you have 3 instances of the graphql service (call them graphql-service-1, graphql-service-2, graphql-service-3) sitting behind a loadbalancer.
  • A client from their browser establishes a TCP connection through the loadbalancer and gets routed to graphql-service-1.
  • graphql-service-1 recieves the mutation request and produces a “create user” message to the create-user-topic. The socket the mutation request came on is left open for the response to be sent back on.
  • This “create user” message makes its way through the system until it finds its way back to the user-created-topic.
  • But remember you have 3 instances of the graphql service, you have to ensure that graphql-service-1 consumes this specific message from a topic which has a myriad of “user created” messages.
  • To further complicate matters, graphql-service-1 is also handling multiple “create user” requests at the same time, you would have to ensure this specific message finds its way back to the specific waiting socket on graphql-service-1 on which the request arrived through.

Approaches I have tried previously

  • Use a unique key (uuid) for each request message, then scan through the messages in the results topic until you find the response you are looking for.
    The drawback to this approach is that you no longer get the strong ordering guarantees you get by using the same key for related data
  • Use a unique topic for each request.
    The drawback to this approach is that you end up with an insane amount of topics, and you are pretty much using kafka as a key value store, which is not its intended use.

I decided to go with a solution with no blocking on the main thread and which embraces kafka’s use patterns and asynchronousity .

Below is a distributed trace timeline (Click on diagram to zoom) of the solution, showing the request moving through the system.

Figure 3: Distributed trace timeline
Figure 3: Distributed trace timeline

graphql service emailer microservice

My implementation was done using Clojure, a functional JVM based programming language. Functional programming tends to go well with async, but I am sure the ideas can easily be replicated in any programming language.

From here on out, I assume you have a working understanding of kafka and clojure. For brevity, only relevant code segments are included.

From a high level, what I was after was semantics which combines the publishing and subscribtion (PubScribe :-) ) to kafka as a single transaction. Which would read as “publish a value to topic-1 and expect a result from topic-2”

(defrecord KafKaWaitService [producer-props consumer-props val check-fn?])
;; producer-props: a map with keys to initialize producer
;; e.g {:k-ser string-ser :v-ser json-ser}

;; example consumer-props: {:c-topic user-created-topic
;;                          :k-des string-des :v-des avro-des}

;; val: a map representing the message which needs travel the system
;; example val: {:key email :value arguments :topic create-user-topic}

;; check-fn?: a single arity predicate.
;;            Determines if that is the response you are waiting for
;; example check-fn?: (fn [msg]
;;                      (let [{:keys [value]} msg]
;;                        (= (:email value) (:email arguments))))

I then defined a protocol which would be used to interact with the record.

(defprotocol IWaitService
  (start [this]
     "Setup components")
  (get-value!! [this timeout-ms timeout-val]
   "Returns the response.
    Blocks if response is not ready.
    Returns `timeout-val` if the result is not delivered within `timeout-ms`."))

Below is the record implementation of the protocol. (Again, only relevant code included)

(defrecord KafKaWaitService [producer-props consumer-props val check-fn?]
  ws/IWaitService
  (start [this]
    (let [f (CompletableFuture.) ;; 1
          c-conf {:group.id (uuid-str) ;;2
                  :auto.offset.reset "latest" ;; 3
                  :enable.auto.commit "false"}
          consumers (kc/start-consumers
                      [(kc/map->WaitConsumer
                        {:partitions-assigned #(with-open [producer (kp/tracing-producer producer-props)]
                                                 (kp/send! producer val) ;;4
                                                )
                         :processor (kc/processor-consumer #(when (check-fn? %) ;; 5
                                                             (.complete f %) ;; 6
                                                             ))})])]
      (assoc this :consumer consumer :result f) ;; 7
     ))
  (get-value!! [this timeout-ms timeout-val]
    (try
      (let [{:keys [result]} this]
        (.get result timeout-ms (TimeUnit/MILLISECONDS)) ;; 8
       )
      (catch TimeoutException _
        timeout-val ;; 9
       )))
  AutoCloseable ;; 10
  (close [this]
    (let [{:keys [consumer]} this]
      (kc/stop-consumers consumer) ;; 11
   )))


(defn new-kafka-wait-service [p-props c-props val check-fn?] ;; 12
  (map->KafKaWaitService {:producer-props p-props :consumer-props c-props :val val :check-fn? check-fn?}))
  1. A completable future is defined which would hold the result.
  2. Each consumer group id is unique (uuid), representing each unique request.
  3. The consumer is setup to check for a result in the future, so we begin consuming from the end of the topic
  4. The consumer macro ensures the consumer is ready, then sends the “val”
  5. The “check-fn?” is applied to each message received on the result topic
  6. If the predicate returns true, then the completable future from step 1 is completed with the current message.
  7. The start function returns the created record with references to the kafka consumer and the result future assoc-ed in.
  8. The get-value!! function simply returns the value of the future created in step 1, it blocks for timeout-ms if it is not yet ready.
    This is one of the tenets of reactive programming, operations should be responsive, and you achieve this by making sure you only wait for a definite amount of time.
  9. If it blocks for up to timeout-ms, then catch the timeout exception and return the timeout-val
  10. I also implemented the java.lang.AutoCloseable interface so I could use the record with the with-open macro to ensure resource cleanup and prevent memory leaks.
  11. Cleanup step
  12. Wrapper function used to create an instance of the record.

Below is an example of how it all comes together.
I use go-blocks and channels to prevent blocking the main thread

(let [ret-chan (async/chan)
      wait-time 20]
  (async/go
    (with-open [ws (-> (kw/new-kafka-wait-service producer-props consumer-props
                            {:key email :value arguments :topic create-user-topic}
                            #(let [{:keys [value]} %]
                              (= (:email value) (:email arguments))))
                        kw/start)]
        (let [user (kw/get-value!! ws (* wait-time 1000) :timeout)]
          (if-not (= user :timeout)
            (async/>! ret-chan (:value user))
            (async/>! ret-chan (Exception.
                                  (format "Timed out after %d seconds" wait-time)))))))
   ret-chan)

I thought I was done, but then I realized I was getting timeouts once in a while.

I dived deeper into kafka’s documentation.
Some background … when a group of consumers subscribe to a topic as part of a Consumer Group, the consumer group coordinator divides the partitions among the consumer instances in the group. When these consumer instances receive their assignments, they can be configured to start consuming from the partition in 1 of 3 ways; earliest (beginning of partition), latest (end of partition) or if you choose to manually seek to a specific offset.

Imagine we have 1 partition on a topic, this partition has 10 messages, and a new consumer group subscribes, configured to latest. But latest is not a specific offset, it is a moving reference. At this point latest would point to offset 10. The consumer group coordinator then assigns this single partition to a single consumer instance, at this point the assignment is of the form ”consumer-0 consume from partition-0 starting from latest”. Once partitions have been assigned, from kafka’s point of view this consumer group is considered started, eventhough the consumer instance has not yet received the fixed offset to start consuming from, it is still to determine what latest points to.
While all of this is happening asynchronously, let’s say 2 new messages are sent to that topic. That would mean, by the time the consumer instance starts consuming, latest has changed from 10 to 12.
In my case, I would start the consumer, the partition assignment happens, which I assumed meant the consumer was ready. I would then start the request chain, if the response took less time to get ready than the consumer instance took to get its fixed starting point, the response would be missed because it would fall behind the new latest.
What I learned from this is that with kafka consumers, started != ready (which makes sense for what it was defined for)

As you can see from the distributed trace timeline image (Figure 1), some time is elapsed between the time the consumer starts to when the partitions are assigned. Below is the area of interest.

Figure 4: Partitions assigned asynchronously
Figure 4: Partitions assigned asynchronously

So the next problem to solve was figuring out when a “started” consumer was “ready”. I solved this by getting the fixed offset at the time of assignment, and manually setting the starting point of each consumer, then invoking a partitions-assigned callback which I defined.

;; I implement the kafka ConsumerRebalanceListener Interface
(reify ConsumerRebalanceListener
    (onPartitionsRevoked [this topic-partitions])
    (onPartitionsAssigned [this topic-partitions]))
tracer/IOnPartitionsAssigned
(on-partitions-assigned [this group-id opts topic-partitions]
  (let [{:keys [::kafka-consumer]} this
        _ (.assignment kafka-consumer)
        assignments (map map->TopicPartition topic-partitions)
        positions (map (fn [a]
                          {:assignment a
                           :position (.position kafka-consumer a)})
                    assignments)]
    (doseq [p positions]
      (.seek kafka-consumer (:assignment p) (:position p)))
    (partitions-assigned)))

To conclude, kafka’s consumption default use case is for long running asynchronous processes, but I was trying to use it for short lived synchronous requests. This was achieved by combining production and consumption into a single transaction. Then configuring the internal consumer within the transaction to consume first automatically to latest then manually (seek) after partition assignments. As expected, trying to use something outside its defaults, would entail going a little under the hood. Luckily, kafka exposed all the pieces needed to make the necessary tweaks within the respective callbacks.

Copyright © 2020 Husayn Arrah.