Skip to content

Commit

Permalink
Merge pull request apache#1454 from MabelYC/fixDupLog
Browse files Browse the repository at this point in the history
SAMZA-2614:Fix NPE at KafkaCheckpointManager.start in standby containers
  • Loading branch information
sborya authored Jan 5, 2021
2 parents 5748fa6 + f013058 commit f7f9b90
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory

var systemConsumer: SystemConsumer = _
var systemAdmin: SystemAdmin = _
val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)

var taskNames: Set[TaskName] = Set[TaskName]()
var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _

var producerRef: AtomicReference[SystemProducer] = _
val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
val producerCreationLock: Object = new Object

// if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
Expand All @@ -86,21 +86,20 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
*
*/
override def createResources(): Unit = {
val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
Preconditions.checkNotNull(systemAdmin)

systemAdmin.start()
val createResourcesSystemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName + "createResource")
Preconditions.checkNotNull(createResourcesSystemAdmin)
createResourcesSystemAdmin.start()
try {
info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
s"partition count: ${checkpointSpec.getPartitionCount}")
systemAdmin.createStream(checkpointSpec)
createResourcesSystemAdmin.createStream(checkpointSpec)

if (validateCheckpoint) {
info(s"Validating checkpoint stream")
systemAdmin.validateStream(checkpointSpec)
createResourcesSystemAdmin.validateStream(checkpointSpec)
}
} finally {
systemAdmin.stop()
createResourcesSystemAdmin.stop()
}
}

Expand All @@ -124,10 +123,6 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
* @inheritdoc
*/
override def register(taskName: TaskName) {
systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
producerRef = new AtomicReference[SystemProducer](getSystemProducer())

debug(s"Registering taskName: $taskName")
producerRef.get().register(taskName.getTaskName)
taskNames += taskName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])

Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()

checkPointManager.register(taskName)
checkPointManager.start
Expand Down

0 comments on commit f7f9b90

Please sign in to comment.