Skip to content

Commit

Permalink
core: Add Unsubscribe() to eventer, now Once() works as expected
Browse files Browse the repository at this point in the history
Signed-off-by: deadprogram <[email protected]>
  • Loading branch information
deadprogram committed Sep 12, 2016
1 parent d0a8faa commit 3a60b33
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
23 changes: 16 additions & 7 deletions eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type eventer struct {
// new events get put in to the event channel
in eventChannel

// slice of out channels used by subscribers
outs []eventChannel
// map of out channels used by subscribers
outs map[eventChannel]eventChannel
}

// Eventer is the interface which describes how a Driver or Adaptor
Expand All @@ -29,12 +29,15 @@ type Eventer interface {
// DeleteEvent removes a previously registered Event name.
DeleteEvent(name string)

// Publish new events to anyone that is subscribed
// Publish new events to any subscriber
Publish(name string, data interface{})

// Subscribe to any events from this eventer
// Subscribe to events
Subscribe() (events eventChannel)

// Unsubscribe from an event channel
Unsubscribe(events eventChannel)

// Event handler
On(name string, f func(s interface{})) (err error)

Expand All @@ -47,15 +50,15 @@ func NewEventer() Eventer {
evtr := &eventer{
eventnames: make(map[string]string),
in: make(eventChannel, 1),
outs: make([]eventChannel, 1),
outs: make(map[eventChannel]eventChannel),
}

// goroutine to cascade "in" events to all "out" event channels
go func() {
for {
select {
case evt := <-evtr.in:
for _, out := range evtr.outs[1:] {
for _, out := range evtr.outs {
out <- evt
}
}
Expand Down Expand Up @@ -95,10 +98,15 @@ func (e *eventer) Publish(name string, data interface{}) {
// Subscribe to any events from this eventer
func (e *eventer) Subscribe() eventChannel {
out := make(eventChannel)
e.outs = append(e.outs, out)
e.outs[out] = out
return out
}

// Unsubscribe from the event channel
func (e *eventer) Unsubscribe(events eventChannel) {
delete(e.outs, events)
}

// On executes the event handler f when e is Published to.
func (e *eventer) On(n string, f func(s interface{})) (err error) {
out := e.Subscribe()
Expand Down Expand Up @@ -126,6 +134,7 @@ func (e *eventer) Once(n string, f func(s interface{})) (err error) {
case evt := <-out:
if evt.Name == n {
f(evt.Data)
e.Unsubscribe(out)
break ProcessEvents
}
}
Expand Down
12 changes: 6 additions & 6 deletions platforms/firmata/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ func TestProcessI2cReply(t *testing.T) {

b.Once(b.Event("I2cReply"), func(data interface{}) {
gobottest.Assert(t, data, I2cReply{
Address: 9,
Register: 0,
Data: []byte{152, 1, 154},
})
Address: 9,
Register: 0,
Data: []byte{152, 1, 154},
})
sem <- true
})

Expand All @@ -242,8 +242,8 @@ func TestProcessFirmwareQuery(t *testing.T) {
sem := make(chan bool)
b := initTestFirmata()
testReadData = []byte{240, 121, 2, 3, 83, 0, 116, 0, 97, 0, 110, 0, 100, 0, 97,
0, 114, 0, 100, 0, 70, 0, 105, 0, 114, 0, 109, 0, 97, 0, 116, 0, 97, 0, 46,
0, 105, 0, 110, 0, 111, 0, 247}
0, 114, 0, 100, 0, 70, 0, 105, 0, 114, 0, 109, 0, 97, 0, 116, 0, 97, 0, 46,
0, 105, 0, 110, 0, 111, 0, 247}

b.Once(b.Event("FirmwareQuery"), func(data interface{}) {
gobottest.Assert(t, data, "StandardFirmata.ino")
Expand Down

0 comments on commit 3a60b33

Please sign in to comment.