Skip to content

Commit

Permalink
flow trigger/dependency instance (azkaban#1611)
Browse files Browse the repository at this point in the history
PR added two core classes for flow trigger feature: TriggerInstance and DependencyInstance.

Conceptually trigger Instance represents an execution of a flow trigger. It holds execution context such as a historically unique trigger execution id, execution status, start/end time, and a list of dependency instances.

Similarly dependency Instance denotes an execution of a dependency performing availability check on a particular data dependency. It holds execution context such as execution status and start/end time, all dependency Instances within the same trigger instance share the same execution id as trigger instance.

A trigger instance and its dependency instances will be created on scheduled trigger start time. The trigger instance will wait until all dependency instances are available, at which point a new flow instance is created, or the maximum allowed wait time is exceeded. Once successfully created, trigger instance will equipped with an UUID to identify itself, initial status as running, and a list of dependency instances whose status are running as well.

Trigger instance can be cancelled manually by user, by timeout when max waiting time is exceeded, or by internal dependency failure.

Trigger/Dependency instance has following status(Running, Cancelling, Cancelled, Succeeded).
Since trigger instance is nothing but a collection of dependency instances, it doesn’t have variables to keep status, start or endtime, all of which can be quickly inferred from its belonging dependencies’ status, start/endtime, which unburdens us from maintaining extra variables.

Cancellation cause will be attached to dependency instance for better user experience when cancelled for whatever reason.

Valid cancellation cause includes:
TIMEOUT, // cancellation is issued due to exceeding max wait time
MANUAL, // cancellation is issued by user
FAILURE, // cancellation is issued by internal dependency instance failure
CASCADING // cancelled by cascading failure(peer dependency is cancelled)
  • Loading branch information
burgerkingeater authored Jan 23, 2018
1 parent e89e7ad commit afb269d
Show file tree
Hide file tree
Showing 5 changed files with 632 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.flowtrigger;

public enum CancellationCause {
NONE, //no cancellation occurred
TIMEOUT, // cancellation is issued due to exceeding max wait time
MANUAL, // cancellation is issued by user
FAILURE, // cancellation is issued by dependency instance failure
CASCADING // cancelled by cascading failure
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.flowtrigger;

import java.util.Date;

public class DependencyInstance {

private final Date startTime;
private final String depName;
private TriggerInstance triggerInstance;
private DependencyInstanceContext context;
private volatile Date endTime;
private volatile Status status;
private volatile CancellationCause cause;

//todo chengren311: convert it to builder
public DependencyInstance(final String depName, final Date startTime,
final Date endTime, final DependencyInstanceContext context, final Status status,
final CancellationCause cause) {
this.status = status;
this.depName = depName;
this.startTime = startTime;
this.endTime = endTime;
this.context = context;
this.cause = cause;
}

public CancellationCause getCancellationCause() {
return this.cause;
}

public void setCancellationCause(final CancellationCause cancellationCause) {
this.cause = cancellationCause;
}

public TriggerInstance getTriggerInstance() {
return this.triggerInstance;
}

public void setTriggerInstance(final TriggerInstance triggerInstance) {
this.triggerInstance = triggerInstance;
}

public void setDependencyInstanceContext(final DependencyInstanceContext context) {
this.context = context;
}

public Date getStartTime() {
return this.startTime;
}

public Date getEndTime() {
return this.endTime;
}

public void setEndTime(final Date endTime) {
this.endTime = endTime;
}

public String getDepName() {
return this.depName;
}

public DependencyInstanceContext getContext() {
return this.context;
}

public Status getStatus() {
return this.status;
}

public void setStatus(final Status status) {
this.status = status;
}

}
35 changes: 35 additions & 0 deletions azkaban-web-server/src/main/java/azkaban/flowtrigger/Status.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.flowtrigger;

import com.google.common.collect.ImmutableSet;
import java.util.Set;

/**
* Represents status for trigger/dependency
*/
public enum Status {
RUNNING, // dependency instance is running
SUCCEEDED, // dependency instance succeeds
CANCELLED, // dependency instance is cancelled
CANCELLING; // dependency instance is being cancelled

public static boolean isDone(final Status status) {
final Set<Status> terminalStatus = ImmutableSet.of(SUCCEEDED, CANCELLED);
return terminalStatus.contains(status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.flowtrigger;

import azkaban.project.FlowTrigger;
import azkaban.project.Project;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class TriggerInstance {

private final List<DependencyInstance> depInstances;
private final String id;
private final String submitUser;
private final Project project;
private final String flowId;
private final int flowVersion;
private FlowTrigger flowTrigger;
private volatile int flowExecId; // associated flow execution id

//todo chengren311: convert it to builder
public TriggerInstance(final String id, final FlowTrigger flowTrigger, final String flowId,
final int flowVersion, final String submitUser, final List<DependencyInstance> depInstances,
final int flowExecId, final Project project) {
this.depInstances = ImmutableList.copyOf(depInstances);
this.id = id;
this.flowTrigger = flowTrigger;
this.submitUser = submitUser;
this.flowId = flowId;
this.flowVersion = flowVersion;
this.flowExecId = flowExecId;
this.project = project;
for (final DependencyInstance depInst : this.depInstances) {
depInst.setTriggerInstance(this);
}
}

public Project getProject() {
return this.project;
}

public String getProjectName() {
return this.project.getName();
}

public List<String> getFailureEmails() {
return this.project.getFlow(this.getFlowId()).getFailureEmails();
}

public String getFlowId() {
return this.flowId;
}

public int getFlowVersion() {
return this.flowVersion;
}

public int getFlowExecId() {
return this.flowExecId;
}

public void setFlowExecId(final int flowExecId) {
this.flowExecId = flowExecId;
}

public final FlowTrigger getFlowTrigger() {
return this.flowTrigger;
}

public void setFlowTrigger(final FlowTrigger flowTrigger) {
this.flowTrigger = flowTrigger;
}

public String getSubmitUser() {
return this.submitUser;
}

public void addDependencyInstance(final DependencyInstance depInst) {
this.depInstances.add(depInst);
}

public List<DependencyInstance> getDepInstances() {
return this.depInstances;
}

public String getId() {
return this.id;
}

private boolean isRunning(final Set<Status> statuses) {
if (statuses.contains(Status.RUNNING)) {
for (final Status status : statuses) {
if (!status.equals(Status.SUCCEEDED) && !status.equals(Status.RUNNING)) {
return false;
}
}
return true;
}
return false;
}

private boolean isSucceed(final Set<Status> statuses) {
return statuses.contains(Status.SUCCEEDED) && statuses.size() == 1;
}

private boolean isCancelled(final Set<Status> statuses) {
if (statuses.contains(Status.CANCELLED)) {
for (final Status status : statuses) {
if (!status.equals(Status.SUCCEEDED) && !status.equals(Status.CANCELLED)) {
return false;
}
}
return true;
}
return false;
}

public Status getStatus() {
// no-dependency trigger is always considered as success
if (this.depInstances.isEmpty()) {
return Status.SUCCEEDED;
}
final Set<Status> statusSet = new HashSet<>();

for (final DependencyInstance depInst : this.depInstances) {
statusSet.add(depInst.getStatus());
}

if (isRunning(statusSet)) {
return Status.RUNNING;
} else if (isSucceed(statusSet)) {
return Status.SUCCEEDED;
} else if (isCancelled(statusSet)) {
return Status.CANCELLED;
} else {
return Status.CANCELLING;
}
}

public Date getStartTime() {
final List<Date> startTimeList = this.depInstances.stream()
.map(DependencyInstance::getStartTime).collect(Collectors.toList());
return startTimeList.isEmpty() ? null : Collections.min(startTimeList);
}

public Date getEndTime() {
if (Status.isDone(this.getStatus())) {
final List<Date> endTimeList = this.depInstances.stream()
.map(DependencyInstance::getEndTime).filter(endTime -> endTime != null)
.collect(Collectors.toList());
return endTimeList.isEmpty() ? null : Collections.max(endTimeList);
} else {
return null;
}
}
}
Loading

0 comments on commit afb269d

Please sign in to comment.