Skip to content

Commit

Permalink
Refactor Dry Run so that output is printed by Run Summaries (vercel#4403
Browse files Browse the repository at this point in the history
)

The goal is to make RunSummary responsible for all the JSON and text
and saved-to-disk versions of what happened in a Run via the `.Close()`
method. This commit incorporates DryRun output into this mold. Specifically:

- Pass all of RunOpts to RunSummary.
    Many of the pieces were being passed in already and this work need
    the dry run flag also. Moves runOpts into the util package to avoid circular
    imports.
- Use `.Close()` to print dry run output
- Refactor DryRun execution to look more like Real Run.
  • Loading branch information
mehulkar authored Mar 31, 2023
1 parent dbe5b2c commit ec70b15
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 136 deletions.
71 changes: 24 additions & 47 deletions cli/internal/run/dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,53 +23,17 @@ func DryRun(
g *graph.CompleteGraph,
rs *runSpec,
engine *core.Engine,
taskHashTracker *taskhash.Tracker,
_ *taskhash.Tracker, // unused, but keep here for parity with RealRun method signature
turboCache cache.Cache,
base *cmdutil.CmdBase,
summary runsummary.Meta,
) error {
defer turboCache.Shutdown()

dryRunJSON := rs.Opts.runOpts.dryRunJSON
taskSummaries := []*runsummary.TaskSummary{}

taskSummaries, err := executeDryRun(
ctx,
engine,
g,
taskHashTracker,
rs,
base,
)

if err != nil {
return err
}

// We walk the graph with no concurrency.
// Populating the cache state is parallelizable.
// Do this _after_ walking the graph.
populateCacheState(turboCache, taskSummaries)

// Assign the Task Summaries to the main summary
summary.RunSummary.Tasks = taskSummaries

// Render the dry run as json
if dryRunJSON {
rendered, err := summary.FormatJSON()
if err != nil {
return err
}
base.UI.Output(string(rendered))
return nil
}

return summary.FormatAndPrintText(g.WorkspaceInfos)
}

func executeDryRun(ctx gocontext.Context, engine *core.Engine, g *graph.CompleteGraph, taskHashTracker *taskhash.Tracker, rs *runSpec, base *cmdutil.CmdBase) ([]*runsummary.TaskSummary, error) {
taskIDs := []*runsummary.TaskSummary{}

dryRunExecFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
mu := sync.Mutex{}
execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
// Assign some fallbacks if they were missing
if taskSummary.Command == "" {
taskSummary.Command = runsummary.MissingTaskLabel
Expand All @@ -79,8 +43,11 @@ func executeDryRun(ctx gocontext.Context, engine *core.Engine, g *graph.Complete
taskSummary.Framework = runsummary.MissingFrameworkLabel
}

taskIDs = append(taskIDs, taskSummary)

// This mutex is not _really_ required, since we are using Concurrency: 1 as an execution
// option, but we add it here to match the shape of RealRuns execFunc.
mu.Lock()
defer mu.Unlock()
taskSummaries = append(taskSummaries, taskSummary)
return nil
}

Expand All @@ -91,21 +58,31 @@ func executeDryRun(ctx gocontext.Context, engine *core.Engine, g *graph.Complete
getArgs := func(taskID string) []string {
return rs.ArgsForTask(taskID)
}
visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, getArgs, base.Logger, dryRunExecFunc)

visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, getArgs, base.Logger, execFunc)
execOpts := core.EngineExecutionOptions{
Concurrency: 1,
Parallel: false,
}
errs := engine.Execute(visitorFn, execOpts)

if len(errs) > 0 {
if errs := engine.Execute(visitorFn, execOpts); len(errs) > 0 {
for _, err := range errs {
base.UI.Error(err.Error())
}
return nil, errors.New("errors occurred during dry-run graph traversal")
return errors.New("errors occurred during dry-run graph traversal")
}

return taskIDs, nil
// We walk the graph with no concurrency.
// Populating the cache state is parallelizable.
// Do this _after_ walking the graph.
populateCacheState(turboCache, taskSummaries)

// Assign the Task Summaries to the main summary
summary.RunSummary.Tasks = taskSummaries

// The exitCode isn't really used by the Run Summary Close() method for dry runs
// but we pass in a successful value to match Real Runs.
return summary.Close(0, g.WorkspaceInfos)
}

