Skip to content

Commit

Permalink
KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup ra…
Browse files Browse the repository at this point in the history
…ce on unassigned task directories (apache#15088)

KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides with parallel state store cleanup executions.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
sanepal authored Jan 8, 2024
1 parent b2bfd5d commit f1a0207
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1249,9 +1249,15 @@ private void tryToLockAllNonEmptyTaskDirectories() {
try {
final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology);
if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id);
if (!allTasks.containsKey(id)) {
log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
// Check again in case the cleaner thread ran and emptied the directory
if (stateDirectory.directoryForTaskIsEmpty(id)) {
log.debug("Releasing lock on empty directory for task {}", id);
stateDirectory.unlock(id);
} else {
lockedTaskDirectories.add(id);
if (!allTasks.containsKey(id)) {
log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
}
}
}
} catch (final TaskIdFormatException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,7 @@ public void shouldNotLockAnythingIfStateDirIsEmpty() {
public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
expectLockObtainedFor(taskId01);
expectLockFailedFor(taskId10);
expectDirectoryNotEmpty(taskId01);

makeTaskFolders(
taskId01.toString(),
Expand All @@ -1918,6 +1919,21 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}

@Test
public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
expectLockObtainedFor(taskId01, taskId10);
expectDirectoryNotEmpty(taskId01);
expect(stateDirectory.directoryForTaskIsEmpty(taskId10)).andReturn(true);
expectUnlockFor(taskId10);

makeTaskFolders(taskId01.toString(), taskId10.toString());
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));

verify(stateDirectory);
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}

@Test
public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
Expand Down Expand Up @@ -1951,6 +1967,7 @@ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
@Test
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
expectLockObtainedFor(taskId00, taskId01, taskId02);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02);
expectUnlockFor(taskId02);

makeTaskFolders(
Expand Down Expand Up @@ -1993,6 +2010,7 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater()
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
expectUnlockFor(taskId03);
makeTaskFolders(
taskId00.toString(),
Expand Down Expand Up @@ -2121,6 +2139,7 @@ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
replay(stateDirectory);

Expand All @@ -2144,6 +2163,7 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception {
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));

expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
replay(stateDirectory);

Expand Down Expand Up @@ -2243,6 +2263,7 @@ public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
expectDirectoryNotEmpty(taskId00);
expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00));
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
Expand Down Expand Up @@ -2350,6 +2371,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {

makeTaskFolders(taskId00.toString(), taskId01.toString());
expectLockObtainedFor(taskId00, taskId01);
expectDirectoryNotEmpty(taskId00, taskId01);

// The second attempt will return empty tasks.
makeTaskFolders();
Expand Down Expand Up @@ -4050,7 +4072,8 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);

makeTaskFolders(taskId00.toString(), task01.toString());
makeTaskFolders(taskId00.toString(), taskId01.toString());
expectDirectoryNotEmpty(taskId00, taskId01);
expectLockObtainedFor(taskId00, taskId01);
expectRestoreToBeCompleted(consumer);
when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
Expand Down Expand Up @@ -4829,6 +4852,12 @@ private void expectUnlockFor(final TaskId... tasks) throws Exception {
}
}

private void expectDirectoryNotEmpty(final TaskId... tasks) {
for (final TaskId taskId : tasks) {
expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(false);
}
}

private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
expect(consumer.assignment()).andReturn(assignment);
Expand Down Expand Up @@ -5119,6 +5148,7 @@ private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Lo
Files.createFile(checkpointFile.toPath());
new OffsetCheckpoint(checkpointFile).write(offsets);
expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
expectDirectoryNotEmpty(task);
}

private File getCheckpointFile(final TaskId task) {
Expand Down

0 comments on commit f1a0207

Please sign in to comment.