Skip to content

Commit

Permalink
Merge branch 'cassandra-3.0' into cassandra-3.11
Browse files Browse the repository at this point in the history
  • Loading branch information
ifesdjeen committed Oct 1, 2021
2 parents 2614f7e + 13632e9 commit 32a15f0
Show file tree
Hide file tree
Showing 21 changed files with 933 additions and 70 deletions.
7 changes: 7 additions & 0 deletions NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ Upgrading
for further details. You also need to regenerate passwords for users for who the password
was created while the above property was set to be more than 30 otherwise they will not be able to log in.

Statement re-prepare storms
---------------------------
- CASSANDRA-15252 has changed how prepared statement ids are computed in order to avoid infinite re-prepare
loops caused by the driver. This new behaviour will be picked up only when the entire cluster has been updated
to 3.0.26 or higher. In case of a mixed version cluster, different major versions will be taken as a minimum
required version.

3.11.11
=======

Expand Down
53 changes: 48 additions & 5 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.metrics.CQLMetrics;
import org.apache.cassandra.service.*;
import org.apache.cassandra.service.pager.QueryPager;
Expand All @@ -62,6 +63,17 @@ public class QueryProcessor implements QueryHandler
{
public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.4");

/**
* If a query is prepared with a fully qualified name, but the user also uses USE (specifically when USE keyspace
* is different) then the IDs generated could change over time; invalidating the assumption that IDs won't ever
* change. In the version defined below, the USE keyspace is ignored when a fully-qualified name is used as an
* attempt to make IDs stable.
*/
private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_30 = new CassandraVersion("3.0.26");
private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_3X = new CassandraVersion("3.11.12");
private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_40 = new CassandraVersion("4.0.1");


public static final QueryProcessor instance = new QueryProcessor();

private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
Expand Down Expand Up @@ -425,7 +437,29 @@ public static ResultMessage.Prepared prepare(String queryString, ClientState cli
throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
assert boundTerms == prepared.boundNames.size();

return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
if (prepared.keyspace != null)
{
// Edge-case of CASSANDRA-15252 in mixed-mode cluster. We accept that 15252 itself can manifest in a
// cluster that has both old and new nodes, but we would like to avoid a situation when the fix adds
// a new behaviour that can break which, in addition, can get triggered more frequently.
// If statement ID was generated on the old node _with_ use, when attempting to execute on the new node,
// we may fall into infinite loop. To break out of this loop, we put a prepared statement that client
// expects into cache, so that it could get PREPARED response on the second try.
ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared, forThrift);
ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift) : newBehavior;
CassandraVersion minVersion = Gossiper.instance.getMinVersion(20, TimeUnit.MILLISECONDS);

// Default to old behaviour in case we're not sure about the version. Even if we ever flip back to the old
// behaviour due to the gossip bug or incorrect version string, we'll end up with two re-prepare round-trips.
return minVersion != null &&
((minVersion.major == 3 && minVersion.minor == 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_30) >= 0) ||
(minVersion.major == 3 && minVersion.minor != 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_3X) >= 0) ||
(minVersion.major == 4 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_40) >= 0)) ? newBehavior : oldBehavior;
}
else
{
return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
}
}

private static MD5Digest computeId(String queryString, String keyspace)
Expand All @@ -440,12 +474,13 @@ private static Integer computeThriftId(String queryString, String keyspace)
return toHash.hashCode();
}

private static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String keyspace, boolean forThrift)
@VisibleForTesting
public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace, boolean forThrift)
throws InvalidRequestException
{
if (forThrift)
{
Integer thriftStatementId = computeThriftId(queryString, keyspace);
Integer thriftStatementId = computeThriftId(queryString, clientKeyspace);
ParsedStatement.Prepared existing = thriftPreparedStatements.get(thriftStatementId);
if (existing == null)
return null;
Expand All @@ -456,7 +491,7 @@ private static ResultMessage.Prepared getStoredPreparedStatement(String queryStr
}
else
{
MD5Digest statementId = computeId(queryString, keyspace);
MD5Digest statementId = computeId(queryString, clientKeyspace);
ParsedStatement.Prepared existing = preparedStatements.get(statementId);
if (existing == null)
return null;
Expand All @@ -467,7 +502,8 @@ private static ResultMessage.Prepared getStoredPreparedStatement(String queryStr
}
}

private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift)
@VisibleForTesting
public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift)
throws InvalidRequestException
{
// Concatenate the current keyspace so we don't mix prepared statements between keyspace (#5352).
Expand Down Expand Up @@ -620,6 +656,13 @@ public static void clearInternalStatementsCache()
internalStatements.clear();
}

