Skip to content

Commit

Permalink
Correctly handle pending ranges with adjacent range movements
Browse files Browse the repository at this point in the history
Patch by Aleksandr Sorokoumov; reviewed by Sam Tunnicliffe and Marcus Eriksson for CASSANDRA-14801
  • Loading branch information
Gerrrr authored and krummas committed Sep 14, 2020
1 parent 54d297a commit 8ba163f
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.0-beta3
* Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
* Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
* Add nodetool getfullquerylog (CASSANDRA-15988)
* Fix yaml format and alignment in tpstats (CASSANDRA-11402)
Expand Down
12 changes: 8 additions & 4 deletions src/java/org/apache/cassandra/locator/TokenMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ public RangesAtEndpoint getPendingRanges(String keyspaceName, InetAddressAndPort
Replica replica = entry.getValue();
if (replica.endpoint().equals(endpoint))
{
builder.add(replica);
builder.add(replica, Conflict.DUPLICATE);
}
}
return builder.build();
Expand Down Expand Up @@ -899,11 +899,15 @@ private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrate
{
EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata);
EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata);
for (Replica replica : newReplicas)
for (Replica newReplica : newReplicas)
{
if (currentReplicas.endpoints().contains(replica.endpoint()))
if (currentReplicas.endpoints().contains(newReplica.endpoint()))
continue;
newPendingRanges.addPendingRange(range, replica);

// we calculate pending replicas for leave- and move- affected ranges in the same way to avoid
// a possible conflict when 2 pending replicas have the same endpoint and different ranges.
for (Replica pendingReplica : newReplica.subtractSameReplication(addressRanges.get(newReplica.endpoint())))
newPendingRanges.addPendingRange(range, pendingReplica);
}
}

Expand Down
293 changes: 289 additions & 4 deletions test/unit/org/apache/cassandra/locator/PendingRangesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
package org.apache.cassandra.locator;

import java.net.UnknownHostException;
import java.util.Collections;
import java.util.*;
import java.util.stream.Collectors;

import com.google.common.collect.*;
import org.junit.BeforeClass;
import org.junit.Test;


import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.quicktheories.core.Gen;
import org.quicktheories.generators.Generate;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.SourceDSL.integers;

public class PendingRangesTest
{
Expand Down Expand Up @@ -170,6 +175,282 @@ public void calculatePendingRangesForConcurrentReplacements()
assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected);
}

@Test
public void testConcurrentAdjacentLeaveAndMove()
{
TokenMetadata tm = new TokenMetadata();
AbstractReplicationStrategy strategy = simpleStrategy(tm, 3);

Token newToken = token(0);
Token token1 = token(-9);
Token token2 = token(-5);
Token token3 = token(-1);
Token token4 = token(1);
Token token5 = token(5);

InetAddressAndPort node1 = peer(1);
InetAddressAndPort node2 = peer(2);
InetAddressAndPort node3 = peer(3);
InetAddressAndPort node4 = peer(4);
InetAddressAndPort node5 = peer(5);

// setup initial ring
addNode(tm, node1, token1);
addNode(tm, node2, token2);
addNode(tm, node3, token3);
addNode(tm, node4, token4);
addNode(tm, node5, token5);

tm.addLeavingEndpoint(node5);
tm.addMovingEndpoint(newToken, node3);

tm.calculatePendingRanges(strategy, KEYSPACE);
assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node1, new Range<>(token2, token3), true)),
tm.getPendingRanges(KEYSPACE, node1));
assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node2, new Range<>(token3, token4), true)),
tm.getPendingRanges(KEYSPACE, node2));
assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node3, new Range<>(token3, newToken), true),
new Replica(node3, new Range<>(token4, token5), true)),
tm.getPendingRanges(KEYSPACE, node3));
assertRangesAtEndpoint(RangesAtEndpoint.empty(node4), tm.getPendingRanges(KEYSPACE, node4));
assertRangesAtEndpoint(RangesAtEndpoint.empty(node5), tm.getPendingRanges(KEYSPACE, node5));
}

