Skip to content

Commit

Permalink
Refactor TaskGraph construction (vercel#2546)
Browse files Browse the repository at this point in the history
* Invert condition for early exit
* filter tasksOnly consistently and add code comments
* Build PackageTaskDeps as a map instead of converting later
* Improve struct names for options
  • Loading branch information
mehulkar authored Nov 2, 2022
1 parent 652408b commit 97904c1
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 98 deletions.
163 changes: 83 additions & 80 deletions cli/internal/core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Engine struct {
TaskGraph *dag.AcyclicGraph
// Tasks are a map of tasks in the engine
Tasks map[string]*Task
PackageTaskDeps [][]string
PackageTaskDeps map[string][]string
rootEnabledTasks util.Set
}

Expand All @@ -39,13 +39,13 @@ func NewEngine(topologicalGraph *dag.AcyclicGraph) *Engine {
Tasks: make(map[string]*Task),
TopologicGraph: topologicalGraph,
TaskGraph: &dag.AcyclicGraph{},
PackageTaskDeps: [][]string{},
PackageTaskDeps: map[string][]string{},
rootEnabledTasks: make(util.Set),
}
}

// EngineExecutionOptions are options for a single engine execution
type EngineExecutionOptions struct {
// EngineBuildingOptions help construct the TaskGraph
type EngineBuildingOptions struct {
// Packages in the execution scope, if nil, all packages will be considered in scope
Packages []string
// TaskNames in the execution scope, if nil, all tasks will be executed
Expand All @@ -55,7 +55,7 @@ type EngineExecutionOptions struct {
}

// Prepare constructs the Task Graph for a list of packages and tasks
func (e *Engine) Prepare(options *EngineExecutionOptions) error {
func (e *Engine) Prepare(options *EngineBuildingOptions) error {
pkgs := options.Packages
tasks := options.TaskNames
if len(tasks) == 0 {
Expand All @@ -72,16 +72,16 @@ func (e *Engine) Prepare(options *EngineExecutionOptions) error {
return nil
}

// ExecOpts controls a single walk of the task graph
type ExecOpts struct {
// EngineExecutionOptions controls a single walk of the task graph
type EngineExecutionOptions struct {
// Parallel is whether to run tasks in parallel
Parallel bool
// Concurrency is the number of concurrent tasks that can be executed
Concurrency int
}

// Execute executes the pipeline, constructing an internal task graph and walking it accordingly.
func (e *Engine) Execute(visitor Visitor, opts ExecOpts) []error {
func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error {
var sema = util.NewSemaphore(opts.Concurrency)
return e.TaskGraph.Walk(func(v dag.Vertex) error {
// Always return if it is the root node
Expand Down Expand Up @@ -109,12 +109,6 @@ func (e *Engine) getTaskDefinition(pkg string, taskName string, taskID string) (
}

func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly bool) error {
if e.PackageTaskDeps == nil {
e.PackageTaskDeps = [][]string{}
}

packageTasksDepsMap := getPackageTaskDepsMap(e.PackageTaskDeps)

traversalQueue := []string{}
for _, pkg := range pkgs {
isRootPkg := pkg == util.RootPkgName
Expand Down Expand Up @@ -146,89 +140,92 @@ func (e *Engine) generateTaskGraph(pkgs []string, taskNames []string, tasksOnly
if err != nil {
return err
}
if !visited.Includes(taskID) {
visited.Add(taskID)
deps := task.Deps

if tasksOnly {
deps = deps.Filter(func(d interface{}) bool {
for _, target := range taskNames {
return fmt.Sprintf("%v", d) == target
}
return false
})
task.TopoDeps = task.TopoDeps.Filter(func(d interface{}) bool {
for _, target := range taskNames {
return fmt.Sprintf("%v", d) == target
}
return false
})
}

toTaskID := taskID
hasTopoDeps := task.TopoDeps.Len() > 0 && e.TopologicGraph.DownEdges(pkg).Len() > 0
hasDeps := deps.Len() > 0
hasPackageTaskDeps := false
if _, ok := packageTasksDepsMap[toTaskID]; ok {
hasPackageTaskDeps = true
}
// Skip this iteration of the loop if we've already seen this taskID
if visited.Includes(taskID) {
continue
}

visited.Add(taskID)

if hasTopoDeps {
depPkgs := e.TopologicGraph.DownEdges(pkg)
for _, from := range task.TopoDeps.UnsafeListOfStrings() {
// add task dep from all the package deps within repo
for depPkg := range depPkgs {
fromTaskID := util.GetTaskId(depPkg, from)
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
// Filter down the tasks if there's a filter in place
// https: //turbo.build/repo/docs/reference/command-line-reference#--only
if tasksOnly {
task.Deps = task.Deps.Filter(func(d interface{}) bool {
for _, target := range taskNames {
return fmt.Sprintf("%v", d) == target
}
}
return false
})
task.TopoDeps = task.TopoDeps.Filter(func(d interface{}) bool {
for _, target := range taskNames {
return fmt.Sprintf("%v", d) == target
}
return false
})
}

toTaskID := taskID

// hasTopoDeps will be true if the task depends on any tasks from dependency packages
// E.g. `dev: { dependsOn: [^dev] }`
hasTopoDeps := task.TopoDeps.Len() > 0 && e.TopologicGraph.DownEdges(pkg).Len() > 0

if hasDeps {
for _, from := range deps.UnsafeListOfStrings() {
fromTaskID := util.GetTaskId(pkg, from)
// hasDeps will be true if the task depends on any tasks from its own package
// E.g. `build: { dependsOn: [dev] }`
hasDeps := task.Deps.Len() > 0

// hasPackageTaskDeps will be true if this is a workspace-specific task, and
// it depends on another workspace-specific tasks
// E.g. `my-package#build: { dependsOn: [my-package#beforebuild] }`.
hasPackageTaskDeps := false
if _, ok := e.PackageTaskDeps[toTaskID]; ok {
hasPackageTaskDeps = true
}

if hasTopoDeps {
depPkgs := e.TopologicGraph.DownEdges(pkg)
for _, from := range task.TopoDeps.UnsafeListOfStrings() {
// add task dep from all the package deps within repo
for depPkg := range depPkgs {
fromTaskID := util.GetTaskId(depPkg, from)
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}
}

if hasPackageTaskDeps {
if pkgTaskDeps, ok := packageTasksDepsMap[toTaskID]; ok {
for _, fromTaskID := range pkgTaskDeps {
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}
if hasDeps {
for _, from := range task.Deps.UnsafeListOfStrings() {
fromTaskID := util.GetTaskId(pkg, from)
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}

if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps {
e.TaskGraph.Add(ROOT_NODE_NAME)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME))
if hasPackageTaskDeps {
if pkgTaskDeps, ok := e.PackageTaskDeps[toTaskID]; ok {
for _, fromTaskID := range pkgTaskDeps {
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}
}
}
return nil
}

func getPackageTaskDepsMap(packageTaskDeps [][]string) map[string][]string {
depMap := make(map[string][]string)
for _, packageTaskDep := range packageTaskDeps {
from := packageTaskDep[0]
to := packageTaskDep[1]
if _, ok := depMap[to]; !ok {
depMap[to] = []string{}
if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps {
e.TaskGraph.Add(ROOT_NODE_NAME)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME))
}
depMap[to] = append(depMap[to], from)
}
return depMap

return nil
}

// AddTask adds a task to the Engine so it can be looked up later.
Expand All @@ -251,6 +248,12 @@ func (e *Engine) AddDep(fromTaskID string, toTaskID string) error {
if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.TopologicGraph.HasVertex(fromPkg) {
return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID)
}
e.PackageTaskDeps = append(e.PackageTaskDeps, []string{fromTaskID, toTaskID})

if _, ok := e.PackageTaskDeps[fromTaskID]; !ok {
e.PackageTaskDeps[toTaskID] = []string{}
}

e.PackageTaskDeps[toTaskID] = append(e.PackageTaskDeps[toTaskID], fromTaskID)

return nil
}
30 changes: 15 additions & 15 deletions cli/internal/core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestEngineDefault(t *testing.T) {
t.Fatal("AddTask is not adding tasks (test)")
}

err := p.Prepare(&EngineExecutionOptions{
err := p.Prepare(&EngineBuildingOptions{
Packages: []string{"a", "b", "c"},
TaskNames: []string{"test"},
TasksOnly: false,
Expand All @@ -65,7 +65,7 @@ func TestEngineDefault(t *testing.T) {
t.Fatalf("%v", err)
}

errs := p.Execute(testVisitor, ExecOpts{
errs := p.Execute(testVisitor, EngineExecutionOptions{
Concurrency: 10,
})

Expand Down Expand Up @@ -135,14 +135,14 @@ func TestDependenciesOnUnspecifiedPackages(t *testing.T) {
// but the combination of that package and task causes
// dependencies to also get run. This is the equivalent of
// turbo run test --filter=app2
err := p.Prepare(&EngineExecutionOptions{
err := p.Prepare(&EngineBuildingOptions{
Packages: []string{"app2"},
TaskNames: []string{"test"},
})
if err != nil {
t.Fatalf("failed to prepare engine: %v", err)
}
errs := p.Execute(testVisitor, ExecOpts{
errs := p.Execute(testVisitor, EngineExecutionOptions{
Concurrency: 10,
})
for _, err := range errs {
Expand Down Expand Up @@ -188,12 +188,12 @@ func TestRunPackageTask(t *testing.T) {
})
// equivalent to "turbo run special", without an entry for
// "special" in turbo.json. Only "app1#special" is defined.
err := p.Prepare(&EngineExecutionOptions{
err := p.Prepare(&EngineBuildingOptions{
Packages: []string{"app1", "libA"},
TaskNames: []string{"special"},
})
assert.NilError(t, err, "Prepare")
errs := p.Execute(testVisitor, ExecOpts{
errs := p.Execute(testVisitor, EngineExecutionOptions{
Concurrency: 10,
})
for _, err := range errs {
Expand All @@ -219,7 +219,7 @@ func TestRunWithNoTasksFound(t *testing.T) {
dependOnBuild := make(util.Set)
dependOnBuild.Add("build")

err := p.Prepare(&EngineExecutionOptions{
err := p.Prepare(&EngineBuildingOptions{
Packages: []string{"app", "lib"},
TaskNames: []string{"build"},
})
Expand Down Expand Up @@ -251,14 +251,14 @@ func TestIncludeRootTasks(t *testing.T) {
TopoDeps: make(util.Set),
Deps: make(util.Set),
})
err := p.Prepare(&EngineExecutionOptions{
err := p.Prepare(&EngineBuildingOptions{
Packages: []string{util.RootPkgName, "app1", "libA"},
TaskNames: []string{"build", "test"},
})
if err != nil {
t.Fatalf("failed to prepare engine: %v", err)
}
errs := p.Execute(testVisitor, ExecOpts{
errs := p.Execute(testVisitor, EngineExecutionOptions{
Concurrency: 10,
})
for _, err := range errs {
Expand Down Expand Up @@ -307,12 +307,12 @@ func TestDependOnRootTask(t *testing.T) {
err := p.AddDep("//#root-task", "libA#build")
assert.NilError(t, err, "AddDep")

err = p.Prepare(&EngineExecutionOptions{
err = p.Prepare(&EngineBuildingOptions{
Packages: []string{"app1"},
TaskNames: []string{"build"},
})
assert.NilError(t, err, "Prepare")
errs := p.Execute(testVisitor, ExecOpts{
errs := p.Execute(testVisitor, EngineExecutionOptions{
Concurrency: 10,
})
for _, err := range errs {
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestDependOnMissingRootTask(t *testing.T) {
err := p.AddDep("//#root-task", "libA#build")
assert.NilError(t, err, "AddDep")

err = p.Prepare(&EngineExecutionOptions{
err = p.Prepare(&EngineBuildingOptions{
Packages: []string{"app1"},
TaskNames: []string{"build"},
})
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestDependOnUnenabledRootTask(t *testing.T) {
err := p.AddDep("//#foo", "libA#build")
assert.NilError(t, err, "AddDep")

err = p.Prepare(&EngineExecutionOptions{
err = p.Prepare(&EngineBuildingOptions{
Packages: []string{"app1"},
TaskNames: []string{"build"},
})
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestEngineTasksOnly(t *testing.T) {
t.Fatal("AddTask is not adding tasks (test)")
}

err := p.Prepare(&EngineExecutionOptions{
err := p.Prepare(&EngineBuildingOptions{
Packages: []string{"a", "b", "c"},
TaskNames: []string{"test"},
TasksOnly: true,
Expand All @@ -433,7 +433,7 @@ func TestEngineTasksOnly(t *testing.T) {
t.Fatalf("%v", err)
}

errs := p.Execute(testVisitor, ExecOpts{
errs := p.Execute(testVisitor, EngineExecutionOptions{
Concurrency: 10,
})

Expand Down
6 changes: 3 additions & 3 deletions cli/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func buildTaskGraphEngine(topoGraph *dag.AcyclicGraph, pipeline fs.Pipeline, rs
})
}

if err := engine.Prepare(&core.EngineExecutionOptions{
if err := engine.Prepare(&core.EngineBuildingOptions{
Packages: rs.FilteredPkgs.UnsafeListOfStrings(),
TaskNames: rs.Targets,
TasksOnly: rs.Opts.runOpts.only,
Expand Down Expand Up @@ -776,7 +776,7 @@ func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec,
}

// run the thing
execOpts := core.ExecOpts{
execOpts := core.EngineExecutionOptions{
Parallel: rs.Opts.runOpts.parallel,
Concurrency: rs.Opts.runOpts.concurrency,
}
Expand Down Expand Up @@ -934,7 +934,7 @@ func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Engine, g *compl
})

return nil
}), core.ExecOpts{
}), core.EngineExecutionOptions{
Concurrency: 1,
Parallel: false,
})
Expand Down

0 comments on commit 97904c1

Please sign in to comment.