Skip to content

Commit

Permalink
Fix issue with overwriting tags.
Browse files Browse the repository at this point in the history
  • Loading branch information
dwmunster authored and samuell committed Jun 19, 2019
1 parent f4afc03 commit 2edf250
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
53 changes: 53 additions & 0 deletions components/maptotags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package components

import (
"fmt"
"os"
"testing"

"log"

"github.com/scipipe/scipipe"
)

func TestMapToTags(t *testing.T) {
var numbers = []string{"1", "2", "3", "4"}

// Create workflow
wf := scipipe.NewWorkflow("wf", 4)
numbersSource := NewParamSource(wf, "number_source", numbers...)

numberFiles := wf.NewProc("make_files", "echo {p:number} > {o:out}")
numberFiles.InParam("number").From(numbersSource.Out())
numberFiles.SetOut("out", "{p:number}.txt")

tagger := NewMapToTags(wf, "tagger", func(ip *scipipe.FileIP) map[string]string {
tags := make(map[string]string)
tags["file"] = ip.Path()
return tags
})
tagger.In().From(numberFiles.Out("out"))

catenator := wf.NewProc("cat", "cat {i:numbers} ../{t:numbers.file}")
catenator.In("numbers").From(tagger.Out())

wf.Run()

// Clean Files
filePaths := []string{}
for _, n := range numbers {
filePaths = append(filePaths, fmt.Sprintf("%s.txt", n))
filePaths = append(filePaths, filePaths[len(filePaths)-1]+".audit.json")
}
for _, filePath := range filePaths {
err := os.Remove(filePath)
if err != nil {
log.Fatal("Could not delete file:", filePath, "\n", err)
}
}
err := os.RemoveAll("log")
if err != nil {
log.Fatal("Could not remove log directory\n", err)
}

}
3 changes: 2 additions & 1 deletion process.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,12 @@ func (p *Process) createTasks() (ch chan *Task) {

inIPs := map[string]*FileIP{}
params := map[string]string{}
tags := map[string]string{}

inPortsOpen := true
paramPortsOpen := true
for {
// Tags need to be per Task, otherwise they are overwritten by future IPs
tags := map[string]string{}
// Only read on in-ports if we have any
if len(p.inPorts) > 0 {
inIPs, inPortsOpen = p.receiveOnInPorts()
Expand Down

0 comments on commit 2edf250

Please sign in to comment.