Skip to content

Commit

Permalink
Add in storage implementations for graph nodes
Browse files Browse the repository at this point in the history
Summary:
NodeStore is separate interface to LinkStore for now, so that we don't
have to immediately add new methods to all LinkStore implementations.

Have not yet integrated NodeStore into actual workload.

In-memory and my-sql implementations for node storage

Basic unit tests for node storage

Fixed sql inject bugs, handling of binary data

Fix binary data handling, regression tests included

Test Plan:
ant test

Also test loading and requesting data with mysql

Reviewers: dhruba

Reviewed By: dhruba

CC: vamsi

Differential Revision: https://reviews.facebook.net/D4725
  • Loading branch information
Tim Armstrong committed Aug 18, 2012
1 parent 9259d55 commit a08fb64
Show file tree
Hide file tree
Showing 15 changed files with 815 additions and 77 deletions.
9 changes: 9 additions & 0 deletions README_instructions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ Execute the following commands in the database:
`version` bigint(20) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (`id`,`link_type`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE `nodetable` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`type` int(10) unsigned NOT NULL,
`version` bigint(20) unsigned NOT NULL,
`time` int(10) unsigned NOT NULL,
`data` mediumtext NOT NULL,
primary key(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Configuring LinkBench:
----------------------
Expand Down
2 changes: 2 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
<include name="**/*Test*.class"/>
<exclude name="**/LinkStoreHBaseGeneralAtomicityTesting.class"/>
<exclude name="**/LinkStoreTestBase.class"/>
<exclude name="**/NodeStoreTestBase.class"/>
<exclude name="**/MySqlTestConfig.class"/>
</fileset>
<formatter type="xml"/>
</batchtest>
Expand Down
2 changes: 2 additions & 0 deletions config/LinkConfigMysql.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ port = 3306

tablename = linktable

nodetable = nodetable

# ignore counttable if not using mysql
counttable = counttable

Expand Down
9 changes: 9 additions & 0 deletions src/java/com/facebook/LinkBench/GraphStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.facebook.LinkBench;

/**
* An abstract class for storing both nodes and edges
* @author tarmstrong
*/
public abstract class GraphStore extends LinkStore implements NodeStore {

}
1 change: 0 additions & 1 deletion src/java/com/facebook/LinkBench/LinkBenchDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ void sendrequests() throws IOException, InterruptedException, Throwable {

// create requesters
for (int i = 0; i < nrequesters; i++) {
final int id = i;
LinkStore store = initStore(Phase.REQUEST, i);
LinkBenchRequest l = new LinkBenchRequest(store, props, latencyStats,
progress, new Random(masterRandom.nextLong()), i, nrequesters);
Expand Down
238 changes: 212 additions & 26 deletions src/java/com/facebook/LinkBench/LinkStoreMysql.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.facebook.LinkBench;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
Expand All @@ -14,7 +17,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class LinkStoreMysql extends LinkStore {
public class LinkStoreMysql extends GraphStore {
public static final int MYSQL_DEFAULT_BULKINSERT_SIZE = 1024;

private static final boolean INTERNAL_TESTING = false;
Expand All @@ -23,6 +26,8 @@ public class LinkStoreMysql extends LinkStore {

String assoctable;
String counttable;
String nodetable;

String host;
String user;
String pwd;
Expand Down Expand Up @@ -52,11 +57,21 @@ public void initialize(Properties props, Phase currentPhase,
int threadId) throws IOException, Exception {
counttable = props.getProperty("counttable");
if (counttable == null || counttable.equals("")) {
logger.error("Error! counttable is empty/ not found!"
+ "Please check configuration file.");
System.exit(1);
String msg = "Error! counttable is empty/ not found!"
+ "Please check configuration file.";
logger.error(msg);
throw new RuntimeException(msg);
}


nodetable = props.getProperty("nodetable");
if (nodetable.equals("")) {
// For now, don't assume that nodetable is provided
String msg = "Error! nodetable is empty!"
+ "Please check configuration file.";
logger.error(msg);
throw new RuntimeException(msg);
}

host = props.getProperty("host");
user = props.getProperty("user");
pwd = props.getProperty("password");
Expand Down Expand Up @@ -115,7 +130,8 @@ private void openConnection() throws Exception {
@Override
public void close() {
try {
conn.close();
if (stmt != null) stmt.close();
if (conn != null) conn.close();
} catch (SQLException e) {
logger.error("Error while closing MySQL connection: ", e);
}
Expand Down Expand Up @@ -173,7 +189,7 @@ private void testCount(Statement stmt, String dbid,
public void addLink(String dbid, Link l, boolean noinverse)
throws Exception {

if (Level.DEBUG.isGreaterOrEqual(debuglevel)) {
if (Level.DEBUG.isGreaterOrEqual(debuglevel)) {
logger.debug("addLink " + l.id1 +
"." + l.id2 +
"." + l.link_type);
Expand Down Expand Up @@ -253,12 +269,13 @@ public void addLink(String dbid, Link l, boolean noinverse)

if (update_data) {
// query to update link data (the first query only updates visibility)
// TODO: why is this necessary?
String updatedata = "UPDATE " + dbid + "." + assoctable +
" SET id1_type = " + l.id1_type +
", id2_type = " + l.id2_type +
", visibility = " + LinkStore.VISIBILITY_DEFAULT +
", data = '" + new String(l.data) +
"', time = " + l.time +
", data = " + stringLiteral(l.data)+
", time = " + l.time +
", version = " + l.version +
" WHERE id1 = " + l.id1 +
" AND id2 = " + l.id2 +
Expand Down Expand Up @@ -287,42 +304,41 @@ public void addLink(String dbid, Link l, boolean noinverse)
* @throws SQLException
*/
private int addLinksNoCount(String dbid, List<Link> links)
throws SQLException {
throws SQLException {
if (links.size() == 0)
return 0;

// query to insert a link;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO " + dbid + "." + assoctable +
"(id1, id1_type, id2, id2_type, link_type, " +
"visibility, data, time, version) VALUES ");
boolean first = true;
for (Link l: links ) {
for (Link l : links) {
if (first) {
first = false;
} else {
sb.append(',');
}
sb.append("(" + l.id1 +
", " + l.id1_type +
", " + l.id2 +
", " + l.id2_type +
", " + l.link_type +
", " + l.visibility +
", '" + new String(l.data) +
"', " + l.time +
", " + l.version + ")");
}
sb.append(" ON DUPLICATE KEY UPDATE visibility = " +
"VALUES(visibility)");
sb.append("(" + l.id1 +
", " + l.id1_type +
", " + l.id2 +
", " + l.id2_type +
", " + l.link_type +
", " + l.visibility +
", " + stringLiteral(l.data) +
", " + l.time + ", " +
l.version + ")");
}
sb.append(" ON DUPLICATE KEY UPDATE visibility = VALUES(visibility)");
String insert = sb.toString();
if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
logger.trace(insert);
}

int nrows = stmt.executeUpdate(insert);
return nrows;
}
}

public void deleteLink(String dbid, long id1, long link_type, long id2,
boolean noinverse, boolean expunge)
Expand Down Expand Up @@ -622,4 +638,174 @@ public void addBulkCounts(String dbid, List<LinkCount> counts)
stmt.executeUpdate(sql);
conn.commit();
}

private void checkNodeTableConfigured() throws Exception {
if (this.nodetable == null) {
throw new Exception("Nodetable not specified: cannot perform node" +
" operation");
}
}

@Override
public void resetNodeStore(String dbid, long startID) throws Exception {
checkNodeTableConfigured();
// Truncate table deletes all data and allows us to reset autoincrement
stmt.execute(String.format("TRUNCATE TABLE `%s`.`%s`;",
dbid, nodetable));
stmt.execute(String.format("ALTER TABLE `%s`.`%s` " +
"AUTO_INCREMENT = %d;", dbid, nodetable, startID));
}

@Override
public long addNode(String dbid, Node node) throws Exception {
checkNodeTableConfigured();
String sql = "INSERT INTO `" + dbid + "`.`" + nodetable + "` " +
"(type, version, time, data) " +
"VALUES (" + node.type + "," + node.version +
"," + node.time + "," + stringLiteral(node.data) + ");";
if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
logger.trace(sql);
}
stmt.executeUpdate(sql, Statement.RETURN_GENERATED_KEYS);
ResultSet rs = stmt.getGeneratedKeys();
conn.commit();

// Find the generated id
if (!rs.next()) {
throw new Exception("Generated key not returned");
}
long newID = rs.getLong(1);
assert(rs.next()); // check done
rs.close();

node.id = newID;
return newID;
}

@Override
public Node getNode(String dbid, int type, long id) throws Exception {
checkNodeTableConfigured();
ResultSet rs = stmt.executeQuery(
"SELECT id, type, version, time, data " +
"FROM `" + dbid + "`.`" + nodetable + "` " +
"WHERE id=" + id + ";");
if (rs.next()) {
Node res = new Node(rs.getLong(1), rs.getInt(2),
rs.getLong(3), rs.getInt(4), rs.getBytes(5));

// Check that multiple rows weren't returned
assert(rs.next() == false);
rs.close();
if (res.type != type) {
return null;
} else {
return res;
}
}
return null;
}

@Override
public boolean updateNode(String dbid, Node node) throws Exception {
checkNodeTableConfigured();
String sql = "UPDATE `" + dbid + "`.`" + nodetable + "`" +
" SET " + "version=" + node.version + ", time=" + node.time
+ ", data=" + stringLiteral(node.data) +
" WHERE id=" + node.id + " AND type=" + node.type;

if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
logger.trace(sql);
}

int rows = stmt.executeUpdate(sql);
if (rows == 1) return true;
else if (rows == 0) return false;
else throw new Exception("Did not expect " + rows + "affected rows: only "
+ "expected update to affect at most one row");
}

@Override
public boolean deleteNode(String dbid, int type, long id) throws Exception {
checkNodeTableConfigured();
int rows = stmt.executeUpdate(
"DELETE FROM `" + dbid + "`.`" + nodetable + "` " +
"WHERE id=" + id + " and type =" + type + ";");
if (rows == 0) {
return false;
} else if (rows == 1) {
return true;
} else {
throw new Exception(rows + " rows modified on delete: should delete " +
"at most one");
}
}

/**
* Convert a byte array into a valid mysql string literal, assuming that
* it will be inserted into a column with latin-1 encoding.
* Based on information at
* http://dev.mysql.com/doc/refman/5.1/en/string-literals.html
* @param arr
* @return
*/
private static String stringLiteral(byte arr[]) {
CharBuffer cb = Charset.forName("ISO-8859-1").decode(ByteBuffer.wrap(arr));
StringBuilder sb = new StringBuilder();
sb.append('\'');
for (int i = 0; i < cb.length(); i++) {
char c = cb.get(i);
switch (c) {
case '\'':
sb.append("\\'");
break;
case '\\':
sb.append("\\\\");
break;
case '\0':
sb.append("\\0");
break;
case '\b':
sb.append("\\b");
break;
case '\n':
sb.append("\\n");
break;
case '\r':
sb.append("\\r");
break;
case '\t':
sb.append("\\t");
break;
default:
if (Character.getNumericValue(c) < 0) {
// Fall back on hex string for values not defined in latin-1
return hexStringLiteral(arr);
} else {
sb.append(c);
}
}
}
sb.append('\'');
return sb.toString();
}

/**
* Create a mysql hex string literal from array:
* E.g. [0xf, bc, 4c, 4] converts to x'0fbc4c03'
* @param arr
* @return the mysql hex literal including quotes
*/
private static String hexStringLiteral(byte[] arr) {
StringBuilder sb = new StringBuilder();
sb.append("x'");
for (int i = 0; i < arr.length; i++) {
byte b = arr[i];
int lo = b & 0xf;
int hi = (b >> 4) & 0xf;
sb.append(Character.forDigit(hi, 16));
sb.append(Character.forDigit(lo, 16));
}
sb.append("'");
return sb.toString();
}
}
Loading

0 comments on commit a08fb64

Please sign in to comment.