Skip to content

Commit

Permalink
http.send: fix interquery cache bug (open-policy-agent#5361)
Browse files Browse the repository at this point in the history
This fixes the issue where concurrent requests with identical cache keys
cause the interquery cache size usage counter to become invalid.

Fixes open-policy-agent#5359.

Signed-off-by: Aleksander <[email protected]>
  • Loading branch information
asleire authored Nov 24, 2022
1 parent cef5f75 commit 955464e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
3 changes: 3 additions & 0 deletions topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int)
}
}

// By deleting the old value, if it exists, we ensure the usage variable stays correct
c.unsafeDelete(k)

c.items[k.String()] = v
c.l.PushBack(k)
c.usage += size
Expand Down
66 changes: 66 additions & 0 deletions topdown/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cache

import (
"reflect"
"sync"
"testing"

"github.com/open-policy-agent/opa/ast"
Expand Down Expand Up @@ -139,6 +140,71 @@ func TestInsert(t *testing.T) {
if !found {
t.Fatal("Expected key \"foo5\" in cache")
}

// replacing an existing key should not affect cache size
cache = NewInterQueryCache(config)

cacheValue6 := newInterQueryCacheValue(ast.String("bar6"), 10)
cache.Insert(ast.String("foo6"), cacheValue6)
cache.Insert(ast.String("foo6"), cacheValue6)

cacheValue7 := newInterQueryCacheValue(ast.String("bar7"), 10)
dropped = cache.Insert(ast.StringTerm("foo7").Value, cacheValue7)

if dropped != 0 {
t.Fatal("Expected dropped to be zero")
}
}

func TestConcurrentInsert(t *testing.T) {
in := `{"inter_query_builtin_cache": {"max_size_bytes": 20},}` // 20 byte limit for test purposes

config, err := ParseCachingConfig([]byte(in))
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCache(config)

cacheValue := newInterQueryCacheValue(ast.String("bar"), 10)
cache.Insert(ast.String("foo"), cacheValue)

wg := sync.WaitGroup{}

for i := 0; i < 5; i++ {
wg.Add(1)

go func() {
defer wg.Done()

cacheValue2 := newInterQueryCacheValue(ast.String("bar2"), 5)
cache.Insert(ast.String("foo2"), cacheValue2)

}()
}
wg.Wait()

cacheValue3 := newInterQueryCacheValue(ast.String("bar3"), 5)
dropped := cache.Insert(ast.String("foo3"), cacheValue3)

if dropped != 0 {
t.Fatal("Expected dropped to be zero")
}

_, found := cache.Get(ast.String("foo"))
if !found {
t.Fatal("Expected key \"foo\" in cache")
}

_, found = cache.Get(ast.String("foo2"))
if !found {
t.Fatal("Expected key \"foo2\" in cache")
}

_, found = cache.Get(ast.String("foo3"))
if !found {
t.Fatal("Expected key \"foo3\" in cache")
}
}

func TestDelete(t *testing.T) {
Expand Down

0 comments on commit 955464e

Please sign in to comment.