-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathallowance_store.go
122 lines (98 loc) · 3.06 KB
/
allowance_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package rate
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"github.com/TykTechnologies/exp/pkg/limiters"
"github.com/TykTechnologies/tyk/internal/redis"
)
// AllowanceStore implements AllowanceRepository.
type AllowanceStore struct {
redis redis.UniversalClient
cacheMu sync.RWMutex
cache map[string][]byte
stats struct {
set int64
setErrors int64
get int64
getCached int64
getErrors int64
locker int64
}
}
// NewAllowanceStore will return a new instance of *AllowanceStore.
func NewAllowanceStore(redis redis.UniversalClient) *AllowanceStore {
return &AllowanceStore{
redis: redis,
cache: make(map[string][]byte),
}
}
// String will return the stats for the AllowanceStore.
func (s *AllowanceStore) String() string {
var (
locker = atomic.LoadInt64(&s.stats.locker)
set = atomic.LoadInt64(&s.stats.set)
setErrors = atomic.LoadInt64(&s.stats.setErrors)
get = atomic.LoadInt64(&s.stats.get)
getCached = atomic.LoadInt64(&s.stats.getCached)
getErrors = atomic.LoadInt64(&s.stats.getErrors)
)
return fmt.Sprintf("locker=%d set=%d setErrors=%d get=%d getCached=%d getErrors=%d", locker, set, setErrors, get, getCached, getErrors)
}
func (s *AllowanceStore) get(key string) *Allowance {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
if cached, ok := s.cache[key]; ok {
allowance := &Allowance{}
// We have control over the type, marshalling must not fail.
_ = json.Unmarshal(cached, allowance)
return allowance
}
return nil
}
func (s *AllowanceStore) set(key string, allowance *Allowance) {
// We have control over the type, marshalling must not fail.
b, _ := json.Marshal(allowance)
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
s.cache[key] = b
}
// Locker returns a distributed locker, similar to a mutex.
func (s *AllowanceStore) Locker(key string) limiters.DistLocker {
atomic.AddInt64(&s.stats.locker, 1)
// Handle distributed lock for the write
return limiters.NewLockRedis(redis.NewPool(s.redis), Prefix(key, "lock"))
}
// Get retrieves and decodes an Allowance value from storage.
func (s *AllowanceStore) Get(ctx context.Context, key string) (*Allowance, error) {
atomic.AddInt64(&s.stats.get, 1)
result := s.get(key)
if result != nil {
atomic.AddInt64(&s.stats.getCached, 1)
return result, nil
}
hval, err := s.redis.HGetAll(ctx, Prefix(key, "allowance")).Result()
if err != nil {
atomic.AddInt64(&s.stats.getErrors, 1)
return nil, err
}
result = NewAllowanceFromMap(hval)
s.set(key, result)
return result, nil
}
// Set will write the passed Allowance value to storage.
func (s *AllowanceStore) Set(ctx context.Context, key string, allowance *Allowance) error {
allowanceKey := Prefix(key, "allowance")
atomic.AddInt64(&s.stats.set, 1)
err := s.redis.HSet(ctx, allowanceKey, allowance.Map()).Err()
if err != nil {
atomic.AddInt64(&s.stats.setErrors, 1)
}
s.redis.Expire(ctx, allowanceKey, 2*allowance.GetDelay())
s.set(key, allowance)
return err
}
// Compile time check that *AllowanceStore implements AllowanceRepository.
var _ AllowanceRepository = &AllowanceStore{}