Skip to content

Commit

Permalink
decompress style a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Morgahl committed May 6, 2022
1 parent 46855e7 commit b00832b
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 7 deletions.
26 changes: 23 additions & 3 deletions async/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,71 @@ import (
// damn near everything in the package as planned lol.
func Map[T any, N any](count, size int, mp func(T) N, in <-chan T) pipes.ChanPull[N] {
out := make(chan N, size)

go mapCoordinator(count, mp, in, out)

return out
}

func mapCoordinator[T any, N any](count int, mp func(T) N, in <-chan T, out chan<- N) {
defer close(out)

if count < 1 {
count = 1
}
defer close(out)

wg := &sync.WaitGroup{}
wg.Add(count)
for ; count > 1; count-- {
go mapWorker(wg, mp, in, out)
}

// demote to a worker to guarantee there is always one worker running and launch one less
// goroutine
mapWorker(wg, mp, in, out)

wg.Wait()
}

func mapWorker[T any, N any](wg *sync.WaitGroup, mp func(T) N, in <-chan T, out chan<- N) {
defer wg.Done()

for t := range in {
out <- mp(t)
}
}

func MapWithError[T any, N any](count, size int, mp func(T) (N, error), in <-chan T) (pipes.ChanPull[N], pipes.ChanPull[error]) {
out, err := make(chan N, size), make(chan error, size)

go mapWithErrorCoordinator(count, mp, in, out, err)

return out, err
}

func mapWithErrorCoordinator[T any, N any](count int, mp func(T) (N, error), in <-chan T, out chan<- N, err chan<- error) {
defer func() { close(out); close(err) }()

if count < 1 {
count = 1
}
defer func() { close(out); close(err) }()

wg := &sync.WaitGroup{}
wg.Add(count)
for ; count > 1; count-- {
go mapWithErrorWorker(wg, mp, in, out, err)
}

// demote to a worker to guarantee there is always one worker running and launch one less
// goroutine
mapWithErrorWorker(wg, mp, in, out, err)

wg.Wait()
}

func mapWithErrorWorker[T any, N any](wg *sync.WaitGroup, mp func(T) (N, error), in <-chan T, out chan<- N, err chan<- error) {
defer wg.Done()

for t := range in {
if n, er := mp(t); er != nil {
err <- er
Expand All @@ -72,23 +86,29 @@ func mapWithErrorWorker[T any, N any](wg *sync.WaitGroup, mp func(T) (N, error),

func MapWithErrorSink[T any, N any](count, size int, mp func(T) (N, error), sink func(error), in <-chan T) pipes.ChanPull[N] {
out := make(chan N, size)

go mapWithErrorSinkCoordinator(count, mp, sink, in, out)

return out
}

func mapWithErrorSinkCoordinator[T any, N any](count int, mp func(T) (N, error), sink func(error), in <-chan T, out chan<- N) {
defer close(out)

if count < 1 {
count = 1
}
defer close(out)

wg := &sync.WaitGroup{}
wg.Add(count)
for ; count > 1; count-- {
go mapWithErrorSinkWorker(wg, mp, sink, in, out)
}

// demote to a worker to guarantee there is always one worker running and launch only `count`
// goroutines
mapWithErrorSinkWorker(wg, mp, sink, in, out)

wg.Wait()
}

Expand Down
23 changes: 22 additions & 1 deletion example/dir_scan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ func main() {
if err != nil {
log.Fatalln(err)
}

filePipe := pipeline(true, dir)
filePipe = async.MapWithErrorSink(Workers, ChanSize, openFile, logError("error opening file"), filePipe)
filePipe = async.MapWithErrorSink(Workers, ChanSize, multiHash, logError("error multi hashing file"), filePipe)
filePipe = async.MapWithErrorSink(Workers, ChanSize, closeFile, logError("error closing file"), filePipe)
// filePipe = pipes.Tap(ChanSize, logFileFound, filePipe)

resultPipe := pipes.Window(ChanSize, time.Second, compileResult, newResults, filePipe)
resultPipe = pipes.Tap(ChanSize, logAny[*Results], resultPipe)

log.Println(pipes.Reduce(compileResults, &Results{}, resultPipe))
}

Expand All @@ -66,24 +69,29 @@ func walkFunc(dir string, recurse bool, out chan<- *FileInfo) func(string, fs.Di
log.Printf("path=%s, err=%s\n", path, err)
return fs.SkipDir
}

// If we have a Directory that is not the starting dir and recurse is disabled: SkipDir
if !recurse && dir != path {
return fs.SkipDir
}

// If we are a Directory that hasn't errored: don't emit; continue walking
return nil
}

// If we are a normal file that has errored: don't emit; log error; continue walking
if err != nil {
log.Printf("path=%s, err=%s\n", path, err)
return nil
}

// We have a file and we don't seem to have angered the powers that be: emit; continue walking
out <- &FileInfo{
Path: path,
Entry: d,
Start: time.Now(),
}

return nil
}
}
Expand All @@ -92,15 +100,19 @@ func getDir(args []string) (string, error) {
if len(args) < 2 {
return "", errors.New("must pass directory path after binary name")
}

dir := args[1]
if !fs.ValidPath(dir) {
return "", fmt.Errorf("must pass valid path: %s", dir)
}

dir, err := filepath.Abs(dir)
if err != nil {
return "", fmt.Errorf("must pass valid path: %s", err)
}

dir = strings.Trim(dir, "\"")

return dir, nil
}

Expand Down Expand Up @@ -142,7 +154,9 @@ func (r Results) String() string {
if r.Found == 0 {
count++
}

avg := time.Duration(float64(r.TotalDuration) / float64(count))

return fmt.Sprintf("Processed: %d, Avg: %s, Tot: %s", r.Found, avg, r.TotalDuration)
}

Expand All @@ -157,18 +171,23 @@ func openFile(fi *FileInfo) (*FileInfo, error) {
if err != nil {
return &FileInfo{}, err
}

fi.File = f

bf := fileBuffers.Get().(*bufio.Reader)
bf.Reset(f)
fi.Buffer = bf

return fi, nil
}

func closeFile(fi *FileInfo) (*FileInfo, error) {
fileBuffers.Put(fi.Buffer)
fi.Buffer = nil

err := fi.File.Close()
fi.File = nil

return fi, err
}

Expand All @@ -180,9 +199,9 @@ var buffers = sync.Pool{
}

func multiHash(fi *FileInfo) (*FileInfo, error) {
defer fi.File.Seek(0, 0)
buf := buffers.Get().(*[]byte)
defer buffers.Put(buf)

md5 := md5.New()
sha1 := sha1.New()
sha256 := sha256.New()
Expand All @@ -191,10 +210,12 @@ func multiHash(fi *FileInfo) (*FileInfo, error) {
if _, err := io.CopyBuffer(w, fi.Buffer, *buf); err != nil {
return &FileInfo{}, err
}

fi.MD5 = md5.Sum(nil)
fi.SHA1 = sha1.Sum(nil)
fi.SHA256 = sha256.Sum(nil)
fi.SHA512 = sha512.Sum(nil)

return fi, nil
}

Expand Down
11 changes: 9 additions & 2 deletions fan_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,36 @@ import "sync"
// input channel is closed and emptied. The last goroutine will close the returned channel to signal
// completion of work.
func FanIn[T any](size int, ins ...<-chan T) ChanPull[T] {
out := make(chan T, size)
if len(ins) < 1 {
return nil
close(out)
return out
}
out := make(chan T, size)

go fanInCoordinator(ins, out)

return out
}

func fanInCoordinator[T any](ins []<-chan T, out chan<- T) {
defer close(out)

wg := &sync.WaitGroup{}
wg.Add(len(ins))
for _, in := range ins[1:] {
go fanInWorker(wg, in, out)
}

// demote to a worker to guarantee there is always one worker running and launch one less
// goroutine
fanInWorker(wg, ins[0], out)

wg.Wait()
}

func fanInWorker[T any](wg *sync.WaitGroup, in <-chan T, out chan<- T) {
defer wg.Done()

for t := range in {
out <- t
}
Expand Down
3 changes: 3 additions & 0 deletions fan_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ func FanOut[T any](count, size int, in <-chan T) []ChanPull[T] {
outs[i] = ch
fan[i] = ch
}

go fanOutWorker(fan, in)

return outs
}

Expand All @@ -18,6 +20,7 @@ func fanOutWorker[T any](fan []chan<- T, in <-chan T) {
close(out)
}
}()

for t := range in {
for _, out := range fan {
out <- t
Expand Down
9 changes: 9 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package pipes

func Filter[T any](size int, filter func(T) bool, in <-chan T) ChanPull[T] {
out := make(chan T, size)

go filterWorker(filter, in, out)

return out
}

func filterWorker[T any](filter func(T) bool, in <-chan T, out ChanPush[T]) {
defer close(out)

for t := range in {
if filter(t) {
out <- t
Expand All @@ -17,12 +20,15 @@ func filterWorker[T any](filter func(T) bool, in <-chan T, out ChanPush[T]) {

func FilterWithError[T any](size int, filter func(T) (bool, error), in <-chan T) (ChanPull[T], ChanPull[error]) {
out, err := make(chan T, size), make(chan error, size)

go filterWithErrorWorker(filter, in, out, err)

return out, err
}

func filterWithErrorWorker[T any](filter func(T) (bool, error), in <-chan T, out chan<- T, err chan<- error) {
defer func() { close(out); close(err) }()

for t := range in {
if keep, er := filter(t); er != nil {
err <- er
Expand All @@ -34,12 +40,15 @@ func filterWithErrorWorker[T any](filter func(T) (bool, error), in <-chan T, out

func FilterWithErrorSink[T any](size int, filter func(T) (bool, error), sink func(error), in <-chan T) ChanPull[T] {
out := make(chan T, size)

go filterWithErrorSinkWorker(filter, sink, in, out)

return out
}

func filterWithErrorSinkWorker[T any](filter func(T) (bool, error), sink func(error), in <-chan T, out chan<- T) {
defer close(out)

for t := range in {
if keep, er := filter(t); er != nil {
sink(er)
Expand Down
9 changes: 9 additions & 0 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ package pipes

func Map[T any, N any](size int, mp func(T) N, in <-chan T) ChanPull[N] {
out := make(chan N, size)

go mapWorker(mp, in, out)

return out
}

func mapWorker[T any, N any](mp func(T) N, in <-chan T, out chan<- N) {
defer close(out)

for t := range in {
out <- mp(t)
}
}

func MapWithError[T any, N any](size int, mp func(T) (N, error), in <-chan T) (ChanPull[N], ChanPull[error]) {
out, err := make(chan N, size), make(chan error, size)

go mapWithErrorWorker(mp, in, out, err)

return out, err
}

func mapWithErrorWorker[T any, N any](mp func(T) (N, error), in <-chan T, out chan<- N, err chan<- error) {
defer func() { close(out); close(err) }()

for t := range in {
if n, er := mp(t); er != nil {
err <- er
Expand All @@ -32,12 +38,15 @@ func mapWithErrorWorker[T any, N any](mp func(T) (N, error), in <-chan T, out ch

func MapWithErrorSink[T any, N any](size int, mp func(T) (N, error), sink func(error), in <-chan T) ChanPull[N] {
out := make(chan N, size)

go mapWithErrorSinkWorker(mp, sink, in, out)

return out
}

func mapWithErrorSinkWorker[T any, N any](mp func(T) (N, error), sink func(error), in <-chan T, out chan<- N) {
defer close(out)

for t := range in {
if n, er := mp(t); er != nil {
sink(er)
Expand Down
Loading

0 comments on commit b00832b

Please sign in to comment.