Skip to content

Commit

Permalink
[atlas][cql] add batched schema mutation operations and schema versio…
Browse files Browse the repository at this point in the history
…n checking

Change-Id: I72c332d7d821991bee37d439627f3e58e0f16a5a
  • Loading branch information
clockfort authored and rjullman committed Oct 2, 2015
1 parent 6488641 commit 924a490
Showing 1 changed file with 147 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
Expand All @@ -77,6 +78,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -337,7 +339,7 @@ private void initializeFromFreshInstance(List<String> hosts, int replicationFact
safetyDisabled,
socketTimeoutMillis,
socketQueryTimeoutMillis);
createTableInternal(CassandraConstants.METADATA_TABLE);
createTables(ImmutableMap.of(CassandraConstants.METADATA_TABLE, Integer.MAX_VALUE));
return;
}
ks = new KsDef(
Expand All @@ -362,7 +364,7 @@ private void initializeFromFreshInstance(List<String> hosts, int replicationFact
safetyDisabled,
socketTimeoutMillis,
socketQueryTimeoutMillis);
createTableInternal(CassandraConstants.METADATA_TABLE);
createTables(ImmutableMap.of(CassandraConstants.METADATA_TABLE, Integer.MAX_VALUE));
return;
} catch (TException e) {
log.warn("failed to connect to host: " + host, e);
Expand Down Expand Up @@ -823,23 +825,34 @@ private ResultSetFuture getPutPartitionResultSetFuture(String tableName,

@Override
public void truncateTable(final String tableName) {
String truncateQuery = "TRUNCATE " + getFullTableName(tableName);
PreparedStatement preparedStatement = getPreparedStatement(truncateQuery);
preparedStatement.setConsistencyLevel(deleteConsistency);
BoundStatement boundStatement = preparedStatement.bind();
if (shouldTraceQuery(tableName)) {
boundStatement.enableTracing();
}
ResultSet resultSet;
try {
resultSet = longRunningQuerySession.executeAsync(boundStatement).getUninterruptibly();
resultSet.all();
} catch (com.datastax.driver.core.exceptions.UnavailableException e) {
throw new InsufficientConsistencyException("Truncate requires all Cassandra nodes to be up and available.", e);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
truncateTables(ImmutableSet.of(tableName));
}

@Override
public void truncateTables(final Set<String> tablesToTruncate) {
String truncateQuery = "TRUNCATE %s"; // full table name (ks.cf)

for (List<String> batchOfRawTableNames : Iterables.partition(tablesToTruncate, 250)) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);

for (String tableName : batchOfRawTableNames) {
PreparedStatement createStatement = getPreparedStatement(String.format(truncateQuery, getFullTableName(tableName)));
createStatement.setConsistencyLevel(ConsistencyLevel.ALL);
BoundStatement boundStatement = createStatement.bind();

if (shouldTraceQuery(tableName)) {
batch.enableTracing();
}
batch.add(boundStatement);
try {
ResultSet resultSet = session.execute(batch);
logTracedQuery(truncateQuery, resultSet);
} catch (com.datastax.driver.core.exceptions.UnavailableException e) {
throw new InsufficientConsistencyException("Truncating tables requires all Cassandra nodes to be up and available.", e);
}
}
}
logTracedQuery(truncateQuery, resultSet);
waitForSchemaVersionsToCoalesce("truncateTables(" + tablesToTruncate.size() + " tables)");
}

@Override
Expand Down Expand Up @@ -1037,74 +1050,87 @@ TokenBackedBasicResultsPage<RowResult<U>, byte[]> getPage(final byte[] startKey)

@Override
public void dropTable(final String tableName) {
String dropQuery = "DROP TABLE IF EXISTS " + getFullTableName(tableName);
PreparedStatement preparedStatement = getPreparedStatement(dropQuery);
preparedStatement.setConsistencyLevel(ConsistencyLevel.ALL);
ResultSet resultSet;
try {
resultSet = longRunningQuerySession.executeAsync(preparedStatement.bind()).getUninterruptibly();
resultSet.all();
} catch (com.datastax.driver.core.exceptions.UnavailableException e) {
throw new InsufficientConsistencyException("Drop table requires all Cassandra nodes to be up and available.", e);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
dropTables(ImmutableSet.of(tableName));
}

@Override
public void dropTables(final Set<String> tablesToDrop) {
String dropQuery = "DROP TABLE IF EXISTS %s"; // full table name (ks.cf)

for (List<String> batchOfRawTableNames : Iterables.partition(tablesToDrop, 250)) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);

for (String tableName : batchOfRawTableNames) {
PreparedStatement createStatement = getPreparedStatement(String.format(dropQuery, getFullTableName(tableName)));
createStatement.setConsistencyLevel(ConsistencyLevel.ALL);
BoundStatement boundStatement = createStatement.bind();

if (shouldTraceQuery(tableName)) {
batch.enableTracing();
}
batch.add(boundStatement);
try {
ResultSet resultSet = session.execute(batch);
logTracedQuery(dropQuery, resultSet);
put(CassandraConstants.METADATA_TABLE, Maps.toMap(
Lists.transform(batchOfRawTableNames, new Function<String, Cell>() {
@Override
public Cell apply(String tableName) {
return getMetadataCell(tableName);
}}),
Functions.constant(PtBytes.EMPTY_BYTE_ARRAY)),
System.currentTimeMillis());
} catch (com.datastax.driver.core.exceptions.UnavailableException e) {
throw new InsufficientConsistencyException("Dropping tables requires all Cassandra nodes to be up and available.", e);
}
}
}
logTracedQuery(dropQuery, resultSet);
putMetadataWithoutChangingSettings(tableName, PtBytes.EMPTY_BYTE_ARRAY);
waitForSchemaVersionsToCoalesce("dropTables(" + tablesToDrop.size() + " tables)");
}


