Skip to content

Commit

Permalink
KAFKA-12185: fix ConcurrentModificationException in newly added Tasks…
Browse files Browse the repository at this point in the history
… container class (apache#9940)

Reviewers: Guozhang Wang <[email protected]>, A. Sophie Blee-Goldman <[email protected]>
  • Loading branch information
mjsax authored Jan 21, 2021
1 parent 24e7e81 commit 92e72f7
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ void convertActiveToStandby(final StreamTask activeTask,
if (activeTasksPerId.remove(activeTask.id()) == null) {
throw new IllegalStateException("Attempted to convert unknown active task to standby task: " + activeTask.id());
}
activeTasksPerPartition.entrySet().stream()
final Set<TopicPartition> toBeRemoved = activeTasksPerPartition.entrySet().stream()
.filter(e -> e.getValue().id().equals(activeTask.id()))
.forEach(e -> activeTasksPerPartition.remove(e.getKey()));
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
toBeRemoved.forEach(activeTasksPerPartition::remove);

cleanUpTaskProducerAndRemoveTask(activeTask.id(), taskCloseExceptions);

Expand Down
Loading

0 comments on commit 92e72f7

Please sign in to comment.