Skip to content

Commit

Permalink
NIFI-8273 Adding Scripted Record processors
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbence authored and markap14 committed Sep 23, 2021
1 parent 4a5fe69 commit db5b618
Show file tree
Hide file tree
Showing 20 changed files with 2,334 additions and 218 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.Record;

import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;

class InterpretedScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final String scriptToRun;
private final Bindings bindings;

InterpretedScriptEvaluator(final ScriptEngine scriptEngine, final String scriptToRun, final FlowFile flowFile, final ComponentLog logger) {
this.scriptEngine = scriptEngine;
this.scriptToRun = scriptToRun;
this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);

bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", logger);
}

@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);

// Evaluate the script with the configurator (if it exists) or the engine
return scriptEngine.eval(scriptToRun, bindings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.Record;

import javax.script.Bindings;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptException;

class PythonScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final CompiledScript compiledScript;
private final Bindings bindings;

PythonScriptEvaluator(
final ScriptEngine scriptEngine,
final CompiledScript compiledScript,
final FlowFile flowFile,
final ComponentLog componentLog
) {
// By pre-compiling the script here, we get significant performance gains. A quick 5-minute benchmark
// shows gains of about 100x better performance. But even with the compiled script, performance pales
// in comparison with Groovy.
this.compiledScript = compiledScript;
this.scriptEngine = scriptEngine;
this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);

bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", componentLog);
}

@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);

compiledScript.eval(bindings);
return scriptEngine.get("_");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;

class RecordCounts {
private long recordCount;
private long droppedCount;

public long getRecordCount() {
return recordCount;
}

public long getDroppedCount() {
return droppedCount;
}

public void incrementRecordCount() {
recordCount++;
}

public void incrementDroppedCount() {
droppedCount++;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;

import org.apache.nifi.serialization.record.Record;

import javax.script.ScriptException;

/**
* Used by scripted record processors to enclose script engines for different languages.
*/
interface ScriptEvaluator {

/**
* Evaluates the enclosed script using the record as argument. Returns with the script's return value.
*
* @param record The script to evaluate.
* @param index The index of the record.
*
* @return The return value of the evaluated script.
*
* @throws ScriptException In case of issues with the evaluations.
*/
Object evaluate(Record record, long index) throws ScriptException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.Relationship;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

@Tags({"record", "filter", "script", "groovy", "jython", "python"})
@CapabilityDescription(
"This processor provides the ability to filter records out from FlowFiles using the user-provided script. " +
"Every record will be evaluated by the script which must return with a boolean value. " +
"Records with \"true\" result will be routed to the \"matching\" relationship in a batch. " +
"Other records will be filtered out."
)
@SeeAlso(classNames = {
"org.apache.nifi.processors.script.ScriptedTransformRecord",
"org.apache.nifi.processors.script.ScriptedValidateRecord",
"org.apache.nifi.processors.script.ScriptedPartitionRecord"
})
public class ScriptedFilterRecord extends ScriptedRouterProcessor<Boolean> {
static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"Matching records of the original FlowFile will be routed to this relationship. " +
"If there are no matching records, no FlowFile will be routed here."
)
.build();

static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder()
.name("original")
.description(
"After successful procession, the incoming FlowFile will be transferred to this relationship. " +
"This happens regardless the number of filtered or remaining records.")
.build();

static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description("In case of any issue during processing the incoming FlowFile, the incoming FlowFile will be routed to this relationship.")
.build();

private static Set<Relationship> RELATIONSHIPS = new HashSet<>();

static {
RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
}

public ScriptedFilterRecord() {
super(Boolean.class);
}

@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}

@Override
protected Relationship getOriginalRelationship() {
return RELATIONSHIP_ORIGINAL;
}

@Override
protected Relationship getFailureRelationship() {
return RELATIONSHIP_FAILURE;
}

@Override
protected Optional<Relationship> resolveRelationship(final Boolean scriptResult) {
return scriptResult ? Optional.of(RELATIONSHIP_SUCCESS) : Optional.empty();
}
}
Loading

0 comments on commit db5b618

Please sign in to comment.