Skip to content

Commit

Permalink
chore(atomix): use mock implementation of snapshot store in tests
Browse files Browse the repository at this point in the history
This is in preparation for refactoring snapshot implementations and moving the concrete implementation out of atomix.
  • Loading branch information
deepthidevaki committed Sep 1, 2020
1 parent e7266a2 commit 218e93b
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ public void shouldReplicateSnapshotWithManyFilesOnJoin() throws Exception {

assertThat(snapshot.getIndex()).isEqualTo(leaderSnapshot.getIndex()).isEqualTo(snapshotIndex);
assertThat(snapshot.getTerm()).isEqualTo(snapshot.getTerm());
assertThat(snapshot.getPath().toFile().list())
.containsExactlyInAnyOrder(leaderSnapshot.getPath().toFile().list());
}

@Test
Expand Down Expand Up @@ -251,7 +249,8 @@ public void shouldReplicateSnapshotAfterDataLoss() throws Exception {

assertThat(snapshot.getIndex()).isEqualTo(leaderSnapshot.getIndex()).isEqualTo(100);
assertThat(snapshot.getTerm()).isEqualTo(leaderSnapshot.getTerm());
assertThat(snapshot.getId()).isEqualTo(leaderSnapshot.getId());
assertThat(snapshot.getId().getSnapshotIdAsString())
.isEqualTo(leaderSnapshot.getId().getSnapshotIdAsString());
}

@Test
Expand All @@ -269,7 +268,6 @@ public void shouldReplicateSnapshotMultipleTimesAfterMultipleDataLoss() throws E
// when another data loss happens
raftRule.shutdownServer(follower);
raftRule.triggerDataLossOnNode(follower);
assertThat(firstSnapshot.getPath()).doesNotExist();
raftRule.bootstrapNode(follower);

// then snapshot is replicated again
Expand All @@ -278,9 +276,9 @@ public void shouldReplicateSnapshotMultipleTimesAfterMultipleDataLoss() throws E

assertThat(newSnapshot.getIndex()).isEqualTo(leaderSnapshot.getIndex()).isEqualTo(100);
assertThat(newSnapshot.getTerm()).isEqualTo(leaderSnapshot.getTerm());
assertThat(newSnapshot.getId()).isEqualTo(leaderSnapshot.getId());
assertThat(newSnapshot.getId().getSnapshotIdAsString())
.isEqualTo(leaderSnapshot.getId().getSnapshotIdAsString());
assertThat(newSnapshot).isEqualTo(firstSnapshot);
assertThat(newSnapshot.getPath()).exists();
}

