Skip to content

Commit

Permalink
refactor(automation): align contexts and run sync
Browse files Browse the repository at this point in the history
First, we changed which methods are responsible for ensuring that we have a long-running context for the "run".

Then, we adjusted how the `ApplyWorkflow` handled the `wait` param. Since we need the `runLock` to be locked during the entire run, we need more fine grained control over when the `ApplyWorkflow` method returns. We now have a `applyWorkflow` method, which is run in a go routine when `wait == false`, and is run directly when otherwise.

This now means we can trust that functions run when we `defer` in `o.applyWorkflow` will trigger after the entire run of the transform.
  • Loading branch information
ramfox committed Oct 29, 2021
1 parent 851e7bd commit 63e7b2f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
19 changes: 17 additions & 2 deletions automation/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,21 @@ func (o *Orchestrator) runWorkflow(ctx context.Context, wf *workflow.Workflow, r

// ApplyWorkflow runs the given workflow, but does not record the output
func (o *Orchestrator) ApplyWorkflow(ctx context.Context, wait bool, scriptOutput io.Writer, wf *workflow.Workflow, ds *dataset.Dataset, secrets map[string]string) (string, error) {
runID := run.NewID()
if wait {
return runID, o.applyWorkflow(ctx, scriptOutput, wf, ds, secrets, runID)
}

go o.applyWorkflow(ctx, scriptOutput, wf, ds, secrets, runID)
return runID, nil
}

func (o *Orchestrator) applyWorkflow(ctx context.Context, scriptOutput io.Writer, wf *workflow.Workflow, ds *dataset.Dataset, secrets map[string]string, runID string) error {
o.runLock.Lock()
defer o.runLock.Unlock()
log.Debugw("ApplyWorkflow, workflow", "id", wf.ID)
apply := o.applyFactory(ctx)

runID := run.NewID()
if scriptOutput != nil {
o.bus.SubscribeID(func(ctx context.Context, e event.Event) error {
log.Debugw("apply transform event", "type", e.Type, "payload", e.Payload)
Expand All @@ -401,7 +410,13 @@ func (o *Orchestrator) ApplyWorkflow(ctx context.Context, wait bool, scriptOutpu

// TODO (ramfox): when we understand what it means to dryrun a hook, this should wait for the err, iterator thought the hooks
// for this workflow, and emit the events for hooks that this orchestrator understands
return runID, apply(ctx, wait, runID, wf, ds, secrets)
return apply(ctx, true, runID, wf, ds, secrets)
}

// CancelRun cancels the run of the given runID
func (o *Orchestrator) CancelRun(ctx context.Context, runID string) {
log.Debugw("orchestrator.CancelRun", "runID", runID)
o.cancelRunCh <- runID
}

// SaveWorkflow creates a new workflow if the workflow id is empty, or updates
Expand Down
16 changes: 9 additions & 7 deletions lib/automation.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,10 @@ type automationImpl struct{}

// Apply runs a transform script
func (automationImpl) Apply(scope scope, p *ApplyParams) (*ApplyResult, error) {
ctx := scope.Context()

var err error
ref := dsref.Ref{}
if p.Ref != "" {
ref, _, err = scope.ParseAndResolveRef(ctx, p.Ref)
ref, _, err = scope.ParseAndResolveRef(scope.Context(), p.Ref)
if err != nil {
return nil, err
}
Expand All @@ -219,7 +217,7 @@ func (automationImpl) Apply(scope scope, p *ApplyParams) (*ApplyResult, error) {
}
if p.Transform != nil {
ds.Transform = p.Transform
ds.Transform.OpenScriptFile(ctx, scope.Filesystem())
ds.Transform.OpenScriptFile(scope.Context(), scope.Filesystem())
}

wf := &workflow.Workflow{
Expand All @@ -229,14 +227,19 @@ func (automationImpl) Apply(scope scope, p *ApplyParams) (*ApplyResult, error) {
wf.Hooks = p.Hooks
}

ctx := scope.Context()
if !p.Wait {
ctx = scope.AppContext()
}

runID, err := scope.AutomationOrchestrator().ApplyWorkflow(ctx, p.Wait, p.ScriptOutput, wf, ds, p.Secrets)
if err != nil {
return nil, err
}

res := &ApplyResult{}
if p.Wait {
ds, err := preview.Create(ctx, ds)
ds, err := preview.Create(scope.Context(), ds)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -471,9 +474,8 @@ func (inst *Instance) apply(ctx context.Context, wait bool, runID string, wf *wo
return err
}

ctx = profile.AddIDToContext(scope.AppContext(), scope.ActiveProfile().ID.Encode())
transformer := transform.NewTransformer(ctx, scope.Filesystem(), scope.Loader(), scope.Bus())
return transformer.Apply(ctx, ds, runID, wait, secrets)
return transformer.Apply(scope.Context(), ds, runID, wait, secrets)
}

// AnalyzeTransform runs analysis on a transform script
Expand Down

0 comments on commit 63e7b2f

Please sign in to comment.