Skip to content

Commit

Permalink
Fix breaking changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed Apr 6, 2024
1 parent d3a091d commit 837947c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.RangeAssignor
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -195,7 +194,7 @@ public fun <K, V> List<ConsumerRecords<K, V>>.offsets(
public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
name: String,
dispatcher: CoroutineDispatcher = IO,
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener(),
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener,
timeout: kotlin.time.Duration = 500.milliseconds,
): Flow<ConsumerRecord<K, V>> = flatMapConcat { consumer ->
consumer.subscribeTo(name, dispatcher, listener, timeout)
Expand All @@ -212,7 +211,7 @@ public fun <K, V> Flow<KafkaConsumer<K, V>>.subscribeTo(
public fun <K, V> KafkaConsumer<K, V>.subscribeTo(
name: String,
dispatcher: CoroutineDispatcher = IO,
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener(),
listener: ConsumerRebalanceListener = NoOpConsumerRebalanceListener,
timeout: kotlin.time.Duration = 500.milliseconds,
): Flow<ConsumerRecord<K, V>> = flow {
subscribe(listOf(name), listener)
Expand Down Expand Up @@ -365,3 +364,8 @@ public data class ConsumerSettings<K, V>(
put(EXCLUDE_INTERNAL_TOPICS_CONFIG, excludeInternalTopics)
}
}

private object NoOpConsumerRebalanceListener : ConsumerRebalanceListener {
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>?) {}
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>?) {}
}
3 changes: 3 additions & 0 deletions src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.Metric
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.PartitionInfo
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.AfterAll
Expand Down Expand Up @@ -295,6 +296,8 @@ abstract class KafkaSpec {
{
val producer = KafkaProducer(it.properties(), it.keySerializer, it.valueSerializer)
object : Producer<String, String> {
override fun clientInstanceId(p0: Duration?): Uuid =
producer.clientInstanceId(p0)
override fun close() {}

override fun close(timeout: Duration?) {}
Expand Down

0 comments on commit 837947c

Please sign in to comment.