Skip to content

Commit

Permalink
Merge branch 'main' of github.com:Netflix/conductor
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed Sep 29, 2022
2 parents 343db8c + 984a655 commit 41e9c2a
Show file tree
Hide file tree
Showing 70 changed files with 1,052 additions and 975 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
pull-requests: write # for actions/stale to close stale PRs
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v5
- uses: actions/stale@v6
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
stale-issue-message: 'This issue is stale, because it has been open for 45 days with no activity. Remove the stale label or comment, or this will be closed in 7 days.'
Expand Down
2 changes: 1 addition & 1 deletion annotations-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies {
api 'com.google.guava:guava:31.1-jre'
api 'com.squareup:javapoet:1.13.+'
api 'com.github.jknack:handlebars:4.3.+'
api 'com.google.protobuf:protobuf-java:3.21.5'
api 'com.google.protobuf:protobuf-java:3.21.6'
api 'javax.annotation:javax.annotation-api:1.3.2'
api gradleApi()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;

import com.netflix.conductor.common.metadata.acl.Permission;

/**
* A base class for {@link com.netflix.conductor.common.metadata.workflow.WorkflowDef} and {@link
* com.netflix.conductor.common.metadata.tasks.TaskDef}.
*/
public abstract class BaseDef extends Auditable {

private final Map<Permission, String> accessPolicy = new EnumMap<>(Permission.class);

public void addPermission(Permission permission, String allowedAuthority) {
this.accessPolicy.put(permission, allowedAuthority);
}

public void removePermission(Permission permission) {
this.accessPolicy.remove(permission);
}

public String getAllowedAuthority(Permission permission) {
return this.accessPolicy.get(permission);
}

public void clearAccessPolicy() {
this.accessPolicy.clear();
}

public Map<Permission, String> getAccessPolicy() {
return Collections.unmodifiableMap(this.accessPolicy);
}

public void setAccessPolicy(Map<Permission, String> accessPolicy) {
this.accessPolicy.putAll(accessPolicy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata.acl;

import com.netflix.conductor.annotations.protogen.ProtoEnum;

@ProtoEnum
public enum Permission {
OWNER,
OPERATOR
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import com.netflix.conductor.annotations.protogen.ProtoMessage;
import com.netflix.conductor.common.constraints.OwnerEmailMandatoryConstraint;
import com.netflix.conductor.common.constraints.TaskTimeoutConstraint;
import com.netflix.conductor.common.metadata.Auditable;
import com.netflix.conductor.common.metadata.BaseDef;

@ProtoMessage
@TaskTimeoutConstraint
@Valid
public class TaskDef extends Auditable {
public class TaskDef extends BaseDef {

@ProtoEnum
public enum TimeoutPolicy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import com.netflix.conductor.common.constraints.NoSemiColonConstraint;
import com.netflix.conductor.common.constraints.OwnerEmailMandatoryConstraint;
import com.netflix.conductor.common.constraints.TaskReferenceNameUniqueConstraint;
import com.netflix.conductor.common.metadata.Auditable;
import com.netflix.conductor.common.metadata.BaseDef;
import com.netflix.conductor.common.metadata.tasks.TaskType;

@ProtoMessage
@TaskReferenceNameUniqueConstraint
public class WorkflowDef extends Auditable {
public class WorkflowDef extends BaseDef {

@ProtoEnum
public enum TimeoutPolicy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void refreshEventQueues() {
try {
Set<String> events =
eventHandlerDAO.getAllEventHandlers().stream()
.filter(EventHandler::isActive)
.map(EventHandler::getEvent)
.collect(Collectors.toSet());

Expand All @@ -151,6 +152,22 @@ public void refreshEventQueues() {
.peek(Lifecycle::start)
.forEach(this::listen);

Set<String> removed = new HashSet<>(eventToQueueMap.keySet());
removed.removeAll(events);
removed.forEach(
key -> {
ObservableQueue queue = eventToQueueMap.remove(key);
try {
queue.stop();
} catch (Exception e) {
LOGGER.error("Failed to stop queue: " + queue, e);
}
});

LOGGER.debug("Event queues: {}", eventToQueueMap.keySet());
LOGGER.debug("Stored queue: {}", events);
LOGGER.debug("Removed queue: {}", removed);

} catch (Exception e) {
Monitors.error(getClass().getSimpleName(), "refresh");
LOGGER.error("refresh event queues failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.conductor.common.metadata.events.EventHandler.TaskDetails;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.execution.StartWorkflowInput;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.JsonUtils;
import com.netflix.conductor.core.utils.ParametersUtils;
Expand Down Expand Up @@ -157,8 +158,8 @@ private Map<String, Object> completeTask(
taskModel.setStatus(status);
taskModel.setOutputData(replaced);
taskModel.setOutputMessage(taskDetails.getOutputMessage());
taskModel.getOutputData().put("conductor.event.messageId", messageId);
taskModel.getOutputData().put("conductor.event.name", event);
taskModel.addOutput("conductor.event.messageId", messageId);
taskModel.addOutput("conductor.event.name", event);

try {
workflowExecutor.updateTask(new TaskResult(taskModel.toTask()));
Expand Down Expand Up @@ -202,17 +203,19 @@ private Map<String, Object> startWorkflow(
workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);

String workflowId =
workflowExecutor.startWorkflow(
params.getName(),
params.getVersion(),
Optional.ofNullable(replaced.get("correlationId"))
.map(Object::toString)
.orElse(params.getCorrelationId()),
workflowInput,
null,
event,
params.getTaskToDomain());
StartWorkflowInput startWorkflowInput = new StartWorkflowInput();
startWorkflowInput.setName(params.getName());
startWorkflowInput.setVersion(params.getVersion());
startWorkflowInput.setCorrelationId(
Optional.ofNullable(replaced.get("correlationId"))
.map(Object::toString)
.orElse(params.getCorrelationId()));
startWorkflowInput.setWorkflowInput(workflowInput);
startWorkflowInput.setEvent(event);
startWorkflowInput.setTaskToDomain(params.getTaskToDomain());

String workflowId = workflowExecutor.startWorkflow(startWorkflowInput);

output.put("workflowId", workflowId);
LOGGER.debug(
"Started workflow: {}/{}/{} for event: {} for message:{}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution;

import java.util.Map;
import java.util.Objects;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;

public class StartWorkflowInput {

private String name;
private Integer version;
private WorkflowDef workflowDefinition;
private Map<String, Object> workflowInput;
private String externalInputPayloadStoragePath;
private String correlationId;
private Integer priority;
private String parentWorkflowId;
private String parentWorkflowTaskId;
private String event;
private Map<String, String> taskToDomain;
private String workflowId;

public StartWorkflowInput() {}

public StartWorkflowInput(StartWorkflowRequest startWorkflowRequest) {
this.name = startWorkflowRequest.getName();
this.version = startWorkflowRequest.getVersion();
this.workflowDefinition = startWorkflowRequest.getWorkflowDef();
this.correlationId = startWorkflowRequest.getCorrelationId();
this.priority = startWorkflowRequest.getPriority();
this.workflowInput = startWorkflowRequest.getInput();
this.externalInputPayloadStoragePath =
startWorkflowRequest.getExternalInputPayloadStoragePath();
this.taskToDomain = startWorkflowRequest.getTaskToDomain();
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getVersion() {
return version;
}

public void setVersion(Integer version) {
this.version = version;
}

public WorkflowDef getWorkflowDefinition() {
return workflowDefinition;
}

public void setWorkflowDefinition(WorkflowDef workflowDefinition) {
this.workflowDefinition = workflowDefinition;
}

public Map<String, Object> getWorkflowInput() {
return workflowInput;
}

public void setWorkflowInput(Map<String, Object> workflowInput) {
this.workflowInput = workflowInput;
}

public String getExternalInputPayloadStoragePath() {
return externalInputPayloadStoragePath;
}

public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
}

public String getCorrelationId() {
return correlationId;
}

public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}

public Integer getPriority() {
return priority;
}

public void setPriority(Integer priority) {
this.priority = priority;
}

public String getParentWorkflowId() {
return parentWorkflowId;
}

public void setParentWorkflowId(String parentWorkflowId) {
this.parentWorkflowId = parentWorkflowId;
}

public String getParentWorkflowTaskId() {
return parentWorkflowTaskId;
}

public void setParentWorkflowTaskId(String parentWorkflowTaskId) {
this.parentWorkflowTaskId = parentWorkflowTaskId;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}

public Map<String, String> getTaskToDomain() {
return taskToDomain;
}

public void setTaskToDomain(Map<String, String> taskToDomain) {
this.taskToDomain = taskToDomain;
}

public String getWorkflowId() {
return workflowId;
}

public void setWorkflowId(String workflowId) {
this.workflowId = workflowId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StartWorkflowInput that = (StartWorkflowInput) o;
return Objects.equals(name, that.name)
&& Objects.equals(version, that.version)
&& Objects.equals(workflowDefinition, that.workflowDefinition)
&& Objects.equals(workflowInput, that.workflowInput)
&& Objects.equals(
externalInputPayloadStoragePath, that.externalInputPayloadStoragePath)
&& Objects.equals(correlationId, that.correlationId)
&& Objects.equals(priority, that.priority)
&& Objects.equals(parentWorkflowId, that.parentWorkflowId)
&& Objects.equals(parentWorkflowTaskId, that.parentWorkflowTaskId)
&& Objects.equals(event, that.event)
&& Objects.equals(taskToDomain, that.taskToDomain)
&& Objects.equals(workflowId, that.workflowId);
}

@Override
public int hashCode() {
return Objects.hash(
name,
version,
workflowDefinition,
workflowInput,
externalInputPayloadStoragePath,
correlationId,
priority,
parentWorkflowId,
parentWorkflowTaskId,
event,
taskToDomain,
workflowId);
}
}
Loading

0 comments on commit 41e9c2a

Please sign in to comment.