Skip to content

Commit

Permalink
Added code for multinode support for cassandra
Browse files Browse the repository at this point in the history
And Fixed transient issue of hibernate and
half fixes for client factory and metadata changes
  • Loading branch information
kkmishra committed Jun 7, 2013
1 parent 4888a6f commit d0c343e
Show file tree
Hide file tree
Showing 89 changed files with 1,482 additions and 1,211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ else if (getCqlVersion().equalsIgnoreCase(CassandraConstants.CQL_VERSION_3_0)
return getCqlVersion().equalsIgnoreCase(CassandraConstants.CQL_VERSION_3_0);
}


/**
* Returns true in case of, composite Id and if cql3 opted and not a
* embedded entity.
Expand All @@ -657,9 +656,9 @@ else if (getCqlVersion().equalsIgnoreCase(CassandraConstants.CQL_VERSION_3_0)
*/
public boolean isCql3Enabled()
{
return isCql3Enabled(null);
return isCql3Enabled(null);
}

/**
* Find.
*
Expand Down Expand Up @@ -1410,7 +1409,10 @@ public int executeBatch()
finally
{
clear();
releaseConnection(pooledConnection);
if (pooledConnection != null)
{
releaseConnection(pooledConnection);
}
}

return recordsExecuted;
Expand Down Expand Up @@ -1683,6 +1685,7 @@ protected CqlResult executeCQLQuery(String cqlQuery) throws InvalidRequestExcept
return conn.execute_cql3_query(ByteBufferUtil.bytes(cqlQuery),
org.apache.cassandra.thrift.Compression.NONE, consistencyLevel);
}
// conn.set_cql_version(getCqlVersion());
return conn.execute_cql_query(ByteBufferUtil.bytes(cqlQuery), org.apache.cassandra.thrift.Compression.NONE);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,25 @@ public Table getColumnFamily(String schemaName, String cfName)
public Properties getConnectionProperties()
{
DataStore ds = getDataStore();
Properties properties = new Properties();
if (ds != null && ds.getConnection() != null)
{
return ds.getConnection().getProperties();
properties = ds.getConnection().getProperties();
return properties;
}
return new Properties();
return properties;
}

public List<Server> getConnectionServers()
{
DataStore ds = getDataStore();
List<Server> servers = new ArrayList<Server>();
if (ds != null && ds.getConnection() != null)
{
return ds.getConnection().getServers();
servers = ds.getConnection().getServers();
return servers;
}
return new ArrayList<Server>();
return servers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public interface CassandraDataHandler
* the exception
*/
List<Object> fromThriftRow(Class<?> clazz, EntityMetadata m, List<String> relationNames, boolean isWrapReq,
ConsistencyLevel consistencyLevel, boolean isCql3Enabled, Object... rowIds) throws Exception;
ConsistencyLevel consistencyLevel, boolean isCql3Enabled,Object conn, Object... rowIds) throws Exception;

/**
* From thrift row.
Expand All @@ -91,7 +91,7 @@ List<Object> fromThriftRow(Class<?> clazz, EntityMetadata m, List<String> relati
* the exception
*/
Object fromThriftRow(Class<?> clazz, EntityMetadata m, Object rowKey, List<String> relationNames,
boolean isWrapReq, ConsistencyLevel consistencyLevel, boolean isCql3Enabled) throws Exception;
boolean isWrapReq, ConsistencyLevel consistencyLevel, boolean isCql3Enabled, Object conn) throws Exception;

