Skip to content

Commit

Permalink
lib/model: Add global request limiter (fixes syncthing#6302) (syncthi…
Browse files Browse the repository at this point in the history
…ng#6303)

This adds a new config with the simple and concise name
maxConcurrentIncomingRequestKiB. This limits how many bytes we have "in
the air" in the form of response data being read and processed.

After some testing I think that not having this limiter is seldom a
great idea and thus I propose a default value of 256 MiB for this new
setting.

I also refactored the folder IO limiter to be a model/folder attribute
instead of a package global.
  • Loading branch information
calmh authored Feb 1, 2020
1 parent 9cef283 commit 55937b6
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 57 deletions.
25 changes: 25 additions & 0 deletions lib/config/optionsconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"runtime"

"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/util"
)
Expand Down Expand Up @@ -60,6 +61,7 @@ type OptionsConfiguration struct {
StunKeepaliveMinS int `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"` // 0 for off
RawStunServers []string `xml:"stunServer" json:"stunServers" default:"default"`
DatabaseTuning Tuning `xml:"databaseTuning" json:"databaseTuning" restart:"true"`
RawMaxCIRequestKiB int `xml:"maxConcurrentIncomingRequestKiB" json:"maxConcurrentIncomingRequestKiB"`

DeprecatedUPnPEnabled bool `xml:"upnpEnabled,omitempty" json:"-"`
DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes,omitempty" json:"-"`
Expand Down Expand Up @@ -175,3 +177,26 @@ func (opts OptionsConfiguration) MaxFolderConcurrency() int {
// of writing is two, 95-percentile at 12 folders.)
return 4 // https://xkcd.com/221/
}

func (opts OptionsConfiguration) MaxConcurrentIncomingRequestKiB() int {
// Negative is disabled, which in limiter land is spelled zero
if opts.RawMaxCIRequestKiB < 0 {
return 0
}

if opts.RawMaxFolderConcurrency == 0 {
// The default is 256 MiB
return 256 * 1024 // KiB
}

// We can't really do less than a couple of concurrent blocks or we'll
// pretty much stall completely. Check that an explicit value is large
// enough.
const minAllowed = 2 * protocol.MaxBlockSize / 1024
if opts.RawMaxCIRequestKiB < minAllowed {
return minAllowed
}

// Roll with it.
return opts.RawMaxCIRequestKiB
}
6 changes: 6 additions & 0 deletions lib/model/bytesemaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type byteSemaphore struct {
}

func newByteSemaphore(max int) *byteSemaphore {
if max < 0 {
max = 0
}
s := byteSemaphore{
max: max,
available: max,
Expand Down Expand Up @@ -56,6 +59,9 @@ func (s *byteSemaphore) give(bytes int) {
}

func (s *byteSemaphore) setCapacity(cap int) {
if cap < 0 {
cap = 0
}
s.mut.Lock()
diff := cap - s.max
s.max = cap
Expand Down
16 changes: 7 additions & 9 deletions lib/model/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@ import (
"github.com/thejerf/suture"
)

// folderIOLimiter limits the number of concurrent I/O heavy operations,
// such as scans and pulls. A limit of zero means no limit.
var folderIOLimiter = newByteSemaphore(0)

type folder struct {
suture.Service
stateTracker
config.FolderConfiguration
*stats.FolderStatisticsReference
ioLimiter *byteSemaphore

localFlags uint32

Expand Down Expand Up @@ -79,11 +76,12 @@ type puller interface {
pull() bool // true when successfull and should not be retried
}

func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger) folder {
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *byteSemaphore) folder {
return folder{
stateTracker: newStateTracker(cfg.ID, evLogger),
FolderConfiguration: cfg,
FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
ioLimiter: ioLimiter,

model: model,
shortID: model.shortID,
Expand Down Expand Up @@ -303,8 +301,8 @@ func (f *folder) pull() bool {
f.setState(FolderSyncWaiting)
defer f.setState(FolderIdle)

folderIOLimiter.take(1)
defer folderIOLimiter.give(1)
f.ioLimiter.take(1)
defer f.ioLimiter.give(1)

return f.puller.pull()
}
Expand Down Expand Up @@ -342,8 +340,8 @@ func (f *folder) scanSubdirs(subDirs []string) error {
f.setError(nil)
f.setState(FolderScanWaiting)

folderIOLimiter.take(1)
defer folderIOLimiter.give(1)
f.ioLimiter.take(1)
defer f.ioLimiter.give(1)

for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i])
Expand Down
4 changes: 2 additions & 2 deletions lib/model/folder_recvonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type receiveOnlyFolder struct {
*sendReceiveFolder
}

func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service {
sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger).(*sendReceiveFolder)
func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger, ioLimiter).(*sendReceiveFolder)
sr.localFlags = protocol.FlagLocalReceiveOnly // gets propagated to the scanner, and set on locally changed files
return &receiveOnlyFolder{sr}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/model/folder_sendonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ type sendOnlyFolder struct {
folder
}

func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger) service {
func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
f := &sendOnlyFolder{
folder: newFolder(model, fset, ignores, cfg, evLogger),
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve, f.String())
Expand Down
4 changes: 2 additions & 2 deletions lib/model/folder_sendrecv.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ type sendReceiveFolder struct {
pullErrorsMut sync.Mutex
}

func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service {
func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
f := &sendReceiveFolder{
folder: newFolder(model, fset, ignores, cfg, evLogger),
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
fs: fs,
versioner: ver,
queue: newJobQueue(),
Expand Down
109 changes: 67 additions & 42 deletions lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ type model struct {
protectedFiles []string
evLogger events.Logger

// globalRequestLimiter limits the amount of data in concurrent incoming requests
globalRequestLimiter *byteSemaphore
// folderIOLimiter limits the number of concurrent I/O heavy operations,
// such as scans and pulls. A limit of zero means no limit.
folderIOLimiter *byteSemaphore

clientName string
clientVersion string

Expand All @@ -145,7 +151,7 @@ type model struct {
foldersRunning int32 // for testing only
}

type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger) service
type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger, *byteSemaphore) service

var (
folderFactories = make(map[config.FolderType]folderFactory)
Expand Down Expand Up @@ -177,38 +183,39 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
},
PassThroughPanics: true,
}),
cfg: cfg,
db: ldb,
finder: db.NewBlockFinder(ldb),
progressEmitter: NewProgressEmitter(cfg, evLogger),
id: id,
shortID: id.Short(),
cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles,
protectedFiles: protectedFiles,
evLogger: evLogger,
clientName: clientName,
clientVersion: clientVersion,
folderCfgs: make(map[string]config.FolderConfiguration),
folderFiles: make(map[string]*db.FileSet),
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
folderIgnores: make(map[string]*ignore.Matcher),
folderRunners: make(map[string]service),
folderRunnerTokens: make(map[string][]suture.ServiceToken),
folderVersioners: make(map[string]versioner.Versioner),
conn: make(map[protocol.DeviceID]connections.Connection),
connRequestLimiters: make(map[protocol.DeviceID]*byteSemaphore),
closed: make(map[protocol.DeviceID]chan struct{}),
helloMessages: make(map[protocol.DeviceID]protocol.HelloResult),
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
remotePausedFolders: make(map[protocol.DeviceID][]string),
fmut: sync.NewRWMutex(),
pmut: sync.NewRWMutex(),
cfg: cfg,
db: ldb,
finder: db.NewBlockFinder(ldb),
progressEmitter: NewProgressEmitter(cfg, evLogger),
id: id,
shortID: id.Short(),
cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles,
protectedFiles: protectedFiles,
evLogger: evLogger,
globalRequestLimiter: newByteSemaphore(1024 * cfg.Options().MaxConcurrentIncomingRequestKiB()),
folderIOLimiter: newByteSemaphore(cfg.Options().MaxFolderConcurrency()),
clientName: clientName,
clientVersion: clientVersion,
folderCfgs: make(map[string]config.FolderConfiguration),
folderFiles: make(map[string]*db.FileSet),
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
folderIgnores: make(map[string]*ignore.Matcher),
folderRunners: make(map[string]service),
folderRunnerTokens: make(map[string][]suture.ServiceToken),
folderVersioners: make(map[string]versioner.Versioner),
conn: make(map[protocol.DeviceID]connections.Connection),
connRequestLimiters: make(map[protocol.DeviceID]*byteSemaphore),
closed: make(map[protocol.DeviceID]chan struct{}),
helloMessages: make(map[protocol.DeviceID]protocol.HelloResult),
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
remotePausedFolders: make(map[protocol.DeviceID][]string),
fmut: sync.NewRWMutex(),
pmut: sync.NewRWMutex(),
}
for devID := range cfg.Devices() {
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String())
}
m.Add(m.progressEmitter)
folderIOLimiter.setCapacity(cfg.Options().MaxFolderConcurrency())

return m
}
Expand Down Expand Up @@ -340,7 +347,7 @@ func (m *model) startFolderLocked(cfg config.FolderConfiguration) {

ignores := m.folderIgnores[folder]

p := folderFactory(m, fset, ignores, cfg, ver, ffs, m.evLogger)
p := folderFactory(m, fset, ignores, cfg, ver, ffs, m.evLogger, m.folderIOLimiter)

m.folderRunners[folder] = p

Expand Down Expand Up @@ -1500,26 +1507,17 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in
limiter := m.connRequestLimiters[deviceID]
m.pmut.RUnlock()

if limiter != nil {
limiter.take(int(size))
}
// The requestResponse releases the bytes to the buffer pool and the
// limiters when its Close method is called.
res := newLimitedRequestResponse(int(size), limiter, m.globalRequestLimiter)

// The requestResponse releases the bytes to the limiter when its Close method is called.
res := newRequestResponse(int(size))
defer func() {
// Close it ourselves if it isn't returned due to an error
if err != nil {
res.Close()
}
}()

if limiter != nil {
go func() {
res.Wait()
limiter.give(int(size))
}()
}

// Only check temp files if the flag is set, and if we are set to advertise
// the temp indexes.
if fromTemporary && !folderCfg.DisableTempIndexes {
Expand Down Expand Up @@ -1563,6 +1561,32 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in
return res, nil
}

// newLimitedRequestResponse takes size bytes from the limiters in order,
// skipping nil limiters, then returns a requestResponse of the given size.
// When the requestResponse is closed the limiters are given back the bytes,
// in reverse order.
func newLimitedRequestResponse(size int, limiters ...*byteSemaphore) *requestResponse {
for _, limiter := range limiters {
if limiter != nil {
limiter.take(size)
}
}

res := newRequestResponse(size)

go func() {
res.Wait()
for i := range limiters {
limiter := limiters[len(limiters)-1-i]
if limiter != nil {
limiter.give(size)
}
}
}()

return res
}

func (m *model) recheckFile(deviceID protocol.DeviceID, folderFs fs.Filesystem, folder, name string, size int32, offset int64, hash []byte) {
cf, ok := m.CurrentFolderFile(folder, name)
if !ok {
Expand Down Expand Up @@ -2483,7 +2507,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
m.fmut.Unlock()

folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())
m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB())
m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())

// Some options don't require restart as those components handle it fine
// by themselves. Compare the options structs containing only the
Expand Down
28 changes: 28 additions & 0 deletions lib/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3451,3 +3451,31 @@ func TestDeviceWasSeen(t *testing.T) {
t.Error("device should have been seen now")
}
}

func TestNewLimitedRequestResponse(t *testing.T) {
l0 := newByteSemaphore(0)
l1 := newByteSemaphore(1024)
l2 := (*byteSemaphore)(nil)

// Should take 500 bytes from any non-unlimited non-nil limiters.
res := newLimitedRequestResponse(500, l0, l1, l2)

if l1.available != 1024-500 {
t.Error("should have taken bytes from limited limiter")
}

// Closing the result should return the bytes.
res.Close()

// Try to take 1024 bytes to make sure the bytes were returned.
done := make(chan struct{})
go func() {
l1.take(1024)
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Error("Bytes weren't returned in a timely fashion")
}
}

0 comments on commit 55937b6

Please sign in to comment.