Skip to content

Commit

Permalink
Add cache state to task summaries on real runs (vercel#4225)
Browse files Browse the repository at this point in the history
This commit threads the itemStatus struct all the way
down to execContext.exec() so we can include it in the run
summaries.
  • Loading branch information
mehulkar authored Mar 28, 2023
1 parent 1b383bb commit 883d6d4
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 59 deletions.
28 changes: 26 additions & 2 deletions cli/integration_tests/basic_monorepo/run_summary/run_summary.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,27 @@ Setup

$ TURBO_RUN_SUMMARY=true ${TURBO} run build -- someargs > /dev/null # first run (should be cache miss)

# HACK: Generated run summaries are named with a ksuid, which is a time-sorted ID. This _generally_ works
# but we're seeing in this test that sometimes a summary file is not sorted (with /bin/ls) in the order we expect
# causing intermittent test failures.
# Add a sleep statement so we can be sure that the second run is a later timestamp,
# so we can reliably get it with `|head -n1` and `|tail -n1` later in this test.
# When we start emitting the path to the run summary file that was generated, or a way to specify
# the output file, we can remove this and look for the file directly.
# If you find this sleep statement, try running this test 10 times in a row. If there are no
# failures, it *should* be safe to remove.
$ sleep 1
$ TURBO_RUN_SUMMARY=true ${TURBO} run build -- someargs > /dev/null # run again (expecting full turbo here)

# no output, just check for 0 status code, which means the directory was created
$ test -d .turbo/runs
# expect 2 run summaries are created
$ ls .turbo/runs/*.json | wc -l
\s*1 (re)
\s*2 (re)

# get jq-parsed output of each run summary
$ FIRST=$(/bin/ls .turbo/runs/*.json | head -n1)
$ SECOND=$(/bin/ls .turbo/runs/*.json | tail -n1)

# some top level run summary validation
$ cat $FIRST | jq '.tasks | length'
Expand All @@ -38,6 +51,7 @@ Setup

# Extract some task-specific summaries from each
$ FIRST_APP_BUILD=$("$TESTDIR/get-build.sh" "$FIRST" "my-app")
$ SECOND_APP_BUILD=$("$TESTDIR/get-build.sh" "$SECOND" "my-app")
$ FIRST_UTIL_BUILD=$("$TESTDIR/get-build.sh" "$FIRST" "util")

$ echo $FIRST_APP_BUILD | jq '.execution'
Expand All @@ -52,13 +66,23 @@ Setup
[
"someargs"
]

$ echo $FIRST_APP_BUILD | jq '.hashOfExternalDependencies'
"ccab0b28617f1f56"
$ echo $FIRST_APP_BUILD | jq '.expandedOutputs'
[
"apps/my-app/.turbo/turbo-build.log"
]
# validate that cache state updates in second run
$ echo $FIRST_APP_BUILD | jq '.cacheState'
{
"local": false,
"remote": false
}
$ echo $SECOND_APP_BUILD | jq '.cacheState'
{
"local": true,
"remote": false
}

# Some validation of util#build
$ echo $FIRST_UTIL_BUILD | jq '.execution'
Expand Down
2 changes: 1 addition & 1 deletion cli/internal/cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, durati
return nil
}

func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
return c.realCache.Fetch(anchor, key, files)
}

Expand Down
21 changes: 16 additions & 5 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type Cache interface {
// Fetch returns true if there is a cache it. It is expected to move files
// into their correct position as a side effect
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error)
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
Exists(hash string) ItemStatus
// Put caches files for a given hash
Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
Expand Down Expand Up @@ -232,17 +232,24 @@ func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
}
}

func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
// Make a shallow copy of the caches, since storeUntil can call removeCache
mplex.mu.RLock()
caches := make([]Cache, len(mplex.caches))
copy(caches, mplex.caches)
mplex.mu.RUnlock()

// We need to return a composite cache status from multiple caches
// Initialize the empty struct so we can assign values to it. This is similar
// to how the Exists() method works.
combinedCacheState := ItemStatus{}

// Retrieve from caches sequentially; if we did them simultaneously we could
// easily write the same file from two goroutines at once.
for i, cache := range caches {
ok, actualFiles, duration, err := cache.Fetch(anchor, key, files)
itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
ok := itemStatus.Local || itemStatus.Remote

if err != nil {
cd := &util.CacheDisabledError{}
if errors.As(err, &cd) {
Expand All @@ -261,11 +268,15 @@ func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key st
// we have previously successfully stored in a higher-priority cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an optimization.
_ = mplex.storeUntil(anchor, key, duration, actualFiles, i)
return ok, actualFiles, duration, err

// If another cache had already set this to true, we don't need to set it again from this cache
combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
return combinedCacheState, actualFiles, duration, err
}
}

return false, nil, 0, nil
return ItemStatus{Local: false, Remote: false}, nil, 0, nil
}

func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
Expand Down
16 changes: 8 additions & 8 deletions cli/internal/cache/cache_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.Absol
}

// Fetch returns true if items are cached. It moves them into position as a side effect.
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")

Expand All @@ -45,33 +45,33 @@ func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _unuse
} else {
// It's not in the cache, bail now
f.logFetch(false, hash, 0)
return false, nil, 0, nil
return ItemStatus{Local: false}, nil, 0, nil
}

cacheItem, openErr := cacheitem.Open(actualCachePath)
if openErr != nil {
return false, nil, 0, openErr
return ItemStatus{Local: false}, nil, 0, openErr
}

restoredFiles, restoreErr := cacheItem.Restore(anchor)
if restoreErr != nil {
_ = cacheItem.Close()
return false, nil, 0, restoreErr
return ItemStatus{Local: false}, nil, 0, restoreErr
}

meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json"))
if err != nil {
_ = cacheItem.Close()
return false, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
}
f.logFetch(true, hash, meta.Duration)

// Wait to see what happens with close.
closeErr := cacheItem.Close()
if closeErr != nil {
return false, restoredFiles, 0, closeErr
return ItemStatus{Local: false}, restoredFiles, 0, closeErr
}
return true, restoredFiles, meta.Duration, nil
return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil
}

func (f *fsCache) Exists(hash string) ItemStatus {
Expand Down Expand Up @@ -129,7 +129,7 @@ func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration
return cacheItem.Close()
}

func (f *fsCache) Clean(anchor turbopath.AbsoluteSystemPath) {
func (f *fsCache) Clean(_ turbopath.AbsoluteSystemPath) {
fmt.Println("Not implemented yet")
}

Expand Down
3 changes: 2 additions & 1 deletion cli/internal/cache/cache_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ func TestFetch(t *testing.T) {

outputDir := turbopath.AbsoluteSystemPath(t.TempDir())
dstOutputPath := "some-package"
hit, files, _, err := cache.Fetch(outputDir, "the-hash", []string{})
cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{})
assert.NilError(t, err, "Fetch")
hit := cacheStatus.Local || cacheStatus.Remote
if !hit {
t.Error("Fetch got false, want true")
}
Expand Down
10 changes: 5 additions & 5 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
// nobody is the usual uid / gid of the 'nobody' user.
const nobody = 65534

func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
Expand Down Expand Up @@ -143,16 +143,16 @@ func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.Anc
return err
}

func (cache *httpCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
hit, files, duration, err := cache.retrieve(key)
if err != nil {
// TODO: analytics event?
return false, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
}
cache.logFetch(hit, key, duration)
return hit, files, duration, err
return ItemStatus{Remote: hit}, files, duration, err
}

func (cache *httpCache) Exists(key string) ItemStatus {
Expand Down Expand Up @@ -349,7 +349,7 @@ func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNon
return nil
}

func (cache *httpCache) Clean(anchor turbopath.AbsoluteSystemPath) {
func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) {
// Not possible; this implementation can only clean for a hash.
}

Expand Down
14 changes: 7 additions & 7 deletions cli/internal/cache/cache_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ func newNoopCache() *noopCache {
return &noopCache{}
}

func (c *noopCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error {
return nil
}
func (c *noopCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
return false, nil, 0, nil
func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
return ItemStatus{Local: false, Remote: false}, nil, 0, nil
}
func (c *noopCache) Exists(key string) ItemStatus {
func (c *noopCache) Exists(_ string) ItemStatus {
return ItemStatus{}
}

func (c *noopCache) Clean(anchor turbopath.AbsoluteSystemPath) {}
func (c *noopCache) CleanAll() {}
func (c *noopCache) Shutdown() {}
func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {}
func (c *noopCache) CleanAll() {}
func (c *noopCache) Shutdown() {}
22 changes: 12 additions & 10 deletions cli/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ type testCache struct {
entries map[string][]turbopath.AnchoredSystemPath
}

func (tc *testCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
func (tc *testCache) Fetch(_ turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
if tc.disabledErr != nil {
return false, nil, 0, tc.disabledErr
return ItemStatus{}, nil, 0, tc.disabledErr
}
foundFiles, ok := tc.entries[hash]
if ok {
duration := 5
return true, foundFiles, duration, nil
return ItemStatus{Local: true}, foundFiles, duration, nil
}
return false, nil, 0, nil
return ItemStatus{}, nil, 0, nil
}

func (tc *testCache) Exists(hash string) ItemStatus {
Expand All @@ -40,17 +40,17 @@ func (tc *testCache) Exists(hash string) ItemStatus {
return ItemStatus{}
}

func (tc *testCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
func (tc *testCache) Put(_ turbopath.AbsoluteSystemPath, hash string, _ int, files []turbopath.AnchoredSystemPath) error {
if tc.disabledErr != nil {
return tc.disabledErr
}
tc.entries[hash] = files
return nil
}

func (tc *testCache) Clean(anchor turbopath.AbsoluteSystemPath) {}
func (tc *testCache) CleanAll() {}
func (tc *testCache) Shutdown() {}
func (tc *testCache) Clean(_ turbopath.AbsoluteSystemPath) {}
func (tc *testCache) CleanAll() {}
func (tc *testCache) Shutdown() {}

func newEnabledCache() *testCache {
return &testCache{
Expand Down Expand Up @@ -106,10 +106,11 @@ func TestPutCachingDisabled(t *testing.T) {
mplex.mu.RUnlock()

// subsequent Fetch should still work
hit, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
if err != nil {
t.Errorf("got error fetching files: %v", err)
}
hit := cacheStatus.Local || cacheStatus.Remote
if !hit {
t.Error("failed to find previously stored files")
}
Expand Down Expand Up @@ -185,11 +186,12 @@ func TestFetchCachingDisabled(t *testing.T) {
},
}

hit, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
if err != nil {
// don't leak the cache removal
t.Errorf("Fetch got error %v, want <nil>", err)
}
hit := cacheStatus.Local || cacheStatus.Remote
if hit {
t.Error("hit on empty cache, expected miss")
}
Expand Down
6 changes: 5 additions & 1 deletion cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func RealRun(
if taskExecutionSummary != nil {
taskSummary.ExpandedOutputs = taskHashTracker.GetExpandedOutputs(taskSummary.TaskID)
taskSummary.Execution = taskExecutionSummary
taskSummary.CacheState = taskHashTracker.GetCacheStatus(taskSummary.TaskID)

// lock since multiple things to be appending to this array at the same time
mu.Lock()
Expand Down Expand Up @@ -243,7 +244,10 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas
ErrorPrefix: prettyPrefix,
WarnPrefix: prettyPrefix,
}
hit, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
cacheStatus, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
ec.taskHashTracker.SetCacheStatus(packageTask.TaskID, cacheStatus)

hit := cacheStatus.Local || cacheStatus.Remote
if err != nil {
prefixedUI.Error(fmt.Sprintf("error fetching from cache: %s", err))
} else if hit {
Expand Down
Loading

0 comments on commit 883d6d4

Please sign in to comment.