Skip to content

Commit

Permalink
[atlas][cql] putUnlessExists
Browse files Browse the repository at this point in the history
Change-Id: I74b63c5c2ec4d8841f24ff216ab438e99dc191d8
  • Loading branch information
clockfort authored and rjullman committed Oct 2, 2015
1 parent 360ea85 commit 228d3ba
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import com.palantir.atlasdb.keyvalue.impl.Cells;
import com.palantir.atlasdb.keyvalue.impl.KeyValueServices;
import com.palantir.atlasdb.table.description.TableMetadata;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.common.annotation.Idempotent;
import com.palantir.common.base.ClosableIterator;
import com.palantir.common.base.ClosableIterators;
Expand Down Expand Up @@ -153,6 +152,11 @@ public PreparedStatement load(String query) {

private static final long TRANSACTION_TS = 0L;

private static enum TransactionType {
NONE,
LIGHTWEIGHT_TRANSACTION_REQUIRED
}

public static CQLKeyValueService create(CassandraKeyValueServiceConfig config) {
Preconditions.checkArgument(!config.servers().isEmpty(), "servers set was empty");
final CQLKeyValueService ret = new CQLKeyValueService(config);
Expand Down Expand Up @@ -677,10 +681,10 @@ public Map<Cell, Long> call() throws Exception {
@Override
public void put(final String tableName, final Map<Cell, byte[]> values, final long timestamp) {
try {
putInternal(
tableName,
KeyValueServices.toConstantTimestampValues(values.entrySet(), timestamp),
false);
putInternal(
tableName,
KeyValueServices.toConstantTimestampValues(values.entrySet(), timestamp),
TransactionType.NONE);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
}
Expand All @@ -689,7 +693,7 @@ public void put(final String tableName, final Map<Cell, byte[]> values, final lo
@Override
public void putWithTimestamps(String tableName, Multimap<Cell, Value> values) {
try {
putInternal(tableName, values.entries(), false);
putInternal(tableName, values.entries(), TransactionType.NONE);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
}
Expand Down Expand Up @@ -727,7 +731,7 @@ public Long apply(Entry<Cell, byte[]> entry) {
public Entry<Cell, Value> apply(Entry<Cell, byte[]> input) {
return Maps.immutableEntry(input.getKey(), Value.create(input.getValue(), timestamp));
}});
resultSetFutures.put(getPutPartitionResultSetFuture(table, partition, false), table);
resultSetFutures.put(getPutPartitionResultSetFuture(table, partition, TransactionType.NONE), table);
}
}

Expand All @@ -739,7 +743,7 @@ public Entry<Cell, Value> apply(Entry<Cell, byte[]> input) {
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
}
logTracedQuery(getPutQuery(resultSetFutures.get(result.getValue()), false), resultSet);
logTracedQuery(getPutQuery(resultSetFutures.get(result.getValue())), resultSet);
}
}

Expand All @@ -753,19 +757,20 @@ public Long apply(Entry<Cell, Value> input) {
}
};

private void putInternal(final String tableName, final Iterable<Map.Entry<Cell, Value>> values, boolean addNotExists)
throws KeyAlreadyExistsException {
private void putInternal(final String tableName, final Iterable<Map.Entry<Cell, Value>> values, TransactionType transactionType)
throws Exception {
List<ResultSetFuture> resultSetFutures = Lists.newArrayList();
int mutationBatchCount = config.mutationBatchCount();
int mutationBatchSizeBytes = config.mutationBatchSizeBytes();
List<ResultSetFuture> resultSetFutures = Lists.newArrayList();
for (List<Entry<Cell, Value>> partition : partitionByCountAndBytes(
values,
mutationBatchCount,
mutationBatchSizeBytes,
tableName,
SIZING_FUNCTION)) {
resultSetFutures.add(getPutPartitionResultSetFuture(tableName, partition, addNotExists));
resultSetFutures.add(getPutPartitionResultSetFuture(tableName, partition, transactionType));
}

for (ResultSetFuture resultSetFuture : resultSetFutures) {
ResultSet resultSet;
try {
Expand All @@ -779,23 +784,26 @@ private void putInternal(final String tableName, final Iterable<Map.Entry<Cell,
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
}
logTracedQuery(getPutQuery(tableName, addNotExists), resultSet);
logTracedQuery(getPutQuery(tableName), resultSet);
}
}

private String getPutQuery(String tableName, boolean addNotExists) {
StringBuilder sb = new StringBuilder("INSERT INTO " + getFullTableName(tableName) + " (" + ROW_NAME + ", " + COL_NAME_COL
+ ", " + TS_COL + ", " + VALUE_COL + ") VALUES (?, ?, ?, ?)");
if (addNotExists) {
sb.append(" IF NOT EXISTS");
}
return sb.toString();
private String getPutQueryForPossibleTransaction(String tableName, TransactionType transactionType) {
return transactionType.equals(TransactionType.LIGHTWEIGHT_TRANSACTION_REQUIRED)? getPutUnlessExistsQuery(tableName) : getPutQuery(tableName);
}

private String getPutUnlessExistsQuery(String tableName) {
return getPutQuery(tableName) + " IF NOT EXISTS";
}

private String getPutQuery(String tableName) {
return "INSERT INTO " + getFullTableName(tableName) + " (" + ROW_NAME + ", " + COL_NAME_COL + ", " + TS_COL + ", " + VALUE_COL + ") VALUES (?, ?, ?, ?)";
}

private ResultSetFuture getPutPartitionResultSetFuture(String tableName,
List<Entry<Cell, Value>> partition,
boolean addNotExists) {
PreparedStatement preparedStatement = getPreparedStatement(getPutQuery(tableName, addNotExists));
TransactionType transactionType) {
PreparedStatement preparedStatement = getPreparedStatement(getPutQueryForPossibleTransaction(tableName, transactionType));
preparedStatement.setConsistencyLevel(writeConsistency);

// Be mindful when using the atomicity semantics of UNLOGGED batch statements.
Expand Down Expand Up @@ -1234,7 +1242,7 @@ public void addGarbageCollectionSentinelValues(String tableName, Set<Cell> cells
public Entry<Cell, Value> apply(Cell cell) {
return Maps.immutableEntry(cell, value);
}
}), false);
}), TransactionType.NONE);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
}
Expand All @@ -1256,10 +1264,14 @@ public Multimap<Cell, Long> getAllTimestamps(String tableName, Set<Cell> cells,
@Override
public void putUnlessExists(String tableName, Map<Cell, byte[]> values)
throws KeyAlreadyExistsException {
Validate.isTrue(TransactionConstants.TRANSACTION_TABLE.equals(tableName));
putInternal(tableName,
KeyValueServices.toConstantTimestampValues(values.entrySet(), TRANSACTION_TS),
true);
try {
putInternal(
tableName,
KeyValueServices.toConstantTimestampValues(values.entrySet(), TRANSACTION_TS),
TransactionType.LIGHTWEIGHT_TRANSACTION_REQUIRED);
} catch (Throwable t) {
throw Throwables.throwUncheckedException(t);
}
}

private String getFullTableName(String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class AtlasDbConstants {

public static final long NULL_HORIZON_DATA_EVENT_ID = -1L;

public static final long TRANSACTION_TS = 0L;

public static final String RELATIONAL_TABLE_PREFIX = "pt_met_";
public static final String TEMP_TABLE_PREFIX = "_t";
public static final String INDEX_SUFFIX = "idx";
Expand Down

0 comments on commit 228d3ba

Please sign in to comment.