Skip to content

Commit

Permalink
accounts/keystore: scan key directory without locks held (ethereum#15171
Browse files Browse the repository at this point in the history
)

The accountCache contains a file cache, and remembers from
scan to scan what files were present earlier. Thus, whenever
there's a change, the scan phase only bothers processing new
and removed files.
  • Loading branch information
holiman authored and fjl committed Oct 9, 2017
1 parent 7a045af commit 88b1db7
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 102 deletions.
161 changes: 122 additions & 39 deletions accounts/keystore/account_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/fatih/set.v0"
)

// Minimum amount of time between cache reloads. This limit applies if the platform does
Expand Down Expand Up @@ -71,13 +72,22 @@ type accountCache struct {
byAddr map[common.Address][]accounts.Account
throttle *time.Timer
notify chan struct{}
fileC fileCache
}

// fileCache is a cache of files seen during scan of keystore
type fileCache struct {
all *set.SetNonTS // list of all files
mtime time.Time // latest mtime seen
mu sync.RWMutex
}

func newAccountCache(keydir string) (*accountCache, chan struct{}) {
ac := &accountCache{
keydir: keydir,
byAddr: make(map[common.Address][]accounts.Account),
notify: make(chan struct{}, 1),
fileC: fileCache{all: set.NewNonTS()},
}
ac.watcher = newWatcher(ac)
return ac, ac.notify
Expand Down Expand Up @@ -127,6 +137,23 @@ func (ac *accountCache) delete(removed accounts.Account) {
}
}

// deleteByFile removes an account referenced by the given path.
func (ac *accountCache) deleteByFile(path string) {
ac.mu.Lock()
defer ac.mu.Unlock()
i := sort.Search(len(ac.all), func(i int) bool { return ac.all[i].URL.Path >= path })

if i < len(ac.all) && ac.all[i].URL.Path == path {
removed := ac.all[i]
ac.all = append(ac.all[:i], ac.all[i+1:]...)
if ba := removeAccount(ac.byAddr[removed.Address], removed); len(ba) == 0 {
delete(ac.byAddr, removed.Address)
} else {
ac.byAddr[removed.Address] = ba
}
}
}

