-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.go
382 lines (340 loc) · 9.05 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
package localcache
import (
"sync"
"time"
"github.com/MoeYang/go-localcache/common"
"github.com/MoeYang/go-localcache/datastruct/dict"
)
const (
defaultShardCnt = 256 // ShardCnt must be a power of 2
defaultCap = 1024
defaultTTL = 60 // default key expire time 60 sec
defaultTTLTick = 100 // default time.tick 100ms
defaultTTLCheckCount = 100 // every time check 100 keys
defaultTTLCheckPercent = 25 // every check expierd key > 25, check another time
defaultTTLCheckRunTime = 50 // max run time for a tick
hitChanLen = 1 << 15 // 32768
addChanLen = 1 << 15
opTypeDel = uint8(1)
opTypeAdd = uint8(2)
)
type Cache interface {
// Get a key and return the value and if the key exists
Get(key string) (interface{}, bool)
// GetOrLoad get a key, while not exists, call f() to load data
GetOrLoad(key string, f LoadFunc) (interface{}, error)
// Set a key-value with default seconds to live
Set(key string, value interface{})
// SetWithExpire set a key-value with seconds to live
SetWithExpire(key string, value interface{}, ttl int64)
// Del delete key
Del(key string)
// Len return count of keys in cache
Len() int
// Flush clear all keys in chache, should do this when set and del is stop
Flush()
// Stop the cacheProcess by close stopChan
Stop()
// Statistic return cache Statistic {"hit":1, "miss":1, "hitRate":50.0}
Statistic() map[string]interface{}
}
// LoadFunc is called to load data from user storage
type LoadFunc func() (interface{}, error)
type localCache struct {
// elimination policy of keys
policy policy
policyType string
// data dict
dict dict.Dict
shardCnt int // shardings count
cap int // capacity
// ttl dict
ttlDict dict.Dict
ttl int64 // Global Keys expire seconds
hitChan chan interface{} // chan while get a key should put in
opChan chan opMsg // add del and add msg in one chan, so we can do options order by time acs
stopChan chan struct{} // chan stop signal
// cache statist
statist statist
// group singleFlight
group common.Group
}
// NewLocalCache return Cache obj with options
func NewLocalCache(options ...Option) Cache {
c := &localCache{
shardCnt: defaultShardCnt,
cap: defaultCap,
ttl: defaultTTL,
hitChan: make(chan interface{}, hitChanLen),
opChan: make(chan opMsg, addChanLen),
statist: newstatisCaculator(false),
}
// set options
for _, opt := range options {
opt(c)
}
// init dict
c.dict = dict.NewDict(c.shardCnt)
// init ttl dict
c.ttlDict = dict.NewDict(c.shardCnt)
// init policy
c.policy = newPolicy(c.policyType, c.cap, c)
// start goroutine
c.start()
return c
}
type Option func(*localCache)
// WithGlobalTTL set all keys default expire time of seconds
func WithGlobalTTL(expireSecond int64) Option {
if expireSecond <= 0 {
expireSecond = defaultTTL
}
return func(c *localCache) {
c.ttl = expireSecond
}
}
// WithShardCount set max Capacity
func WithCapacity(cap int) Option {
if cap <= 0 {
cap = defaultCap
}
return func(c *localCache) {
c.cap = cap
}
}
// WithShardCount shardCnt must be a power of 2
func WithShardCount(shardCnt int) Option {
if shardCnt <= 0 {
shardCnt = defaultShardCnt
}
return func(c *localCache) {
c.shardCnt = shardCnt
}
}
// WithPolicy set the elimination policy of keys
func WithPolicy(policyType string) Option {
return func(c *localCache) {
c.policyType = policyType
}
}
// WithStatist set whether need to caculate the cache`s statist, default false.
// not need may led performance a very little better ^-^
func WithStatist(needStatistic bool) Option {
return func(c *localCache) {
c.statist = newstatisCaculator(needStatistic)
}
}
func (l *localCache) Get(key string) (interface{}, bool) {
obj, has := l.dict.Get(key)
if has {
element := l.policy.unpack(obj)
element.lock.RLock()
value := element.value
isExpire := element.isExpire()
element.lock.RUnlock()
if !isExpire {
// add hit count, if chan full, skip this signal is ok
select {
case l.hitChan <- obj:
default:
}
l.statist.hitIncr()
return value, true
} else {
l.Del(key)
}
}
// not exists or expired
l.statist.missIncr()
return nil, false
}
func (l *localCache) GetOrLoad(key string, f LoadFunc) (interface{}, error) {
res, has := l.Get(key)
if has {
return res, nil
}
// key not exists, load and set cache
return l.load(key, f)
}
func (l *localCache) Set(key string, value interface{}) {
l.SetWithExpire(key, value, l.ttl)
}
func (l *localCache) SetWithExpire(key string, value interface{}, ttl int64) {
obj, has := l.dict.Get(key)
expireTime := time.Now().Add(time.Duration(ttl) * time.Second).Unix()
if has {
// update element info
element := l.policy.unpack(obj)
element.lock.Lock()
element.value = value
element.expireTime = expireTime
// set ttl surround by lock
l.ttlDict.Set(key, expireTime)
element.lock.Unlock()
// add hit count, if chan full, skip this signal is ok
select {
case l.hitChan <- obj:
default:
}
} else {
element := &element{
key: key,
value: value,
expireTime: expireTime,
}
// add async by chan
obj = l.policy.pack(element)
l.opChan <- opMsg{opType: opTypeAdd, obj: obj}
}
}
// Del delete key
func (l *localCache) Del(key string) {
// del async by chan
l.opChan <- opMsg{opType: opTypeDel, obj: key}
}
// Len return count of keys in cache
func (l *localCache) Len() int {
return l.dict.Len()
}
// Flush clear all keys in cache
func (l *localCache) Flush() {
l.dict.Flush()
l.Stop()
l.hitChan = make(chan interface{}, hitChanLen)
l.opChan = make(chan opMsg, addChanLen)
l.policy.flush()
l.start()
}
// Stop the cacheProcess by close stopChan
func (l *localCache) Stop() {
close(l.stopChan)
}
func (l *localCache) Statistic() map[string]interface{} {
return map[string]interface{}{
"hit": l.statist.GetHitCount(),
"miss": l.statist.GetMissCount(),
"hitRate": l.statist.GetHitRate(),
}
}
// start cacheProcess
func (l *localCache) start() {
l.stopChan = make(chan struct{})
// deal chan signals
go l.cacheProcess()
// delete the keys which are expired
go l.ttlProcess()
}
// cacheProcess run a loop to deal chan signals
// use a single goroutine to make policy ops safe.
func (l *localCache) cacheProcess() {
for {
select {
case obj := <-l.hitChan:
l.policy.hit(obj)
case opMsg := <-l.opChan:
if opMsg.opType == opTypeAdd {
l.set(opMsg.obj)
} else if opMsg.opType == opTypeDel {
l.del(opMsg.obj.(string))
}
case <-l.stopChan:
return
}
}
}
// load use singleFlight to load and set cache
func (l *localCache) load(key string, f LoadFunc) (interface{}, error) {
loadF := func() (interface{}, error) {
res, err := f()
// if no err, set k-v to cache
if err == nil {
l.Set(key, res)
}
return res, err
}
// use singleFlight to load and set cache
return l.group.Do(key, loadF)
}
// set called by single goroutine cacheProcess() to sync call
func (l *localCache) set(obj interface{}) {
ele := l.policy.unpack(obj)
objOld, has := l.dict.Get(ele.key)
if has { // exists, del objOld from lru list
l.policy.del(objOld)
}
l.dict.Set(ele.key, obj)
// set ttl
l.ttlDict.Set(ele.key, ele.expireTime)
// add policy
l.policy.add(obj)
}
// del called by single goroutine cacheProcess() to sync call
func (l *localCache) del(key string) {
obj, has := l.dict.Get(key)
if !has {
return
}
// need del
l.dict.Del(key)
// del ttl
l.ttlDict.Del(key)
// del policy list
l.policy.del(obj)
}
// ttlProcess run a loop to delete the keys which are expired
func (l *localCache) ttlProcess() {
t := time.NewTicker(defaultTTLTick * time.Millisecond)
defer t.Stop()
for {
select {
case <-l.stopChan:
return
case <-t.C:
ti := time.Now()
var delCount = 100
// every 100ms, check rand 100 keys;
// if expired more than 25, check again; like redis.
// max run 50 ms.
for delCount > defaultTTLCheckPercent &&
time.Now().Sub(ti) < defaultTTLCheckRunTime*time.Millisecond {
delCount = 0
now := time.Now().Unix()
keys := l.dict.RandKeys(defaultTTLCheckCount)
distinctMap := make(map[string]struct{}, defaultTTLCheckCount)
for _, key := range keys {
if _, see := distinctMap[key]; see {
continue
}
// add distinct key in map because RandKeys may repeat
distinctMap[key] = struct{}{}
v, has := l.ttlDict.Get(key)
if has {
// key expired, del it from dict & ttl dict
expireTime := v.(int64)
if now > expireTime {
l.Del(key)
delCount++
}
}
}
}
//fmt.Println(time.Now(), time.Now().Sub(ti), l.ttlDict.Len(), l.Len())
}
}
}
// element is what factly save in dict
type element struct {
lock sync.RWMutex // element should be multi-safe
key string // need key to del in policy when list is full
value interface{}
expireTime int64
}
// isExpire return whether key is dead
func (e *element) isExpire() bool {
return time.Now().Unix() > e.expireTime
}
// opMsg is a msg send to opChan when add or del a key
type opMsg struct {
opType uint8 // type: add || del
obj interface{} // policy`s obj when set || key string when del
}