Skip to content

Commit

Permalink
[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
Browse files Browse the repository at this point in the history
…ult maxRatePerPartition setting of 0

Author: cody koeninger <[email protected]>

Closes apache#8413 from koeninger/backpressure-testing-master.
  • Loading branch information
koeninger authored and tdas committed Aug 25, 2015
1 parent 5175ca0 commit d9c25de
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[

val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
.getOrElse(maxRateLimitPerPartition)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
limit / numPartitions
}
}.getOrElse(maxRateLimitPerPartition)

if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Expand Down

0 comments on commit d9c25de

Please sign in to comment.