Skip to content

Commit

Permalink
SAMZA-720: Fix BootstrapChooser hanging issue. Backport to 0.9.1
Browse files Browse the repository at this point in the history
  • Loading branch information
yanfang724 authored and nickpan47 committed Jun 23, 2015
1 parent 176b9ab commit 72c2905
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,22 @@ class BootstrappingChooser(
}
.toSet

/**
* Store all the systemStreamPartitions registered
*/
var registeredSystemStreamPartitions = Set[SystemStreamPartition]()

/**
* The number of lagging partitions that the underlying wrapped chooser has
* been updated with, grouped by SystemStream.
*/
var updatedSystemStreams = Map[SystemStream, Int]()

def start = {
// remove the systemStreamPartitions not registered.
laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}

debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
info("Got lagging partition counts for bootstrap streams: %s" format systemStreamLagCounts)
metrics.setLaggingSystemStreams(() => laggingSystemStreamPartitions.size)
Expand All @@ -118,6 +127,8 @@ class BootstrappingChooser(
checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)

wrapped.register(systemStreamPartition, offset)

registeredSystemStreamPartitions += systemStreamPartition
}

def update(envelope: IncomingMessageEnvelope) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,24 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
assertNull(chooser.choose)
// Fin.
}

@Test
def testChooserRegisteredCorrectSsps {
val mock = new MockMessageChooser
val metadata1 = getMetadata(envelope1, "123")
val metadata2 = getMetadata(envelope2, "321")
val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2))

chooser.register(envelope1.getSystemStreamPartition, "1")
chooser.register(envelope2.getSystemStreamPartition, "1")
chooser.start

// it should only contain stream partition 0 and stream1 partition 1
val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition)
assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
}
}

object TestBootstrappingChooser {
Expand Down

0 comments on commit 72c2905

Please sign in to comment.