Skip to content

Commit

Permalink
Revert "Merge pull request burke#583 from burke/andrew-debounce-better"
Browse files Browse the repository at this point in the history
The new debouncing strategy is still not quite what we want. We really
want to stop all services until changes have stopped and then start them
all again. This change makes things worse by leaving old code running longer.

This reverts commit f1d59a8, reversing
changes made to c55b133.
  • Loading branch information
andrew-stripe committed Sep 13, 2016
1 parent d3c5d80 commit 2aead8f
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 56 deletions.
2 changes: 1 addition & 1 deletion go/filemonitor/filelistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type fileListener struct {
fileMonitor
gatheringMonitor
netListener net.Listener
connections map[net.Conn]chan string
stop chan struct{}
Expand Down
2 changes: 1 addition & 1 deletion go/filemonitor/filelistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestFileListener(t *testing.T) {
}

slog.SetTraceLogger(slog.NewTraceLogger(os.Stderr))
fl := filemonitor.NewFileListener(fileChangeDelay, ln)
fl := filemonitor.NewFileListener(filemonitor.DefaultFileChangeDelay, ln)
defer fl.Close()

// We should be able to add a file without connecting anything
Expand Down
18 changes: 12 additions & 6 deletions go/filemonitor/filemonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ type FileMonitor interface {
}

type fileMonitor struct {
listeners []chan []string
listenerMutex sync.Mutex
changes chan string
fileChangeDelay time.Duration
listeners []chan []string
listenerMutex sync.Mutex
}

func (f *fileMonitor) Listen() <-chan []string {
Expand All @@ -30,11 +28,17 @@ func (f *fileMonitor) Listen() <-chan []string {
return c
}

type gatheringMonitor struct {
fileMonitor
changes chan string
fileChangeDelay time.Duration
}

// Create the changes channel and serve debounced changes to listeners.
// The changes channel must be created before this is started.
// Closing the changes channel causes this to close all listener
// channels and return.
func (f *fileMonitor) serveListeners() {
func (f *gatheringMonitor) serveListeners() {
never := make(<-chan time.Time)
deadline := never

Expand All @@ -54,7 +58,9 @@ func (f *fileMonitor) serveListeners() {
}

collected[change] = true
deadline = time.After(f.fileChangeDelay)
if deadline == never {
deadline = time.After(f.fileChangeDelay)
}
case <-deadline:
list := make([]string, 0, len(collected))
for f := range collected {
Expand Down
21 changes: 12 additions & 9 deletions go/filemonitor/filemonitor_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ type fsEventsMonitor struct {
func NewFileMonitor(fileChangeDelay time.Duration) (FileMonitor, error) {
f := fsEventsMonitor{
stream: &fsevents.EventStream{
Paths: []string{},
// We want debouncing rather than throttling so we need to handle
// aggregating events ourselves
Latency: 0,
Paths: []string{},
Latency: fileChangeDelay,
Flags: fsevents.FileEvents,
EventID: fsevents.EventIDSinceNow,
},
Expand All @@ -32,10 +30,7 @@ func NewFileMonitor(fileChangeDelay time.Duration) (FileMonitor, error) {
add: make(chan string, 5000),
stop: make(chan struct{}),
}
f.fileChangeDelay = fileChangeDelay
f.changes = make(chan string)

go f.serveListeners()
go f.handleAdd()

return &f, nil
Expand All @@ -62,15 +57,23 @@ func (f *fsEventsMonitor) watch() {
for {
select {
case events := <-f.stream.Events:
paths := make([]string, 0, len(events))
for _, event := range events {
if (event.Flags & (fsevents.ItemIsFile | flagsWorthReloadingFor)) == 0 {
continue
}

f.changes <- event.Path
paths = append(paths, event.Path)
}

if len(paths) == 0 {
continue
}

for _, l := range f.listeners {
l <- paths
}
case <-f.stop:
close(f.changes)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/filemonitor/filemonitor_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type fsnotifyMonitor struct {
fileMonitor
gatheringMonitor
watcher *fsnotify.Watcher
}

Expand Down
47 changes: 9 additions & 38 deletions go/filemonitor/filemonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"github.com/burke/zeus/go/filemonitor"
)

// Setting a long delay here makes tests slow but improves reliability in Travis CI
const fileChangeDelay = 500 * time.Millisecond

func writeTestFiles(dir string) ([]string, error) {
files := make([]string, 3)

Expand Down Expand Up @@ -50,7 +47,7 @@ func TestFileMonitor(t *testing.T) {
t.Fatal(err)
}

fm, err := filemonitor.NewFileMonitor(fileChangeDelay)
fm, err := filemonitor.NewFileMonitor(filemonitor.DefaultFileChangeDelay)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -88,23 +85,6 @@ func TestFileMonitor(t *testing.T) {
if err := expectChanges(changeCh, watched); err != nil {
t.Fatal(err)
}

// Debouncing waits until no changes have occurred during the debounce
// interval before reporting the change.
for _, v := range [][]byte{{'1'}, {'2'}, {'3'}, {'4'}, {'5'}} {
if err := ioutil.WriteFile(files[0], v, 0); err != nil {
t.Fatal(err)
}
time.Sleep(fileChangeDelay / 3)
}

if err := expectChanges(changeCh, files[0:1]); err != nil {
t.Fatal(err)
}

if changes := awaitChanges(changeCh); changes != nil {
t.Fatalf("should not have any remaining changes, got %v", changes)
}
}

func expectChanges(changeCh <-chan []string, expect []string) error {
Expand All @@ -114,25 +94,16 @@ func expectChanges(changeCh <-chan []string, expect []string) error {
sort.StringSlice(expectSorted).Sort()
expect = expectSorted

changes := awaitChanges(changeCh)
if changes == nil {
return errors.New("Timeout waiting for change notification")
}

sort.StringSlice(changes).Sort()
select {
case changes := <-changeCh:
sort.StringSlice(changes).Sort()

if !reflect.DeepEqual(changes, expect) {
return fmt.Errorf("expected changes in %v, got %v", expect, changes)
if !reflect.DeepEqual(changes, expect) {
return fmt.Errorf("expected changes in %v, got %v", expect, changes)
}
case <-time.After(time.Second):
return errors.New("Timeout waiting for change notification")
}

return nil
}

func awaitChanges(changeCh <-chan []string) []string {
select {
case changes := <-changeCh:
return changes
case <-time.After(4 * fileChangeDelay):
return nil
}
}

0 comments on commit 2aead8f

Please sign in to comment.