func populateCacheState(turboCache cache.Cache, taskSummaries []*runsummary.TaskSummary) {
Expand Down
6 changes: 3 additions & 3 deletions cli/internal/run/graph_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
// GraphRun generates a visualization of the task graph rather than executing it.
func GraphRun(ctx gocontext.Context, rs *runSpec, engine *core.Engine, base *cmdutil.CmdBase) error {
graph := engine.TaskGraph
if rs.Opts.runOpts.singlePackage {
if rs.Opts.runOpts.SinglePackage {
graph = filterSinglePackageGraphForDisplay(engine.TaskGraph)
}
visualizer := graphvisualizer.New(base.RepoRoot, base.UI, graph)

if rs.Opts.runOpts.graphDot {
if rs.Opts.runOpts.GraphDot {
visualizer.RenderDotGraph()
} else {
err := visualizer.GenerateGraphFile(rs.Opts.runOpts.graphFile)
err := visualizer.GenerateGraphFile(rs.Opts.runOpts.GraphFile)
if err != nil {
return err
}
Expand Down
20 changes: 12 additions & 8 deletions cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func RealRun(
packageManager *packagemanager.PackageManager,
processes *process.Manager,
) error {
singlePackage := rs.Opts.runOpts.singlePackage
singlePackage := rs.Opts.runOpts.SinglePackage

if singlePackage {
base.UI.Output(fmt.Sprintf("%s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", ")))))
Expand Down Expand Up @@ -85,8 +85,8 @@ func RealRun(

// run the thing
execOpts := core.EngineExecutionOptions{
Parallel: rs.Opts.runOpts.parallel,
Concurrency: rs.Opts.runOpts.concurrency,
Parallel: rs.Opts.runOpts.Parallel,
Concurrency: rs.Opts.runOpts.Concurrency,
}

mu := sync.Mutex{}
Expand Down Expand Up @@ -149,7 +149,7 @@ func RealRun(

// When continue on error is enabled don't register failed tasks as errors
// and instead must inspect the task summaries.
if ec.rs.Opts.runOpts.continueOnError {
if ec.rs.Opts.runOpts.ContinueOnError {
for _, summary := range runSummary.RunSummary.Tasks {
if childExit := summary.Execution.ExitCode(); childExit != nil {
childExit := *childExit
Expand All @@ -163,7 +163,11 @@ func RealRun(
}
}

runSummary.Close(exitCode)
if err := runSummary.Close(exitCode, g.WorkspaceInfos); err != nil {
// We don't need to throw an error, but we can warn on this.
// Note: this method doesn't actually return an error for Real Runs at the time of writing.
base.UI.Info(fmt.Sprintf("Failed to close Run Summary %v", err))
}

if exitCode != 0 {
return &process.ChildExit{
Expand Down Expand Up @@ -225,7 +229,7 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

var prefix string
var prettyPrefix string
if ec.rs.Opts.runOpts.logPrefix == "none" {
if ec.rs.Opts.runOpts.LogPrefix == "none" {
prefix = ""
} else {
prefix = packageTask.OutputPrefix(ec.isSinglePackage)
Expand Down Expand Up @@ -274,7 +278,7 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas
tracer(runsummary.TargetBuildFailed, err, nil)

ec.logError(prettyPrefix, err)
if !ec.rs.Opts.runOpts.continueOnError {
if !ec.rs.Opts.runOpts.ContinueOnError {
return nil, errors.Wrapf(err, "failed to capture outputs for \"%v\"", packageTask.TaskID)
}
}
Expand Down Expand Up @@ -335,7 +339,7 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas
}

progressLogger.Error(fmt.Sprintf("Error: command finished with error: %v", err))
if !ec.rs.Opts.runOpts.continueOnError {
if !ec.rs.Opts.runOpts.ContinueOnError {
prefixedUI.Error(fmt.Sprintf("ERROR: command finished with error: %s", err))
ec.processes.Close()
} else {
Expand Down
63 changes: 30 additions & 33 deletions cli/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func ExecuteRun(ctx gocontext.Context, helper *cmdutil.Helper, signalWatcher *si
return err
}

opts.runOpts.passThroughArgs = passThroughArgs
opts.runOpts.PassThroughArgs = passThroughArgs
run := configureRun(base, opts, signalWatcher)
if err := run.run(ctx, tasks); err != nil {
base.LogError("run failed: %v", err)
Expand All @@ -80,9 +80,9 @@ func optsFromArgs(args *turbostate.ParsedArgsFromRust) (*Opts, error) {
opts.cacheOpts.SkipFilesystem = runPayload.RemoteOnly
opts.cacheOpts.OverrideDir = runPayload.CacheDir
opts.cacheOpts.Workers = runPayload.CacheWorkers
opts.runOpts.logPrefix = runPayload.LogPrefix
opts.runOpts.summarize = runPayload.Summarize
opts.runOpts.experimentalSpaceID = runPayload.ExperimentalSpaceID
opts.runOpts.LogPrefix = runPayload.LogPrefix
opts.runOpts.Summarize = runPayload.Summarize
opts.runOpts.ExperimentalSpaceID = runPayload.ExperimentalSpaceID

// Runcache flags
opts.runcacheOpts.SkipReads = runPayload.Force
Expand All @@ -101,33 +101,33 @@ func optsFromArgs(args *turbostate.ParsedArgsFromRust) (*Opts, error) {
if err != nil {
return nil, err
}
opts.runOpts.concurrency = concurrency
opts.runOpts.Concurrency = concurrency
}
opts.runOpts.parallel = runPayload.Parallel
opts.runOpts.profile = runPayload.Profile
opts.runOpts.continueOnError = runPayload.ContinueExecution
opts.runOpts.only = runPayload.Only
opts.runOpts.noDaemon = runPayload.NoDaemon
opts.runOpts.singlePackage = args.Command.Run.SinglePackage
opts.runOpts.Parallel = runPayload.Parallel
opts.runOpts.Profile = runPayload.Profile
opts.runOpts.ContinueOnError = runPayload.ContinueExecution
opts.runOpts.Only = runPayload.Only
opts.runOpts.NoDaemon = runPayload.NoDaemon
opts.runOpts.SinglePackage = args.Command.Run.SinglePackage

// See comment on Graph in turbostate.go for an explanation on Graph's representation.
// If flag is passed...
if runPayload.Graph != nil {
// If no value is attached, we print to stdout
if *runPayload.Graph == "" {
opts.runOpts.graphDot = true
opts.runOpts.GraphDot = true
} else {
// Otherwise, we emit to the file name attached as value
opts.runOpts.graphDot = false
opts.runOpts.graphFile = *runPayload.Graph
opts.runOpts.GraphDot = false
opts.runOpts.GraphFile = *runPayload.Graph
}
}

if runPayload.DryRun != "" {
opts.runOpts.dryRunJSON = runPayload.DryRun == _dryRunJSONValue
opts.runOpts.DryRunJSON = runPayload.DryRun == _dryRunJSONValue

if runPayload.DryRun == _dryRunTextValue || runPayload.DryRun == _dryRunJSONValue {
opts.runOpts.dryRun = true
opts.runOpts.DryRun = true
} else {
return nil, fmt.Errorf("invalid dry-run mode: %v", runPayload.DryRun)
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
}

var pkgDepGraph *context.Context
if r.opts.runOpts.singlePackage {
if r.opts.runOpts.SinglePackage {
pkgDepGraph, err = context.SinglePackageGraph(r.base.RepoRoot, rootPackageJSON)
} else {
pkgDepGraph, err = context.BuildPackageGraph(r.base.RepoRoot, rootPackageJSON)
Expand All @@ -183,9 +183,9 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
}
}

if ui.IsCI && !r.opts.runOpts.noDaemon {
if ui.IsCI && !r.opts.runOpts.NoDaemon {
r.base.Logger.Info("skipping turbod since we appear to be in a non-interactive context")
} else if !r.opts.runOpts.noDaemon {
} else if !r.opts.runOpts.NoDaemon {
turbodClient, err := daemon.GetClient(ctx, r.base.RepoRoot, r.base.Logger, r.base.TurboVersion, daemon.ClientOpts{})
if err != nil {
r.base.LogWarning("", errors.Wrap(err, "failed to contact turbod. Continuing in standalone mode"))
Expand All @@ -211,7 +211,7 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
RepoRoot: r.base.RepoRoot,
}

turboJSON, err := g.GetTurboConfigFromWorkspace(util.RootPkgName, r.opts.runOpts.singlePackage)
turboJSON, err := g.GetTurboConfigFromWorkspace(util.RootPkgName, r.opts.runOpts.SinglePackage)
if err != nil {
return err
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
engine, err := buildTaskGraphEngine(
g,
rs,
r.opts.runOpts.singlePackage,
r.opts.runOpts.SinglePackage,
)

if err != nil {
Expand All @@ -298,7 +298,7 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
// CalculateFileHashes assigns PackageInputsExpandedHashes as a side-effect
err = taskHashTracker.CalculateFileHashes(
engine.TaskGraph.Vertices(),
rs.Opts.runOpts.concurrency,
rs.Opts.runOpts.Concurrency,
g.WorkspaceInfos,
g.TaskDefinitions,
r.base.RepoRoot,
Expand All @@ -311,7 +311,7 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
// If we are running in parallel, then we remove all the edges in the graph
// except for the root. Rebuild the task graph for backwards compatibility.
// We still use dependencies specified by the pipeline configuration.
if rs.Opts.runOpts.parallel {
if rs.Opts.runOpts.Parallel {
for _, edge := range g.WorkspaceGraph.Edges() {
if edge.Target() != g.RootNode {
g.WorkspaceGraph.RemoveEdge(edge)
Expand All @@ -320,15 +320,15 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
engine, err = buildTaskGraphEngine(
g,
rs,
r.opts.runOpts.singlePackage,
r.opts.runOpts.SinglePackage,
)
if err != nil {
return errors.Wrap(err, "error preparing engine")
}
}

// Graph Run
if rs.Opts.runOpts.graphFile != "" || rs.Opts.runOpts.graphDot {
if rs.Opts.runOpts.GraphFile != "" || rs.Opts.runOpts.GraphDot {
return GraphRun(ctx, rs, engine, r.base)
}

Expand All @@ -353,9 +353,9 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
startAt,
r.base.UI,
r.base.RepoRoot,
rs.Opts.runOpts.singlePackage,
rs.Opts.runOpts.profile,
r.base.TurboVersion,
r.base.APIClient,
rs.Opts.runOpts,
packagesInScope,
runsummary.NewGlobalHashSummary(
globalHashable.globalFileHashMap,
Expand All @@ -364,13 +364,10 @@ func (r *run) run(ctx gocontext.Context, targets []string) error {
globalHashable.globalCacheKey,
globalHashable.pipeline,
),
rs.Opts.runOpts.summarize,
r.base.APIClient,
rs.Opts.runOpts.experimentalSpaceID,
)

// Dry Run
if rs.Opts.runOpts.dryRun {
if rs.Opts.runOpts.DryRun {
return DryRun(
ctx,
g,
Expand Down Expand Up @@ -443,7 +440,7 @@ func buildTaskGraphEngine(
if err := engine.Prepare(&core.EngineBuildingOptions{
Packages: rs.FilteredPkgs.UnsafeListOfStrings(),
TaskNames: rs.Targets,
TasksOnly: rs.Opts.runOpts.only,
TasksOnly: rs.Opts.runOpts.Only,
}); err != nil {
return nil, err
}
Expand All @@ -454,7 +451,7 @@ func buildTaskGraphEngine(
}

// Check that no tasks would be blocked by a persistent task
if err := engine.ValidatePersistentDependencies(g, rs.Opts.runOpts.concurrency); err != nil {
if err := engine.ValidatePersistentDependencies(g, rs.Opts.runOpts.Concurrency); err != nil {
return nil, fmt.Errorf("Invalid persistent task configuration:\n%v", err)
}

Expand Down
Loading

0 comments on commit ec70b15

Please sign in to comment.