Skip to content

Commit

Permalink
Merge branch 'auth/access_policy' of github.com:Netflix/conductor int…
Browse files Browse the repository at this point in the history
…o auth/access_policy
  • Loading branch information
aravindanr committed Sep 25, 2022
2 parents 1d81d27 + c66f84a commit 2054949
Show file tree
Hide file tree
Showing 18 changed files with 150 additions and 45 deletions.
2 changes: 1 addition & 1 deletion annotations-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies {
api 'com.google.guava:guava:31.1-jre'
api 'com.squareup:javapoet:1.13.+'
api 'com.github.jknack:handlebars:4.3.+'
api 'com.google.protobuf:protobuf-java:3.21.5'
api 'com.google.protobuf:protobuf-java:3.21.6'
api 'javax.annotation:javax.annotation-api:1.3.2'
api gradleApi()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ private Map<String, Object> completeTask(
taskModel.setStatus(status);
taskModel.setOutputData(replaced);
taskModel.setOutputMessage(taskDetails.getOutputMessage());
taskModel.getOutputData().put("conductor.event.messageId", messageId);
taskModel.getOutputData().put("conductor.event.name", event);
taskModel.addOutput("conductor.event.messageId", messageId);
taskModel.addOutput("conductor.event.name", event);

try {
workflowExecutor.updateTask(new TaskResult(taskModel.toTask()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public WorkflowExecutor(
this.systemTaskRegistry = systemTaskRegistry;
}

@PreAuthorize("hasPermission(#input, 'OWNER')")
public String startWorkflow(StartWorkflowInput input) {
WorkflowDef workflowDefinition;

Expand Down Expand Up @@ -773,7 +772,7 @@ public WorkflowModel terminateWorkflow(

String failureWFId = startWorkflow(startWorkflowInput);

workflow.getOutput().put("conductor.failure_workflow", failureWFId);
workflow.addOutput("conductor.failure_workflow", failureWFId);
} catch (Exception e) {
LOGGER.error("Failed to start error workflow", e);
workflow.getOutput()
Expand Down Expand Up @@ -1740,10 +1739,9 @@ private boolean rerunWF(
rerunFromTask.setStartTime(0);
rerunFromTask.setUpdateTime(0);
rerunFromTask.setEndTime(0);
rerunFromTask.getOutputData().clear();
rerunFromTask.clearOutput();
rerunFromTask.setRetried(false);
rerunFromTask.setExecuted(false);
rerunFromTask.setExternalOutputPayloadStoragePath(null);
if (rerunFromTask.getTaskType().equalsIgnoreCase(TaskType.TASK_TYPE_SUB_WORKFLOW)) {
// if task is sub workflow set task as IN_PROGRESS and reset start time
rerunFromTask.setStatus(IN_PROGRESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -94,7 +93,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
switchTask.setTaskType(TaskType.TASK_TYPE_SWITCH);
switchTask.setTaskDefName(TaskType.TASK_TYPE_SWITCH);
switchTask.getInputData().put("case", evalResult);
switchTask.getOutputData().put("evaluationResult", Collections.singletonList(evalResult));
switchTask.addOutput("evaluationResult", List.of(evalResult));
switchTask.setStartTime(System.currentTimeMillis());
switchTask.setStatus(TaskModel.Status.IN_PROGRESS);
tasksToBeScheduled.add(switchTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.events.ScriptEvaluator;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

Expand All @@ -36,8 +38,11 @@ public class DoWhile extends WorkflowSystemTask {

private static final Logger LOGGER = LoggerFactory.getLogger(DoWhile.class);

public DoWhile() {
private final ParametersUtils parametersUtils;

public DoWhile(ParametersUtils parametersUtils) {
super(TASK_TYPE_DO_WHILE);
this.parametersUtils = parametersUtils;
}

@Override
Expand Down Expand Up @@ -104,9 +109,7 @@ public boolean execute(
break;
}
}
doWhileTaskModel
.getOutputData()
.put(String.valueOf(doWhileTaskModel.getIteration()), output);
doWhileTaskModel.addOutput(String.valueOf(doWhileTaskModel.getIteration()), output);

if (hasFailures) {
LOGGER.debug(
Expand All @@ -133,7 +136,7 @@ public boolean execute(
shouldContinue);
if (shouldContinue) {
doWhileTaskModel.setIteration(doWhileTaskModel.getIteration() + 1);
doWhileTaskModel.getOutputData().put("iteration", doWhileTaskModel.getIteration());
doWhileTaskModel.addOutput("iteration", doWhileTaskModel.getIteration());
return scheduleNextIteration(doWhileTaskModel, workflow, workflowExecutor);
} else {
LOGGER.debug(
Expand Down Expand Up @@ -214,7 +217,14 @@ boolean markTaskSuccess(TaskModel taskModel) {

@VisibleForTesting
boolean evaluateCondition(WorkflowModel workflow, TaskModel task) throws ScriptException {
Map<String, Object> conditionInput = new HashMap<>(task.getInputData());
TaskDef taskDefinition = task.getTaskDefinition().orElse(null);
// Use paramUtils to compute the task input
Map<String, Object> conditionInput =
parametersUtils.getTaskInputV2(
task.getWorkflowTask().getInputParameters(),
workflow,
task.getTaskId(),
taskDefinition);
conditionInput.put(task.getReferenceTaskName(), task.getOutputData());
List<TaskModel> loopOver =
workflow.getTasks().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf
payload.put("correlationId", workflow.getCorrelationId());

task.setStatus(TaskModel.Status.IN_PROGRESS);
task.getOutputData().putAll(payload);
task.addOutput(payload);

try {
task.getOutputData().put(EVENT_PRODUCED, computeQueueName(workflow, task));
task.addOutput(EVENT_PRODUCED, computeQueueName(workflow, task));
} catch (Exception e) {
task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public Inline(Map<String, Evaluator> evaluators) {
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
Map<String, Object> taskInput = task.getInputData();
Map<String, Object> taskOutput = task.getOutputData();
String evaluatorType = (String) taskInput.get(QUERY_EVALUATOR_TYPE);
String expression = (String) taskInput.get(QUERY_EXPRESSION_PARAMETER);

Expand All @@ -79,7 +78,7 @@ public boolean execute(
checkExpression(expression);
Evaluator evaluator = evaluators.get(evaluatorType);
Object evalResult = evaluator.evaluate(expression, taskInput);
taskOutput.put("result", evalResult);
task.addOutput("result", evalResult);
task.setStatus(TaskModel.Status.COMPLETED);
} catch (Exception e) {
String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
Expand All @@ -95,7 +94,7 @@ public boolean execute(
? TaskModel.Status.FAILED_WITH_TERMINAL_ERROR
: TaskModel.Status.FAILED);
task.setReasonForIncompletion(errorMessage);
taskOutput.put("error", errorMessage);
task.addOutput("error", errorMessage);
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public Lambda() {
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
Map<String, Object> taskInput = task.getInputData();
Map<String, Object> taskOutput = task.getOutputData();
String scriptExpression;
try {
scriptExpression = (String) taskInput.get(QUERY_EXPRESSION_PARAMETER);
Expand All @@ -79,7 +78,7 @@ public boolean execute(
scriptExpressionBuilder,
task.getTaskId());
Object returnValue = ScriptEvaluator.eval(scriptExpressionBuilder, taskInput);
taskOutput.put("result", returnValue);
task.addOutput("result", returnValue);
task.setStatus(TaskModel.Status.COMPLETED);
} else {
LOGGER.error("Empty {} in Lambda task. ", QUERY_EXPRESSION_PARAMETER);
Expand All @@ -97,7 +96,7 @@ public boolean execute(
e);
task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion(e.getMessage());
taskOutput.put(
task.addOutput(
"error", e.getCause() != null ? e.getCause().getMessage() : e.getMessage());
}
return true;
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ && getStatus() == taskModel.getStatus()
&& Objects.equals(getReasonForIncompletion(), taskModel.getReasonForIncompletion())
&& Objects.equals(getWorkerId(), taskModel.getWorkerId())
&& Objects.equals(getWaitTimeout(), taskModel.getWaitTimeout())
&& Objects.equals(getOutputData(), taskModel.getOutputData())
&& Objects.equals(outputData, taskModel.outputData)
&& Objects.equals(outputPayload, taskModel.outputPayload)
&& Objects.equals(getWorkflowTask(), taskModel.getWorkflowTask())
&& Objects.equals(getDomain(), taskModel.getDomain())
&& Objects.equals(getInputMessage(), taskModel.getInputMessage())
Expand Down Expand Up @@ -816,7 +817,8 @@ public int hashCode() {
getCallbackAfterSeconds(),
getWorkerId(),
getWaitTimeout(),
getOutputData(),
outputData,
outputPayload,
getWorkflowTask(),
getDomain(),
getInputMessage(),
Expand Down Expand Up @@ -871,4 +873,10 @@ public void addOutput(Map<String, Object> outputData) {
this.outputData.putAll(outputData);
}
}

public void clearOutput() {
this.outputData.clear();
this.outputPayload.clear();
this.externalOutputPayloadStoragePath = null;
}
}
26 changes: 24 additions & 2 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ && getStatus() == that.getStatus()
&& Objects.equals(getParentWorkflowTaskId(), that.getParentWorkflowTaskId())
&& Objects.equals(getTasks(), that.getTasks())
&& Objects.equals(getInput(), that.getInput())
&& Objects.equals(getOutput(), that.getOutput())
&& Objects.equals(output, that.output)
&& Objects.equals(outputPayload, that.outputPayload)
&& Objects.equals(getCorrelationId(), that.getCorrelationId())
&& Objects.equals(getReRunFromWorkflowId(), that.getReRunFromWorkflowId())
&& Objects.equals(getReasonForIncompletion(), that.getReasonForIncompletion())
Expand Down Expand Up @@ -544,7 +545,8 @@ public int hashCode() {
getParentWorkflowTaskId(),
getTasks(),
getInput(),
getOutput(),
output,
outputPayload,
getCorrelationId(),
getReRunFromWorkflowId(),
getReasonForIncompletion(),
Expand Down Expand Up @@ -581,4 +583,24 @@ public Workflow toWorkflow() {
}
return workflow;
}

public void addInput(String key, Object value) {
this.input.put(key, value);
}

public void addInput(Map<String, Object> inputData) {
if (inputData != null) {
this.input.putAll(inputData);
}
}

public void addOutput(String key, Object value) {
this.output.put(key, value);
}

public void addOutput(Map<String, Object> outputData) {
if (outputData != null) {
this.output.putAll(outputData);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,22 @@ class DoWhileSpec extends Specification {
DoWhile doWhile

WorkflowExecutor workflowExecutor
ObjectMapper objectMapper
ParametersUtils parametersUtils
TaskModel doWhileTaskModel

WorkflowTask task1, task2
TaskModel taskModel1, taskModel2

def setup() {
objectMapper = new ObjectMapper();
workflowExecutor = Mock(WorkflowExecutor.class)
parametersUtils = new ParametersUtils(objectMapper)

task1 = new WorkflowTask(name: 'task1', taskReferenceName: 'task1')
task2 = new WorkflowTask(name: 'task2', taskReferenceName: 'task2')

doWhile = new DoWhile()
doWhile = new DoWhile(parametersUtils)
}

def "first iteration"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ public void testGetTaskInput() {
workflow.getInput().put("requestId", "request id 001");
TaskModel task = new TaskModel();
task.setReferenceTaskName("task2");
task.getOutputData().put("location", "http://location");
task.getOutputData().put("isPersonActive", true);
task.addOutput("location", "http://location");
task.addOutput("isPersonActive", true);
workflow.getTasks().add(task);
Map<String, Object> taskInput = parametersUtils.getTaskInput(ip, workflow, null, null);

Expand Down Expand Up @@ -324,8 +324,8 @@ public void testGetTaskInputV1() {
workflow.getInput().put("requestId", "request id 001");
TaskModel task = new TaskModel();
task.setReferenceTaskName("task2");
task.getOutputData().put("location", "http://location");
task.getOutputData().put("isPersonActive", true);
task.addOutput("location", "http://location");
task.addOutput("isPersonActive", true);
workflow.getTasks().add(task);
Map<String, Object> taskInput = parametersUtils.getTaskInput(ip, workflow, null, null);

Expand Down Expand Up @@ -1366,18 +1366,18 @@ private WorkflowModel createDefaultWorkflow() {
names.add(name);
names.add(name2);

workflow.getOutput().put("name", name);
workflow.getOutput().put("names", names);
workflow.getOutput().put("awards", 200);
workflow.addOutput("name", name);
workflow.addOutput("names", names);
workflow.addOutput("awards", 200);

TaskModel task = new TaskModel();
task.setReferenceTaskName("task2");
task.getOutputData().put("location", "http://location");
task.addOutput("location", "http://location");
task.setStatus(TaskModel.Status.COMPLETED);

TaskModel task2 = new TaskModel();
task2.setReferenceTaskName("task3");
task2.getOutputData().put("refId", "abcddef_1234_7890_aaffcc");
task2.addOutput("refId", "abcddef_1234_7890_aaffcc");
task2.setStatus(TaskModel.Status.SCHEDULED);

workflow.getTasks().add(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public void testSinkParam() {

TaskModel task1 = new TaskModel();
task1.setReferenceTaskName("t1");
task1.getOutputData().put("q", "t1_queue");
task1.addOutput("q", "t1_queue");
workflow.getTasks().add(task1);

TaskModel task2 = new TaskModel();
task2.setReferenceTaskName("t2");
task2.getOutputData().put("q", "task2_queue");
task2.addOutput("q", "task2_queue");
workflow.getTasks().add(task2);

TaskModel task = new TaskModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testTaskOps() {
for (int i = 0; i < 3; i++) {
TaskModel found = getExecutionDAO().getTask(workflowId + "_t" + i);
assertNotNull(found);
found.getOutputData().put("updated", true);
found.addOutput("updated", true);
found.setStatus(TaskModel.Status.COMPLETED);
getExecutionDAO().updateTask(found);
}
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/gettingstarted/docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ This image at `/docker/serverAndUI` is provided to illustrate starting both the

1. When you run Elasticsearch, sometimes the health remains in Yellow state. Conductor server by default requires
Green state to run when indexing is enabled. To work around this, you can use the following property:
`conductor.elasticsearch.clusteHealthColor=yellow` Reference: [Issue 2262](https://github.com/Netflix/conductor/issues/2262)
`conductor.elasticsearch.clusterHealthColor=yellow` Reference: [Issue 2262](https://github.com/Netflix/conductor/issues/2262)



#### Elasticsearch timeout
Standalone(single node) elasticsearch has a yellow status which will cause timeout for conductor server (Required: Green).
Spin up a cluster (more than one) to prevent timeout or use config option `conductor.elasticsearch.clusteHealthColor=yellow`.
Spin up a cluster (more than one) to prevent timeout or use config option `conductor.elasticsearch.clusterHealthColor=yellow`.

See issue: https://github.com/Netflix/conductor/issues/2262

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor execu
}
//noinspection ConstantConditions
if (response != null) {
task.getOutputData().put("response", response.asMap());
task.addOutput("response", response.asMap());
}

} catch (Exception e) {
Expand All @@ -138,7 +138,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor execu
task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion(
"Failed to invoke " + getTaskType() + " task due to: " + e);
task.getOutputData().put("response", e.toString());
task.addOutput("response", e.toString());
}
}

Expand Down
Loading

0 comments on commit 2054949

Please sign in to comment.