forked from jitsucom/jitsu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.go
163 lines (136 loc) · 3.01 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package events
import (
"github.com/jitsucom/eventnative/safego"
"sync"
)
//CachedFact is channel holder for key and value
type CachedFact struct {
key string
fact Fact
}
//CachedBucket is slice of events(facts) under the hood
//swapPointer is used for overwrite values without copying underlying slice
type CachedBucket struct {
sync.RWMutex
facts []Fact
swapPointer int
capacity int
}
//Put value in underlying slice with lock
//e.g. capacity = 3
// swapPointer = 0, [1, 2, 3] + 4 = [3, 2, 4]
// swapPointer = 1, [3, 2, 4] + 5 = [3, 4, 5]
// swapPointer = 0, [3, 4, 5] + 6 = [5, 4, 6]
func (ce *CachedBucket) Put(value Fact) {
ce.Lock()
if len(ce.facts) == ce.capacity {
lastIndex := len(ce.facts) - 1
last := ce.facts[lastIndex]
ce.facts[ce.swapPointer] = last
ce.facts[lastIndex] = value
ce.swapPointer += 1
if ce.swapPointer == ce.capacity-1 {
ce.swapPointer = 0
}
} else {
ce.facts = append(ce.facts, value)
}
ce.Unlock()
}
//Get return <= n elements from bucket with read lock
func (ce *CachedBucket) GetN(n int) []Fact {
ce.RLock()
defer ce.RUnlock()
if len(ce.facts) <= n {
return ce.facts
}
return ce.facts[:n]
}
//Cache keep capacityPerKey last elements
//1. per key (perApiKey map)
//2. without key filter (all)
type Cache struct {
sync.RWMutex
putCh chan *CachedFact
perApiKey map[string]*CachedBucket
all *CachedBucket
capacityPerKey int
closed bool
}
//return Cache and start goroutine for async puts
func NewCache(capacityPerKey int) *Cache {
c := &Cache{
putCh: make(chan *CachedFact, 1000000),
perApiKey: map[string]*CachedBucket{},
capacityPerKey: capacityPerKey,
all: &CachedBucket{
facts: make([]Fact, 0, capacityPerKey),
swapPointer: 0,
capacity: capacityPerKey,
},
}
c.start()
return c
}
//start goroutine for reading from putCh and put to cache (async put)
func (c *Cache) start() {
safego.RunWithRestart(func() {
for {
if c.closed {
break
}
cf := <-c.putCh
if cf != nil {
c.Put(cf.key, cf.fact)
}
}
})
}
//PutAsync put value into channel
func (c *Cache) PutAsync(key string, value Fact) {
select {
case c.putCh <- &CachedFact{key: key, fact: value}:
default:
}
}
//Put fact to map per key and to all with lock
func (c *Cache) Put(key string, value Fact) {
//all
c.all.Put(value)
//per key
c.RLock()
element, ok := c.perApiKey[key]
c.RUnlock()
if !ok {
c.Lock()
element, ok = c.perApiKey[key]
if !ok {
element = &CachedBucket{
facts: make([]Fact, 0, c.capacityPerKey),
swapPointer: 0,
capacity: c.capacityPerKey,
}
c.perApiKey[key] = element
}
c.Unlock()
}
element.Put(value)
}
//GetN return at most n facts by key
func (c *Cache) GetN(key string, n int) []Fact {
c.RLock()
element, ok := c.perApiKey[key]
c.RUnlock()
if ok {
return element.GetN(n)
}
return []Fact{}
}
//GetAll return at most n facts
func (c *Cache) GetAll(n int) []Fact {
return c.all.GetN(n)
}
func (c *Cache) Close() error {
c.closed = true
return nil
}