@Override
public void createTable(final String tableName, final int maxValueSizeInBytes) {
createTableInternal(tableName);
createTables(ImmutableMap.of(tableName, maxValueSizeInBytes));
}

private void createTableInternal(final String tableName) {
for (String name : getAllTableNamesInternal()) {
if (name.equalsIgnoreCase(tableName)) {
return;
}
}
String createQuery = "CREATE TABLE "
+ getFullTableName(tableName)
+ " ( "
+ ROW_NAME
+ " blob, "
+ COL_NAME_COL
+ " blob, "
+ TS_COL
+ " bigint, "
+ VALUE_COL
+ " blob, "
@Override
public void createTables(final Map<String, Integer> tableNamesToMaxValueSizeInBytes) {
String createQuery = "CREATE TABLE IF NOT EXISTS %s ( " // full table name (ks.cf)
+ ROW_NAME + " blob, "
+ COL_NAME_COL + "blob, "
+ TS_COL + " bigint, "
+ VALUE_COL + " blob, "
+ "PRIMARY KEY ("
+ ROW_NAME
+ ", "
+ COL_NAME_COL
+ ", "
+ TS_COL
+ ")) "
+ ROW_NAME + ", "
+ COL_NAME_COL + ", "
+ TS_COL + ")) "
+ "WITH COMPACT STORAGE AND CLUSTERING ORDER BY ("
+ COL_NAME_COL
+ " ASC, "
+ TS_COL
+ " ASC) "
+ COL_NAME_COL + " ASC, "
+ TS_COL + " ASC) "
+ "AND compaction = {'sstable_size_in_mb': '80', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}";
PreparedStatement preparedStatement = getPreparedStatement(createQuery);
preparedStatement.setConsistencyLevel(ConsistencyLevel.ALL);
BoundStatement boundStatement = preparedStatement.bind();
if (shouldTraceQuery(tableName)) {
boundStatement.enableTracing();
}
ResultSet resultSet;
try {
resultSet = session.executeAsync(boundStatement).getUninterruptibly();
resultSet.all();
} catch (com.datastax.driver.core.exceptions.UnavailableException e) {
throw new InsufficientConsistencyException("Create table requires all Cassandra nodes to be up and available.", e);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);

for (List<String> batchOfRawTableNames : Iterables.partition(tableNamesToMaxValueSizeInBytes.keySet(), 250)) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);

for (String tableName : batchOfRawTableNames) {
PreparedStatement createStatement = getPreparedStatement(String.format(createQuery, getFullTableName(tableName)));
createStatement.setConsistencyLevel(ConsistencyLevel.ALL);
BoundStatement boundStatement = createStatement.bind();

if (shouldTraceQuery(tableName)) {
batch.enableTracing();
}
batch.add(boundStatement);
try {
ResultSet resultSet = session.execute(batch);
logTracedQuery(createQuery, resultSet);
} catch (com.datastax.driver.core.exceptions.UnavailableException e) {
throw new InsufficientConsistencyException("Creating tables requires all Cassandra nodes to be up and available.", e);
}
}
}
logTracedQuery(createQuery, resultSet);
return;
waitForSchemaVersionsToCoalesce("createTables(" + tableNamesToMaxValueSizeInBytes.size() + " tables)");
}