@VisibleForTesting
public static void clearPreparedStatementsCache()
{
preparedStatements.clear();
thriftPreparedStatements.clear();
}

private static class MigrationSubscriber extends MigrationListener
{
private static void removeInvalidPreparedStatements(String ksName, String cfName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ else if (!haveMultipleCFs)
short[] partitionKeyBindIndexes = (haveMultipleCFs || batchStatement.statements.isEmpty())? null
: boundNames.getPartitionKeyBindIndexes(batchStatement.statements.get(0).cfm);

return new ParsedStatement.Prepared(batchStatement, boundNames, partitionKeyBindIndexes);
return new ParsedStatement.Prepared(batchStatement, boundNames, partitionKeyBindIndexes, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void prepareKeyspace(ClientState state) throws InvalidRequestException
{
if (!cfName.hasKeyspace())
{
// XXX: We explicitely only want to call state.getKeyspace() in this case, as we don't want to throw
// XXX: We explicitly only want to call state.getKeyspace() in this case, as we don't want to throw
// if not logged in any keyspace but a keyspace is explicitely set on the statement. So don't move
// the call outside the 'if' or replace the method by 'prepareKeyspace(state.getKeyspace())'
cfName.setKeyspace(state.getKeyspace(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ public ParsedStatement.Prepared prepare(ClientState clientState)
{
VariableSpecifications boundNames = getBoundVariables();
ModificationStatement statement = prepare(boundNames, clientState);
return new ParsedStatement.Prepared(statement, boundNames, boundNames.getPartitionKeyBindIndexes(statement.cfm));
return new ParsedStatement.Prepared(statement, boundNames, boundNames.getPartitionKeyBindIndexes(statement.cfm), statement.cfm.ksName);
}

public ModificationStatement prepare(VariableSpecifications boundNames, ClientState clientState)
Expand Down
20 changes: 16 additions & 4 deletions src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Collections;
import java.util.List;

import javax.annotation.Nullable;

import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.exceptions.RequestValidationException;
Expand Down Expand Up @@ -59,24 +61,34 @@ public static class Prepared

public final CQLStatement statement;
public final List<ColumnSpecification> boundNames;

@Nullable
public final short[] partitionKeyBindIndexes;

protected Prepared(CQLStatement statement, List<ColumnSpecification> boundNames, short[] partitionKeyBindIndexes)
@Nullable
public final String keyspace;

protected Prepared(CQLStatement statement, List<ColumnSpecification> boundNames, short[] partitionKeyBindIndexes, String keyspace)
{
this.statement = statement;
this.boundNames = boundNames;
this.partitionKeyBindIndexes = partitionKeyBindIndexes;
this.rawCQLStatement = "";
this.keyspace = keyspace;
}

public Prepared(CQLStatement statement, VariableSpecifications names, short[] partitionKeyBindIndexes)
{
this(statement, names.getSpecifications(), partitionKeyBindIndexes);
this(statement, names, partitionKeyBindIndexes, null);
}

public Prepared(CQLStatement statement, VariableSpecifications names, short[] partitionKeyBindIndexes, String keyspace)
{
this(statement, names.getSpecifications(), partitionKeyBindIndexes, keyspace);
}

public Prepared(CQLStatement statement)
{
this(statement, Collections.<ColumnSpecification>emptyList(), null);
this(statement, Collections.<ColumnSpecification>emptyList(), null, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,12 @@ public ParsedStatement.Prepared prepare(boolean forView, ClientState clientState
prepareLimit(boundNames, limit, keyspace(), limitReceiver()),
prepareLimit(boundNames, perPartitionLimit, keyspace(), perPartitionLimitReceiver()));

return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
return prepare(stmt, boundNames, cfm);
}

protected ParsedStatement.Prepared prepare(SelectStatement stmt, VariableSpecifications boundNames, CFMetaData cfm)
{
return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm), cfm.ksName);
}

/**
Expand Down
101 changes: 101 additions & 0 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -58,6 +62,8 @@
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.RecomputingSupplier;

import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
import static org.apache.cassandra.utils.ExecutorUtils.shutdown;

Expand Down Expand Up @@ -253,6 +259,8 @@ public void run()
}
}

private final RecomputingSupplier<CassandraVersion> minVersionSupplier = new RecomputingSupplier<>(this::computeMinVersion, executor);

private Gossiper()
{
// half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip
Expand All @@ -262,6 +270,32 @@ private Gossiper()

// Register this instance with JMX
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);


subscribers.add(new IEndpointStateChangeSubscriber()
{
public void onJoin(InetAddress endpoint, EndpointState state)
{
maybeRecompute(state);
}

public void onAlive(InetAddress endpoint, EndpointState state)
{
maybeRecompute(state);
}

private void maybeRecompute(EndpointState state)
{
if (state.getApplicationState(ApplicationState.RELEASE_VERSION) != null)
minVersionSupplier.recompute();
}

public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
{
if (state == ApplicationState.RELEASE_VERSION)
minVersionSupplier.recompute();
}
});
}

public void setLastProcessedMessageAt(long timeInMillis)
Expand Down Expand Up @@ -1474,6 +1508,7 @@ public void start(int generationNbr, Map<ApplicationState, VersionedValue> prelo
maybeInitializeLocalState(generationNbr);
EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
localState.addApplicationStates(preloadLocalStates);
minVersionSupplier.recompute();

//notify snitches that Gossiper is about to start
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
Expand Down Expand Up @@ -1872,6 +1907,72 @@ public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedE
ExecutorUtils.shutdownAndWait(timeout, unit, executor);
}

@Nullable
public CassandraVersion getMinVersion(int delay, TimeUnit timeUnit)
{
try
{
return minVersionSupplier.get(delay, timeUnit);
}
catch (TimeoutException e)
{
// Timeouts here are harmless: they won't cause reprepares and may only
// cause the old version of the hash to be kept for longer
return null;
}
catch (Throwable e)
{
logger.error("Caught an exception while waiting for min version", e);
return null;
}
}

@Nullable
private String getReleaseVersionString(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
if (state == null)
return null;

VersionedValue value = state.getApplicationState(ApplicationState.RELEASE_VERSION);
return value == null ? null : value.value;
}

private CassandraVersion computeMinVersion()
{
CassandraVersion minVersion = null;

for (InetAddress addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
Gossiper.instance.getUnreachableMembers()))
{
String versionString = getReleaseVersionString(addr);
// Raced with changes to gossip state, wait until next iteration
if (versionString == null)
return null;

CassandraVersion version;

try
{
version = new CassandraVersion(versionString);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
String message = String.format("Can't parse version string %s", versionString);
logger.warn(message);
if (logger.isDebugEnabled())
logger.debug(message, t);
return null;
}

if (minVersion == null || version.compareTo(minVersion) < 0)
minVersion = version;
}

return minVersion;
}

@VisibleForTesting
public void setAnyNodeOn30(boolean anyNodeOn30)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ public interface IEndpointStateChangeSubscriber
* @param endpoint endpoint for which the state change occurred.
* @param epState state that actually changed for the above endpoint.
*/
public void onJoin(InetAddress endpoint, EndpointState epState);
default void onJoin(InetAddress endpoint, EndpointState epState) {}

public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue);
default void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}

public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value);
default void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}

public void onAlive(InetAddress endpoint, EndpointState state);
default void onAlive(InetAddress endpoint, EndpointState state) {}

public void onDead(InetAddress endpoint, EndpointState state);
default void onDead(InetAddress endpoint, EndpointState state) {}

public void onRemove(InetAddress endpoint);
default void onRemove(InetAddress endpoint) {}

/**
* Called whenever a node is restarted.
* Note that there is no guarantee when that happens that the node was
* previously marked down. It will have only if {@code state.isAlive() == false}
* as {@code state} is from before the restarted node is marked up.
*/
public void onRestart(InetAddress endpoint, EndpointState state);
default void onRestart(InetAddress endpoint, EndpointState state) {}
}
Loading

0 comments on commit 32a15f0

Please sign in to comment.