Skip to content

Commit

Permalink
NIFI-9865: Add Atlas lineage for GCS processors (PutHDFS)
Browse files Browse the repository at this point in the history
This closes apache#5926.

Signed-off-by: Peter Turcsanyi <[email protected]>
  • Loading branch information
Lehel44 authored and turcsanyip committed May 10, 2022
1 parent 8aaa3d2 commit 8247910
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance.analyzer;

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.PathExtractorContext;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;

import java.util.Map;

public abstract class AbstractDirectoryAnalyzer extends AbstractNiFiProvenanceEventAnalyzer {

@Override
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
String transitUri = event.getTransitUri();
if (transitUri == null) {
return null;
}

String directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);

Path path = new Path(directoryUri);

String namespace = context.getNamespaceResolver().fromHostNames(path.toUri().getHost());

PathExtractorContext pathExtractorContext = new PathExtractorContext(namespace, context.getAwsS3ModelVersion());
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);

Referenceable ref = convertToReferenceable(entityWithExtInfo.getEntity(), pathExtractorContext.getKnownEntities());

return ref != null ? singleDataSetRef(event.getComponentId(), event.getEventType(), ref) : null;
}

protected abstract Referenceable convertToReferenceable(AtlasEntity entity, Map<String, AtlasEntity> knownEntities);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.PathExtractorContext;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;

import java.util.Map;

Expand Down Expand Up @@ -59,7 +53,7 @@
* <li>name=bucket (example: mybucket)
* </ul>
*/
public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
public class AwsS3Directory extends AbstractDirectoryAnalyzer {

public static final String TYPE_DIRECTORY_V1 = AtlasPathExtractorUtil.AWS_S3_PSEUDO_DIR;
public static final String TYPE_BUCKET_V1 = AtlasPathExtractorUtil.AWS_S3_BUCKET;
Expand All @@ -71,33 +65,12 @@ public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
public static final String ATTR_CONTAINER_V2 = AtlasPathExtractorUtil.ATTRIBUTE_CONTAINER;
public static final String ATTR_OBJECT_PREFIX_V2 = AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX;

@Override
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
String transitUri = event.getTransitUri();
if (transitUri == null) {
return null;
}

String directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);

Path path = new Path(directoryUri);

String namespace = context.getNamespaceResolver().fromHostNames(path.toUri().getHost());

PathExtractorContext pathExtractorContext = new PathExtractorContext(namespace, context.getAwsS3ModelVersion());
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);

Referenceable ref = convertToReferenceable(entityWithExtInfo.getEntity(), pathExtractorContext.getKnownEntities());

return ref != null ? singleDataSetRef(event.getComponentId(), event.getEventType(), ref) : null;
}

@Override
public String targetTransitUriPattern() {
return "^s3a://.+/.+$";
}

