Skip to content

Commit

Permalink
Fix issue where multiple execution window pre-stages are added to par…
Browse files Browse the repository at this point in the history
…allel stages
  • Loading branch information
robfletcher committed Dec 9, 2016
1 parent 68f4b44 commit a228e4b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.netflix.spinnaker.orca.pipeline;

import java.util.*;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import com.netflix.spinnaker.orca.pipeline.TaskNode.TaskDefinition;
Expand Down Expand Up @@ -145,21 +148,12 @@ protected <T extends Execution<T>> void planBeforeOrAfterStages(
}

private <T extends Execution<T>> void planExecutionWindow(Stage<T> stage, Consumer<Stage<T>> callback) {
Optional<Stage<T>> parentStage = stage
.getExecution()
.getStages()
.stream()
.filter(it -> it.getId().equals(stage.getParentStageId()))
.findFirst();
boolean hasExecutionWindow = (Boolean) stage
.getContext()
.getOrDefault("restrictExecutionDuringTimeWindow", false);
boolean isNonSynthetic = stage.getSyntheticStageOwner() == null &&
stage.getParentStageId() == null;
boolean parentIsInitializingStage = parentStage
.map(Stage::isInitializationStage)
.orElse(false);
if (hasExecutionWindow && (isNonSynthetic || parentIsInitializingStage)) {
if (hasExecutionWindow && isNonSynthetic) {
Stage<T> syntheticStage = newStage(
stage.getExecution(),
RestrictExecutionDuringTimeWindow.TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ default boolean isImmutable() {
/**
* Returns a flag indicating if the stage is a parallel initialization stage
*/
boolean isInitializationStage();
@Deprecated boolean isInitializationStage();

void setInitializationStage(boolean initializationStage);
@Deprecated void setInitializationStage(boolean initializationStage);

AtomicInteger getTaskCounter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package com.netflix.spinnaker.orca.pipeline

import java.util.function.BiFunction
import java.util.function.Consumer
import groovy.transform.CompileStatic
import com.netflix.spinnaker.orca.DefaultTaskResult
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.listeners.StageListener
Expand All @@ -27,6 +26,7 @@ import com.netflix.spinnaker.orca.pipeline.parallel.WaitForRequisiteCompletionSt
import com.netflix.spinnaker.orca.pipeline.parallel.WaitForRequisiteCompletionTask
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.pipeline.util.StageNavigator
import groovy.transform.CompileStatic
import org.spockframework.spring.xml.SpockMockFactoryBean
import org.springframework.beans.factory.FactoryBean
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -410,7 +410,8 @@ abstract class ExecutionRunnerSpec<R extends ExecutionRunner> extends Specificat
getType() >> endStage.type
buildTaskGraph(_) >> new TaskNode.TaskGraph(FULL, [new TaskDefinition("test", TestTask)])
}
@Subject runner = create(startStageDefinitionBuilder, branchAStageDefinitionBuilder, branchBStageDefinitionBuilder, endStageDefinitionBuilder)
@Subject
runner = create(startStageDefinitionBuilder, branchAStageDefinitionBuilder, branchBStageDefinitionBuilder, endStageDefinitionBuilder)

and:
def executedStageTypes = []
Expand Down Expand Up @@ -672,6 +673,47 @@ abstract class ExecutionRunnerSpec<R extends ExecutionRunner> extends Specificat
contexts = [[region: "a"], [region: "b"]]
}

def "applies execution windows correctly to internal parallel stages"() {
given:
def stage = new PipelineStage(execution, "branching")
stage.initializationStage = true
stage.context.restrictExecutionDuringTimeWindow = true
stage.context.restrictedExecutionWindow = [
whitelist : [[startHour: 0, startMin: 0, endHour: 23, endMin: 59]],
days : [1, 2, 3, 4, 5, 6, 7],
skipRemainingWait: true
]
execution.stages << stage

executionRepository.retrievePipeline(execution.id) >> execution

and:
def stageDefinitionBuilder = stageDefinition("branching", contexts, { builder ->
builder.withTask("start", StartLoopTask)
}, { builder ->
builder.withTask("branch", TestTask)
}, { builder ->
builder.withTask("end", EndLoopTask)
})

@Subject runner = create(stageDefinitionBuilder)

and:
startLoopTask.execute(_) >> new DefaultTaskResult(SUCCEEDED)
testTask.execute(_) >> new DefaultTaskResult(SUCCEEDED)
endLoopTask.execute(_) >> new DefaultTaskResult(SUCCEEDED)

when:
runner.start(execution)

then:
execution.stages.findAll { it.type == RestrictExecutionDuringTimeWindow.TYPE }.size() == 1

where:
execution = Pipeline.builder().withId("1").withParallel(true).build()
contexts = [[region: "a"], [region: "b"]]
}

def "if a stage is allowed to fail it is marked as FAILED_CONTINUE"() {
given:
def stage = new PipelineStage(execution, stageType, [continuePipeline: true])
Expand Down

0 comments on commit a228e4b

Please sign in to comment.