-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: proposal to implement a subscription factory with relative offset. #886
WIP: proposal to implement a subscription factory with relative offset. #886
Conversation
Hi @jewertow, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
@ennru I will be grateful if you tell me if this is a right way. |
Hi @jewertow |
I will continue working on this PR when I finish #900. |
c4e764c
to
1becd6a
Compare
@ennru |
Thank you for getting back to this. I think it would be interesting to explore how the new PartitionAssignmentHandler can be used to implement any kind of non-standard seeking. The idea would be to provide a handler which would react on assignments and apply custom seeking via the provided |
…ed on PartitionAssignmentHandler
import scala.jdk.CollectionConverters._ | ||
|
||
object RelativeOffsetAssignmentHandler { | ||
def apply(backOffset: Long): RelativeOffsetAssignmentHandler = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seeks all assigned partitions to offset = endOffset - backOffset.
def apply(backOffset: Long): RelativeOffsetAssignmentHandler = | ||
new RelativeOffsetAssignmentHandler(None, Some(backOffset)) | ||
|
||
def apply(topicPartition: TopicPartition, backOffset: Long): RelativeOffsetAssignmentHandler = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seeks given partition to offset = end offset - backOffset and all others are assigned to beginning offset.
def apply(topicPartition: TopicPartition, backOffset: Long): RelativeOffsetAssignmentHandler = | ||
new RelativeOffsetAssignmentHandler(Some(Map(topicPartition -> backOffset))) | ||
|
||
def apply(topicPartitionBackOffset: Map[TopicPartition, Long]): RelativeOffsetAssignmentHandler = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It allows to seek multiple partitions to offset = end offset - specified offset. All others assigned partitions are seeking to beginning offset.
def apply(topicPartitionBackOffset: Map[TopicPartition, Long]): RelativeOffsetAssignmentHandler = | ||
new RelativeOffsetAssignmentHandler(Some(topicPartitionBackOffset)) | ||
|
||
def apply(tps: Set[TopicPartition], backOffset: Long): RelativeOffsetAssignmentHandler = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is intended to seek back all the given partitions with the same offset. All others assigned partitions are seeking to beginning offset.
@ennru |
…e offset." This reverts commit 1becd6a.
@seglo what do you think about it? |
I would expect this relative offset seeking is interesting only when a partition is assigned the first time after an Alpakka Kafka consumer is started. |
I forgot about rebalances. I will investigate it again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good so far. I think it would be nice for this feature to include support for a time offset as Martynas originally suggested in #538. i.e. "I want to resume streaming from 10 minutes in the past"
There has been a need to query certain Kafka topics with relative offsets, for example last offset, or last offset minus 100, or last offset minus 10 minutes.
This can be done by looking up the offset for wallclock time - time interval to go back
. Then you can lookup for the earliest offset(s) around that time with KafkaConsumer.offsetForTimes
.
topicPartitionsBackOffset match { | ||
case Some(partitionOffset) => | ||
val tps = partitionOffset.keys.filter(assignedTps.contains) | ||
consumer.endOffsets(tps.toSet.asJava).asScala.foreach { tpOffset => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be better expressed with pattern matching on the tuple, and perhaps a for expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e.
for {
(partition, offset) <- consumer.endOffsets(tps.toSet.asJava).asScala
} {
// ...
}
backOffset.foreach { offset => | ||
assignedTps.foreach { tp => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A for expression would be nice here to reduce the nesting. i.e.
for (offset <- backOffset; tp <- assignedTps) {
...
@jewertow Just curious why you closed the PR. Are you still interested in pursuing this? |
@seglo Yes, I want to implement it in the near future, but first I want to finish few other tasks in other projects that I opened. Open but inactive PR is annoying and disrupts the status of work in the project. |
Alright. Hope to see you back soon :) |
Purpose
Introduction of a subscription factory with relative offset definitions.
References
See #538
Changes
RelativeOffsetSubscriptions.assignmentForLastN
Background Context
There has been a need to query certain Kafka topics with relative offsets, for example last offset, or last offset minus 100, or last offset minus 10 minutes.