@Test
Expand Down Expand Up @@ -347,7 +345,7 @@ public void shouldReplicateSnapshotToOldLeaderAfterRestart() throws Exception {
raftRule.joinCluster(leader);

// then
assertThat(raftRule.allNodesHaveSnapshotWithIndex(200)).isTrue();
raftRule.assertallNodesHaveSnapshotWithIndex(200);
final var snapshot = raftRule.getSnapshotOnNode(leader);

assertThat(snapshot.getIndex()).isEqualTo(leaderSnapshot.getIndex()).isEqualTo(200);
Expand Down
124 changes: 58 additions & 66 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.atomix.raft;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
Expand All @@ -30,10 +31,11 @@
import io.atomix.raft.protocol.TestRaftProtocolFactory;
import io.atomix.raft.protocol.TestRaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.snapshot.InMemorySnapshot;
import io.atomix.raft.snapshot.PersistedSnapshot;
import io.atomix.raft.snapshot.PersistedSnapshotListener;
import io.atomix.raft.snapshot.PersistedSnapshotStore;
import io.atomix.raft.snapshot.impl.FileBasedSnapshotStoreFactory;
import io.atomix.raft.snapshot.TestSnapshotStore;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.entry.RaftLogEntry;
import io.atomix.raft.zeebe.EntryValidator;
Expand All @@ -46,14 +48,11 @@
import io.atomix.utils.AbstractIdentifier;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.time.WallClockTimestamp;
import io.zeebe.util.FileUtil;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -71,7 +70,6 @@
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.agrona.IoUtil;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
Expand All @@ -95,6 +93,9 @@ public final class RaftRule extends ExternalResource {
private final Map<String, AtomicReference<CountDownLatch>> compactAwaiters = new HashMap<>();
private long position;
private EntryValidator entryValidator = new NoopEntryValidator();
// Keep a reference to the snapshots to ensure they are persisted across the restarts.
private Map<String, AtomicReference<InMemorySnapshot>> snapshots;
private Map<String, TestSnapshotStore> snapshotStores;

private RaftRule(final int nodeCount) {
this.nodeCount = nodeCount;
Expand Down Expand Up @@ -129,6 +130,8 @@ protected void before() throws Throwable {
position = 0;
members = new ArrayList<>();
memberLog = new ConcurrentHashMap<>();
snapshotStores = new HashMap<>();
snapshots = new HashMap<>();
nextId = 0;
context = new SingleThreadContext("raft-test-messaging-%d");
protocolFactory = new TestRaftProtocolFactory(context);
Expand Down Expand Up @@ -258,6 +261,7 @@ public void shutdownServer(final String nodeName) throws Exception {
servers.remove(nodeName).shutdown().get(30, TimeUnit.SECONDS);
compactAwaiters.remove(nodeName);
memberLog.remove(nodeName);
snapshotStores.remove(nodeName);
}

private RaftMember getRaftMember(final String memberId) {
Expand All @@ -281,10 +285,11 @@ public void doSnapshot(final long index, final int size) throws Exception {
for (final RaftServer raftServer : servers.values()) {
if (raftServer.isRunning()) {
final var raftContext = raftServer.getContext();
final var snapshotStore = raftContext.getPersistedSnapshotStore();
final var snapshotStore =
getSnapshotStore(raftServer.cluster().getMember().memberId().id());

compactAwaiters.get(raftServer.name()).set(new CountDownLatch(1));
writeSnapshot(index, raftContext.getTerm(), snapshotStore, size);
InMemorySnapshot.newPersistedSnapshot(index, raftContext.getTerm(), size, snapshotStore);
}
}

Expand All @@ -299,6 +304,26 @@ public void doSnapshot(final long index, final int size) throws Exception {
}
}

private TestSnapshotStore getSnapshotStore(final String memberId) {
return snapshotStores.get(memberId);
}

private AtomicReference<InMemorySnapshot> getOrCreatePersistedSnapshot(final String memberId) {
return snapshots.computeIfAbsent(memberId, i -> new AtomicReference<>());
}

public void assertallNodesHaveSnapshotWithIndex(final long index) {

assertThat(
servers.values().stream()
.map(RaftServer::getContext)
.map(RaftContext::getPersistedSnapshotStore)
.map(PersistedSnapshotStore::getCurrentSnapshotIndex)
.distinct()
.collect(Collectors.toList()))
.containsExactlyInAnyOrderElementsOf(List.of(index));
}

public boolean allNodesHaveSnapshotWithIndex(final long index) {
return servers.values().stream()
.map(RaftServer::getContext)
Expand All @@ -323,36 +348,6 @@ public PersistedSnapshot getSnapshotOnNode(final String nodeId) {
return snapshotStore.getLatestSnapshot().orElseThrow();
}

private void writeSnapshot(
final long index,
final long term,
final PersistedSnapshotStore persistedSnapshotStore,
final int size) {

final var transientSnapshot =
persistedSnapshotStore.newTransientSnapshot(index, term, new WallClockTimestamp());
transientSnapshot.take(
path -> {
IoUtil.ensureDirectoryExists(path.toFile(), "snapshot dir should exist");
for (int i = 0; i < size; i++) {
final var snapshotFile = path.resolve("snapshot-" + i + ".file");
try {
Files.write(
snapshotFile,
RandomStringUtils.random(128).getBytes(),
StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE);
} catch (final IOException e) {
e.printStackTrace();
return false;
}
}
// do something
return true;
});
transientSnapshot.persist();
}

public void awaitNewLeader() {
waitUntil(() -> getLeader().isPresent(), 100);
}
Expand Down Expand Up @@ -480,21 +475,13 @@ private RaftStorage createStorage(final MemberId memberId) {
public void copySnapshotOffline(final String sourceNode, final String targetNode)
throws Exception {
final var snapshotOnNode = getSnapshotOnNode(sourceNode);

final var memberDirectory = getMemberDirectory(directory, targetNode);
final var snapshotStore =
new FileBasedSnapshotStoreFactory().createSnapshotStore(memberDirectory.toPath(), "1");

try (final var snapshotChunkReader = snapshotOnNode.newChunkReader()) {
final var receivedSnapshot =
snapshotStore.newReceivedSnapshot(snapshotOnNode.getId().getSnapshotIdAsString());

while (snapshotChunkReader.hasNext()) {
final var chunk = snapshotChunkReader.next();
receivedSnapshot.apply(chunk);
}
receivedSnapshot.persist();
final var targetSnapshotStore = new TestSnapshotStore(getOrCreatePersistedSnapshot(sourceNode));
final var receivedSnapshot =
targetSnapshotStore.newReceivedSnapshot(snapshotOnNode.getId().getSnapshotIdAsString());
for (final var reader = snapshotOnNode.newChunkReader(); reader.hasNext(); ) {
receivedSnapshot.apply(reader.next());
}
receivedSnapshot.persist();
}

private RaftStorage createStorage(
Expand All @@ -510,8 +497,9 @@ private RaftStorage createStorage(
.withMaxSegmentSize(1024 * 10)
.withFreeDiskSpace(100)
.withSnapshotStore(
new FileBasedSnapshotStoreFactory()
.createSnapshotStore(memberDirectory.toPath(), "1"))
snapshotStores.compute(
memberId.id(),
(k, v) -> new TestSnapshotStore(getOrCreatePersistedSnapshot(memberId.id()))))
.withNamespace(RaftNamespaces.RAFT_STORAGE);
return configurator.apply(defaults).build();
}
Expand Down Expand Up @@ -598,6 +586,8 @@ public void triggerDataLossOnNode(final String node) throws IOException {

final var memberDirectory = getMemberDirectory(directory, member);
FileUtil.deleteFolder(memberDirectory.toPath());
// Clear in memory snapshots
snapshots.remove(node);
}

public PersistedSnapshotStore getPersistedSnapshotStore(final String followerB) {
Expand Down Expand Up @@ -664,19 +654,21 @@ public RaftSnapshotListener(final MemberId memberId) {
@Override
public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) {
final var raftServer = servers.get(memberId.id());
final var raftContext = raftServer.getContext();
final var serviceManager = raftContext.getLogCompactor();
serviceManager.setCompactableIndex(persistedSnapshot.getIndex());

raftServer
.compact()
.whenComplete(
(v, t) -> {
final var latch = compactAwaiters.get(memberId.id()).get();
if (latch != null) {
latch.countDown();
}
});
if (raftServer != null) {
final var raftContext = raftServer.getContext();
final var serviceManager = raftContext.getLogCompactor();
serviceManager.setCompactableIndex(persistedSnapshot.getIndex());

raftServer
.compact()
.whenComplete(
(v, t) -> {
final var latch = compactAwaiters.get(memberId.id()).get();
if (latch != null) {
latch.countDown();
}
});
}
}
}
}
5 changes: 2 additions & 3 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import io.atomix.raft.protocol.TestRaftProtocolFactory;
import io.atomix.raft.protocol.TestRaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.snapshot.impl.FileBasedSnapshotStoreFactory;
import io.atomix.raft.snapshot.TestSnapshotStore;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.entry.InitializeEntry;
Expand Down Expand Up @@ -198,8 +198,7 @@ private RaftStorage createStorage(
.withStorageLevel(StorageLevel.DISK)
.withDirectory(directory)
.withMaxEntriesPerSegment(10)
.withSnapshotStore(
new FileBasedSnapshotStoreFactory().createSnapshotStore(directory.toPath(), "1"))
.withSnapshotStore(new TestSnapshotStore(new AtomicReference<>()))
.withMaxSegmentSize(1024 * 10)
.withNamespace(RaftNamespaces.RAFT_STORAGE);
return configurator.apply(defaults).build();
Expand Down
Loading

0 comments on commit 218e93b

Please sign in to comment.