Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: proposal to implement a subscription factory with relative offset. #886

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.scaladsl

import akka.kafka.RestrictedConsumer
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._

object RelativeOffsetAssignmentHandler {
def apply(backOffset: Long): RelativeOffsetAssignmentHandler =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seeks all assigned partitions to offset = endOffset - backOffset.

new RelativeOffsetAssignmentHandler(None, Some(backOffset))

def apply(topicPartition: TopicPartition, backOffset: Long): RelativeOffsetAssignmentHandler =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seeks given partition to offset = end offset - backOffset and all others are assigned to beginning offset.

new RelativeOffsetAssignmentHandler(Some(Map(topicPartition -> backOffset)))

def apply(topicPartitionBackOffset: Map[TopicPartition, Long]): RelativeOffsetAssignmentHandler =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allows to seek multiple partitions to offset = end offset - specified offset. All others assigned partitions are seeking to beginning offset.

new RelativeOffsetAssignmentHandler(Some(topicPartitionBackOffset))

def apply(tps: Set[TopicPartition], backOffset: Long): RelativeOffsetAssignmentHandler = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is intended to seek back all the given partitions with the same offset. All others assigned partitions are seeking to beginning offset.

val partitionsOffset = tps.map(tp => (tp, backOffset)).toMap
new RelativeOffsetAssignmentHandler(Some(partitionsOffset))
}
}

class RelativeOffsetAssignmentHandler private (topicPartitionsBackOffset: Option[Map[TopicPartition, Long]],
backOffset: Option[Long] = None)
extends PartitionAssignmentHandler {

override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()

override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = {
topicPartitionsBackOffset match {
case Some(partitionOffset) =>
val tps = partitionOffset.keys.filter(assignedTps.contains)
consumer.endOffsets(tps.toSet.asJava).asScala.foreach { tpOffset =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be better expressed with pattern matching on the tuple, and perhaps a for expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e.

for {
  (partition, offset) <- consumer.endOffsets(tps.toSet.asJava).asScala
} {
  // ...
}

val partition = tpOffset._1
val endOffset = tpOffset._2
val offset = endOffset - partitionOffset.getOrElse(partition, 0L)
consumer.seek(partition, offset)
}
case None =>
backOffset.foreach { offset =>
assignedTps.foreach { tp =>
Comment on lines +46 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A for expression would be nice here to reduce the nesting. i.e.

for (offset <- backOffset; tp <- assignedTps) {
  ...

val endOffset = consumer.endOffsets(assignedTps.asJava).get(tp)
val expectedOffset = endOffset - offset
consumer.seek(tp, expectedOffset)
}
}
}
}

override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()

override def onStop(currentTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()
}