Skip to content

Commit

Permalink
lib/scanner: Use fs.Filesystem for all operations
Browse files Browse the repository at this point in the history
One more step on the path of the great refactoring. Touches rwfolder a
little bit since it uses the Lstat from fs as well, but mostly this is
just on the scanner as rwfolder is scheduled for a later refactor.

There are a couple of usages of fs.DefaultFilesystem that will in the
end become a filesystem injected from the top, but that comes later.

GitHub-Pull-Request: syncthing#4070
LGTM: AudriusButkevicius, imsodin
  • Loading branch information
calmh authored and imsodin committed Apr 1, 2017
1 parent bdb56d9 commit 4253f22
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 103 deletions.
2 changes: 1 addition & 1 deletion lib/db/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *FileSet) SetIndexID(device protocol.DeviceID, id protocol.IndexID) {
func (s *FileSet) MtimeFS() *fs.MtimeFS {
prefix := s.db.mtimesKey([]byte(s.folder))
kv := NewNamespacedKV(s.db, string(prefix))
return fs.NewMtimeFS(kv)
return fs.NewMtimeFS(fs.DefaultFilesystem, kv)
}

func (s *FileSet) ListDevices() []protocol.DeviceID {
Expand Down
27 changes: 24 additions & 3 deletions lib/fs/basicfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *BasicFilesystem) Mkdir(name string, perm FileMode) error {
}

func (f *BasicFilesystem) Lstat(name string) (FileInfo, error) {
fi, err := os.Lstat(name)
fi, err := underlyingLstat(name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -71,11 +71,32 @@ func (f *BasicFilesystem) DirNames(name string) ([]string, error) {
}

func (f *BasicFilesystem) Open(name string) (File, error) {
return os.Open(name)
fd, err := os.Open(name)
if err != nil {
return nil, err
}
return fsFile{fd}, err
}

func (f *BasicFilesystem) Create(name string) (File, error) {
return os.Create(name)
fd, err := os.Create(name)
if err != nil {
return nil, err
}
return fsFile{fd}, err
}

// fsFile implements the fs.File interface on top of an os.File
type fsFile struct {
*os.File
}

func (f fsFile) Stat() (FileInfo, error) {
info, err := f.File.Stat()
if err != nil {
return nil, err
}
return fsFileInfo{info}, nil
}

// fsFileInfo implements the fs.FileInfo interface on top of an os.FileInfo.
Expand Down
18 changes: 14 additions & 4 deletions lib/fs/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
package fs

import (
"errors"
"io"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -37,6 +38,7 @@ type File interface {
io.WriterAt
io.Closer
Truncate(size int64) error
Stat() (FileInfo, error)
}

// The FileInfo interface is almost the same as os.FileInfo, but with the
Expand All @@ -57,12 +59,20 @@ type FileInfo interface {
// FileMode is similar to os.FileMode
type FileMode uint32

// ModePerm is the equivalent of os.ModePerm
const ModePerm = FileMode(os.ModePerm)

// DefaultFilesystem is the fallback to use when nothing explicitly has
// been passed.
var DefaultFilesystem Filesystem = new(BasicFilesystem)
var DefaultFilesystem Filesystem = NewBasicFilesystem()

// SkipDir is used as a return value from WalkFuncs to indicate that
// the directory named in the call is to be skipped. It is not returned
// as an error by any function.
var errSkipDir = errors.New("skip this directory")
var SkipDir = errSkipDir // silences the lint warning...
var SkipDir = filepath.SkipDir

// IsExist is the equivalent of os.IsExist
var IsExist = os.IsExist

// IsNotExist is the equivalent of os.IsNotExist
var IsNotExist = os.IsNotExist
29 changes: 29 additions & 0 deletions lib/fs/lstat_broken.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

// +build linux android

package fs

import (
"os"
"syscall"
"time"
)

// Lstat is like os.Lstat, except lobotomized for Android. See
// https://forum.syncthing.net/t/2395
func underlyingLstat(name string) (fi os.FileInfo, err error) {
for i := 0; i < 10; i++ { // We have to draw the line somewhere
fi, err = os.Lstat(name)
if err, ok := err.(*os.PathError); ok && err.Err == syscall.EINTR {
time.Sleep(time.Duration(i+1) * time.Millisecond)
continue
}
return
}
return
}
15 changes: 15 additions & 0 deletions lib/fs/lstat_regular.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

// +build !linux,!android

package fs

import "os"

func underlyingLstat(name string) (fi os.FileInfo, err error) {
return os.Lstat(name)
}
16 changes: 7 additions & 9 deletions lib/fs/mtimefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ var osChtimes = os.Chtimes
// of what shenanigans the underlying filesystem gets up to. A nil MtimeFS
// just does the underlying operations with no additions.
type MtimeFS struct {
Filesystem
db database
}

func NewMtimeFS(db database) *MtimeFS {
func NewMtimeFS(underlying Filesystem, db database) *MtimeFS {
return &MtimeFS{
db: db,
Filesystem: underlying,
db: db,
}
}

Expand All @@ -56,12 +58,8 @@ func (f *MtimeFS) Chtimes(name string, atime, mtime time.Time) error {
return nil
}

func (f *MtimeFS) Lstat(name string) (os.FileInfo, error) {
if f == nil {
return osutil.Lstat(name)
}

info, err := osutil.Lstat(name)
func (f *MtimeFS) Lstat(name string) (FileInfo, error) {
info, err := f.Filesystem.Lstat(name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -113,7 +111,7 @@ func (f *MtimeFS) load(name string) (real, virtual time.Time) {
// The mtimeFileInfo is an os.FileInfo that lies about the ModTime().

type mtimeFileInfo struct {
os.FileInfo
FileInfo
mtime time.Time
}

Expand Down
2 changes: 1 addition & 1 deletion lib/fs/mtimefs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestMtimeFS(t *testing.T) {
// a random time with nanosecond precision
testTime := time.Unix(1234567890, 123456789)

mtimefs := NewMtimeFS(make(mapStore))
mtimefs := NewMtimeFS(DefaultFilesystem, make(mapStore))

// Do one Chtimes call that will go through to the normal filesystem
osChtimes = os.Chtimes
Expand Down
2 changes: 1 addition & 1 deletion lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
BlockSize: protocol.BlockSize,
TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{m, folder},
Lstater: mtimefs,
Filesystem: mtimefs,
IgnorePerms: folderCfg.IgnorePerms,
AutoNormalize: folderCfg.AutoNormalize,
Hashers: m.numHashers(folder),
Expand Down
10 changes: 5 additions & 5 deletions lib/model/rwfolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {
// There is already something under that name, but it's a file/link.
// Most likely a file/link is getting replaced with a directory.
// Remove the file/link and fall through to directory creation.
case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
case err == nil && (!info.IsDir() || info.IsSymlink()):
err = osutil.InWritableDir(os.Remove, realName)
if err != nil {
l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
Expand Down Expand Up @@ -655,7 +655,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {

// Mask for the bits we want to preserve and add them in to the
// directories permissions.
return os.Chmod(path, mode|(info.Mode()&retainBits))
return os.Chmod(path, mode|(os.FileMode(info.Mode())&retainBits))
}

if err = osutil.InWritableDir(mkdir, realName); err == nil {
Expand All @@ -678,7 +678,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {
// It's OK to change mode bits on stuff within non-writable directories.
if f.ignorePermissions(file) {
f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
} else if err := os.Chmod(realName, mode|(info.Mode()&retainBits)); err == nil {
} else if err := os.Chmod(realName, mode|(os.FileMode(info.Mode())&retainBits)); err == nil {
f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
} else {
l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c

// Check for an old temporary file which might have some blocks we could
// reuse.
tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize, nil, false)
tempBlocks, err := scanner.HashFile(fs.DefaultFilesystem, tempName, protocol.BlockSize, nil, false)
if err == nil {
// Check for any reusable blocks in the temp file
tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func (f *sendReceiveFolder) performFinish(state *sharedPullerState) error {
// handle that.

switch {
case stat.IsDir() || stat.Mode()&os.ModeSymlink != 0:
case stat.IsDir() || stat.IsSymlink():
// It's a directory or a symlink. These are not versioned or
// archived for conflicts, only removed (which of course fails for
// non-empty directories).
Expand Down
4 changes: 2 additions & 2 deletions lib/model/rwfolder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func setUpSendReceiveFolder(model *Model) sendReceiveFolder {
model: model,
},

mtimeFS: fs.NewMtimeFS(db.NewNamespacedKV(model.db, "mtime")),
mtimeFS: fs.NewMtimeFS(fs.DefaultFilesystem, db.NewNamespacedKV(model.db, "mtime")),
dir: "testdata",
queue: newJobQueue(),
errors: make(map[string]string),
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestCopierFinder(t *testing.T) {
}

// Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(tempFile, protocol.BlockSize, nil, false)
blks, err := scanner.HashFile(fs.DefaultFilesystem, tempFile, protocol.BlockSize, nil, false)
if err != nil {
t.Log(err)
}
Expand Down
94 changes: 60 additions & 34 deletions lib/scanner/blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,16 @@ package scanner

import (
"errors"
"os"
"path/filepath"

"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)

// The parallel hasher reads FileInfo structures from the inbox, hashes the
// file to populate the Blocks element and sends it to the outbox. A number of
// workers are used in parallel. The outbox will become closed when the inbox
// is closed and all items handled.

func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter Counter, done, cancel chan struct{}, useWeakHashes bool) {
wg := sync.NewWaitGroup()
wg.Add(workers)

for i := 0; i < workers; i++ {
go func() {
hashFiles(dir, blockSize, outbox, inbox, counter, cancel, useWeakHashes)
wg.Done()
}()
}

go func() {
wg.Wait()
if done != nil {
close(done)
}
close(outbox)
}()
}

// HashFile hashes the files and returns a list of blocks representing the file.
func HashFile(path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := os.Open(path)
func HashFile(fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := fs.Open(path)
if err != nil {
l.Debugln("open:", err)
return nil, err
Expand Down Expand Up @@ -82,10 +57,53 @@ func HashFile(path string, blockSize int, counter Counter, useWeakHashes bool) (
return blocks, nil
}

func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter Counter, cancel chan struct{}, useWeakHashes bool) {
// The parallel hasher reads FileInfo structures from the inbox, hashes the
// file to populate the Blocks element and sends it to the outbox. A number of
// workers are used in parallel. The outbox will become closed when the inbox
// is closed and all items handled.
type parallelHasher struct {
fs fs.Filesystem
dir string
blockSize int
workers int
outbox chan<- protocol.FileInfo
inbox <-chan protocol.FileInfo
counter Counter
done chan<- struct{}
cancel <-chan struct{}
useWeakHashes bool
wg sync.WaitGroup
}

func newParallelHasher(fs fs.Filesystem, dir string, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, cancel <-chan struct{}, useWeakHashes bool) {
ph := &parallelHasher{
fs: fs,
dir: dir,
blockSize: blockSize,
workers: workers,
outbox: outbox,
inbox: inbox,
counter: counter,
done: done,
cancel: cancel,
useWeakHashes: useWeakHashes,
wg: sync.NewWaitGroup(),
}

for i := 0; i < workers; i++ {
ph.wg.Add(1)
go ph.hashFiles()
}

go ph.closeWhenDone()
}

func (ph *parallelHasher) hashFiles() {
defer ph.wg.Done()

for {
select {
case f, ok := <-inbox:
case f, ok := <-ph.inbox:
if !ok {
return
}
Expand All @@ -94,7 +112,7 @@ func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo,
panic("Bug. Asked to hash a directory or a deleted file.")
}

blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, counter, useWeakHashes)
blocks, err := HashFile(ph.fs, filepath.Join(ph.dir, f.Name), ph.blockSize, ph.counter, ph.useWeakHashes)
if err != nil {
l.Debugln("hash error:", f.Name, err)
continue
Expand All @@ -112,13 +130,21 @@ func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo,
}

select {
case outbox <- f:
case <-cancel:
case ph.outbox <- f:
case <-ph.cancel:
return
}

case <-cancel:
case <-ph.cancel:
return
}
}
}

func (ph *parallelHasher) closeWhenDone() {
ph.wg.Wait()
if ph.done != nil {
close(ph.done)
}
close(ph.outbox)
}
Loading

0 comments on commit 4253f22

Please sign in to comment.