Skip to content

Commit

Permalink
Fix problem with buffer filling up due to sequential file sending
Browse files Browse the repository at this point in the history
  • Loading branch information
samuell committed May 22, 2019
1 parent d365071 commit e4b2baa
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions components/file_combinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package components

import (
"github.com/scipipe/scipipe"
"sync"
)

// FileCombinator takes a set of input streams of FileIPs, and returns the same
Expand Down Expand Up @@ -63,11 +64,21 @@ func (p *FileCombinator) Run() {
outIPs := p.combine(inIPs, keys)

// Send combinations of all IPs
wg := &sync.WaitGroup{}
for pName, ips := range outIPs {
for _, ip := range ips {
p.Out(pName).Send(ip)
}
wg.Add(1)
// Make unique copy of variables for this iteration, so they don't get
// overwritten on the next loop iteration
pName := pName
ips := ips
go func() {
for _, ip := range ips {
p.Out(pName).Send(ip)
}
wg.Done()
}()
}
wg.Wait()
}

// combine is a recursive method that creates combinations of all the IPs in the input IP arrays, such that:
Expand Down

0 comments on commit e4b2baa

Please sign in to comment.