diff --git a/cli/internal/core/engine.go b/cli/internal/core/engine.go index 7a37a723b19ed..3867d5e597ea1 100644 --- a/cli/internal/core/engine.go +++ b/cli/internal/core/engine.go @@ -29,7 +29,7 @@ type Engine struct { TaskGraph *dag.AcyclicGraph // Tasks are a map of tasks in the engine Tasks map[string]*Task - PackageTaskDeps [][]string + PackageTaskDeps map[string][]string rootEnabledTasks util.Set } @@ -39,13 +39,13 @@ func NewEngine(topologicalGraph *dag.AcyclicGraph) *Engine { Tasks: make(map[string]*Task), TopologicGraph: topologicalGraph, TaskGraph: &dag.AcyclicGraph{}, - PackageTaskDeps: [][]string{}, + PackageTaskDeps: map[string][]string{}, rootEnabledTasks: make(util.Set), } } -// EngineExecutionOptions are options for a single engine execution -type EngineExecutionOptions struct { +// EngineBuildingOptions help construct the TaskGraph +type EngineBuildingOptions struct { // Packages in the execution scope, if nil, all packages will be considered in scope Packages []string // TaskNames in the execution scope, if nil, all tasks will be executed @@ -55,7 +55,7 @@ type EngineExecutionOptions struct { } // Prepare constructs the Task Graph for a list of packages and tasks -func (e *Engine) Prepare(options *EngineExecutionOptions) error { +func (e *Engine) Prepare(options *EngineBuildingOptions) error { pkgs := options.Packages tasks := options.TaskNames if len(tasks) == 0 { @@ -72,8 +72,8 @@ func (e *Engine) Prepare(options *EngineExecutionOptions) error { return nil } -// ExecOpts controls a single walk of the task graph -type ExecOpts struct { +// EngineExecutionOptions controls a single walk of the task graph +type EngineExecutionOptions struct { // Parallel is whether to run tasks in parallel Parallel bool // Concurrency is the number of concurrent tasks that can be executed @@ -81,7 +81,7 @@ type ExecOpts struct { } // Execute executes the pipeline, constructing an internal task graph and walking it accordingly. -func (e *Engine) Execute(visitor Visitor, opts ExecOpts) []error { +func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error { var sema = util.NewSemaphore(opts.Concurrency) return e.TaskGraph.Walk(func(v dag.Vertex) error { // Always return if it is the root node @@ -109,12 +109,6 @@ func (e *Engine) getTaskDefinition(pkg string, taskName string, taskID string) ( } func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly bool) error { - if e.PackageTaskDeps == nil { - e.PackageTaskDeps = [][]string{} - } - - packageTasksDepsMap := getPackageTaskDepsMap(e.PackageTaskDeps) - traversalQueue := []string{} for _, pkg := range pkgs { isRootPkg := pkg == util.RootPkgName @@ -146,89 +140,92 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly if err != nil { return err } - if !visited.Includes(taskID) { - visited.Add(taskID) - deps := task.Deps - - if tasksOnly { - deps = deps.Filter(func(d interface{}) bool { - for _, target := range taskNames { - return fmt.Sprintf("%v", d) == target - } - return false - }) - task.TopoDeps = task.TopoDeps.Filter(func(d interface{}) bool { - for _, target := range taskNames { - return fmt.Sprintf("%v", d) == target - } - return false - }) - } - toTaskID := taskID - hasTopoDeps := task.TopoDeps.Len() > 0 && e.TopologicGraph.DownEdges(pkg).Len() > 0 - hasDeps := deps.Len() > 0 - hasPackageTaskDeps := false - if _, ok := packageTasksDepsMap[toTaskID]; ok { - hasPackageTaskDeps = true - } + // Skip this iteration of the loop if we've already seen this taskID + if visited.Includes(taskID) { + continue + } + + visited.Add(taskID) - if hasTopoDeps { - depPkgs := e.TopologicGraph.DownEdges(pkg) - for _, from := range task.TopoDeps.UnsafeListOfStrings() { - // add task dep from all the package deps within repo - for depPkg := range depPkgs { - fromTaskID := util.GetTaskId(depPkg, from) - e.TaskGraph.Add(fromTaskID) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) - traversalQueue = append(traversalQueue, fromTaskID) - } + // Filter down the tasks if there's a filter in place + // https: //turbo.build/repo/docs/reference/command-line-reference#--only + if tasksOnly { + task.Deps = task.Deps.Filter(func(d interface{}) bool { + for _, target := range taskNames { + return fmt.Sprintf("%v", d) == target } - } + return false + }) + task.TopoDeps = task.TopoDeps.Filter(func(d interface{}) bool { + for _, target := range taskNames { + return fmt.Sprintf("%v", d) == target + } + return false + }) + } + + toTaskID := taskID + + // hasTopoDeps will be true if the task depends on any tasks from dependency packages + // E.g. `dev: { dependsOn: [^dev] }` + hasTopoDeps := task.TopoDeps.Len() > 0 && e.TopologicGraph.DownEdges(pkg).Len() > 0 - if hasDeps { - for _, from := range deps.UnsafeListOfStrings() { - fromTaskID := util.GetTaskId(pkg, from) + // hasDeps will be true if the task depends on any tasks from its own package + // E.g. `build: { dependsOn: [dev] }` + hasDeps := task.Deps.Len() > 0 + + // hasPackageTaskDeps will be true if this is a workspace-specific task, and + // it depends on another workspace-specific tasks + // E.g. `my-package#build: { dependsOn: [my-package#beforebuild] }`. + hasPackageTaskDeps := false + if _, ok := e.PackageTaskDeps[toTaskID]; ok { + hasPackageTaskDeps = true + } + + if hasTopoDeps { + depPkgs := e.TopologicGraph.DownEdges(pkg) + for _, from := range task.TopoDeps.UnsafeListOfStrings() { + // add task dep from all the package deps within repo + for depPkg := range depPkgs { + fromTaskID := util.GetTaskId(depPkg, from) e.TaskGraph.Add(fromTaskID) e.TaskGraph.Add(toTaskID) e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) traversalQueue = append(traversalQueue, fromTaskID) } } + } - if hasPackageTaskDeps { - if pkgTaskDeps, ok := packageTasksDepsMap[toTaskID]; ok { - for _, fromTaskID := range pkgTaskDeps { - e.TaskGraph.Add(fromTaskID) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) - traversalQueue = append(traversalQueue, fromTaskID) - } - } + if hasDeps { + for _, from := range task.Deps.UnsafeListOfStrings() { + fromTaskID := util.GetTaskId(pkg, from) + e.TaskGraph.Add(fromTaskID) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) + traversalQueue = append(traversalQueue, fromTaskID) } + } - if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps { - e.TaskGraph.Add(ROOT_NODE_NAME) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME)) + if hasPackageTaskDeps { + if pkgTaskDeps, ok := e.PackageTaskDeps[toTaskID]; ok { + for _, fromTaskID := range pkgTaskDeps { + e.TaskGraph.Add(fromTaskID) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) + traversalQueue = append(traversalQueue, fromTaskID) + } } } - } - return nil -} -func getPackageTaskDepsMap(packageTaskDeps [][]string) map[string][]string { - depMap := make(map[string][]string) - for _, packageTaskDep := range packageTaskDeps { - from := packageTaskDep[0] - to := packageTaskDep[1] - if _, ok := depMap[to]; !ok { - depMap[to] = []string{} + if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps { + e.TaskGraph.Add(ROOT_NODE_NAME) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME)) } - depMap[to] = append(depMap[to], from) } - return depMap + + return nil } // AddTask adds a task to the Engine so it can be looked up later. @@ -251,6 +248,12 @@ func (e *Engine) AddDep(fromTaskID string, toTaskID string) error { if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.TopologicGraph.HasVertex(fromPkg) { return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID) } - e.PackageTaskDeps = append(e.PackageTaskDeps, []string{fromTaskID, toTaskID}) + + if _, ok := e.PackageTaskDeps[fromTaskID]; !ok { + e.PackageTaskDeps[toTaskID] = []string{} + } + + e.PackageTaskDeps[toTaskID] = append(e.PackageTaskDeps[toTaskID], fromTaskID) + return nil } diff --git a/cli/internal/core/engine_test.go b/cli/internal/core/engine_test.go index 8fc2650eb9162..3ca2c35b649dc 100644 --- a/cli/internal/core/engine_test.go +++ b/cli/internal/core/engine_test.go @@ -55,7 +55,7 @@ func TestEngineDefault(t *testing.T) { t.Fatal("AddTask is not adding tasks (test)") } - err := p.Prepare(&EngineExecutionOptions{ + err := p.Prepare(&EngineBuildingOptions{ Packages: []string{"a", "b", "c"}, TaskNames: []string{"test"}, TasksOnly: false, @@ -65,7 +65,7 @@ func TestEngineDefault(t *testing.T) { t.Fatalf("%v", err) } - errs := p.Execute(testVisitor, ExecOpts{ + errs := p.Execute(testVisitor, EngineExecutionOptions{ Concurrency: 10, }) @@ -135,14 +135,14 @@ func TestDependenciesOnUnspecifiedPackages(t *testing.T) { // but the combination of that package and task causes // dependencies to also get run. This is the equivalent of // turbo run test --filter=app2 - err := p.Prepare(&EngineExecutionOptions{ + err := p.Prepare(&EngineBuildingOptions{ Packages: []string{"app2"}, TaskNames: []string{"test"}, }) if err != nil { t.Fatalf("failed to prepare engine: %v", err) } - errs := p.Execute(testVisitor, ExecOpts{ + errs := p.Execute(testVisitor, EngineExecutionOptions{ Concurrency: 10, }) for _, err := range errs { @@ -188,12 +188,12 @@ func TestRunPackageTask(t *testing.T) { }) // equivalent to "turbo run special", without an entry for // "special" in turbo.json. Only "app1#special" is defined. - err := p.Prepare(&EngineExecutionOptions{ + err := p.Prepare(&EngineBuildingOptions{ Packages: []string{"app1", "libA"}, TaskNames: []string{"special"}, }) assert.NilError(t, err, "Prepare") - errs := p.Execute(testVisitor, ExecOpts{ + errs := p.Execute(testVisitor, EngineExecutionOptions{ Concurrency: 10, }) for _, err := range errs { @@ -219,7 +219,7 @@ func TestRunWithNoTasksFound(t *testing.T) { dependOnBuild := make(util.Set) dependOnBuild.Add("build") - err := p.Prepare(&EngineExecutionOptions{ + err := p.Prepare(&EngineBuildingOptions{ Packages: []string{"app", "lib"}, TaskNames: []string{"build"}, }) @@ -251,14 +251,14 @@ func TestIncludeRootTasks(t *testing.T) { TopoDeps: make(util.Set), Deps: make(util.Set), }) - err := p.Prepare(&EngineExecutionOptions{ + err := p.Prepare(&EngineBuildingOptions{ Packages: []string{util.RootPkgName, "app1", "libA"}, TaskNames: []string{"build", "test"}, }) if err != nil { t.Fatalf("failed to prepare engine: %v", err) } - errs := p.Execute(testVisitor, ExecOpts{ + errs := p.Execute(testVisitor, EngineExecutionOptions{ Concurrency: 10, }) for _, err := range errs { @@ -307,12 +307,12 @@ func TestDependOnRootTask(t *testing.T) { err := p.AddDep("//#root-task", "libA#build") assert.NilError(t, err, "AddDep") - err = p.Prepare(&EngineExecutionOptions{ + err = p.Prepare(&EngineBuildingOptions{ Packages: []string{"app1"}, TaskNames: []string{"build"}, }) assert.NilError(t, err, "Prepare") - errs := p.Execute(testVisitor, ExecOpts{ + errs := p.Execute(testVisitor, EngineExecutionOptions{ Concurrency: 10, }) for _, err := range errs { @@ -347,7 +347,7 @@ func TestDependOnMissingRootTask(t *testing.T) { err := p.AddDep("//#root-task", "libA#build") assert.NilError(t, err, "AddDep") - err = p.Prepare(&EngineExecutionOptions{ + err = p.Prepare(&EngineBuildingOptions{ Packages: []string{"app1"}, TaskNames: []string{"build"}, }) @@ -379,7 +379,7 @@ func TestDependOnUnenabledRootTask(t *testing.T) { err := p.AddDep("//#foo", "libA#build") assert.NilError(t, err, "AddDep") - err = p.Prepare(&EngineExecutionOptions{ + err = p.Prepare(&EngineBuildingOptions{ Packages: []string{"app1"}, TaskNames: []string{"build"}, }) @@ -423,7 +423,7 @@ func TestEngineTasksOnly(t *testing.T) { t.Fatal("AddTask is not adding tasks (test)") } - err := p.Prepare(&EngineExecutionOptions{ + err := p.Prepare(&EngineBuildingOptions{ Packages: []string{"a", "b", "c"}, TaskNames: []string{"test"}, TasksOnly: true, @@ -433,7 +433,7 @@ func TestEngineTasksOnly(t *testing.T) { t.Fatalf("%v", err) } - errs := p.Execute(testVisitor, ExecOpts{ + errs := p.Execute(testVisitor, EngineExecutionOptions{ Concurrency: 10, }) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 61fda3244cc62..eb33cead0d09e 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -500,7 +500,7 @@ func buildTaskGraphEngine(topoGraph *dag.AcyclicGraph, pipeline fs.Pipeline, rs }) } - if err := engine.Prepare(&core.EngineExecutionOptions{ + if err := engine.Prepare(&core.EngineBuildingOptions{ Packages: rs.FilteredPkgs.UnsafeListOfStrings(), TaskNames: rs.Targets, TasksOnly: rs.Opts.runOpts.only, @@ -776,7 +776,7 @@ func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec, } // run the thing - execOpts := core.ExecOpts{ + execOpts := core.EngineExecutionOptions{ Parallel: rs.Opts.runOpts.parallel, Concurrency: rs.Opts.runOpts.concurrency, } @@ -934,7 +934,7 @@ func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Engine, g *compl }) return nil - }), core.ExecOpts{ + }), core.EngineExecutionOptions{ Concurrency: 1, Parallel: false, })