Skip to content

Commit

Permalink
lib/events: Become a service (fixes syncthing#5372) (syncthing#5373)
Browse files Browse the repository at this point in the history
Here the event Logger is rewritten as a service with a main loop instead
of mutexes. This loop has a select with essentially two legs: incoming
events, and subscription changes. When both are possible select will
chose one randomly, thus ensuring that in practice unsubscribes will
happen timely and not block the system.
  • Loading branch information
calmh authored Dec 13, 2018
1 parent fc860df commit abb3fb8
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 50 deletions.
144 changes: 94 additions & 50 deletions lib/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ type Logger struct {
nextSubscriptionIDs []int
nextGlobalID int
timeout *time.Timer
mutex sync.Mutex
events chan Event
funcs chan func()
stop chan struct{}
}

type Event struct {
Expand All @@ -225,15 +227,24 @@ type Subscription struct {

var Default = NewLogger()

func init() {
// The default logger never stops. To ensure this we nil out the stop
// channel so any attempt to stop it will panic.
Default.stop = nil
go Default.Serve()
}

var (
ErrTimeout = errors.New("timeout")
ErrClosed = errors.New("closed")
)

func NewLogger() *Logger {
l := &Logger{
mutex: sync.NewMutex(),
timeout: time.NewTimer(time.Second),
events: make(chan Event, BufferSize),
funcs: make(chan func()),
stop: make(chan struct{}),
}
// Make sure the timer is in the stopped state and hasn't fired anything
// into the channel.
Expand All @@ -243,20 +254,52 @@ func NewLogger() *Logger {
return l
}

func (l *Logger) Serve() {
loop:
for {
select {
case e := <-l.events:
// Incoming events get sent
l.sendEvent(e)

case fn := <-l.funcs:
// Subscriptions etc are handled here.
fn()

case <-l.stop:
break loop
}
}

// Closing the event channels corresponds to what happens when a
// subscription is unsubscribed; this stops any BufferedSubscription,
// makes Poll() return ErrClosed, etc.
for _, s := range l.subs {
close(s.events)
}
}

func (l *Logger) Stop() {
close(l.stop)
}

func (l *Logger) Log(t EventType, data interface{}) {
l.mutex.Lock()
l.events <- Event{
Time: time.Now(),
Type: t,
Data: data,
// SubscriptionID and GlobalID are set in sendEvent
}
}

func (l *Logger) sendEvent(e Event) {
l.nextGlobalID++
dl.Debugln("log", l.nextGlobalID, t, data)
dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)

e := Event{
GlobalID: l.nextGlobalID,
Time: time.Now(),
Type: t,
Data: data,
}
e.GlobalID = l.nextGlobalID

for i, s := range l.subs {
if s.mask&t != 0 {
if s.mask&e.Type != 0 {
e.SubscriptionID = l.nextSubscriptionIDs[i]
l.nextSubscriptionIDs[i]++

Expand All @@ -278,59 +321,60 @@ func (l *Logger) Log(t EventType, data interface{}) {
}
}
}
l.mutex.Unlock()
}

func (l *Logger) Subscribe(mask EventType) *Subscription {
l.mutex.Lock()
dl.Debugln("subscribe", mask)
res := make(chan *Subscription)
l.funcs <- func() {
dl.Debugln("subscribe", mask)

s := &Subscription{
mask: mask,
events: make(chan Event, BufferSize),
timeout: time.NewTimer(0),
}

s := &Subscription{
mask: mask,
events: make(chan Event, BufferSize),
timeout: time.NewTimer(0),
}
// We need to create the timeout timer in the stopped, non-fired state so
// that Subscription.Poll() can safely reset it and select on the timeout
// channel. This ensures the timer is stopped and the channel drained.
if runningTests {
// Make the behavior stable when running tests to avoid randomly
// varying test coverage. This ensures, in practice if not in
// theory, that the timer fires and we take the true branch of the
// next if.
runtime.Gosched()
}
if !s.timeout.Stop() {
<-s.timeout.C
}

// We need to create the timeout timer in the stopped, non-fired state so
// that Subscription.Poll() can safely reset it and select on the timeout
// channel. This ensures the timer is stopped and the channel drained.
if runningTests {
// Make the behavior stable when running tests to avoid randomly
// varying test coverage. This ensures, in practice if not in
// theory, that the timer fires and we take the true branch of the
// next if.
runtime.Gosched()
l.subs = append(l.subs, s)
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
res <- s
}
if !s.timeout.Stop() {
<-s.timeout.C
}

l.subs = append(l.subs, s)
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
l.mutex.Unlock()
return s
return <-res
}

func (l *Logger) Unsubscribe(s *Subscription) {
l.mutex.Lock()
dl.Debugln("unsubscribe")
for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1
l.funcs <- func() {
dl.Debugln("unsubscribe")
for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1

l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]
l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]

l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
l.nextSubscriptionIDs[last] = 0
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
l.nextSubscriptionIDs[last] = 0
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]

break
break
}
}
close(s.events)
}
close(s.events)
l.mutex.Unlock()
}

// Poll returns an event from the subscription or an error if the poll times
Expand Down
Loading

0 comments on commit abb3fb8

Please sign in to comment.