Skip to content

Commit

Permalink
Change Tap* function signature and always forward the T type
Browse files Browse the repository at this point in the history
  • Loading branch information
Morgahl committed May 2, 2022
1 parent 7ba3d3d commit 53c7886
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
6 changes: 3 additions & 3 deletions chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ func (c Chan[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink func(e
return MapWithErrorSink(size, mp, sink, thunkChanPull(c))
}

func (c Chan[T]) Tap(size int, tap func(T) T) ChanPull[T] {
func (c Chan[T]) Tap(size int, tap func(T)) ChanPull[T] {
return Tap(size, tap, thunkChanPull(c))
}

func (c Chan[T]) TapWithError(size int, tap func(T) (T, error)) (ChanPull[T], ChanPull[error]) {
func (c Chan[T]) TapWithError(size int, tap func(T) error) (ChanPull[T], ChanPull[error]) {
return TapWithError(size, tap, thunkChanPull(c))
}

func (c Chan[T]) TapWithErrorSink(size int, tap func(T) (T, error), sink func(error)) ChanPull[T] {
func (c Chan[T]) TapWithErrorSink(size int, tap func(T) error, sink func(error)) ChanPull[T] {
return TapWithErrorSink(size, tap, sink, thunkChanPull(c))
}

Expand Down
6 changes: 3 additions & 3 deletions chan_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ func (c ChanPull[T]) MapWithErrorSink(size int, mp func(T) (any, error), sink fu
return MapWithErrorSink(size, mp, sink, c)
}

func (c ChanPull[T]) Tap(size int, tap func(T) T) ChanPull[T] {
func (c ChanPull[T]) Tap(size int, tap func(T)) ChanPull[T] {
return Tap(size, tap, c)
}

func (c ChanPull[T]) TapWithError(size int, tap func(T) (T, error)) (ChanPull[T], ChanPull[error]) {
func (c ChanPull[T]) TapWithError(size int, tap func(T) error) (ChanPull[T], ChanPull[error]) {
return TapWithError(size, tap, c)
}

func (c ChanPull[T]) TapWithErrorSink(size int, tap func(T) (T, error), sink func(error)) ChanPull[T] {
func (c ChanPull[T]) TapWithErrorSink(size int, tap func(T) error, sink func(error)) ChanPull[T] {
return TapWithErrorSink(size, tap, sink, c)
}

Expand Down
46 changes: 40 additions & 6 deletions tap.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,47 @@
package pipes

func Tap[T any](size int, tap func(T) T, in <-chan T) ChanPull[T] {
return Map(size, tap, in)
func Tap[T any](size int, tap func(T), in <-chan T) ChanPull[T] {
out := make(chan T, size)
go tapWorker(tap, in, out)
return out
}

func TapWithError[T any](size int, tap func(T) (T, error), in <-chan T) (ChanPull[T], ChanPull[error]) {
return MapWithError(size, tap, in)
func tapWorker[T any](tap func(T), in <-chan T, out chan<- T) {
defer close(out)
for t := range in {
tap(t)
out <- t
}
}

func TapWithErrorSink[T any](size int, tap func(T) (T, error), sink func(error), in <-chan T) ChanPull[T] {
return MapWithErrorSink(size, tap, sink, in)
func TapWithError[T any](size int, tap func(T) error, in <-chan T) (ChanPull[T], ChanPull[error]) {
out, err := make(chan T, size), make(chan error, size)
go tapWithErrorWorker(tap, in, out, err)
return out, err
}

func tapWithErrorWorker[T any](tap func(T) error, in <-chan T, out chan<- T, err chan<- error) {
defer func() { close(out); close(err) }()
for t := range in {
if er := tap(t); er != nil {
err <- er
}
out <- t
}
}

func TapWithErrorSink[T any](size int, tap func(T) error, sink func(error), in <-chan T) ChanPull[T] {
out := make(chan T, size)
go tapWithErrorSinkWorker(tap, sink, in, out)
return out
}

func tapWithErrorSinkWorker[T any](mp func(T) error, sink func(error), in <-chan T, out chan<- T) {
defer close(out)
for t := range in {
if er := mp(t); er != nil {
sink(er)
}
out <- t
}
}

0 comments on commit 53c7886

Please sign in to comment.