Skip to content

Commit

Permalink
dropbox: fix async batch missing the last few entries
Browse files Browse the repository at this point in the history
  • Loading branch information
ncw committed May 14, 2021
1 parent 5ee646f commit 75c417a
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions backend/dropbox/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package dropbox

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -34,7 +35,7 @@ type batcher struct {
timeout time.Duration // idle timeout for batch
async bool // whether we are using async batching
in chan batcherRequest // incoming items to batch
quit chan struct{} // close to quit the loop
closed chan struct{} // close to indicate batcher shut down
atexit atexit.FnHandle // atexit handle
shutOnce sync.Once // make sure we shutdown once only
wg sync.WaitGroup // wait for shutdown
Expand All @@ -46,6 +47,14 @@ type batcherRequest struct {
result chan<- batcherResponse
}

// Return true if batcherRequest is the quit request
func (br *batcherRequest) isQuit() bool {
return br.commitInfo == nil
}

// Send this to get the engine to quit
var quitRequest = batcherRequest{}

// batcherResponse holds a response to be delivered to clients waiting
// for a batch to complete.
type batcherResponse struct {
Expand Down Expand Up @@ -92,7 +101,7 @@ func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.
timeout: timeout,
async: async,
in: make(chan batcherRequest, size),
quit: make(chan struct{}),
closed: make(chan struct{}),
}
if b.Batching() {
b.atexit = atexit.Register(b.Shutdown)
Expand Down Expand Up @@ -178,7 +187,8 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF
}
}
}()
fs.Debugf(b.f, "Committing %s batch length %d", b.mode, len(items))
desc := fmt.Sprintf("%s batch length %d starting with: %s", b.mode, len(items), items[0].Commit.Path)
fs.Debugf(b.f, "Committing %s", desc)

// finalise the batch getting either a result or a job id to poll
batchStatus, err := b.finishBatch(ctx, items)
Expand Down Expand Up @@ -246,6 +256,7 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF
return errors.Errorf("batch had %d errors: last error: %s", errorCount, errorTag)
}

fs.Debugf(b.f, "Committed %s", desc)
return nil
}

Expand All @@ -270,10 +281,8 @@ func (b *batcher) commitLoop(ctx context.Context) {
outer:
for {
select {
case <-b.quit:
break outer
case req, ok := <-b.in:
if !ok {
case req := <-b.in:
if req.isQuit() {
break outer
}
items = append(items, req.commitInfo)
Expand Down Expand Up @@ -304,9 +313,15 @@ outer:
func (b *batcher) Shutdown() {
b.shutOnce.Do(func() {
atexit.Unregister(b.atexit)
// quit the commitLoop. Note that we don't close b.in
// because that will cause write to closed channel
close(b.quit)
fs.Infof(b.f, "Commiting uploads - please wait...")
// show that batcher is shutting down
close(b.closed)
// quit the commitLoop by sending a quitRequest message
//
// Note that we don't close b.in because that will
// cause write to closed channel in Commit when we are
// exiting due to a signal.
b.in <- quitRequest
b.wg.Wait()
})
}
Expand All @@ -315,6 +330,11 @@ func (b *batcher) Shutdown() {
// batch and then waiting for the batch to complete in a synchronous
// way if async is not set.
func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) {
select {
case <-b.closed:
return nil, fserrors.FatalError(errors.New("batcher is shutting down"))
default:
}
fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path)
resp := make(chan batcherResponse, 1)
b.in <- batcherRequest{
Expand Down

0 comments on commit 75c417a

Please sign in to comment.