/**
* Populate entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import com.impetus.kundera.property.PropertyAccessor;
import com.impetus.kundera.property.PropertyAccessorFactory;
import com.impetus.kundera.property.PropertyAccessorHelper;
import com.impetus.kundera.property.accessor.BigDecimalAccessor;
import com.impetus.kundera.property.accessor.DoubleAccessor;
import com.impetus.kundera.property.accessor.IntegerAccessor;
import com.impetus.kundera.property.accessor.LongAccessor;
Expand Down Expand Up @@ -204,14 +203,14 @@ public <E> E fromThriftRow(Class<E> clazz, EntityMetadata m, DataRow<SuperColumn
* the exception
*/
public List<Object> fromThriftRow(Class<?> clazz, EntityMetadata m, List<String> relationNames, boolean isWrapReq,
ConsistencyLevel consistencyLevel, boolean isCql3Enabled, Object... rowIds) throws Exception
ConsistencyLevel consistencyLevel, boolean isCql3Enabled,Object conn, Object... rowIds) throws Exception
{
List<Object> entities = new ArrayList<Object>();
if (rowIds != null)
{
for (Object rowKey : rowIds)
{
Object e = fromThriftRow(clazz, m, rowKey, relationNames, isWrapReq, consistencyLevel, isCql3Enabled);
Object e = fromThriftRow(clazz, m, rowKey, relationNames, isWrapReq, consistencyLevel, isCql3Enabled, conn);
if (e != null)
{
entities.add(e);
Expand Down Expand Up @@ -241,7 +240,7 @@ public List<Object> fromThriftRow(Class<?> clazz, EntityMetadata m, List<String>
* the exception
*/
public abstract Object fromThriftRow(Class<?> clazz, EntityMetadata m, Object rowKey, List<String> relationNames,
boolean isWrapReq, ConsistencyLevel consistencyLevel, boolean isCqlEnabled) throws Exception;
boolean isWrapReq, ConsistencyLevel consistencyLevel, boolean isCqlEnabled, Object conn) throws Exception;

/**
* Populate embedded object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface InvertedIndexHandler
* @param cdHandler
*/
void write(Node node, EntityMetadata entityMetadata, String persistenceUnit, ConsistencyLevel consistencyLevel,
CassandraDataHandler cdHandler);
CassandraDataHandler cdHandler, Object conn);

/**
* Searches records from Inverted index table.
Expand All @@ -56,7 +56,7 @@ void write(Node node, EntityMetadata entityMetadata, String persistenceUnit, Con
* @return
*/
List<SearchResult> search(EntityMetadata m, String persistenceUnit, ConsistencyLevel consistencyLevel,
Map<Boolean, List<IndexClause>> indexClauseMap);
Map<Boolean, List<IndexClause>> indexClauseMap, Object conn);

/**
* Deletes a record from inverted index table.
Expand All @@ -65,6 +65,6 @@ List<SearchResult> search(EntityMetadata m, String persistenceUnit, ConsistencyL
* @param metadata
* @param consistencyLevel
*/
void delete(Object entity, EntityMetadata metadata, ConsistencyLevel consistencyLevel);
void delete(Object entity, EntityMetadata metadata, ConsistencyLevel consistencyLevel, Object conn);

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class InvertedIndexHandlerBase
private static Log log = LogFactory.getLog(InvertedIndexHandlerBase.class);

public List<SearchResult> search(EntityMetadata m, String persistenceUnit, ConsistencyLevel consistencyLevel,
Map<Boolean, List<IndexClause>> indexClauseMap)
Map<Boolean, List<IndexClause>> indexClauseMap, Object conn)
{
String columnFamilyName = m.getTableName() + Constants.INDEX_TABLE_SUFFIX;

Expand All @@ -69,7 +69,7 @@ public List<SearchResult> search(EntityMetadata m, String persistenceUnit, Consi
for (IndexExpression expression : o.getExpressions())
{
searchAndAddToResults(m, persistenceUnit, consistencyLevel, columnFamilyName, searchResults,
expression, isRowKeyQuery);
expression, isRowKeyQuery, conn);
}

}
Expand All @@ -81,7 +81,8 @@ public List<SearchResult> search(EntityMetadata m, String persistenceUnit, Consi
* search result to <code>searchResults</code>
*/
private void searchAndAddToResults(EntityMetadata m, String persistenceUnit, ConsistencyLevel consistencyLevel,
String columnFamilyName, List<SearchResult> searchResults, IndexExpression expression, boolean isRowKeyQuery)
String columnFamilyName, List<SearchResult> searchResults, IndexExpression expression,
boolean isRowKeyQuery, Object conn)
{
SearchResult searchResult = new SearchResult();

Expand Down Expand Up @@ -130,7 +131,7 @@ private void searchAndAddToResults(EntityMetadata m, String persistenceUnit, Con
// EQUAL Operator
case EQ:
SuperColumn thriftSuperColumn = getSuperColumnForRow(consistencyLevel, columnFamilyName, rowKey,
superColumnName, persistenceUnit);
superColumnName, persistenceUnit, conn);

if (thriftSuperColumn != null)
thriftSuperColumns.add(thriftSuperColumn);
Expand All @@ -146,22 +147,22 @@ private void searchAndAddToResults(EntityMetadata m, String persistenceUnit, Con
// Greater than operator
case GT:
searchSuperColumnsInRange(columnFamilyName, consistencyLevel, persistenceUnit, rowKey, superColumnName,
thriftSuperColumns, superColumnName, new byte[0]);
thriftSuperColumns, superColumnName, new byte[0], conn);
break;
// Less than Operator
case LT:
searchSuperColumnsInRange(columnFamilyName, consistencyLevel, persistenceUnit, rowKey, superColumnName,
thriftSuperColumns, new byte[0], superColumnName);
thriftSuperColumns, new byte[0], superColumnName, conn);
break;
// Greater than-equals to operator
case GTE:
searchSuperColumnsInRange(columnFamilyName, consistencyLevel, persistenceUnit, rowKey, superColumnName,
thriftSuperColumns, superColumnName, new byte[0]);
thriftSuperColumns, superColumnName, new byte[0], conn);
break;
// Less than equal to operator
case LTE:
searchSuperColumnsInRange(columnFamilyName, consistencyLevel, persistenceUnit, rowKey, superColumnName,
thriftSuperColumns, new byte[0], superColumnName);
thriftSuperColumns, new byte[0], superColumnName, conn);
break;

