Allow blocking the queue and retrying messages from the failed message's offset #103
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Hi there 👋🏻 ! I'd like to implement the option to block the queue and retry failed messages from the failed message's offset. It's the same need as in #30.
Blocking the queue and retrying failed has the advantage that bugs can be detected early and can be fixed on the consumer (and possibly the producer's) side, without losing the order of messages. It has the disadvantage that non-failing messages will be processed many times over. And it can only work with a single
BroadwayKafka.Producer
per partition. But that's a penalty that I'm willing to take to keep a consistent messages order.The flow:
failed
.It will only work if the Producer commits offsets after the queue in its state has been drained.
This PR implements this flow, and on my local environment I can confirm that the failed message is continuously retried. I added a configuration option called
retry_on_failure
with the default set tofalse
.Unfortunately, I'm struggling with the test a bit. It looks like the current implementation of
MessageServer
can't retry messages and only pushes (but maybe I'm mistaken). Ideally, the messages in the queue are retried until the whole queue is empty, and after 2 confirmed retries we stop the process and the test passes.Is it possible to add retry logic to
ProducerTest
without rewritingMessageServer
? Feedback is very welcome 🙏🏻Thanks for writing such great libraries! 🙇🏻