diff --git a/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepositoryTck.groovy b/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepositoryTck.groovy index f728af4df8..2dce5f03a2 100644 --- a/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepositoryTck.groovy +++ b/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepositoryTck.groovy @@ -22,10 +22,10 @@ import com.netflix.spinnaker.orca.pipeline.model.Execution import com.netflix.spinnaker.orca.pipeline.model.JenkinsTrigger import com.netflix.spinnaker.orca.pipeline.model.PipelineTrigger import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria -import rx.schedulers.Schedulers import spock.lang.Specification import spock.lang.Subject import spock.lang.Unroll + import static com.netflix.spinnaker.orca.ExecutionStatus.* import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE @@ -66,7 +66,7 @@ abstract class ExecutionRepositoryTck extends Spe repository.store(succeededExecution) def pipelines = repository.retrievePipelinesForPipelineConfigId( "pipeline-1", new ExecutionCriteria(limit: 5, statuses: ["RUNNING", "SUCCEEDED", "TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).toList() then: pipelines*.id.sort() == [runningExecution.id, succeededExecution.id].sort() @@ -74,7 +74,7 @@ abstract class ExecutionRepositoryTck extends Spe when: pipelines = repository.retrievePipelinesForPipelineConfigId( "pipeline-1", new ExecutionCriteria(limit: 5, statuses: ["RUNNING"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).toList() then: pipelines*.id.sort() == [runningExecution.id].sort() @@ -82,7 +82,7 @@ abstract class ExecutionRepositoryTck extends Spe when: pipelines = repository.retrievePipelinesForPipelineConfigId( "pipeline-1", new ExecutionCriteria(limit: 5, statuses: ["TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).toList() then: pipelines.isEmpty() @@ -106,7 +106,7 @@ abstract class ExecutionRepositoryTck extends Spe repository.store(succeededExecution) def orchestrations = repository.retrieveOrchestrationsForApplication( runningExecution.application, new ExecutionCriteria(limit: 5, statuses: ["RUNNING", "SUCCEEDED", "TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).toList() then: orchestrations*.id.sort() == [runningExecution.id, succeededExecution.id].sort() @@ -114,7 +114,7 @@ abstract class ExecutionRepositoryTck extends Spe when: orchestrations = repository.retrieveOrchestrationsForApplication( runningExecution.application, new ExecutionCriteria(limit: 5, statuses: ["RUNNING"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).toList() then: orchestrations*.id.sort() == [runningExecution.id].sort() @@ -122,7 +122,7 @@ abstract class ExecutionRepositoryTck extends Spe when: orchestrations = repository.retrieveOrchestrationsForApplication( runningExecution.application, new ExecutionCriteria(limit: 5, statuses: ["TERMINAL"]) - ).subscribeOn(Schedulers.io()).toList().toBlocking().single() + ).toList() then: orchestrations.isEmpty() @@ -154,7 +154,7 @@ abstract class ExecutionRepositoryTck extends Spe repository.store(pipeline) expect: - repository.retrieve(PIPELINE).toBlocking().first().id == pipeline.id + repository.retrieve(PIPELINE).first().id == pipeline.id with(repository.retrieve(pipeline.type, pipeline.id)) { id == pipeline.id @@ -223,7 +223,7 @@ abstract class ExecutionRepositoryTck extends Spe thrown ExecutionNotFoundException and: - repository.retrieve(PIPELINE).toList().toBlocking().first() == [] + repository.retrieve(PIPELINE).toList() == [] } def "updateStatus sets startTime to current time if new status is RUNNING"() { diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 39ae1160cb..fb94581767 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -19,11 +19,12 @@ import com.netflix.spinnaker.orca.pipeline.model.Execution; import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType; import com.netflix.spinnaker.orca.pipeline.model.Stage; -import rx.Observable; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.*; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static java.util.stream.Collectors.toList; @@ -56,14 +57,14 @@ public interface ExecutionRepository { void delete(@Nonnull ExecutionType type, @Nonnull String id); - @Nonnull Observable retrieve(ExecutionType type); + @Nonnull Iterable retrieve(ExecutionType type); - @Nonnull Observable retrievePipelinesForApplication(@Nonnull String application); + @Nonnull Iterable retrievePipelinesForApplication(@Nonnull String application); - @Nonnull Observable retrievePipelinesForPipelineConfigId( + @Nonnull Iterable retrievePipelinesForPipelineConfigId( @Nonnull String pipelineConfigId, @Nonnull ExecutionCriteria criteria); - @Nonnull Observable retrieveOrchestrationsForApplication( + @Nonnull Iterable retrieveOrchestrationsForApplication( @Nonnull String application, @Nonnull ExecutionCriteria criteria); @Nonnull Execution retrieveOrchestrationForCorrelationId( @@ -116,4 +117,11 @@ public ExecutionCriteria setStatuses(ExecutionStatus... statuses) { return Objects.hash(limit, statuses); } } + + final class IterableUtil { + + public static Stream toStream(Iterable iterable) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterable.iterator(), Spliterator.ORDERED), false); + } + } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactResolver.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactResolver.java index 7bea6f126a..1b073ed4ad 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactResolver.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/util/ArtifactResolver.java @@ -30,23 +30,15 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import rx.functions.Func2; -import rx.schedulers.Schedulers; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.IterableUtil.toStream; import static java.lang.String.format; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; @@ -139,15 +131,10 @@ List getArtifactsForPipelineId( @Nonnull String pipelineId, @Nonnull ExecutionCriteria criteria ) { - Execution execution = executionRepository - .retrievePipelinesForPipelineConfigId(pipelineId, criteria) - .subscribeOn(Schedulers.io()) - .toSortedList(startTimeOrId) - .toBlocking() - .single() - .stream() - .findFirst() - .orElse(null); + Execution execution = toStream(executionRepository.retrievePipelinesForPipelineConfigId(pipelineId, criteria)) + .sorted(startTimeOrId) + .findFirst() + .orElse(null); return execution == null ? Collections.emptyList() : getAllArtifacts(execution); } @@ -266,7 +253,7 @@ public static class ResolveResult { Set unresolvedExpectedArtifacts = new HashSet<>(); } - private static Func2 startTimeOrId = (a, b) -> { + private static Comparator startTimeOrId = (a, b) -> { Long aStartTime = Optional.ofNullable(a.getStartTime()).orElse(0L); Long bStartTime = Optional.ofNullable(b.getStartTime()).orElse(0L); diff --git a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java index 60b55c8e8f..bccdf4a96f 100644 --- a/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java +++ b/orca-pipelinetemplate/src/main/java/com/netflix/spinnaker/orca/pipelinetemplate/PipelineTemplateService.java @@ -15,10 +15,6 @@ */ package com.netflix.spinnaker.orca.pipelinetemplate; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper; import com.netflix.spinnaker.orca.pipeline.model.Execution; @@ -33,6 +29,13 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE; @Component @@ -85,15 +88,12 @@ public Execution retrievePipelineOrNewestExecution(@Nullable String executionId, } else if (pipelineConfigId != null) { // No executionId set - use last execution ExecutionRepository.ExecutionCriteria criteria = new ExecutionRepository.ExecutionCriteria().setLimit(1); - try { - return executionRepository.retrievePipelinesForPipelineConfigId(pipelineConfigId, criteria) - .toSingle() - .toBlocking() - .value(); - } catch (NoSuchElementException e) { - throw new ExecutionNotFoundException("No pipeline execution could be found for config id " + - pipelineConfigId + ": " + e.getMessage()); + + Iterator executions = executionRepository.retrievePipelinesForPipelineConfigId(pipelineConfigId, criteria).iterator(); + if (!executions.hasNext()) { + throw new ExecutionNotFoundException("No pipeline execution could be found for config id " + pipelineConfigId); } + return executions.next(); } else { throw new IllegalArgumentException("Either executionId or pipelineConfigId have to be set."); } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt index 829da6cd49..6ed3033fdf 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt @@ -96,9 +96,8 @@ internal interface OrcaMessageHandler : MessageHandler { val criteria = ExecutionRepository.ExecutionCriteria().setLimit(1).setStatuses(ExecutionStatus.RUNNING) !repository .retrievePipelinesForPipelineConfigId(configId, criteria) - .isEmpty - .toBlocking() - .first() + .iterator() + .hasNext() } == true } } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt index e16f25693f..86be3c91e8 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt @@ -38,7 +38,6 @@ import org.jetbrains.spek.api.dsl.* import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP import org.jetbrains.spek.subject.SubjectSpek import org.springframework.context.ApplicationEventPublisher -import rx.Observable.just import java.util.* object StartExecutionHandlerTest : SubjectSpek({ @@ -276,10 +275,7 @@ object StartExecutionHandlerTest : SubjectSpek({ pipeline.isLimitConcurrent = true runningPipeline.isLimitConcurrent = true - whenever( - repository - .retrievePipelinesForPipelineConfigId(configId, ExecutionCriteria().setLimit(1).setStatuses(RUNNING)) - ) doReturn just(runningPipeline) + whenever(pendingExecutionService.depth(configId)) doReturn 1 whenever( repository.retrieve(message.executionType, message.executionId) ) doReturn pipeline @@ -310,10 +306,7 @@ object StartExecutionHandlerTest : SubjectSpek({ pipeline.isLimitConcurrent = false runningPipeline.isLimitConcurrent = false - whenever( - repository - .retrievePipelinesForPipelineConfigId(configId, ExecutionCriteria().setLimit(1).setStatuses(RUNNING)) - ) doReturn just(runningPipeline) + whenever(pendingExecutionService.depth(configId)) doReturn 1 whenever( repository.retrieve(message.executionType, message.executionId) ) doReturn pipeline diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisOrchestrationMigrationNotificationAgent.groovy b/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisOrchestrationMigrationNotificationAgent.groovy index 14f5960e09..041819aeba 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisOrchestrationMigrationNotificationAgent.groovy +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisOrchestrationMigrationNotificationAgent.groovy @@ -122,10 +122,8 @@ class MultiRedisOrchestrationMigrationNotificationAgent extends AbstractPollingN def applicationName = application.name.toLowerCase() def unmigratedOrchestrations = executionRepositoryPrevious .retrieveOrchestrationsForApplication(applicationName, executionCriteria) - .filter({ orchestration -> !previouslyMigratedOrchestrationIds.contains(orchestration.id) }) .toList() - .toBlocking() - .single() + .findAll { orchestration -> !previouslyMigratedOrchestrationIds.contains(orchestration.id) } def migratableOrchestrations = unmigratedOrchestrations.findAll { it.status.isComplete() } def pendingOrchestrations = unmigratedOrchestrations.findAll { !it.status.isComplete() } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisPipelineMigrationNotificationAgent.groovy b/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisPipelineMigrationNotificationAgent.groovy index 930900975e..3ddc9ff0d8 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisPipelineMigrationNotificationAgent.groovy +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/migrations/MultiRedisPipelineMigrationNotificationAgent.groovy @@ -120,10 +120,8 @@ class MultiRedisPipelineMigrationNotificationAgent extends AbstractPollingNotifi allPipelineConfigIds.eachWithIndex { String pipelineConfigId, int index -> def unmigratedPipelines = executionRepositoryPrevious .retrievePipelinesForPipelineConfigId(pipelineConfigId, executionCriteria) - .filter({ pipeline -> !previouslyMigratedPipelineIds.contains(pipeline.id) }) .toList() - .toBlocking() - .single() + .findAll { pipeline -> !previouslyMigratedPipelineIds.contains(pipeline.id) } def migratablePipelines = unmigratedPipelines.findAll { it.status.isComplete() } def pendingPipelines = unmigratedPipelines.findAll { !it.status.isComplete() } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java index 167fe3c879..f9473e480a 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgent.java @@ -31,7 +31,6 @@ import rx.Observable; import rx.Scheduler; import rx.Subscription; -import rx.functions.Func1; import rx.schedulers.Schedulers; import javax.annotation.PreDestroy; @@ -40,9 +39,13 @@ import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE; +import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.IterableUtil.toStream; // TODO rz - Remove redis know-how and move back into orca-core @Component @@ -56,9 +59,9 @@ public class OldPipelineCleanupPollingNotificationAgent implements ApplicationLi private Scheduler scheduler = Schedulers.io(); private Subscription subscription; - private Func1 filter = new Func1() { + private Predicate filter = new Predicate() { @Override - public Boolean call(Execution execution) { + public boolean test(Execution execution) { if (!COMPLETED_STATUSES.contains(execution.getStatus().toString())) { return false; } @@ -67,7 +70,7 @@ public Boolean call(Execution execution) { } }; - private Func1 mapper = execution -> new PipelineExecutionDetails( + private Function mapper = execution -> new PipelineExecutionDetails( execution.getId(), execution.getApplication(), execution.getPipelineConfigId() == null ? "ungrouped" : execution.getPipelineConfigId(), @@ -138,7 +141,7 @@ private void tick() { applications.forEach(app -> { log.debug("Cleaning up " + app); - cleanupApp(executionRepository.retrievePipelinesForApplication(app)); + cleanupApp(toStream(executionRepository.retrievePipelinesForApplication(app))); }); } catch (Exception e) { @@ -146,8 +149,8 @@ private void tick() { } } - private void cleanupApp(Observable observable) { - List allPipelines = observable.filter(filter).map(mapper).toList().toBlocking().single(); + private void cleanupApp(Stream executions) { + List allPipelines = executions.filter(filter).map(mapper).collect(Collectors.toList()); Map> groupedPipelines = new HashMap<>(); allPipelines.forEach(p -> { diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java index e0ae867b99..be3634e7ea 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgent.java @@ -37,7 +37,6 @@ import rx.Observable; import rx.Scheduler; import rx.Subscription; -import rx.functions.Func1; import rx.schedulers.Schedulers; import javax.annotation.PreDestroy; @@ -45,9 +44,13 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.netflix.appinfo.InstanceInfo.InstanceStatus.UP; import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.ORCHESTRATION; +import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.IterableUtil.toStream; import static java.lang.String.format; import static java.time.temporal.ChronoUnit.DAYS; import static java.util.Comparator.comparing; @@ -63,9 +66,9 @@ public class TopApplicationExecutionCleanupPollingNotificationAgent implements A private Scheduler scheduler = Schedulers.io(); private Subscription subscription; - private Func1 filter = (Execution execution) -> + private Predicate filter = (Execution execution) -> execution.getStatus().isComplete() || Instant.ofEpochMilli(execution.getBuildTime()).isBefore(Instant.now().minus(31, DAYS)); - private Func1 mapper = (Execution execution) -> { + private Function mapper = (Execution execution) -> { Map builder = new HashMap<>(); builder.put("id", execution.getId()); builder.put("startTime", execution.getStartTime()); @@ -142,7 +145,11 @@ void tick() { ExecutionCriteria executionCriteria = new ExecutionCriteria(); executionCriteria.setLimit(Integer.MAX_VALUE); - cleanup(executionRepository.retrieveOrchestrationsForApplication(application, executionCriteria), application, "orchestration"); + cleanup( + toStream(executionRepository.retrieveOrchestrationsForApplication(application, executionCriteria)), + application, + "orchestration" + ); } else { log.error("Unable to cleanup executions, unsupported type: {}", type); } @@ -158,13 +165,21 @@ private T jedis(Function work) { } } - private void cleanup(Observable observable, String application, String type) { - List executions = observable.filter(filter).map(mapper).toList().toBlocking().single(); + private void cleanup(Stream observable, String application, String type) { + List executions = observable.filter(filter).map(mapper).collect(Collectors.toList()); executions.sort(comparing(a -> (Long) Optional.ofNullable(a.get("startTime")).orElse(0L))); if (executions.size() > threshold) { executions.subList(0, (executions.size() - threshold)).forEach(it -> { Long startTime = Optional.ofNullable((Long) it.get("startTime")).orElseGet(() -> (Long) it.get("buildTime")); - log.info("Deleting {} execution {} (startTime: {}, application: {}, pipelineConfigId: {}, status: {})", type, it.get("id"), startTime != null ? Instant.ofEpochMilli(startTime) : null, application, it.get("pipelineConfigId"), it.get("status")); + log.info( + "Deleting {} execution {} (startTime: {}, application: {}, pipelineConfigId: {}, status: {})", + type, + it.get("id"), + startTime != null ? Instant.ofEpochMilli(startTime) : null, + application, + it.get("pipelineConfigId"), + it.get("status") + ); if (type.equals("orchestration")) { executionRepository.delete(ORCHESTRATION, (String) it.get("id")); } else { diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index 2b34cc1fb4..b48dc518a9 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -300,12 +300,16 @@ Execution retrieve(@Nonnull ExecutionType type, @Nonnull String id) { } @Override - public @Nonnull - Observable retrieve(@Nonnull ExecutionType type) { + public @Nonnull Iterable retrieve(@Nonnull ExecutionType type) { List> observables = allRedisDelegates().stream() .map(d -> all(type, d)) .collect(Collectors.toList()); - return Observable.merge(observables); + + return Observable.from(observables) + .flatMap(task -> task.observeOn(queryAllScheduler)) + .toList() + .toBlocking() + .single(); } @Override @@ -316,17 +320,17 @@ public void delete(@Nonnull ExecutionType type, @Nonnull String id) { @Override public @Nonnull - Observable retrievePipelinesForApplication(@Nonnull String application) { + Iterable retrievePipelinesForApplication(@Nonnull String application) { List> observables = allRedisDelegates().stream() .map(d -> allForApplication(PIPELINE, application, d)) .collect(Collectors.toList()); - return Observable.merge(observables); + return Observable.merge(observables).toList().toBlocking().single(); } @Override public @Nonnull - Observable retrievePipelinesForPipelineConfigId(@Nonnull String pipelineConfigId, - @Nonnull ExecutionCriteria criteria) { + Iterable retrievePipelinesForPipelineConfigId(@Nonnull String pipelineConfigId, + @Nonnull ExecutionCriteria criteria) { /* * Fetch pipeline ids from the primary redis (and secondary if configured) */ @@ -396,15 +400,18 @@ Observable retrievePipelinesForPipelineConfigId(@Nonnull String pipel ); // merge primary + secondary observables - return Observable.merge(currentObservable, previousObservable); + return Observable.merge(currentObservable, previousObservable) + .subscribeOn(queryByAppScheduler) + .toList() + .toBlocking().single(); } - return currentObservable; + return currentObservable.toList().toBlocking().single(); } @Override public @Nonnull - Observable retrieveOrchestrationsForApplication(@Nonnull String application, @Nonnull ExecutionCriteria criteria) { + Iterable retrieveOrchestrationsForApplication(@Nonnull String application, @Nonnull ExecutionCriteria criteria) { String allOrchestrationsKey = appKey(ORCHESTRATION, application); /* @@ -478,10 +485,13 @@ Observable retrieveOrchestrationsForApplication(@Nonnull String appli ); // merge primary + secondary observables - return Observable.merge(currentObservable, previousObservable); + return Observable.merge(currentObservable, previousObservable) + .subscribeOn(queryByAppScheduler) + .toList() + .toBlocking().single(); } - return currentObservable; + return currentObservable.subscribeOn(queryByAppScheduler).toList().toBlocking().single(); } @Override @@ -511,12 +521,10 @@ Execution retrieveOrchestrationForCorrelationId( @Override public @Nonnull List retrieveBufferedExecutions() { // TODO rz - This is definitely not a healthy way to do this. - return Observable.concat( - retrieve(PIPELINE), - retrieve(ORCHESTRATION) - ) + return Observable.merge(Observable.from(retrieve(PIPELINE)), Observable.from(retrieve(ORCHESTRATION))) .filter(execution -> execution.getStatus() == ExecutionStatus.BUFFERED) .toList() + .first() .toBlocking().single(); } diff --git a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy index 7a06eeb881..c6dea40a70 100644 --- a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy +++ b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/OldPipelineCleanupPollingNotificationAgentSpec.groovy @@ -45,15 +45,15 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { def filter = new OldPipelineCleanupPollingNotificationAgent(clock: clock, thresholdDays: 1).filter expect: - filter.call(pipeline { + filter.test(pipeline { status = ExecutionStatus.SUCCEEDED startTime = Duration.ofDays(1).toMillis() }) == true - filter.call(pipeline { + filter.test(pipeline { status = ExecutionStatus.RUNNING startTime = Duration.ofDays(1).toMillis() }) == false - filter.call(pipeline { + filter.test(pipeline { status = ExecutionStatus.SUCCEEDED startTime = Duration.ofDays(3).toMillis() }) == false @@ -74,7 +74,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { def mapper = new OldPipelineCleanupPollingNotificationAgent().mapper expect: - with(mapper.call(pipeline)) { + with(mapper.apply(pipeline)) { id == "ID1" application == "orca" pipelineConfigId == "P1" @@ -94,7 +94,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { millis() >> { Duration.ofDays(10).toMillis() } } def executionRepository = Mock(ExecutionRepository) { - 1 * retrievePipelinesForApplication("orca") >> rx.Observable.from(pipelines) + 1 * retrievePipelinesForApplication("orca") >> pipelines } def jedisPool = Stub(Pool) { getResource() >> { diff --git a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy index 215772d253..9180b45c61 100644 --- a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy +++ b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/notifications/scheduling/TopApplicationExecutionCleanupPollingNotificationAgentSpec.groovy @@ -48,7 +48,7 @@ class TopApplicationExecutionCleanupPollingNotificationAgentSpec extends Specifi status = s } - filter.call(pipeline) == (s == ExecutionStatus.SUCCEEDED) + filter.test(pipeline) == (s == ExecutionStatus.SUCCEEDED) } } @@ -64,7 +64,7 @@ class TopApplicationExecutionCleanupPollingNotificationAgentSpec extends Specifi def mapper = new TopApplicationExecutionCleanupPollingNotificationAgent().mapper expect: - with(mapper.call(pipeline)) { + with(mapper.apply(pipeline)) { id == "ID1" startTime == 1000 pipelineConfigId == "P1" @@ -88,7 +88,7 @@ class TopApplicationExecutionCleanupPollingNotificationAgentSpec extends Specifi } } agent.executionRepository = Mock(ExecutionRepository) { - retrieveOrchestrationsForApplication("app1", _) >> rx.Observable.from(orchestrations) + retrieveOrchestrationsForApplication("app1", _) >> orchestrations } when: diff --git a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepositorySpec.groovy b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepositorySpec.groovy index fe81aab953..513980cf07 100644 --- a/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepositorySpec.groovy +++ b/orca-redis/src/test/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepositorySpec.groovy @@ -92,7 +92,7 @@ class JedisExecutionRepositorySpec extends ExecutionRepositoryTck list(@PathVariable String application, - @RequestParam(value = "limit", defaultValue = "3500") - int limit, - @RequestParam(value = "statuses", required = false) - String statuses) { + List list(@PathVariable String application, + @RequestParam(value = "limit", defaultValue = "3500") int limit, + @RequestParam(value = "statuses", required = false) String statuses) { statuses = statuses ?: ExecutionStatus.values()*.toString().join(",") def executionCriteria = new ExecutionRepository.ExecutionCriteria( limit: limit, @@ -89,24 +95,13 @@ class TaskController { def orchestrations = executionRepository .retrieveOrchestrationsForApplication(application, executionCriteria) - .filter({ Execution orchestration -> !orchestration.startTime || (orchestration.startTime > startTimeCutoff) }) - .map({ Execution orchestration -> convert(orchestration) }) - .subscribeOn(Schedulers.io()) .toList() - .toBlocking() - .single() - .sort(startTimeOrId) - - orchestrations.subList(0, Math.min(orchestrations.size(), limit)) - } + .findAll({ Execution orchestration -> !orchestration.startTime || (orchestration.startTime > startTimeCutoff) }) + orchestrations.sort(startTimeOrId) - @PreAuthorize("@fiatPermissionEvaluator.storeWholePermission()") - @PostFilter("hasPermission(filterObject.application, 'APPLICATION', 'READ')") - @RequestMapping(value = "/tasks", method = RequestMethod.GET) - List list() { - executionRepository.retrieve(ORCHESTRATION).toBlocking().iterator.collect { - convert it - } + orchestrations + .collect({ Execution orchestration -> convert(orchestration) }) + .subList(0, Math.min(orchestrations.size(), limit)) } // @PostAuthorize("hasPermission(returnObject.application, 'APPLICATION', 'READ')") @@ -170,8 +165,9 @@ class TaskController { def ids = pipelineConfigIds.split(',') def allPipelines = rx.Observable.merge(ids.collect { - executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) + rx.Observable.from(executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria)) + }).subscribeOn(Schedulers.io()).toList().toBlocking().single() + allPipelines.sort(startTimeOrId) return filterPipelinesByHistoryCutoff(allPipelines, limit) } @@ -331,9 +327,10 @@ class TaskController { def strategyConfigIds = front50Service.getStrategies(application)*.id as List def allIds = pipelineConfigIds + strategyConfigIds - def allPipelines = rx.Observable.merge(allIds.collect { + def allPipelines = allIds.collect { executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) + }.flatten() + allPipelines.sort(startTimeOrId) if (!expand) { allPipelines.each { pipeline -> @@ -373,7 +370,10 @@ class TaskController { pipelines.groupBy { it.pipelineConfigId }.values().each { List pipelinesGroup -> - def sortedPipelinesGroup = pipelinesGroup.sort(startTimeOrId).reverse() + def sortedPipelinesGroup = pipelinesGroup + sortedPipelinesGroup.sort(startTimeOrId) + sortedPipelinesGroup = sortedPipelinesGroup.reverse() + def recentPipelines = sortedPipelinesGroup.findAll { !it.startTime || it.startTime > cutoffTime } @@ -385,11 +385,12 @@ class TaskController { pipelinesSatisfyingCutoff.addAll(recentPipelines.subList(0, Math.min(recentPipelines.size(), limit))) } + pipelinesSatisfyingCutoff.sort(startTimeOrId) - return pipelinesSatisfyingCutoff.sort(startTimeOrId) + return pipelinesSatisfyingCutoff } - private static Closure startTimeOrId = { a, b -> + private static Comparator startTimeOrId = { a, b -> def aStartTime = a.startTime ?: 0 def bStartTime = b.startTime ?: 0 diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy index b8c2b5eab2..bd8dc3162f 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/OperationsControllerSpec.groovy @@ -16,7 +16,6 @@ package com.netflix.spinnaker.orca.controllers -import javax.servlet.http.HttpServletResponse import com.netflix.spinnaker.kork.web.exceptions.InvalidRequestException import com.netflix.spinnaker.kork.web.exceptions.ValidationException import com.netflix.spinnaker.orca.igor.BuildArtifactFilter @@ -40,10 +39,12 @@ import org.springframework.http.HttpMethod import org.springframework.http.MediaType import org.springframework.mock.env.MockEnvironment import org.springframework.test.web.servlet.setup.MockMvcBuilders -import rx.Observable import spock.lang.Specification import spock.lang.Subject import spock.lang.Unroll + +import javax.servlet.http.HttpServletResponse + import static com.netflix.spinnaker.orca.ExecutionStatus.CANCELED import static com.netflix.spinnaker.orca.ExecutionStatus.SUCCEEDED import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType @@ -471,7 +472,7 @@ class OperationsControllerSpec extends Specification { startedPipeline.id = UUID.randomUUID().toString() startedPipeline } - executionRepository.retrievePipelinesForPipelineConfigId(*_) >> Observable.empty() + executionRepository.retrievePipelinesForPipelineConfigId(*_) >> [] ArtifactResolver realArtifactResolver = new ArtifactResolver(mapper, executionRepository) // can't use @subject, since we need to test the behavior of otherwise mocked-out 'artifactResolver' diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy index 8592a63bc0..1da7e13d2b 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy @@ -16,8 +16,6 @@ package com.netflix.spinnaker.orca.controllers -import java.time.Clock -import java.time.Instant import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spinnaker.orca.front50.Front50Service import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper @@ -28,10 +26,14 @@ import com.netflix.spinnaker.orca.pipeline.model.Task import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import groovy.json.JsonSlurper import org.springframework.http.MediaType -import org.springframework.mock.web.MockHttpServletResponse import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.setup.MockMvcBuilders import spock.lang.Specification + +import java.time.Clock +import java.time.Instant + +import static com.netflix.spinnaker.orca.ExecutionStatus.TERMINAL import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.ORCHESTRATION import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.* import static java.time.ZoneOffset.UTC @@ -66,16 +68,6 @@ class TaskControllerSpec extends Specification { ).build() } - void '/tasks returns a list of active tasks'() { - when: - mockMvc.perform(get('/tasks')).andReturn().response - - then: - 1 * executionRepository.retrieve(ORCHESTRATION) >> { - return rx.Observable.empty() - } - } - void 'should cancel a list of tasks by id'() { when: def response = mockMvc.perform( @@ -90,21 +82,24 @@ class TaskControllerSpec extends Specification { void 'step names are properly translated'() { given: - executionRepository.retrieve(ORCHESTRATION) >> rx.Observable.from([orchestration { + executionRepository.retrieve(ORCHESTRATION, _) >> orchestration { id = "1" - application = "covfefe" + application = "test" stage { type = "test" - tasks = [new Task(name: 'jobOne'), new Task(name: 'jobTwo')] + tasks = [ + new Task(id: "1", name: 'jobOne', implementingClass: "foo", status: TERMINAL), + new Task(id: "2", name: 'jobTwo', implementingClass: "foo", status: TERMINAL) + ] } - }]) + } when: - def response = mockMvc.perform(get('/tasks')).andReturn().response + def response = mockMvc.perform(get('/tasks/1')).andReturn().response then: response.status == 200 - with(new JsonSlurper().parseText(response.contentAsString).first()) { + with(new JsonSlurper().parseText(response.contentAsString)) { steps.name == ['jobOne', 'jobTwo'] } } @@ -129,22 +124,12 @@ class TaskControllerSpec extends Specification { ] } - void '/tasks returns [] when there are no tasks'() { - when: - MockHttpServletResponse response = mockMvc.perform(get('/tasks')).andReturn().response - - then: - 1 * executionRepository.retrieve(ORCHESTRATION) >> rx.Observable.from([]) - response.status == 200 - response.contentAsString == '[]' - } - void '/applications/{application}/tasks filters tasks by application'() { when: def response = mockMvc.perform(get("/applications/$app/tasks")).andReturn().response then: - 1 * executionRepository.retrieveOrchestrationsForApplication(app, _) >> rx.Observable.empty() + 1 * executionRepository.retrieveOrchestrationsForApplication(app, _) >> [] where: app = "test" @@ -166,7 +151,7 @@ class TaskControllerSpec extends Specification { } } def app = 'test' - executionRepository.retrieveOrchestrationsForApplication(app, _) >> rx.Observable.from(tasks) + executionRepository.retrieveOrchestrationsForApplication(app, _) >> tasks when: def response = new ObjectMapper().readValue( @@ -193,7 +178,7 @@ class TaskControllerSpec extends Specification { [pipelineConfigId: "2", id: 'older3', application: app, startTime: clock.instant().minus(daysOfExecutionHistory + 1, DAYS).minus(4, HOURS).toEpochMilli()] ] - executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> pipelines.findAll { it.pipelineConfigId == "1" }.collect { config -> pipeline { @@ -202,8 +187,8 @@ class TaskControllerSpec extends Specification { startTime = config.startTime pipelineConfigId = config.pipelineConfigId } - }) - executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> rx.Observable.from(pipelines.findAll { + } + executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> pipelines.findAll { it.pipelineConfigId == "2" }.collect { config -> pipeline { @@ -212,7 +197,7 @@ class TaskControllerSpec extends Specification { startTime = config.startTime pipelineConfigId = config.pipelineConfigId } - }) + } front50Service.getPipelines(app, false) >> [[id: "1"], [id: "2"]] front50Service.getStrategies(app) >> [] @@ -236,7 +221,7 @@ class TaskControllerSpec extends Specification { [pipelineConfigId: "3", id: "started-5", application: "covfefe", startTime: clock.instant().minus(daysOfExecutionHistory, DAYS).minus(2, HOURS).toEpochMilli(), id: 'old-3'] ] - executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> pipelines.findAll { it.pipelineConfigId == "1" }.collect { config -> pipeline { @@ -245,8 +230,8 @@ class TaskControllerSpec extends Specification { startTime = config.startTime pipelineConfigId = config.pipelineConfigId } - }) - executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> rx.Observable.from(pipelines.findAll { + } + executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> pipelines.findAll { it.pipelineConfigId == "2" }.collect { config -> pipeline { @@ -255,8 +240,8 @@ class TaskControllerSpec extends Specification { startTime = config.startTime pipelineConfigId = config.pipelineConfigId } - }) - executionRepository.retrievePipelinesForPipelineConfigId("3", _) >> rx.Observable.from(pipelines.findAll { + } + executionRepository.retrievePipelinesForPipelineConfigId("3", _) >> pipelines.findAll { it.pipelineConfigId == "3" }.collect { config -> pipeline { @@ -265,7 +250,7 @@ class TaskControllerSpec extends Specification { startTime = config.startTime pipelineConfigId = config.pipelineConfigId } - }) + } when: def response = mockMvc.perform(get("/pipelines?pipelineConfigIds=1,2")).andReturn().response