@Test
public void testConcurrentAdjacentLeavingNodes()
{
TokenMetadata tm = new TokenMetadata();
AbstractReplicationStrategy strategy = simpleStrategy(tm, 2);

Token token1 = token(-9);
Token token2 = token(-4);
Token token3 = token(0);
Token token4 = token(4);

InetAddressAndPort node1 = peer(1);
InetAddressAndPort node2 = peer(2);
InetAddressAndPort node3 = peer(3);
InetAddressAndPort node4 = peer(4);

addNode(tm, node1, token1);
addNode(tm, node2, token2);
addNode(tm, node3, token3);
addNode(tm, node4, token4);

tm.addLeavingEndpoint(node2);
tm.addLeavingEndpoint(node3);

tm.calculatePendingRanges(strategy, KEYSPACE);
assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node1, new Range<>(token1, token3), true)),
tm.getPendingRanges(KEYSPACE, node1));
assertRangesAtEndpoint(RangesAtEndpoint.empty(node2), tm.getPendingRanges(KEYSPACE, node2));
assertRangesAtEndpoint(RangesAtEndpoint.empty(node3), tm.getPendingRanges(KEYSPACE, node3));
assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node4, new Range<>(token4, token1), true),
new Replica(node4, new Range<>(token1, token2), true)),
tm.getPendingRanges(KEYSPACE, node4));
}

@Test
public void testBootstrapLeaveAndMovePermutationsWithoutVnodes()
{
// In a non-vnode cluster (i.e. where tokensPerNode == 1), we can
// add, remove and move nodes
int maxRf = 5;
int nodes = 50;
Gen<Integer> rfs = rf(maxRf);
Gen<Input> inputs = rfs.flatMap(rf -> input(rf, nodes));

qt().forAll(inputs.flatMap(this::clustersWithChangedTopology))
.checkAssert(Cluster::calculateAndGetPendingRanges);
}

@Test
public void testBootstrapAndLeavePermutationsWithVnodes()
{
// In a vnode cluster (i.e. where tokensPerNode > 1), move is not
// supported, so only leave and bootstrap operations will occur
int maxRf = 5;
int nodes = 50;
int maxTokensPerNode = 16;

Gen<Integer> rfs = rf(maxRf);
Gen<Input> inputs = rfs.flatMap(rf -> input(rf, nodes, maxTokensPerNode));

qt().forAll(inputs.flatMap(this::clustersWithChangedTopology))
.checkAssert(Cluster::calculateAndGetPendingRanges);
}

private Gen<Integer> rf(int maxRf)
{
return integers().between(1, maxRf);
}

private Gen<Input> input(int rf, int maxNodes)
{
return integers().between(rf, maxNodes).map(n -> new Input(rf, n, 1));
}

private Gen<Input> input(int rf, int maxNodes, int maxTokensPerNode)
{
Gen<Integer> tokensPerNode = integers().between(1, maxTokensPerNode);
return integers().between(rf, maxNodes)
.zip(tokensPerNode, (n, tokens) -> new Input(rf, n, tokens));
}

private Gen<Integer> bootstrappedNodes(Input input)
{
// at most double in size
return integers().between(0, input.nodes);
}

private Gen<Integer> leftNodes(Input input)
{
return integers().between(0, input.nodes - input.rf);
}

private Gen<Integer> movedNodes(Input input)
{
// Move is not supported in vnode clusters
if (input.tokensPerNode > 1)
return integers().between(0, 0);

return integers().between(0, input.nodes);
}

private Gen<Cluster> clusters(Input input)
{
return Generate.constant(() -> new Cluster(input.rf, input.nodes, input.tokensPerNode));
}

private Gen<Cluster> clustersWithChangedTopology(Input input)
{
Gen<Cluster> clusters = clusters(input);
Gen<Integer> leftNodes = leftNodes(input);
Gen<Integer> bootstrappedNodes = bootstrappedNodes(input);
Gen<Integer> movedNodes = movedNodes(input);

return clusters.zip(leftNodes, bootstrappedNodes, movedNodes,
(cluster, left, bootstrapped, moved) -> cluster.decommissionNodes(left)
.bootstrapNodes(bootstrapped)
.moveNodes(moved));
}