@Override
Expand Down Expand Up @@ -1148,15 +1174,18 @@ private Cell getMetadataCell(String tableName) {

@Override
public void putMetadataForTable(final String tableName, final byte[] meta) {
putMetadataWithoutChangingSettings(tableName, meta);
setSettingsForTable(tableName, meta);
putMetadataForTables(ImmutableMap.of(tableName, meta));
}

private void putMetadataWithoutChangingSettings(final String tableName, final byte[] meta) {
put(
CassandraConstants.METADATA_TABLE,
ImmutableMap.of(getMetadataCell(tableName), meta),
System.currentTimeMillis());
@Override
public void putMetadataForTables(final Map<String, byte[]> tableNameToMetadata) {
Map<Cell, byte[]> cellToMetadata = Maps.newHashMap();
for (Entry<String, byte[]> tableEntry : tableNameToMetadata.entrySet()) {
cellToMetadata.put(getMetadataCell(tableEntry.getKey()), tableEntry.getValue());
setSettingsForTable(tableEntry.getKey(), tableEntry.getValue());
}
put(CassandraConstants.METADATA_TABLE, cellToMetadata, System.currentTimeMillis());
waitForSchemaVersionsToCoalesce("putMetadataForTables(" + tableNameToMetadata.size() +" tables)");
}

private void setSettingsForTable(String tableName, byte[] rawMetadata) {
Expand Down Expand Up @@ -1405,4 +1434,39 @@ private void alterTombstoneThreshold(final String tableName, float tombstoneThre
logTracedQuery(setTombStoneQuery, resultSet);
return;
}
}

private void waitForSchemaVersionsToCoalesce(String encapsulatingOperationDescription) {
PreparedStatement peerInfoQuery = getPreparedStatement("select peer, schema_version from system.peers;");
peerInfoQuery.setConsistencyLevel(ConsistencyLevel.ALL);

Multimap<UUID, InetAddress> peerInfo = ArrayListMultimap.create();
long start = System.currentTimeMillis();
long sleepTime = 100;
do {
peerInfo.clear();
for (Row row : session.execute(peerInfoQuery.bind()).all()) {
peerInfo.put(row.getUUID("schema_version"), row.getInet("peer"));
}

if (peerInfo.keySet().size() <= 1) { // full schema agreement
return;
}
sleepTime = Math.min(sleepTime * 2, 5000);
} while (System.currentTimeMillis() < start + CassandraConstants.SECONDS_WAIT_FOR_VERSIONS*1000);

StringBuilder sb = new StringBuilder();
sb.append(String.format("Cassandra cluster cannot come to agreement on schema versions, during operation: %s.", encapsulatingOperationDescription));

for ( Entry<UUID, Collection<InetAddress>> versionToPeer : peerInfo.asMap().entrySet()) {
sb.append(String.format("\nAt schema version %s:", versionToPeer.getKey()));
for (InetAddress peer: versionToPeer.getValue()) {
sb.append(String.format("\n\tNode: %s", peer));
}
}
sb.append("\nFind the nodes above that diverge from the majority schema " +
"(or have schema 'UNKNOWN', which likely means they are down/unresponsive) " +
"and examine their logs to determine the issue. Fixing the underlying issue and restarting Cassandra " +
"should resolve the problem. You can quick-check this with 'nodetool describecluster'.");
throw new IllegalStateException(sb.toString());
}
}

0 comments on commit 924a490

Please sign in to comment.