Skip to content

Commit

Permalink
Add path types for cache. (vercel#2059)
Browse files Browse the repository at this point in the history
As prework for merging cache-to-tar, this ensures that we know what types of paths we're dealing with at (almost) every step in the cache process.

It also makes the first argument to `Put` (previously unused) and `Fetch` the anchor of the enumerated list of files being saved or restored.
  • Loading branch information
nathanhammond authored Sep 26, 2022
1 parent 7e560b2 commit 45ff205
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 203 deletions.
20 changes: 11 additions & 9 deletions cli/internal/cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package cache

import (
"sync"

"github.com/vercel/turborepo/cli/internal/turbopath"
)

// An asyncCache is a wrapper around a Cache interface that handles incoming
Expand All @@ -20,10 +22,10 @@ type asyncCache struct {

// A cacheRequest models an incoming cache request on our queue.
type cacheRequest struct {
target string
anchor turbopath.AbsoluteSystemPath
key string
duration int
files []string
files []turbopath.AnchoredSystemPath
}

func newAsyncCache(realCache Cache, opts Opts) Cache {
Expand All @@ -38,26 +40,26 @@ func newAsyncCache(realCache Cache, opts Opts) Cache {
return c
}

func (c *asyncCache) Put(target string, key string, duration int, files []string) error {
func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
c.requests <- cacheRequest{
target: target,
anchor: anchor,
key: key,
files: files,
duration: duration,
}
return nil
}

func (c *asyncCache) Fetch(target string, key string, files []string) (bool, []string, int, error) {
return c.realCache.Fetch(target, key, files)
func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
return c.realCache.Fetch(anchor, key, files)
}

func (c *asyncCache) Exists(key string) (ItemStatus, error) {
return c.realCache.Exists(key)
}

func (c *asyncCache) Clean(target string) {
c.realCache.Clean(target)
func (c *asyncCache) Clean(anchor turbopath.AbsoluteSystemPath) {
c.realCache.Clean(anchor)
}

func (c *asyncCache) CleanAll() {
Expand All @@ -74,7 +76,7 @@ func (c *asyncCache) Shutdown() {
// run implements the actual async logic.
func (c *asyncCache) run() {
for r := range c.requests {
c.realCache.Put(r.target, r.key, r.duration, r.files)
_ = c.realCache.Put(r.anchor, r.key, r.duration, r.files)
}
c.wg.Done()
}
29 changes: 15 additions & 14 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
type Cache interface {
// Fetch returns true if there is a cache it. It is expected to move files
// into their correct position as a side effect
Fetch(target string, hash string, files []string) (bool, []string, int, error)
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error)
Exists(hash string) (ItemStatus, error)
// Put caches files for a given hash
Put(target string, hash string, duration int, files []string) error
Clean(target string)
Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
Clean(anchor turbopath.AbsoluteSystemPath)
CleanAll()
Shutdown()
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client clien
}

if useHTTPCache {
implementation := newHTTPCache(opts, client, recorder, repoRoot)
implementation := newHTTPCache(opts, client, recorder)
cacheImplementations = append(cacheImplementations, implementation)
}

Expand Down Expand Up @@ -172,8 +172,8 @@ type cacheMultiplexer struct {
onCacheRemoved OnCacheRemoved
}

func (mplex *cacheMultiplexer) Put(target string, key string, duration int, files []string) error {
return mplex.storeUntil(target, key, duration, files, len(mplex.caches))
func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches))
}

type cacheRemoval struct {
Expand All @@ -184,7 +184,7 @@ type cacheRemoval struct {
// storeUntil stores artifacts into higher priority caches than the given one.
// Used after artifact retrieval to ensure we have them in eg. the directory cache after
// downloading from the RPC cache.
func (mplex *cacheMultiplexer) storeUntil(target string, key string, duration int, files []string, stopAt int) error {
func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error {
// Attempt to store on all caches simultaneously.
toRemove := make([]*cacheRemoval, stopAt)
g := &errgroup.Group{}
Expand All @@ -196,7 +196,7 @@ func (mplex *cacheMultiplexer) storeUntil(target string, key string, duration in
c := cache
i := i
g.Go(func() error {
err := c.Put(target, key, duration, files)
err := c.Put(anchor, key, duration, files)
if err != nil {
cd := &util.CacheDisabledError{}
if errors.As(err, &cd) {
Expand Down Expand Up @@ -241,7 +241,7 @@ func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
}
}

func (mplex *cacheMultiplexer) Fetch(target string, key string, files []string) (bool, []string, int, error) {
func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
// Make a shallow copy of the caches, since storeUntil can call removeCache
mplex.mu.RLock()
caches := make([]Cache, len(mplex.caches))
Expand All @@ -251,7 +251,7 @@ func (mplex *cacheMultiplexer) Fetch(target string, key string, files []string)
// Retrieve from caches sequentially; if we did them simultaneously we could
// easily write the same file from two goroutines at once.
for i, cache := range caches {
ok, actualFiles, duration, err := cache.Fetch(target, key, files)
ok, actualFiles, duration, err := cache.Fetch(anchor, key, files)
if err != nil {
cd := &util.CacheDisabledError{}
if errors.As(err, &cd) {
Expand All @@ -269,11 +269,12 @@ func (mplex *cacheMultiplexer) Fetch(target string, key string, files []string)
// Store this into other caches. We can ignore errors here because we know
// we have previously successfully stored in a higher-priority cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an optimization.
_ = mplex.storeUntil(target, key, duration, actualFiles, i)
_ = mplex.storeUntil(anchor, key, duration, actualFiles, i)
return ok, actualFiles, duration, err
}
}
return false, files, 0, nil

return false, nil, 0, nil
}

func (mplex *cacheMultiplexer) Exists(target string) (ItemStatus, error) {
Expand All @@ -290,9 +291,9 @@ func (mplex *cacheMultiplexer) Exists(target string) (ItemStatus, error) {
return syncCacheState, nil
}

func (mplex *cacheMultiplexer) Clean(target string) {
func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
for _, cache := range mplex.caches {
cache.Clean(target)
cache.Clean(anchor)
}
}

Expand Down
49 changes: 25 additions & 24 deletions cli/internal/cache/cache_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package cache
import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"runtime"

"github.com/vercel/turborepo/cli/internal/analytics"
Expand All @@ -18,7 +16,7 @@ import (

// fsCache is a local filesystem cache
type fsCache struct {
cacheDirectory string
cacheDirectory turbopath.AbsoluteSystemPath
recorder analytics.Recorder
repoRoot turbopath.AbsoluteSystemPath
}
Expand All @@ -30,30 +28,29 @@ func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.Absol
return nil, err
}
return &fsCache{
cacheDirectory: cacheDir.ToStringDuringMigration(),
cacheDirectory: cacheDir,
recorder: recorder,
repoRoot: repoRoot,
}, nil
}

// Fetch returns true if items are cached. It moves them into position as a side effect.
func (f *fsCache) Fetch(target, hash string, _unusedOutputGlobs []string) (bool, []string, int, error) {
cachedFolder := filepath.Join(f.cacheDirectory, hash)
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
cachedFolder := f.cacheDirectory.UntypedJoin(hash)

// If it's not in the cache bail now
if !fs.PathExists(cachedFolder) {
if !cachedFolder.DirExists() {
f.logFetch(false, hash, 0)
return false, nil, 0, nil
}

// Otherwise, copy it into position
err := fs.RecursiveCopy(cachedFolder, target)
err := fs.RecursiveCopy(cachedFolder.ToStringDuringMigration(), anchor.ToStringDuringMigration())
if err != nil {
// TODO: what event to log here?
return false, nil, 0, fmt.Errorf("error moving artifact from cache into %v: %w", target, err)
return false, nil, 0, fmt.Errorf("error moving artifact from cache into %v: %w", anchor, err)
}

meta, err := ReadCacheMetaFile(filepath.Join(f.cacheDirectory, hash+"-meta.json"))
meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json"))
if err != nil {
return false, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
}
Expand All @@ -62,9 +59,9 @@ func (f *fsCache) Fetch(target, hash string, _unusedOutputGlobs []string) (bool,
}

func (f *fsCache) Exists(hash string) (ItemStatus, error) {
cachedFolder := filepath.Join(f.cacheDirectory, hash)
cachedFolder := f.cacheDirectory.UntypedJoin(hash)

if !fs.PathExists(cachedFolder) {
if !cachedFolder.DirExists() {
return ItemStatus{Local: false}, nil
}

Expand All @@ -87,26 +84,26 @@ func (f *fsCache) logFetch(hit bool, hash string, duration int) {
f.recorder.LogEvent(payload)
}

func (f *fsCache) Put(target, hash string, duration int, files []string) error {
func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
g := new(errgroup.Group)

numDigesters := runtime.NumCPU()
fileQueue := make(chan string, numDigesters)
fileQueue := make(chan turbopath.AnchoredSystemPath, numDigesters)

for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for file := range fileQueue {
statedFile := fs.LstatCachedFile{Path: f.repoRoot.UntypedJoin(file)}
statedFile := fs.LstatCachedFile{Path: file.RestoreAnchor(anchor)}
fromType, err := statedFile.GetType()
if err != nil {
return fmt.Errorf("error stat'ing cache source %v: %v", file, err)
}
if !fromType.IsDir() {
if err := fs.EnsureDir(filepath.Join(f.cacheDirectory, hash, file)); err != nil {
if err := f.cacheDirectory.UntypedJoin(hash, file.ToStringDuringMigration()).EnsureDir(); err != nil {
return fmt.Errorf("error ensuring directory file from cache: %w", err)
}

if err := fs.CopyFile(&statedFile, filepath.Join(f.cacheDirectory, hash, file)); err != nil {
if err := fs.CopyFile(&statedFile, f.cacheDirectory.UntypedJoin(hash, file.ToStringDuringMigration()).ToStringDuringMigration()); err != nil {
return fmt.Errorf("error copying file from cache: %w", err)
}
}
Expand All @@ -124,15 +121,19 @@ func (f *fsCache) Put(target, hash string, duration int, files []string) error {
return err
}

WriteCacheMetaFile(filepath.Join(f.cacheDirectory, hash+"-meta.json"), &CacheMetadata{
writeErr := WriteCacheMetaFile(f.cacheDirectory.UntypedJoin(hash+"-meta.json"), &CacheMetadata{
Duration: duration,
Hash: hash,
})

if writeErr != nil {
return writeErr
}

return nil
}

func (f *fsCache) Clean(target string) {
func (f *fsCache) Clean(anchor turbopath.AbsoluteSystemPath) {
fmt.Println("Not implemented yet")
}

Expand All @@ -150,21 +151,21 @@ type CacheMetadata struct {
}

// WriteCacheMetaFile writes cache metadata file at a path
func WriteCacheMetaFile(path string, config *CacheMetadata) error {
func WriteCacheMetaFile(path turbopath.AbsoluteSystemPath, config *CacheMetadata) error {
jsonBytes, marshalErr := json.Marshal(config)
if marshalErr != nil {
return marshalErr
}
writeFilErr := ioutil.WriteFile(path, jsonBytes, 0644)
writeFilErr := path.WriteFile(jsonBytes, 0644)
if writeFilErr != nil {
return writeFilErr
}
return nil
}

// ReadCacheMetaFile reads cache metadata file at a path
func ReadCacheMetaFile(path string) (*CacheMetadata, error) {
jsonBytes, readFileErr := ioutil.ReadFile(path)
func ReadCacheMetaFile(path turbopath.AbsoluteSystemPath) (*CacheMetadata, error) {
jsonBytes, readFileErr := path.ReadFile()
if readFileErr != nil {
return nil, readFileErr
}
Expand Down
Loading

0 comments on commit 45ff205

Please sign in to comment.