static class Input
{
final int rf;
final int nodes;
final int tokensPerNode;

Input(int rf, int nodes, int tokensPerNode)
{
this.rf = rf;
this.nodes = nodes;
this.tokensPerNode = tokensPerNode;
}

public String toString()
{
return String.format("Input(rf=%s, nodes=%s, tokensPerNode=%s)", rf, nodes, tokensPerNode);
}
}

private static class Cluster
{
private final TokenMetadata tm;
private final int tokensPerNode;
private final AbstractReplicationStrategy strategy;

private final List<InetAddressAndPort> nodes;
Random random = new Random();

Cluster(int rf, int initialNodes, int tokensPerNode)
{
this.tm = new TokenMetadata();
this.tokensPerNode = tokensPerNode;
this.strategy = simpleStrategy(tm, rf);

this.nodes = new ArrayList<>(initialNodes);
for (int i = 0; i < initialNodes; i++)
addInitialNode();
}

private void addInitialNode()
{
InetAddressAndPort node = peer(nodes.size() + 1);
tm.updateHostId(UUID.randomUUID(), node);
tm.updateNormalTokens(tokens(), node);
nodes.add(node);
}

private void bootstrapNode()
{
InetAddressAndPort node = peer(nodes.size() + 1);
tm.updateHostId(UUID.randomUUID(), node);
tm.addBootstrapTokens(tokens(), node);
nodes.add(node);
}

void calculateAndGetPendingRanges()
{
// test that it does not crash
tm.calculatePendingRanges(strategy, KEYSPACE);
for (InetAddressAndPort node : nodes)
tm.getPendingRanges(KEYSPACE, node);
}

Cluster decommissionNodes(int cnt)
{
for (int i = 0; i < cnt; i++)
tm.addLeavingEndpoint(nodes.get(random.nextInt(nodes.size())));
return this;
}

Cluster bootstrapNodes(int cnt)
{
for (int i = 0; i < cnt; i++)
bootstrapNode();
return this;
}

Cluster moveNodes(int cnt)
{
assert cnt == 0 || tokensPerNode == 1 : "Moving tokens is not supported when tokensPerNode";

for (int i = 0; i < cnt; i++)
moveNode();
return this;
}

private void moveNode()
{
if (tm.getSizeOfMovingEndpoints() >= nodes.size())
throw new IllegalStateException("Number of movements should not exceed total nodes in the cluster");

// we want to ensure that any given node is only marked as moving once.
List<InetAddressAndPort> moveCandidates = nodes.stream()
.filter(node -> !tm.isMoving(node))
.collect(Collectors.toList());
InetAddressAndPort node = moveCandidates.get(random.nextInt(moveCandidates.size()));
tm.addMovingEndpoint(token(random.nextLong()), node);
}

private Collection<Token> tokens()
{
Collection<Token> tokens = new ArrayList<>(tokensPerNode);
for (int i=0; i< tokensPerNode; i++)
tokens.add(token(random.nextLong()));
return tokens;
}

@Override
public String toString()
{
return String.format("Nodes: %s\n" +
"Metadata: %s",
nodes.size(),
tm.toString());
}
}

private void assertPendingRanges(PendingRangeMaps pending, RangesByEndpoint expected)
{
Expand All @@ -188,6 +469,11 @@ private void assertPendingRanges(EndpointsByRange pending, RangesByEndpoint expe
assertRangesByEndpoint(expected, actual.build());
}

private void assertRangesAtEndpoint(RangesAtEndpoint expected, RangesAtEndpoint actual)
{
assertEquals(expected.size(), actual.size());
assertTrue(Iterables.all(expected, actual::contains));
}

private void assertRangesByEndpoint(RangesByEndpoint expected, RangesByEndpoint actual)
{
Expand All @@ -196,8 +482,7 @@ private void assertRangesByEndpoint(RangesByEndpoint expected, RangesByEndpoint
{
RangesAtEndpoint expectedReplicas = expected.get(endpoint);
RangesAtEndpoint actualReplicas = actual.get(endpoint);
assertEquals(expectedReplicas.size(), actualReplicas.size());
assertTrue(Iterables.all(expectedReplicas, actualReplicas::contains));
assertRangesAtEndpoint(expectedReplicas, actualReplicas);
}
}

Expand Down

0 comments on commit 8ba163f

Please sign in to comment.