Skip to content

Commit

Permalink
fix(MPT): Fixes triggering for template triggers. (spinnaker#504)
Browse files Browse the repository at this point in the history
Echo has its own cache of pipeline configs, but
does not hydrate templated pipeline configs into
the full runnable pipeline. Hence if you define
a trigger in the template, Echo will not trigger it
when it receives a matching trigger event.
  • Loading branch information
jtk54 authored Mar 26, 2019
1 parent 54486ae commit 4091f1d
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.netflix.spinnaker.echo.services;

import com.netflix.spinnaker.echo.model.Pipeline;
import java.util.Map;
import retrofit.http.*;

import java.util.List;

public interface Front50Service {
@GET("/pipelines?restricted=false")
@Headers("Accept: application/json")
List<Pipeline> getPipelines();
List<Map<String, Object>> getPipelines(); // Return Map here so we don't throw away MPT attributes.

@GET("/pipelines/{application}?refresh=false")
@Headers("Accept: application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class Pipeline {
@JsonProperty
String type;

@JsonProperty
String schema;

@JsonProperty
Object template;

@JsonProperty
List<Map<String, Object>> stages;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,47 @@

package com.netflix.spinnaker.echo.pipelinetriggers;

import static java.time.Instant.now;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.patterns.PolledMeter;
import com.netflix.spinnaker.echo.model.Pipeline;
import com.netflix.spinnaker.echo.model.Trigger;
import com.netflix.spinnaker.echo.pipelinetriggers.orca.OrcaService;
import com.netflix.spinnaker.echo.services.Front50Service;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.time.Instant.now;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class PipelineCache implements MonitoredPoller {
private final int pollingIntervalMs;
private final int pollingSleepMs;
private final Front50Service front50;
private final OrcaService orca;
private final Registry registry;
private final ScheduledExecutorService executorService;
private final ObjectMapper objectMapper;

private transient Boolean running;
private transient Instant lastPollTimestamp;
Expand All @@ -61,21 +67,27 @@ public class PipelineCache implements MonitoredPoller {
@Autowired
public PipelineCache(@Value("${front50.pollingIntervalMs:10000}") int pollingIntervalMs,
@Value("${front50.pollingSleepMs:100}") int pollingSleepMs,
ObjectMapper objectMapper,
@NonNull Front50Service front50,
@NonNull OrcaService orca,
@NonNull Registry registry) {
this(Executors.newSingleThreadScheduledExecutor(), pollingIntervalMs, pollingSleepMs, front50, registry);
this(Executors.newSingleThreadScheduledExecutor(), pollingIntervalMs, pollingSleepMs, objectMapper, front50, orca, registry);
}

// VisibleForTesting
public PipelineCache(ScheduledExecutorService executorService,
int pollingIntervalMs,
int pollingSleepMs,
ObjectMapper objectMapper,
@NonNull Front50Service front50,
@NonNull OrcaService orca,
@NonNull Registry registry) {
this.objectMapper = objectMapper;
this.executorService = executorService;
this.pollingIntervalMs = pollingIntervalMs;
this.pollingSleepMs = pollingSleepMs;
this.front50 = front50;
this.orca = orca;
this.registry = registry;
this.running = false;
this.pipelines = null;
Expand Down Expand Up @@ -121,7 +133,7 @@ void pollPipelineConfigs() {
try {
log.debug("Getting pipelines from Front50...");
long start = System.currentTimeMillis();
pipelines = decorateTriggers(front50.getPipelines());
pipelines = decorateTriggers(fetchHydratedPipelines());

lastPollTimestamp = now();
registry.counter("front50.requests").increment();
Expand All @@ -132,6 +144,36 @@ void pollPipelineConfigs() {
}
}

private List<Pipeline> fetchHydratedPipelines() {
List<Map<String, Object>> rawPipelines = front50.getPipelines();
if (rawPipelines == null) {
return Collections.emptyList();
}

Predicate<Map<String, Object>> isV2Pipeline = p -> {
return p.getOrDefault("type", "").equals("templatedPipeline") &&
p.getOrDefault("schema", "").equals("v2");
};

return rawPipelines.stream()
.map((Map<String, Object> p) -> {
if (isV2Pipeline.test(p)) {
try {
return orca.v2Plan(p);
} catch (Exception e) {
// Don't fail the entire cache cycle if we fail a plan.
log.error("Caught exception while planning templated pipeline: {}", p, e);
return Collections.emptyMap();
}
} else {
return p;
}
})
.filter(m -> !m.isEmpty())
.map(m -> objectMapper.convertValue(m, Pipeline.class))
.collect(Collectors.toList());
}

@Override
public boolean isRunning() {
return running;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public interface OrcaService {
@POST("/plan")
Map plan(@Body Map pipelineConfig, @Query("resolveArtifacts") boolean resolveArtifacts);

@POST("/v2/pipelineTemplates/plan")
Map<String, Object> v2Plan(@Body Map pipelineConfig);

@POST("/orchestrate")
Observable<TriggerResponse> trigger(@Body Pipeline pipeline, @Header(AuthenticatedRequest.SPINNAKER_USER) String runAsUser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,20 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.echo.model.Pipeline
import com.netflix.spinnaker.echo.model.Trigger
import com.netflix.spinnaker.echo.pipelinetriggers.orca.OrcaService
import com.netflix.spinnaker.echo.services.Front50Service
import com.netflix.spinnaker.echo.test.RetrofitStubs
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll

import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

import static java.util.concurrent.TimeUnit.SECONDS

class PipelineCacheSpec extends Specification implements RetrofitStubs {
def front50 = Mock(Front50Service)
def orca = Mock(OrcaService)
def registry = new NoopRegistry()
def objectMapper = new ObjectMapper()

@Shared
def interval = 30
Expand All @@ -48,13 +44,19 @@ class PipelineCacheSpec extends Specification implements RetrofitStubs {
def sleepMs = 100

@Subject
def pipelineCache = new PipelineCache(Mock(ScheduledExecutorService), interval, sleepMs, front50, registry)
def pipelineCache = new PipelineCache(Mock(ScheduledExecutorService), interval, sleepMs, objectMapper, front50, orca, registry)

def "keeps polling if Front50 returns an error"() {
given:
def pipelineMap = [
application: 'application',
name : 'Pipeline',
id : 'P1'
]
def pipeline = Pipeline.builder().application('application').name('Pipeline').id('P1').build()

def initialLoad = []
front50.getPipelines() >> initialLoad >> { throw unavailable() } >> [pipeline]
front50.getPipelines() >> initialLoad >> { throw unavailable() } >> [pipelineMap]
pipelineCache.start()

expect: 'null pipelines when we have not polled yet'
Expand Down

0 comments on commit 4091f1d

Please sign in to comment.