Skip to content

Commit

Permalink
Use Channels to Handle Task Order.
Browse files Browse the repository at this point in the history
Fixes: scipipe#81
  • Loading branch information
dwmunster authored and samuell committed Jun 14, 2019
1 parent 4383fc5 commit edbb6d5
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,29 +316,31 @@ func (p *Process) Run() {
Failf("%s: CoresPerTask (%d) can't be greater than maxConcurrentTasks of workflow (%d)\n", p.Name(), p.CoresPerTask, cap(p.workflow.concurrentTasks))
}

tasks := []*Task{}
for t := range p.createTasks() {
// Collect tasks so we can later wait for their done-signal before sending outputs
tasks = append(tasks, t)

// Sending FIFOs for the task
for oname, oip := range t.OutIPs {
if oip.doStream {
if oip.FifoFileExists() {
Fail("Fifo file exists, so exiting (clean up fifo files before restarting the workflow): ", oip.FifoPath())
tasks := make(chan *Task, BUFSIZE)
go func() {
defer close(tasks)
for t := range p.createTasks() {
tasks <- t

// Sending FIFOs for the task
for oname, oip := range t.OutIPs {
if oip.doStream {
if oip.FifoFileExists() {
Fail("Fifo file exists, so exiting (clean up fifo files before restarting the workflow): ", oip.FifoPath())
}
oip.CreateFifo()
p.Out(oname).Send(oip)
}
oip.CreateFifo()
p.Out(oname).Send(oip)
}
}

// Execute task in separate go-routine
go t.Execute()
}
// Execute task in separate go-routine
go t.Execute()
}
}()

// Wait for tasks to finish, in the order they were started (thus maintaining
// order of IPs), and then sending output IPs
for _, t := range tasks {
for t := range tasks {
<-t.Done
for oname, oip := range t.OutIPs {
if !oip.doStream { // Streaming (FIFO) outputs have been sent earlier
Expand Down

0 comments on commit edbb6d5

Please sign in to comment.