Skip to content

Commit

Permalink
Clarify taskQueue Name
Browse files Browse the repository at this point in the history
Signed-off-by: Drayton Munster <[email protected]>
  • Loading branch information
dwmunster authored and samuell committed Jun 15, 2019
1 parent 1acbf9d commit 1e91142
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,16 @@ func (p *Process) Run() {
// for all Tasks to be spawned before processing any can cause deadlock
// under certain workflow architectures when there are more than BUFSIZE
// Tasks per process, see #81.
tasksToBeProcessed := taskQueue{}
startedTasks := taskQueue{}

var nextTask *Task
tasks := p.createTasks()
for tasks != nil || len(tasksToBeProcessed) > 0 {
for tasks != nil || len(startedTasks) > 0 {
select {
case t, ok := <-tasks:
if !ok {
tasks = nil
} else {
tasksToBeProcessed = append(tasksToBeProcessed, t)
// Sending FIFOs for the task
for oname, oip := range t.OutIPs {
if oip.doStream {
Expand All @@ -346,9 +345,11 @@ func (p *Process) Run() {

// Execute task in separate go-routine
go t.Execute()

startedTasks = append(startedTasks, t)
}
case <-tasksToBeProcessed.NextTaskDone():
nextTask, tasksToBeProcessed = tasksToBeProcessed[0], tasksToBeProcessed[1:]
case <-startedTasks.NextTaskDone():
nextTask, startedTasks = startedTasks[0], startedTasks[1:]
for oname, oip := range nextTask.OutIPs {
if !oip.doStream { // Streaming (FIFO) outputs have been sent earlier
p.Out(oname).Send(oip)
Expand Down

0 comments on commit 1e91142

Please sign in to comment.