In designing scalable architectures, Command Query Responsibility Segregation (CQRS) and Event Sourcing is a common pattern used. A typical implementation of CQRS would have a component like kafka residing in the middle of everything. The system is then coordinated by the movement of events as services publish and consume from kafka. With kafka as the source of truth, it can be used to populate data stores and support polyglot views on the frontend. You can read more about such patterns in Ben Stopford’s book Designing Event-Driven Systems.
One of the patterns discussed in the book is “CQRS with blocking reads”, which he describes as …collapsing the asynchronicity of the CQRS pattern so that it appears synchronous to the client. This post explores my implementation of this pattern and how I solved the challenges I encountered.
I am working on a side project in which I implemented this pattern, you can check out the site here. Below is a cross section of the process and components involved in creating a new user in the system.
---> produces to
---> consumes from
This post is going to focus on steps 1-5
I will present the problem by means of a scenario.
Approaches I have tried previously
I decided to go with a solution with no blocking on the main thread and which embraces kafka’s use patterns and asynchronousity .
Below is a distributed trace timeline (Click on diagram to zoom) of the solution, showing the request moving through the system.
graphql service emailer microservice
My implementation was done using Clojure, a functional JVM based programming language. Functional programming tends to go well with async, but I am sure the ideas can easily be replicated in any programming language.
From here on out, I assume you have a working understanding of kafka and clojure. For brevity, only relevant code segments are included.
From a high level, what I was after was semantics which combines the publishing and subscribtion (PubScribe :-) ) to kafka as a single transaction. Which would read as “publish a value to topic-1 and expect a result from topic-2”
(defrecord KafKaWaitService [producer-props consumer-props val check-fn?])
;; producer-props: a map with keys to initialize producer
;; e.g {:k-ser string-ser :v-ser json-ser}
;; example consumer-props: {:c-topic user-created-topic
;; :k-des string-des :v-des avro-des}
;; val: a map representing the message which needs travel the system
;; example val: {:key email :value arguments :topic create-user-topic}
;; check-fn?: a single arity predicate.
;; Determines if that is the response you are waiting for
;; example check-fn?: (fn [msg]
;; (let [{:keys [value]} msg]
;; (= (:email value) (:email arguments))))
I then defined a protocol which would be used to interact with the record.
(defprotocol IWaitService
(start [this]
"Setup components")
(get-value!! [this timeout-ms timeout-val]
"Returns the response.
Blocks if response is not ready.
Returns `timeout-val` if the result is not delivered within `timeout-ms`."))
Below is the record implementation of the protocol. (Again, only relevant code included)
(defrecord KafKaWaitService [producer-props consumer-props val check-fn?]
ws/IWaitService
(start [this]
(let [f (CompletableFuture.) ;; 1
c-conf {:group.id (uuid-str) ;;2
:auto.offset.reset "latest" ;; 3
:enable.auto.commit "false"}
consumers (kc/start-consumers
[(kc/map->WaitConsumer
{:partitions-assigned #(with-open [producer (kp/tracing-producer producer-props)]
(kp/send! producer val) ;;4
)
:processor (kc/processor-consumer #(when (check-fn? %) ;; 5
(.complete f %) ;; 6
))})])]
(assoc this :consumer consumer :result f) ;; 7
))
(get-value!! [this timeout-ms timeout-val]
(try
(let [{:keys [result]} this]
(.get result timeout-ms (TimeUnit/MILLISECONDS)) ;; 8
)
(catch TimeoutException _
timeout-val ;; 9
)))
AutoCloseable ;; 10
(close [this]
(let [{:keys [consumer]} this]
(kc/stop-consumers consumer) ;; 11
)))
(defn new-kafka-wait-service [p-props c-props val check-fn?] ;; 12
(map->KafKaWaitService {:producer-props p-props :consumer-props c-props :val val :check-fn? check-fn?}))
assoc-ed
in.timeout-ms
if it is not yet ready.timeout-ms
, then catch the timeout exception and return the timeout-val
java.lang.AutoCloseable
interface so I could use the record with the with-open
macro to ensure resource cleanup and prevent memory leaks.Below is an example of how it all comes together.
I use go-blocks
and channels
to prevent blocking the main thread
(let [ret-chan (async/chan)
wait-time 20]
(async/go
(with-open [ws (-> (kw/new-kafka-wait-service producer-props consumer-props
{:key email :value arguments :topic create-user-topic}
#(let [{:keys [value]} %]
(= (:email value) (:email arguments))))
kw/start)]
(let [user (kw/get-value!! ws (* wait-time 1000) :timeout)]
(if-not (= user :timeout)
(async/>! ret-chan (:value user))
(async/>! ret-chan (Exception.
(format "Timed out after %d seconds" wait-time)))))))
ret-chan)
I thought I was done, but then I realized I was getting timeouts once in a while.
I dived deeper into kafka’s documentation.
Some background … when a group of consumers subscribe to a topic as part of a Consumer Group, the consumer group coordinator divides the partitions among the consumer instances in the group. When these consumer instances receive their assignments, they can be configured to start consuming from the partition in 1 of 3 ways; earliest
(beginning of partition), latest
(end of partition) or if you choose to manually seek
to a specific offset.
Imagine we have 1 partition on a topic, this partition has 10 messages, and a new consumer group subscribes, configured to latest
. But latest is not a specific offset, it is a moving reference. At this point latest
would point to offset 10
. The consumer group coordinator then assigns this single partition to a single consumer instance, at this point the assignment is of the form ”consumer-0 consume from partition-0 starting from latest”. Once partitions have been assigned, from kafka’s point of view this consumer group is considered started, eventhough the consumer instance has not yet received the fixed offset to start consuming from, it is still to determine what latest points to.
While all of this is happening asynchronously, let’s say 2 new messages are sent to that topic. That would mean, by the time the consumer instance starts consuming, latest
has changed from 10
to 12
.
In my case, I would start the consumer, the partition assignment happens, which I assumed meant the consumer was ready. I would then start the request chain, if the response took less time to get ready than the consumer instance took to get its fixed starting point, the response would be missed because it would fall behind the new latest.
What I learned from this is that with kafka consumers, started != ready (which makes sense for what it was defined for)
As you can see from the distributed trace timeline image (Figure 1), some time is elapsed between the time the consumer starts to when the partitions are assigned. Below is the area of interest.
So the next problem to solve was figuring out when a “started” consumer was “ready”. I solved this by getting the fixed offset at the time of assignment, and manually setting the starting point of each consumer, then invoking a partitions-assigned
callback which I defined.
;; I implement the kafka ConsumerRebalanceListener Interface
(reify ConsumerRebalanceListener
(onPartitionsRevoked [this topic-partitions])
(onPartitionsAssigned [this topic-partitions]))
tracer/IOnPartitionsAssigned
(on-partitions-assigned [this group-id opts topic-partitions]
(let [{:keys [::kafka-consumer]} this
_ (.assignment kafka-consumer)
assignments (map map->TopicPartition topic-partitions)
positions (map (fn [a]
{:assignment a
:position (.position kafka-consumer a)})
assignments)]
(doseq [p positions]
(.seek kafka-consumer (:assignment p) (:position p)))
(partitions-assigned)))
To conclude, kafka’s consumption default use case is for long running asynchronous processes, but I was trying to use it for short lived synchronous requests. This was achieved by combining production and consumption into a single transaction. Then configuring the internal consumer within the transaction to consume first automatically to latest
then manually (seek
) after partition assignments. As expected, trying to use something outside its defaults, would entail going a little under the hood. Luckily, kafka exposed all the pieces needed to make the necessary tweaks within the respective callbacks.