Skip to content

Commit

Permalink
[hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove
Browse files Browse the repository at this point in the history
Additionally, remove dead code and check that an
exception was thrown.
  • Loading branch information
rkhachatryan committed Feb 22, 2021
1 parent bd8e406 commit bd91b6c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,20 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;

import org.junit.Test;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;

import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mock;

/** Tests for basic {@link CompletedCheckpointStore} contract. */
public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
Expand Down Expand Up @@ -100,24 +97,28 @@ public void testAddCheckpointWithFailedRemove() throws Exception {
CompletedCheckpointStore store =
createCompletedCheckpoints(numCheckpointsToRetain, Executors.directExecutor());

for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
doReturn(i).when(checkpointToAdd).getCheckpointID();
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();

try {
store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {});

// The checkpoint should be in the store if we successfully add it into the store.
List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
assertTrue(addedCheckpoints.contains(checkpointToAdd));
} catch (Exception e) {
// The checkpoint should not be in the store if any exception is thrown.
List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
assertFalse(addedCheckpoints.contains(checkpointToAdd));
}
CountDownLatch discardAttempted = new CountDownLatch(1);
for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
CompletedCheckpoint checkpointToAdd =
new CompletedCheckpoint(
new JobID(),
i,
i,
i,
Collections.emptyMap(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
new TestCompletedCheckpointStorageLocation()) {
@Override
public boolean discardOnSubsume() {
discardAttempted.countDown();
throw new RuntimeException();
}
};
// should fail despite the exception
store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {});
}
discardAttempted.await();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -363,70 +358,4 @@ public Void answer(InvocationOnMock invocation)
// are subsumed should they be discarded.
verify(failingRetrievableStateHandle, never()).discardState();
}

/**
* Tests that the checkpoint does not exist in the store when we fail to add it into the store
* (i.e., there exists an exception thrown by the method).
*/
@Test
public void testAddCheckpointWithFailedRemove() throws Exception {
final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
mock(RetrievableStateStorageHelper.class);

ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));

doAnswer(
new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
@Override
public RetrievableStateHandle<CompletedCheckpoint> answer(
InvocationOnMock invocationOnMock) throws Throwable {
CompletedCheckpoint checkpoint =
(CompletedCheckpoint) invocationOnMock.getArguments()[1];

RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle =
mock(RetrievableStateHandle.class);
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);

return retrievableStateHandle;
}
})
.when(zookeeperStateHandleStoreMock)
.addAndLock(anyString(), any(CompletedCheckpoint.class));

doThrow(new Exception())
.when(zookeeperStateHandleStoreMock)
.releaseAndTryRemove(anyString());

final int numCheckpointsToRetain = 1;

CompletedCheckpointStore zooKeeperCompletedCheckpointStore =
new DefaultCompletedCheckpointStore<>(
numCheckpointsToRetain,
zookeeperStateHandleStoreMock,
zooKeeperCheckpointStoreUtil,
Executors.directExecutor());

for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
doReturn(i).when(checkpointToAdd).getCheckpointID();
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();

try {
zooKeeperCompletedCheckpointStore.addCheckpoint(
checkpointToAdd, new CheckpointsCleaner(), () -> {});

// The checkpoint should be in the store if we successfully add it into the store.
List<CompletedCheckpoint> addedCheckpoints =
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
assertTrue(addedCheckpoints.contains(checkpointToAdd));
} catch (Exception e) {
// The checkpoint should not be in the store if any exception is thrown.
List<CompletedCheckpoint> addedCheckpoints =
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
assertFalse(addedCheckpoints.contains(checkpointToAdd));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
Expand All @@ -26,6 +27,7 @@
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
Expand All @@ -42,10 +44,13 @@
import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.rethrow;
Expand Down Expand Up @@ -296,4 +301,43 @@ public long getStateSize() {
return 0;
}
}

/**
* Tests that the checkpoint does not exist in the store when we fail to add it into the store
* (i.e., there exists an exception thrown by the method).
*/
@Test
public void testAddCheckpointWithFailedRemove() throws Exception {

final int numCheckpointsToRetain = 1;
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());

final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
final CompletedCheckpointStore store = createZooKeeperCheckpointStore(client);

CountDownLatch discardAttempted = new CountDownLatch(1);
for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
CompletedCheckpoint checkpointToAdd =
new CompletedCheckpoint(
new JobID(),
i,
i,
i,
Collections.emptyMap(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
new TestCompletedCheckpointStorageLocation());
// shouldn't fail despite the exception
store.addCheckpoint(
checkpointToAdd,
new CheckpointsCleaner(),
() -> {
discardAttempted.countDown();
throw new RuntimeException();
});
}
discardAttempted.await();
}
}

0 comments on commit bd91b6c

Please sign in to comment.