default:
Expand Down Expand Up @@ -212,7 +213,7 @@ private void searchAndAddToResults(EntityMetadata m, String persistenceUnit, Con
}
}

public void delete(Object entity, EntityMetadata metadata, ConsistencyLevel consistencyLevel)
public void delete(Object entity, EntityMetadata metadata, ConsistencyLevel consistencyLevel, Object conn)
{
MetamodelImpl metaModel = (MetamodelImpl) KunderaMetadata.INSTANCE.getApplicationMetadata().getMetamodel(
metadata.getPersistenceUnit());
Expand Down Expand Up @@ -254,7 +255,7 @@ public void delete(Object entity, EntityMetadata metadata, ConsistencyLevel cons
if (superColumnName != null)
{
deleteColumn(indexColumnFamily, rowKey, superColumnName,
metadata.getPersistenceUnit(), consistencyLevel, columnName);
metadata.getPersistenceUnit(), consistencyLevel, columnName, conn);
}
}
}
Expand All @@ -273,7 +274,7 @@ public void delete(Object entity, EntityMetadata metadata, ConsistencyLevel cons
if (superColumnName != null)
{
deleteColumn(indexColumnFamily, rowKey, superColumnName, metadata.getPersistenceUnit(),
consistencyLevel, columnName);
consistencyLevel, columnName, conn);
}

}
Expand All @@ -291,7 +292,7 @@ public void delete(Object entity, EntityMetadata metadata, ConsistencyLevel cons
* TODO
*/
protected abstract void deleteColumn(String indexColumnFamily, String rowKey, byte[] superColumnName,
String persistenceUnit, ConsistencyLevel consistencyLevel, byte[] columnName);
String persistenceUnit, ConsistencyLevel consistencyLevel, byte[] columnName, Object conn);

/**
* @param consistencyLevel
Expand All @@ -301,10 +302,10 @@ protected abstract void deleteColumn(String indexColumnFamily, String rowKey, by
* @return
*/
protected abstract SuperColumn getSuperColumnForRow(ConsistencyLevel consistencyLevel, String columnFamilyName,
String rowKey, byte[] superColumnName, String persistenceUnit);
String rowKey, byte[] superColumnName, String persistenceUnit, Object conn);

protected abstract void searchSuperColumnsInRange(String columnFamilyName, ConsistencyLevel consistencyLevel,
String persistenceUnit, String rowKey, byte[] searchSuperColumnName, List<SuperColumn> thriftSuperColumns,
byte[] start, byte[] finish);
byte[] start, byte[] finish, Object conn);

}
Loading

0 comments on commit d0c343e

Please sign in to comment.