Skip to content

Commit

Permalink
solver: update exec ops to separate functions
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Jun 19, 2017
1 parent 4ed7fa3 commit 0a7a2c1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 54 deletions.
1 change: 1 addition & 0 deletions examples/llbout/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func main() {
alpine := llb.Image("docker.io/library/alpine:latest")
mod3 := mod2.Run(llb.Meta{Args: []string{"/bin/cp", "-a", "/alpine/etc/passwd", "baz"}, Cwd: "/"})
mod3.AddMount("/alpine", alpine)
mod3.AddMount("/redis", busybox)
mod4 := mod3.Run(llb.Meta{Args: []string{"/bin/ls", "-l", "/"}, Cwd: "/"})

res := mod4
Expand Down
121 changes: 68 additions & 53 deletions solver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (g *opVertex) release(ctx context.Context) (retErr error) {
return retErr
}

func (g *opVertex) getInputRef(i int) cache.ImmutableRef {
func (g *opVertex) getInputRefForIndex(i int) cache.ImmutableRef {
input := g.op.Inputs[i]
for _, v := range g.inputs {
if v.dgst == digest.Digest(input.Digest) {
Expand Down Expand Up @@ -143,86 +143,101 @@ func (g *opVertex) solve(ctx context.Context, opt Opt) (retErr error) {
}
}

g.notifyStarted(pw)
defer g.notifyComplete(pw)
g.notifyStarted(ctx)
defer g.notifyCompleted(ctx)

switch op := g.op.Op.(type) {
case *pb.Op_Source:
id, err := source.FromString(op.Source.Identifier)
if err != nil {
if err := g.runSourceOp(ctx, opt.SourceManager, op); err != nil {
return err
}
ref, err := opt.SourceManager.Pull(ctx, id)
if err != nil {
case *pb.Op_Exec:
if err := g.runExecOp(ctx, opt.CacheManager, opt.Worker, op); err != nil {
return err
}
g.refs = []cache.ImmutableRef{ref}
case *pb.Op_Exec:
default:
return errors.Errorf("invalid op type")
}
return nil
}

mounts := make(map[string]cache.Mountable)
func (g *opVertex) runSourceOp(ctx context.Context, sm *source.Manager, op *pb.Op_Source) error {
id, err := source.FromString(op.Source.Identifier)
if err != nil {
return err
}
ref, err := sm.Pull(ctx, id)
if err != nil {
return err
}
g.refs = []cache.ImmutableRef{ref}
return nil
}

var outputs []cache.MutableRef
func (g *opVertex) runExecOp(ctx context.Context, cm cache.Manager, w worker.Worker, op *pb.Op_Exec) error {
mounts := make(map[string]cache.Mountable)

defer func() {
for _, o := range outputs {
if o != nil {
s, err := o.Freeze() // TODO: log error
if err == nil {
s.Release(ctx)
}
}
}
}()

for _, m := range op.Exec.Mounts {
var mountable cache.Mountable
ref := g.getInputRef(int(m.Input))
mountable = ref
if m.Output != -1 {
active, err := opt.CacheManager.New(ctx, ref) // TODO: should be method
if err != nil {
return err
var outputs []cache.MutableRef

defer func() {
for _, o := range outputs {
if o != nil {
s, err := o.Freeze() // TODO: log error
if err == nil {
s.Release(ctx)
}
outputs = append(outputs, active)
mountable = active
}
mounts[m.Dest] = mountable
}
}()

meta := worker.Meta{
Args: op.Exec.Meta.Args,
Env: op.Exec.Meta.Env,
Cwd: op.Exec.Meta.Cwd,
for _, m := range op.Exec.Mounts {
var mountable cache.Mountable
ref := g.getInputRefForIndex(int(m.Input))
mountable = ref
if m.Output != -1 {
active, err := cm.New(ctx, ref) // TODO: should be method
if err != nil {
return err
}
outputs = append(outputs, active)
mountable = active
}
mounts[m.Dest] = mountable
}

if err := opt.Worker.Exec(ctx, meta, mounts, os.Stderr, os.Stderr); err != nil {
return errors.Wrapf(err, "worker failed running %v", meta.Args)
}
meta := worker.Meta{
Args: op.Exec.Meta.Args,
Env: op.Exec.Meta.Env,
Cwd: op.Exec.Meta.Cwd,
}

g.refs = []cache.ImmutableRef{}
if err := w.Exec(ctx, meta, mounts, os.Stderr, os.Stderr); err != nil {
return errors.Wrapf(err, "worker failed running %v", meta.Args)
}

for i, o := range outputs {
ref, err := o.ReleaseAndCommit(ctx)
if err != nil {
return errors.Wrapf(err, "error committing %s", ref.ID())
}
g.refs = append(g.refs, ref)
outputs[i] = nil
g.refs = []cache.ImmutableRef{}
for i, o := range outputs {
ref, err := o.ReleaseAndCommit(ctx)
if err != nil {
return errors.Wrapf(err, "error committing %s", o.ID())
}

default:
return errors.Errorf("invalid op type")
g.refs = append(g.refs, ref)
outputs[i] = nil
}
return nil
}

func (g *opVertex) notifyStarted(pw progress.Writer) {
func (g *opVertex) notifyStarted(ctx context.Context) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
g.vtx.Started = &now
pw.Write(g.dgst.String(), g.vtx)
}

func (g *opVertex) notifyComplete(pw progress.Writer) {
func (g *opVertex) notifyCompleted(ctx context.Context) {
pw, _, _ := progress.FromContext(ctx)
defer pw.Close()
now := time.Now()
g.vtx.Completed = &now
pw.Write(g.dgst.String(), g.vtx)
Expand Down
2 changes: 1 addition & 1 deletion util/flightcontrol/flightcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newCall(fn func(ctx context.Context) (interface{}, error)) *call {
c.ctx = ctx
c.closeProgressWriter = closeProgressWriter

go c.progressState.run(pr)
go c.progressState.run(pr) // TODO: remove this, wrap writer instead

return c
}
Expand Down

0 comments on commit 0a7a2c1

Please sign in to comment.