Skip to content

Commit

Permalink
ATLAS-4008: Cache getGuid and getStatus in GraphTransactionInterceptor
Browse files Browse the repository at this point in the history
Signed-off-by: Nikhil Bonte <[email protected]>
  • Loading branch information
nikhilbonte committed Oct 30, 2020
1 parent d4b15c7 commit 7c68048
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
Expand Down Expand Up @@ -54,6 +55,30 @@ public class GraphTransactionInterceptor implements MethodInterceptor {

private final AtlasGraph graph;

private static final ThreadLocal<Map<Object, String>> vertexGuidCache =
new ThreadLocal<Map<Object, String>>() {
@Override
public Map<Object, String> initialValue() {
return new HashMap<Object, String>();
}
};

private static final ThreadLocal<Map<Object, AtlasEntity.Status>> vertexStateCache =
new ThreadLocal<Map<Object, AtlasEntity.Status>>() {
@Override
public Map<Object, AtlasEntity.Status> initialValue() {
return new HashMap<Object, AtlasEntity.Status>();
}
};

private static final ThreadLocal<Map<Object, AtlasEntity.Status>> edgeStateCache =
new ThreadLocal<Map<Object, AtlasEntity.Status>>() {
@Override
public Map<Object, AtlasEntity.Status> initialValue() {
return new HashMap<Object, AtlasEntity.Status>();
}
};

@Inject
public GraphTransactionInterceptor(AtlasGraph graph) {
this.graph = graph;
Expand Down Expand Up @@ -117,7 +142,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
// Reset the boolean flags
isTxnOpen.set(Boolean.FALSE);
innerFailure.set(Boolean.FALSE);
guidVertexCache.get().clear();
clearCache();

List<PostTransactionHook> trxHooks = postTransactionHooks.get();

Expand Down Expand Up @@ -201,6 +226,9 @@ public static AtlasVertex getVertexFromCache(String guid) {

public static void clearCache() {
guidVertexCache.get().clear();
vertexGuidCache.get().clear();
vertexStateCache.get().clear();
edgeStateCache.get().clear();
}

boolean logException(Throwable t) {
Expand All @@ -214,6 +242,72 @@ boolean logException(Throwable t) {
}
}

public static void addToVertexGuidCache(Object vertexId, String guid) {

if (guid == null) {
removeFromVertexGuidCache(vertexId);
} else {
Map<Object, String> cache = vertexGuidCache.get();
cache.put(vertexId, guid);
}
}

public static void removeFromVertexGuidCache(Object vertexId) {
Map<Object, String> cache = vertexGuidCache.get();

cache.remove(vertexId);
}

public static String getVertexGuidFromCache(Object vertexId) {
Map<Object, String> cache = vertexGuidCache.get();

return cache.get(vertexId);
}

public static void addToVertexStateCache(Object vertexId, AtlasEntity.Status status) {

if (status == null) {
removeFromVertexStateCache(vertexId);
} else {
Map<Object, AtlasEntity.Status> cache = vertexStateCache.get();
cache.put(vertexId, status);
}
}

public static void removeFromVertexStateCache(Object vertexId) {
Map<Object, AtlasEntity.Status> cache = vertexStateCache.get();

cache.remove(vertexId);
}

public static AtlasEntity.Status getVertexStateFromCache(Object vertexId) {
Map<Object, AtlasEntity.Status> cache = vertexStateCache.get();

return cache.get(vertexId);
}

public static void addToEdgeStateCache(Object edgeId, AtlasEntity.Status status) {

if (status == null) {
removeFromEdgeStateCache(edgeId);
} else {
Map<Object, AtlasEntity.Status> cache = edgeStateCache.get();
cache.put(edgeId, status);
}
}

public static void removeFromEdgeStateCache(Object edgeId) {
Map<Object, AtlasEntity.Status> cache = edgeStateCache.get();

cache.remove(edgeId);
}

public static AtlasEntity.Status getEdgeStateFromCache(Object edgeId) {
Map<Object, AtlasEntity.Status> cache = edgeStateCache.get();

return cache.get(edgeId);
}

public static abstract class PostTransactionHook {
protected PostTransactionHook() {
List<PostTransactionHook> trxHooks = postTransactionHooks.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
Expand Down Expand Up @@ -815,8 +816,17 @@ public static String getRelationshipGuid(AtlasElement element) {
return element.getProperty(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
}

public static String getGuid(AtlasElement element) {
return element.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class);
public static String getGuid(AtlasVertex vertex) {
Object vertexId = vertex.getId();
String ret = GraphTransactionInterceptor.getVertexGuidFromCache(vertexId);

if (ret == null) {
ret = vertex.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class);

GraphTransactionInterceptor.addToVertexGuidCache(vertexId, ret);
}

return ret;
}

public static String getHomeId(AtlasElement element) {
Expand Down Expand Up @@ -870,10 +880,33 @@ public static String getStateAsString(AtlasElement element) {
return element.getProperty(STATE_PROPERTY_KEY, String.class);
}

public static Status getStatus(AtlasElement element) {
return (getState(element) == Id.EntityState.DELETED) ? Status.DELETED : Status.ACTIVE;
public static Status getStatus(AtlasVertex vertex) {
Object vertexId = vertex.getId();
Status ret = GraphTransactionInterceptor.getVertexStateFromCache(vertexId);

if (ret == null) {
ret = (getState(vertex) == Id.EntityState.DELETED) ? Status.DELETED : Status.ACTIVE;

GraphTransactionInterceptor.addToVertexStateCache(vertexId, ret);
}

return ret;
}

public static Status getStatus(AtlasEdge edge) {
Object edgeId = edge.getId();
Status ret = GraphTransactionInterceptor.getEdgeStateFromCache(edgeId);

if (ret == null) {
ret = (getState(edge) == Id.EntityState.DELETED) ? Status.DELETED : Status.ACTIVE;

GraphTransactionInterceptor.addToEdgeStateCache(edgeId, ret);
}

return ret;
}


public static AtlasRelationship.Status getEdgeStatus(AtlasElement element) {
return (getState(element) == Id.EntityState.DELETED) ? AtlasRelationship.Status.DELETED : AtlasRelationship.Status.ACTIVE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,17 +961,33 @@ protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws At
deleteRelationship(edge);
} else {
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
AtlasAttribute attribute = getAttributeForEdge(edge.getLabel());

deleteEdgeBetweenVertices(outVertex, inVertex, attribute);
if (!isDeletedEntity(outVertex)) {
AtlasVertex inVertex = edge.getInVertex();
AtlasAttribute attribute = getAttributeForEdge(edge.getLabel());

deleteEdgeBetweenVertices(outVertex, inVertex, attribute);
}
}
}
}

_deleteVertex(instanceVertex, force);
}

private boolean isDeletedEntity(AtlasVertex entityVertex) {
boolean ret = false;
String outGuid = GraphHelper.getGuid(entityVertex);
AtlasEntity.Status outState = GraphHelper.getStatus(entityVertex);

//If the reference vertex is marked for deletion, skip updating the reference
if (outState == AtlasEntity.Status.DELETED || (outGuid != null && RequestContext.get().isDeletedEntity(outGuid))) {
ret = true;
}

return ret;
}

public void deleteClassificationVertex(AtlasVertex classificationVertex, boolean force) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting classification vertex", string(classificationVertex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.store.graph.v2;

import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.model.instance.AtlasEntity;
Expand Down Expand Up @@ -645,6 +646,7 @@ public void testDeleteEntityRemoveReferences() throws Exception {

AtlasEntityHeader entityDeleted = response.getFirstDeletedEntityByTypeName(ENTITY_TYPE_WITH_COMPLEX_COLLECTION_ATTR);

GraphTransactionInterceptor.clearCache();
AtlasEntityWithExtInfo deletedEntityWithExtInfo = entityStore.getById(entityDeleted.getGuid());
AtlasVertex deletedEntityVertex = AtlasGraphUtilsV2.findByGuid(entityDeleted.getGuid());
Iterator<AtlasEdge> edges = deletedEntityVertex.getEdges(AtlasEdgeDirection.OUT).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
Expand Down Expand Up @@ -859,6 +860,7 @@ public void testSetObjectIdAttrToNull() throws Exception {

entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
entityStore.createOrUpdate(new AtlasEntityStream(db2Entity), false);
GraphTransactionInterceptor.clearCache();

final AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);

Expand All @@ -873,6 +875,8 @@ public void testSetObjectIdAttrToNull() throws Exception {
createdTblEntity.setAttribute("databaseComposite", null);

final EntityMutationResponse tblUpdateResponse = entityStore.createOrUpdate(new AtlasEntityStream(createdTblEntity), true);
GraphTransactionInterceptor.clearCache();

final AtlasEntityHeader updatedTblHeader = tblUpdateResponse.getFirstEntityPartialUpdated();
final AtlasEntity updatedTblEntity = getEntityFromStore(updatedTblHeader);
final AtlasEntity deletedDb2Entity = getEntityFromStore(db2Entity.getGuid());
Expand Down

0 comments on commit 7c68048

Please sign in to comment.