Kafka Delay Queue: When Messages Need a Nap Before They Work
Do you ever want to stop time? Ever wanted to stop an email from being sent immediately or ever wanted to delay an embarrassing text message so that you can reconsider your life choices? In the distributed universe of Kafka, message consumption takes place instantly once it is produced. But this isn't ideal every time; we may want some delay between the production and consumption of a message. There is no direct built-in way to do this in Kafka. But we can develop our own custom solution for this with Kaka itself without using any external tools.
What is Delayed Message Processing?
It is a concept in which a message is not processed directly once produced but after some delay. But Kafka processes messages as soon as they are produced.
Why it is not in Kafka?
There is no built-in mechanism for us to delay processing of messages. Because at its core, Kafka is designed for high throughput. which leads to a fast message delivery note for intentionally delaying messages.
Use Cases for Delayed Processing
✅
A user is onboarded on your application, and you want to send them a tutorial notification after some hours.
✅
You are calling third-party APIs, but they are responding with failure. So you may need to delay the process for some time.
✅
Schedule some tasks for the future.
✅
Call microservices at different times.
✅
and so on...
How to make Delay Queue in Kafka?
Kafka provides us a retry mechanism that helps us to retry failed messages after some time. We can configure this time according to us and also how many times we want to retry.
This will be the base concept of our delay queue implementation. We are going to intentionally throw an exception while processing a message, which will trigger the cough retry mechanism, and it will make sure that the message will be reprocessed after a predefined delay.
For this mechanism to work, we need to track how many times a message is processed. We can use Kafka headers for that. When a message is produced along with payload data, several metadata are also attached to that message in the form of headers. There is a Kafka header that tells us how many times a message is processed.
If the message is being processed for the first time, we will stop it by throwing an error, which will trigger the retry mechanism.
Code Snippet
I am going to use Spring Boot for this implementation, but you can do it in almost every programming language if you follow the above-mentioned mechanism. In the upcoming days, I will also provide a Golang implementation for this.
We are annotating this topic as a reliable topic with a delay of 60000 ms, and we are mentioning we only want to consume 2 times.
Then we are actually using kafka built in annotation @KafkaListener to listen on a topic.
In method parameters, we are getting actual payload data and a variable.
consumeAttempt.
There is no need to explicitly assign this header while producing a message because Kafka handles it internally.For the first time, a variable
consumeAttempt
will be now, so we are throwing an exception. But after 60 seconds, this method will be again executed with the same payload. And this time valueconsumeAttempt
will be 2. Because we are trying to consume this message a 2nd time.Hence it will be processed normally.
Varied Delays
If you want to have a more dynamic time for delays instead of just waiting for 60 seconds, you will need to create another topic with a different @Backoff
value and send your messages to that topic. But you can also create several partitions for one topic and assign a listener with different @Backoff
values to each listener and send your messages to a particular partition.
Just a min….
I am very early in my career, and I am constantly learning and improving. If you have any suggestions related to whatever we discussed in this block, I will be very happy to listen to them. You can use the comment section to share your opinion.