Skip to content

Commit

Permalink
ATLAS-3477: Add entity purge API in Admin Resource
Browse files Browse the repository at this point in the history
  • Loading branch information
sidharthkmishra authored and sarathsubramanian committed Nov 14, 2019
1 parent 714423c commit 09118ca
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public enum AtlasPrivilege {

RELATIONSHIP_ADD("add-relationship"),
RELATIONSHIP_UPDATE("update-relationship"),
RELATIONSHIP_REMOVE("remove-relationship");
RELATIONSHIP_REMOVE("remove-relationship"),
ADMIN_PURGE("admin-purge");

private final String type;

Expand Down
11 changes: 11 additions & 0 deletions client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class AtlasClientV2 extends AtlasBaseClient {
// Type APIs
Expand All @@ -63,6 +64,11 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String GET_BY_NAME_TEMPLATE = TYPES_API + "%s/name/%s";
private static final String GET_BY_GUID_TEMPLATE = TYPES_API + "%s/guid/%s";
private static final String ENTITY_BULK_API = ENTITY_API + "bulk/";

//Admin Entity Purge
private static final String ADMIN_API = BASE_URI + "admin/";
private static final String ENTITY_PURGE_API = ADMIN_API + "purge/";

// Lineage APIs
private static final String LINEAGE_URI = BASE_URI + "v2/lineage/";

Expand Down Expand Up @@ -362,6 +368,10 @@ public EntityMutationResponse deleteEntitiesByGuids(List<String> guids) throws A
return callAPI(API_V2.DELETE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, "guid", guids);
}

public EntityMutationResponse purgeEntitiesByGuids(Set<String> guids) throws AtlasServiceException {
return callAPI(API_V2.PURGE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, guids);
}

public AtlasClassifications getClassifications(String guid) throws AtlasServiceException {
return callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, null);
}
Expand Down Expand Up @@ -555,6 +565,7 @@ public static class API_V2 extends API {
public static final API_V2 CREATE_ENTITIES = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 UPDATE_ENTITIES = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 DELETE_ENTITIES_BY_GUIDS = new API_V2(ENTITY_BULK_API, HttpMethod.DELETE, Response.Status.OK);
public static final API_V2 PURGE_ENTITIES_BY_GUIDS = new API_V2(ENTITY_PURGE_API, HttpMethod.DELETE, Response.Status.OK);
public static final API_V2 GET_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.GET, Response.Status.OK);
public static final API_V2 ADD_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.POST, Response.Status.NO_CONTENT);
public static final API_V2 UPDATE_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.PUT, Response.Status.NO_CONTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public interface EntityChangeListenerV2 {
*/
void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;


/**
* This is upon purging entities from the repository.
*
* @param entities the purged entities
*/
void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException;

/**
* This is upon adding new classifications to an entity.
*
Expand Down Expand Up @@ -123,6 +131,13 @@ public interface EntityChangeListenerV2 {
*/
void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException;

/**
* This is upon purging relationships from the repository.
*
* @param relationships the purged relationships
*/
void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException;

/**
* This is upon add new labels to an entity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public enum EntityAuditActionV2 {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE;
TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;

public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) {
Expand All @@ -60,6 +60,8 @@ public static EntityAuditActionV2 fromString(String strValue) {
return ENTITY_UPDATE;
case "ENTITY_DELETE":
return ENTITY_DELETE;
case "ENTITY_PURGE":
return ENTITY_PURGE;
case "ENTITY_IMPORT_CREATE":
return ENTITY_IMPORT_CREATE;
case "ENTITY_IMPORT_UPDATE":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public List<AtlasEntityHeader> getDeletedEntities() {
return null;
}

@JsonIgnore
public List<AtlasEntityHeader> getPurgedEntities() {
if ( mutatedEntities != null) {
return mutatedEntities.get(EntityOperation.PURGE);
}
return null;
}

@JsonIgnore
public AtlasEntityHeader getFirstEntityCreated() {
final List<AtlasEntityHeader> entitiesByOperation = getEntitiesByOperation(EntityOperation.CREATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public enum EntityOperation {
CREATE,
UPDATE,
PARTIAL_UPDATE,
DELETE
DELETE,
PURGE
}

public static final class EntityMutation implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_PURGE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
Expand Down Expand Up @@ -132,6 +133,23 @@ public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) thro
RequestContext.get().endMetricRecord(metric);
}

@Override
public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");

List<EntityAuditEventV2> events = new ArrayList<>();

for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE, "Purged entity");

events.add(event);
}

auditRepository.putEventsV2(events);

RequestContext.get().endMetricRecord(metric);
}

@Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
Expand Down Expand Up @@ -470,6 +488,9 @@ private String getV2AuditPrefix(EntityAuditActionV2 action) {
case ENTITY_DELETE:
ret = "Deleted: ";
break;
case ENTITY_PURGE:
ret = "Purged: ";
break;
case CLASSIFICATION_ADD:
ret = "Added classification: ";
break;
Expand Down Expand Up @@ -521,4 +542,11 @@ public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolea
LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
}
}

@Override
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,17 @@ EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<
*/

String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException;

/*
* Return list of deleted entity guids
*/
EntityMutationResponse deleteByIds(List<String> guid) throws AtlasBaseException;

/*
* Return list of purged entity guids
*/
EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException;

/**
* Add classification(s)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,16 @@ public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws Atla
String guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);
AtlasEntity.Status state = getState(instanceVertex);

if (state == DELETED || requestContext.isDeletedEntity(guid)) {
boolean needToSkip = requestContext.isPurgeRequested() ? (state == ACTIVE || requestContext.isPurgedEntity(guid)) :
(state == DELETED || requestContext.isDeletedEntity(guid));

if (needToSkip) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping deletion of {} as it is already deleted", guid);
if(RequestContext.get().isPurgeRequested()) {
LOG.debug("Skipping purging of {} as it is active or already purged", guid);
} else {
LOG.debug("Skipping deletion of {} as it is already deleted", guid);
}
}

continue;
Expand Down Expand Up @@ -149,10 +156,15 @@ public void deleteRelationship(AtlasEdge edge) throws AtlasBaseException {
public void deleteRelationships(Collection<AtlasEdge> edges, final boolean forceDelete) throws AtlasBaseException {
for (AtlasEdge edge : edges) {
boolean isInternal = isInternalType(edge.getInVertex()) && isInternalType(edge.getOutVertex());
boolean needToSkip = !isInternal && (RequestContext.get().isPurgeRequested() ? getState(edge) == ACTIVE : getState(edge) == DELETED);

This comment has been minimized.

Copy link
@bolkedebruin

bolkedebruin Nov 29, 2019

Contributor

@sidharthkmishra This is creating a NullPointerException for us

Screenshot 2019-11-29 at 19 06 47


if (!isInternal && getState(edge) == DELETED) {
if (needToSkip) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
if(RequestContext.get().isPurgeRequested()) {
LOG.debug("Skipping purging of {} as it is active or already purged", getIdFromEdge(edge));
} else{
LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
}
}

continue;
Expand Down Expand Up @@ -183,8 +195,10 @@ public Collection<GraphHelper.VertexInfo> getOwnedVertices(AtlasVertex entityVer
AtlasVertex vertex = vertices.pop();
AtlasEntity.Status state = getState(vertex);

if (state == DELETED) {
//If the reference vertex is marked for deletion, skip it
//In case of purge If the reference vertex is active then skip it or else
//If the vertex marked for deletion, skip it
boolean needToSkip = RequestContext.get().isPurgeRequested() ? (state == ACTIVE) : (state == DELETED);
if (needToSkip) {
continue;
}

Expand Down Expand Up @@ -221,7 +235,9 @@ public Collection<GraphHelper.VertexInfo> getOwnedVertices(AtlasVertex entityVer
} else {
AtlasEdge edge = graphHelper.getEdgeForLabel(vertex, edgeLabel);

if (edge == null || getState(edge) == DELETED) {
needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
getState(edge) == ACTIVE : getState(edge) == DELETED);
if (needToSkip) {
continue;
}

Expand Down Expand Up @@ -274,7 +290,9 @@ public Collection<GraphHelper.VertexInfo> getOwnedVertices(AtlasVertex entityVer

if (CollectionUtils.isNotEmpty(edges)) {
for (AtlasEdge edge : edges) {
if (edge == null || getState(edge) == DELETED) {
needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
getState(edge) == ACTIVE : getState(edge) == DELETED);
if (needToSkip) {
continue;
}

Expand Down Expand Up @@ -838,8 +856,10 @@ protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVe
final String outId = GraphHelper.getGuid(outVertex);
final Status state = getState(outVertex);

if (state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId))) {
//If the reference vertex is marked for deletion, skip updating the reference
boolean needToSkip = RequestContext.get().isPurgeRequested() ? state == ACTIVE || (outId != null && RequestContext.get().isPurgedEntity(outId)) :
state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId));

if (needToSkip) {
return;
}

Expand Down Expand Up @@ -954,7 +974,8 @@ protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws At
for (AtlasEdge edge : incomingEdges) {
Status edgeState = getState(edge);

if (edgeState == ACTIVE) {
boolean isProceed = RequestContext.get().isPurgeRequested()? edgeState == DELETED : edgeState == ACTIVE;
if (isProceed) {
if (isRelationshipEdge(edge)) {
deleteRelationship(edge);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boo
List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities();
List<AtlasEntityHeader> purgedEntities = entityMutationResponse.getPurgedEntities();

// complete full text mapping before calling toReferenceables(), from notifyListners(), to
// include all vertex updates in the current graph-transaction
Expand All @@ -113,6 +114,7 @@ public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boo
notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport);
notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
notifyListeners(purgedEntities, EntityOperation.PURGE, isImport);

notifyPropagatedEntities();
}
Expand Down Expand Up @@ -340,7 +342,7 @@ private void notifyRelationshipListeners(List<AtlasRelationship> relationships,


private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
if (instanceConverter != null) {
if (operation != EntityOperation.PURGE && instanceConverter != null) {
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);

for (EntityChangeListener listener : entityChangeListeners) {
Expand Down Expand Up @@ -381,6 +383,10 @@ private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, EntityOper
case DELETE:
listener.onEntitiesDeleted(entities, isImport);
break;

case PURGE:
listener.onEntitiesPurged(entities);
break;
}
}
}
Expand All @@ -399,6 +405,9 @@ private void notifyV2RelationshipListeners(List<AtlasRelationship> relationships
case DELETE:
listener.onRelationshipsDeleted(relationships, isImport);
break;
case PURGE:
listener.onRelationshipsPurged(relationships);
break;
}
}
}
Expand Down Expand Up @@ -485,7 +494,7 @@ private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> entityHeaders,
final AtlasEntity entity;

// delete notifications don't need all attributes. Hence the special handling for delete operation
if (operation == EntityOperation.DELETE) {
if (operation == EntityOperation.DELETE || operation == EntityOperation.PURGE) {
entity = new AtlasEntity(entityHeader);
} else {
String entityGuid = entityHeader.getGuid();
Expand Down Expand Up @@ -586,12 +595,23 @@ private void pruneResponse(EntityMutationResponse resp) {
List<AtlasEntityHeader> updatedEntities = resp.getUpdatedEntities();
List<AtlasEntityHeader> partialUpdatedEntities = resp.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = resp.getDeletedEntities();
List<AtlasEntityHeader> purgedEntities = resp.getPurgedEntities();

// remove entities with DELETED status from created & updated lists
purgeDeletedEntities(createdEntities);
purgeDeletedEntities(updatedEntities);
purgeDeletedEntities(partialUpdatedEntities);

// remove entities purged in this mutation from created & updated lists
if (purgedEntities != null) {
for (AtlasEntityHeader entity : purgedEntities) {
purgeEntity(entity.getGuid(), deletedEntities);
purgeEntity(entity.getGuid(), createdEntities);
purgeEntity(entity.getGuid(), updatedEntities);
purgeEntity(entity.getGuid(), partialUpdatedEntities);
}
}

// remove entities deleted in this mutation from created & updated lists
if (deletedEntities != null) {
for (AtlasEntityHeader entity : deletedEntities) {
Expand Down
Loading

0 comments on commit 09118ca

Please sign in to comment.