Skip to content

Commit

Permalink
Merge pull request brianfrankcooper#488 from allanbank/kudu-cleanup
Browse files Browse the repository at this point in the history
[kudu] Checkstyle updates for the Kudu binding.
  • Loading branch information
allanbank committed Nov 12, 2015
2 parents add8e70 + 23f8f30 commit 4e7295c
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 55 deletions.
24 changes: 24 additions & 0 deletions kudu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,30 @@ LICENSE file.
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.15</version>
<configuration>
<consoleOutput>true</consoleOutput>
<configLocation>../checkstyle.xml</configLocation>
<failOnViolation>true</failOnViolation>
<failsOnError>true</failsOnError>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>checkstyle</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<releases>
Expand Down
139 changes: 84 additions & 55 deletions kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,68 +37,85 @@
import static org.kududb.Type.STRING;

/**
* Kudu client for YCSB framework
* Example to load:
* $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
* Example to run:
* Kudu client for YCSB framework. Example to load: <blockquote>
*
* <pre>
* <code>
* $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
* </code>
* </pre>
*
* </blockquote> Example to run: <blockquote>
*
* <pre>
* <code>
* ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
*
* </code>
* </pre>
*
* </blockquote>
*/
public class KuduYCSBClient extends com.yahoo.ycsb.DB {
public static final String KEY = "key";
public static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out.");
public static final Status TIMEOUT =
new Status("TIMEOUT", "The operation timed out.");
public static final int MAX_TABLETS = 9000;
public static final long DEFAULT_SLEEP = 60000;
private static final String SYNC_OPS_OPT = "kudu_sync_ops";
private static final String DEBUG_OPT = "kudu_debug";
private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors";
private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets";
private static final String PRE_SPLIT_NUM_TABLETS_OPT =
"kudu_pre_split_num_tablets";
private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas";
private static final String BLOCK_SIZE_OPT = "kudu_block_size";
private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
private static final int BLOCK_SIZE_DEFAULT = 4096;
private static final List<String> columnNames = new ArrayList<String>();
private static final List<String> COLUMN_NAMES = new ArrayList<String>();
private static KuduClient client;
private static Schema schema;
private static int fieldCount;
private boolean debug = false;
private boolean printErrors = false;
private String tableName;
private KuduSession session;
private KuduTable table;
private KuduTable kuduTable;

@Override
public void init() throws DBException {
if (getProperties().getProperty(DEBUG_OPT) != null) {
this.debug = getProperties().getProperty(DEBUG_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
this.printErrors =
getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
this.printErrors =
getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table;
initClient(debug, tableName, getProperties());
this.session = client.newSession();
if (getProperties().getProperty(SYNC_OPS_OPT) != null &&
getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
if (getProperties().getProperty(SYNC_OPS_OPT) != null
&& getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
this.session.setMutationBufferSpace(100);
} else {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC);
}

try {
this.table = client.openTable(tableName);
this.kuduTable = client.openTable(tableName);
} catch (Exception e) {
throw new DBException("Could not open a table because of:", e);
}
}

private synchronized static void initClient(boolean debug, String tableName, Properties prop)
throws DBException {
if (client != null) return;
private static synchronized void initClient(boolean debug, String tableName,
Properties prop) throws DBException {
if (client != null) {
return;
}

String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT);
if (masterAddresses == null) {
Expand All @@ -107,8 +124,8 @@ private synchronized static void initClient(boolean debug, String tableName, Pro

int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
if (numTablets > MAX_TABLETS) {
throw new DBException("Specified number of tablets (" + numTablets + ") must be equal " +
"or below " + MAX_TABLETS);
throw new DBException("Specified number of tablets (" + numTablets
+ ") must be equal " + "or below " + MAX_TABLETS);
}

int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3);
Expand All @@ -117,8 +134,7 @@ private synchronized static void initClient(boolean debug, String tableName, Pro

client = new KuduClient.KuduClientBuilder(masterAddresses)
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
.defaultOperationTimeoutMs(DEFAULT_SLEEP)
.build();
.defaultOperationTimeoutMs(DEFAULT_SLEEP).build();
if (debug) {
System.out.println("Connecting to the masters at " + masterAddresses);
}
Expand All @@ -129,17 +145,14 @@ private synchronized static void initClient(boolean debug, String tableName, Pro
List<ColumnSchema> columns = new ArrayList<ColumnSchema>(fieldCount + 1);

ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
.key(true)
.desiredBlockSize(blockSize)
.build();
.key(true).desiredBlockSize(blockSize).build();
columns.add(keyColumn);
columnNames.add(KEY);
COLUMN_NAMES.add(KEY);
for (int i = 0; i < fieldCount; i++) {
String name = "field" + i;
columnNames.add(name);
COLUMN_NAMES.add(name);
columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
.desiredBlockSize(blockSize)
.build());
.desiredBlockSize(blockSize).build());
}
schema = new Schema(columns);

Expand All @@ -164,16 +177,17 @@ private synchronized static void initClient(boolean debug, String tableName, Pro
}
}

private static int getIntFromProp(Properties prop, String propName, int defaultValue)
throws DBException {
private static int getIntFromProp(Properties prop, String propName,
int defaultValue) throws DBException {
String intStr = prop.getProperty(propName);
if (intStr == null) {
return defaultValue;
} else {
try {
return Integer.valueOf(intStr);
} catch (NumberFormatException ex) {
throw new DBException("Provided number for " + propName + " isn't a valid integer");
throw new DBException(
"Provided number for " + propName + " isn't a valid integer");
}
}
}
Expand All @@ -189,23 +203,29 @@ public void cleanup() throws DBException {

@Override
public Status read(String table, String key, Set<String> fields,
HashMap<String,ByteIterator> result) {
Vector<HashMap<String, ByteIterator>> results = new Vector<HashMap<String, ByteIterator>>();
HashMap<String, ByteIterator> result) {
Vector<HashMap<String, ByteIterator>> results =
new Vector<HashMap<String, ByteIterator>>();
final Status status = scan(table, key, 1, fields, results);
if (!status.equals(Status.OK)) return status;
if (results.size() != 1) return Status.NOT_FOUND;
if (!status.equals(Status.OK)) {
return status;
}
if (results.size() != 1) {
return Status.NOT_FOUND;
}
result.putAll(results.firstElement());
return Status.OK;
}

@Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(this.table);
KuduScanner.KuduScannerBuilder scannerBuilder =
client.newScannerBuilder(this.kuduTable);
List<String> querySchema;
if (fields == null) {
querySchema = columnNames;
querySchema = COLUMN_NAMES;
// No need to set the projected columns with the whole schema.
} else {
querySchema = new ArrayList<String>(fields);
Expand All @@ -222,20 +242,22 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f
scannerBuilder.exclusiveUpperBound(upperBound);
}

KuduScanner scanner = scannerBuilder
.limit(recordcount) // currently noop
KuduScanner scanner = scannerBuilder.limit(recordcount) // currently noop
.build();

while (scanner.hasMoreRows()) {
RowResultIterator data = scanner.nextRows();
addAllRowsToResult(data, recordcount, querySchema, result);
if (recordcount == result.size()) break;
if (recordcount == result.size()) {
break;
}
}
RowResultIterator closer = scanner.close();
addAllRowsToResult(closer, recordcount, querySchema, result);
} catch (TimeoutException te) {
if (printErrors) {
System.err.println("Waited too long for a scan operation with start key=" + startkey);
System.err.println(
"Waited too long for a scan operation with start key=" + startkey);
}
return TIMEOUT;
} catch (Exception e) {
Expand All @@ -247,14 +269,18 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f
}

private void addAllRowsToResult(RowResultIterator it, int recordcount,
List<String> querySchema,
Vector<HashMap<String, ByteIterator>> result)
throws Exception {
List<String> querySchema, Vector<HashMap<String, ByteIterator>> result)
throws Exception {
RowResult row;
HashMap<String, ByteIterator> rowResult = new HashMap<String, ByteIterator>(querySchema.size());
if (it == null) return;
HashMap<String, ByteIterator> rowResult =
new HashMap<String, ByteIterator>(querySchema.size());
if (it == null) {
return;
}
while (it.hasNext()) {
if (result.size() == recordcount) return;
if (result.size() == recordcount) {
return;
}
row = it.next();
int colIdx = 0;
for (String col : querySchema) {
Expand All @@ -266,8 +292,9 @@ private void addAllRowsToResult(RowResultIterator it, int recordcount,
}

@Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
Update update = this.table.newUpdate();
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
Update update = this.kuduTable.newUpdate();
PartialRow row = update.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
Expand All @@ -282,20 +309,22 @@ public Status update(String table, String key, HashMap<String, ByteIterator> val
}

@Override
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
Insert insert = this.table.newInsert();
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
Insert insert = this.kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
row.addString(i, new String(values.get(schema.getColumnByIndex(i).getName()).toArray()));
row.addString(i, new String(
values.get(schema.getColumnByIndex(i).getName()).toArray()));
}
apply(insert);
return Status.OK;
}

@Override
public Status delete(String table, String key) {
Delete delete = this.table.newDelete();
Delete delete = this.kuduTable.newDelete();
PartialRow row = delete.getRow();
row.addString(KEY, key);
apply(delete);
Expand Down
22 changes: 22 additions & 0 deletions kudu/src/main/java/com/yahoo/ycsb/db/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

/**
* The YCSB binding for <a href="http://getkudu.io/">Kudu</a>.
*/
package com.yahoo.ycsb.db;

0 comments on commit 4e7295c

Please sign in to comment.