Skip to content

Commit

Permalink
feat: add func in channel.go
Browse files Browse the repository at this point in the history
  • Loading branch information
duke-git committed Apr 19, 2022
1 parent 980ff2c commit fc6dee9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
33 changes: 33 additions & 0 deletions concurrency/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,36 @@ func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any

return multiplexedStream
}

// Or merge one or more done channels into one done channel, which is closed when any done channel is closed
func (c *Channel) Or(channels ...<-chan any) <-chan any {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}

orDone := make(chan any)

go func() {
defer close(orDone)

switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-c.Or(append(channels[3:], orDone)...):
}
}
}()

return orDone
}
23 changes: 23 additions & 0 deletions concurrency/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package concurrency
import (
"context"
"testing"
"time"

"github.com/duke-git/lancet/v2/internal"
)
Expand Down Expand Up @@ -108,3 +109,25 @@ func TestFanIn(t *testing.T) {

assert.Equal(1, 1)
}

func TestOr(t *testing.T) {
assert := internal.NewAssert(t, "TestOr")

sig := func(after time.Duration) <-chan any {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}

start := time.Now()

c := NewChannel()
<-c.Or(sig(2*time.Hour), sig(5*time.Minute), sig(1*time.Second), sig(1*time.Hour), sig(1*time.Minute))

t.Logf("done after %v", time.Since(start))

assert.Equal(1, 1)
}

0 comments on commit fc6dee9

Please sign in to comment.