private Referenceable convertToReferenceable(AtlasEntity entity, Map<String, AtlasEntity> knownEntities) {
protected Referenceable convertToReferenceable(AtlasEntity entity, Map<String, AtlasEntity> knownEntities) {
if (entity == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance.analyzer;

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.v1.model.instance.Referenceable;

import java.util.Map;

import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;

/**
* Analyzes a transit URI as a GCS bucket or directory (skipping the file name).
* <p>
* Atlas entity hierarchy v1: gcs_virtual_dir -> gcs_bucket
* <p>gcs_virtual_dir
* <ul>
* <li>qualifiedName=gs://bucket/path@namespace (example: gs://mybucket/mydir1/mydir2@ns1)
* <li>name=/path (example: /mydir1/mydir2)
* </ul>
* <p>gcs_bucket
* <ul>
* <li>qualifiedName=gs://bucket@namespace (example: gs://mybucket@ns1)
* <li>name=bucket (example: mybucket)
* </ul>
*/
public class GCSDirectory extends AbstractDirectoryAnalyzer {

static final String GCP_STORAGE_VIRTUAL_DIRECTORY = "gcp_storage_virtual_directory";
static final String REL_PARENT = "parent";

@Override
public String targetTransitUriPattern() {
return "^gs://.+/.+$";
}

protected Referenceable convertToReferenceable(AtlasEntity entity, Map<String, AtlasEntity> knownEntities) {
if (entity == null) {
return null;
}

Referenceable ref = createReferenceable(entity);

if (GCP_STORAGE_VIRTUAL_DIRECTORY.equals(entity.getTypeName())) {
AtlasObjectId parentId = (AtlasObjectId) entity.getRelationshipAttribute(REL_PARENT);
if (parentId != null) {
AtlasEntity parentEntity = knownEntities.get(parentId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
ref.set(REL_PARENT, convertToReferenceable(parentEntity, knownEntities));
}
}

return ref;
}

private Referenceable createReferenceable(AtlasEntity entity) {
Referenceable ref = new Referenceable(entity.getTypeName());
ref.setValues(entity.getAttributes());
return ref;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ org.apache.nifi.atlas.provenance.analyzer.HBaseTable
org.apache.nifi.atlas.provenance.analyzer.FilePath
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory
org.apache.nifi.atlas.provenance.analyzer.AzureADLSDirectory
org.apache.nifi.atlas.provenance.analyzer.GCSDirectory

# By event type, if none of above analyzers matches
org.apache.nifi.atlas.provenance.analyzer.unknown.Create
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.provenance.analyzer;

import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.NamespaceResolver;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.provenance.analyzer.GCSDirectory.GCP_STORAGE_VIRTUAL_DIRECTORY;
import static org.apache.nifi.atlas.provenance.analyzer.GCSDirectory.REL_PARENT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

public class TestGCSDirectory {

private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND;
private static final String ATLAS_NAMESPACE = "namespace1";
private static final String GCS_BUCKET = "bucket1";
private static final String GCS_FILENAME = "file1";
private static final String GCS_QUALIFIED_NAME_FORMAT = "gs://%s%s/@%s";
private static final String GCS_ROOT_QUALIFIED_NAME_FORMAT = "gs://%s@%s";
private static final String GCS_TRANSIT_URI_FORMAT = "gs://%s%s/%s";
private static final String GCS_ROOT_TRANSIT_URI_FORMAT = "gs://%s/%s";


@Test
public void testSimpleDirectory() {
String processorName = "PutHDFS";
String dirPath = "/dir1";
String expectedDirectoryQualifiedName = String.format(GCS_QUALIFIED_NAME_FORMAT, GCS_BUCKET, dirPath, ATLAS_NAMESPACE);
String transitUri = String.format(GCS_TRANSIT_URI_FORMAT, GCS_BUCKET, dirPath, GCS_FILENAME);

executeTest(processorName, transitUri, "dir1", "/", GCS_BUCKET,
GCP_STORAGE_VIRTUAL_DIRECTORY, expectedDirectoryQualifiedName, AtlasPathExtractorUtil.GCS_BUCKET);
}

@Test
public void testCompoundDirectory() {
String processorName = "PutHDFS";
String dirPath = "/dir1/dir2/dir3/dir4/dir5";
String expectedDirectoryQualifiedName = String.format(GCS_QUALIFIED_NAME_FORMAT, GCS_BUCKET, dirPath, ATLAS_NAMESPACE);
String transitUri = String.format(GCS_TRANSIT_URI_FORMAT, GCS_BUCKET, dirPath, GCS_FILENAME);

executeTest(processorName, transitUri, "dir5", "/dir1/dir2/dir3/dir4/", "dir4",
GCP_STORAGE_VIRTUAL_DIRECTORY, expectedDirectoryQualifiedName, AtlasPathExtractorUtil.GCS_VIRTUAL_DIR);
}

@Test
public void testRootDirectory() {
String processorName = "PutHDFS";
String expectedDirectoryQualifiedName = String.format(GCS_ROOT_QUALIFIED_NAME_FORMAT, GCS_BUCKET, ATLAS_NAMESPACE);
String transitUri = String.format(GCS_ROOT_TRANSIT_URI_FORMAT, GCS_BUCKET, GCS_FILENAME);

executeTest(processorName, transitUri, GCS_BUCKET, null, "/",
"gcp_storage_bucket", expectedDirectoryQualifiedName, AtlasPathExtractorUtil.GCS_BUCKET);
}

protected void executeTest(String processorName, String transitUri, String lastDirName, String parentPath, String parentName,
String directoryType, String expectedDirectoryQualifiedName, String parentType) {

ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);
AnalysisContext analysisContext = mockAnalysisContext();

NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, PROVENANCE_EVENT_TYPE);
assertAnalyzer(analyzer);

DataSetRefs refs = analyzer.analyze(analysisContext, provenanceEvent);
assertAnalysisResult(refs, lastDirName, parentPath, parentName, directoryType, expectedDirectoryQualifiedName, parentType);
}

protected void assertAnalysisResult(DataSetRefs refs, String lastDirName, String parentPath, String parentName,
String directoryType, String expectedDirectoryQualifiedName, String parentType) {


Assertions.assertEquals(0, refs.getInputs().size());
Assertions.assertEquals(1, refs.getOutputs().size());

Referenceable directoryRef = refs.getOutputs().iterator().next();

Assertions.assertEquals(directoryType, directoryRef.getTypeName());
Assertions.assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME));
Assertions.assertEquals(lastDirName, directoryRef.get(ATTR_NAME));
Assertions.assertEquals(parentPath, directoryRef.get(AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX));

if (parentPath != null) {
Referenceable bucketRef = (Referenceable) directoryRef.get(REL_PARENT);
Assertions.assertNotNull(bucketRef);
Assertions.assertEquals(parentType, bucketRef.getTypeName());
Assertions.assertEquals(parentName, bucketRef.get(ATTR_NAME));
}
}

private ProvenanceEventRecord mockProvenanceEvent(String processorName, String transitUri) {
ProvenanceEventRecord provenanceEvent = Mockito.mock(ProvenanceEventRecord.class);

when(provenanceEvent.getComponentType()).thenReturn(processorName);
when(provenanceEvent.getTransitUri()).thenReturn(transitUri);
when(provenanceEvent.getEventType()).thenReturn(PROVENANCE_EVENT_TYPE);

return provenanceEvent;
}

private AnalysisContext mockAnalysisContext() {
NamespaceResolver namespaceResolver = Mockito.mock(NamespaceResolver.class);
when(namespaceResolver.fromHostNames(any())).thenReturn(ATLAS_NAMESPACE);

AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class);
when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver);

return analysisContext;
}

private void assertAnalyzer(NiFiProvenanceEventAnalyzer analyzer) {
Assertions.assertNotNull(analyzer);
Assertions.assertEquals(GCSDirectory.class, analyzer.getClass());
}
}

0 comments on commit 8247910

Please sign in to comment.