func removeAccount(slice []accounts.Account, elem accounts.Account) []accounts.Account {
for i := range slice {
if slice[i] == elem {
Expand Down Expand Up @@ -167,15 +194,16 @@ func (ac *accountCache) find(a accounts.Account) (accounts.Account, error) {
default:
err := &AmbiguousAddrError{Addr: a.Address, Matches: make([]accounts.Account, len(matches))}
copy(err.Matches, matches)
sort.Sort(accountsByURL(err.Matches))
return accounts.Account{}, err
}
}

func (ac *accountCache) maybeReload() {
ac.mu.Lock()
defer ac.mu.Unlock()

if ac.watcher.running {
ac.mu.Unlock()
return // A watcher is running and will keep the cache up-to-date.
}
if ac.throttle == nil {
Expand All @@ -184,12 +212,15 @@ func (ac *accountCache) maybeReload() {
select {
case <-ac.throttle.C:
default:
ac.mu.Unlock()
return // The cache was reloaded recently.
}
}
// No watcher running, start it.
ac.watcher.start()
ac.reload()
ac.throttle.Reset(minReloadInterval)
ac.mu.Unlock()
ac.scanAccounts()
}

func (ac *accountCache) close() {
Expand All @@ -205,70 +236,122 @@ func (ac *accountCache) close() {
ac.mu.Unlock()
}

// reload caches addresses of existing accounts.
// Callers must hold ac.mu.
func (ac *accountCache) reload() {
accounts, err := ac.scan()
// scanFiles performs a new scan on the given directory, compares against the already
// cached filenames, and returns file sets: new, missing , modified
func (fc *fileCache) scanFiles(keyDir string) (set.Interface, set.Interface, set.Interface, error) {
t0 := time.Now()
files, err := ioutil.ReadDir(keyDir)
t1 := time.Now()
if err != nil {
log.Debug("Failed to reload keystore contents", "err", err)
return nil, nil, nil, err
}
ac.all = accounts
sort.Sort(ac.all)
for k := range ac.byAddr {
delete(ac.byAddr, k)
}
for _, a := range accounts {
ac.byAddr[a.Address] = append(ac.byAddr[a.Address], a)
}
select {
case ac.notify <- struct{}{}:
default:
fc.mu.RLock()
prevMtime := fc.mtime
fc.mu.RUnlock()

filesNow := set.NewNonTS()
moddedFiles := set.NewNonTS()
var newMtime time.Time
for _, fi := range files {
modTime := fi.ModTime()
path := filepath.Join(keyDir, fi.Name())
if skipKeyFile(fi) {
log.Trace("Ignoring file on account scan", "path", path)
continue
}
filesNow.Add(path)
if modTime.After(prevMtime) {
moddedFiles.Add(path)
}
if modTime.After(newMtime) {
newMtime = modTime
}
}
log.Debug("Reloaded keystore contents", "accounts", len(ac.all))
t2 := time.Now()

fc.mu.Lock()
// Missing = previous - current
missing := set.Difference(fc.all, filesNow)
// New = current - previous
newFiles := set.Difference(filesNow, fc.all)
// Modified = modified - new
modified := set.Difference(moddedFiles, newFiles)
fc.all = filesNow
fc.mtime = newMtime
fc.mu.Unlock()
t3 := time.Now()
log.Debug("FS scan times", "list", t1.Sub(t0), "set", t2.Sub(t1), "diff", t3.Sub(t2))
return newFiles, missing, modified, nil
}

func (ac *accountCache) scan() ([]accounts.Account, error) {
files, err := ioutil.ReadDir(ac.keydir)
// scanAccounts checks if any changes have occurred on the filesystem, and
// updates the account cache accordingly
func (ac *accountCache) scanAccounts() error {
newFiles, missingFiles, modified, err := ac.fileC.scanFiles(ac.keydir)
t1 := time.Now()
if err != nil {
return nil, err
log.Debug("Failed to reload keystore contents", "err", err)
return err
}

var (
buf = new(bufio.Reader)
addrs []accounts.Account
keyJSON struct {
Address string `json:"address"`
}
)
for _, fi := range files {
path := filepath.Join(ac.keydir, fi.Name())
if skipKeyFile(fi) {
log.Trace("Ignoring file on account scan", "path", path)
continue
}
logger := log.New("path", path)

readAccount := func(path string) *accounts.Account {
fd, err := os.Open(path)
if err != nil {
logger.Trace("Failed to open keystore file", "err", err)
continue
log.Trace("Failed to open keystore file", "path", path, "err", err)
return nil
}
defer fd.Close()
buf.Reset(fd)
// Parse the address.
keyJSON.Address = ""
err = json.NewDecoder(buf).Decode(&keyJSON)
addr := common.HexToAddress(keyJSON.Address)
switch {
case err != nil:
logger.Debug("Failed to decode keystore key", "err", err)
log.Debug("Failed to decode keystore key", "path", path, "err", err)
case (addr == common.Address{}):
logger.Debug("Failed to decode keystore key", "err", "missing or zero address")
log.Debug("Failed to decode keystore key", "path", path, "err", "missing or zero address")
default:
addrs = append(addrs, accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}})
return &accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}}
}
fd.Close()
return nil
}
return addrs, err

for _, p := range newFiles.List() {
path, _ := p.(string)
a := readAccount(path)
if a != nil {
ac.add(*a)
}
}
for _, p := range missingFiles.List() {
path, _ := p.(string)
ac.deleteByFile(path)
}

for _, p := range modified.List() {
path, _ := p.(string)
a := readAccount(path)
ac.deleteByFile(path)
if a != nil {
ac.add(*a)
}
}

t2 := time.Now()

select {
case ac.notify <- struct{}{}:
default:
}
log.Trace("Handled keystore changes", "time", t2.Sub(t1))

return nil
}

func skipKeyFile(fi os.FileInfo) bool {
Expand Down
99 changes: 99 additions & 0 deletions accounts/keystore/account_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package keystore

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -295,3 +296,101 @@ func TestCacheFind(t *testing.T) {
}
}
}

func waitForAccounts(wantAccounts []accounts.Account, ks *KeyStore) error {
var list []accounts.Account
for d := 200 * time.Millisecond; d < 8*time.Second; d *= 2 {
list = ks.Accounts()
if reflect.DeepEqual(list, wantAccounts) {
// ks should have also received change notifications
select {
case <-ks.changes:
default:
return fmt.Errorf("wasn't notified of new accounts")
}
return nil
}
time.Sleep(d)
}
return fmt.Errorf("\ngot %v\nwant %v", list, wantAccounts)
}

// TestUpdatedKeyfileContents tests that updating the contents of a keystore file
// is noticed by the watcher, and the account cache is updated accordingly
func TestUpdatedKeyfileContents(t *testing.T) {
t.Parallel()

// Create a temporary kesytore to test with
rand.Seed(time.Now().UnixNano())
dir := filepath.Join(os.TempDir(), fmt.Sprintf("eth-keystore-watch-test-%d-%d", os.Getpid(), rand.Int()))
ks := NewKeyStore(dir, LightScryptN, LightScryptP)

list := ks.Accounts()
if len(list) > 0 {
t.Error("initial account list not empty:", list)
}
time.Sleep(100 * time.Millisecond)

// Create the directory and copy a key file into it.
os.MkdirAll(dir, 0700)
defer os.RemoveAll(dir)
file := filepath.Join(dir, "aaa")

// Place one of our testfiles in there
if err := cp.CopyFile(file, cachetestAccounts[0].URL.Path); err != nil {
t.Fatal(err)
}

// ks should see the account.
wantAccounts := []accounts.Account{cachetestAccounts[0]}
wantAccounts[0].URL = accounts.URL{Scheme: KeyStoreScheme, Path: file}
if err := waitForAccounts(wantAccounts, ks); err != nil {
t.Error(err)
return
}

// Now replace file contents
if err := forceCopyFile(file, cachetestAccounts[1].URL.Path); err != nil {
t.Fatal(err)
return
}
wantAccounts = []accounts.Account{cachetestAccounts[1]}
wantAccounts[0].URL = accounts.URL{Scheme: KeyStoreScheme, Path: file}
if err := waitForAccounts(wantAccounts, ks); err != nil {
t.Errorf("First replacement failed")
t.Error(err)
return
}

// Now replace file contents again
if err := forceCopyFile(file, cachetestAccounts[2].URL.Path); err != nil {
t.Fatal(err)
return
}
wantAccounts = []accounts.Account{cachetestAccounts[2]}
wantAccounts[0].URL = accounts.URL{Scheme: KeyStoreScheme, Path: file}
if err := waitForAccounts(wantAccounts, ks); err != nil {
t.Errorf("Second replacement failed")
t.Error(err)
return
}
// Now replace file contents with crap
if err := ioutil.WriteFile(file, []byte("foo"), 0644); err != nil {
t.Fatal(err)
return
}
if err := waitForAccounts([]accounts.Account{}, ks); err != nil {
t.Errorf("Emptying account file failed")
t.Error(err)
return
}
}

// forceCopyFile is like cp.CopyFile, but doesn't complain if the destination exists.
func forceCopyFile(dst, src string) error {
data, err := ioutil.ReadFile(src)
if err != nil {
return err
}
return ioutil.WriteFile(dst, data, 0644)
}
Loading

0 comments on commit 88b1db7

Please sign in to comment.