-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathwindow.go
138 lines (111 loc) · 3.16 KB
/
window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package slidingwindow
import (
"time"
)
// LocalWindow represents a window that ignores sync behavior entirely
// and only stores counters in memory.
type LocalWindow struct {
// The start boundary (timestamp in nanoseconds) of the window.
// [start, start + size)
start int64
// The total count of events happened in the window.
count int64
}
func NewLocalWindow() (*LocalWindow, StopFunc) {
return &LocalWindow{}, func() {}
}
func (w *LocalWindow) Start() time.Time {
return time.Unix(0, w.start)
}
func (w *LocalWindow) Count() int64 {
return w.count
}
func (w *LocalWindow) AddCount(n int64) {
w.count += n
}
func (w *LocalWindow) Reset(s time.Time, c int64) {
w.start = s.UnixNano()
w.count = c
}
func (w *LocalWindow) Sync(now time.Time) {}
type (
SyncRequest struct {
Key string
Start int64
Count int64
Changes int64
}
SyncResponse struct {
// Whether the synchronization succeeds.
OK bool
Start int64
// The changes accumulated by the local limiter.
Changes int64
// The total changes accumulated by all the other limiters.
OtherChanges int64
}
MakeFunc func() SyncRequest
HandleFunc func(SyncResponse)
)
type Synchronizer interface {
// Start starts the synchronization goroutine, if any.
Start()
// Stop stops the synchronization goroutine, if any, and waits for it to exit.
Stop()
// Sync sends a synchronization request.
Sync(time.Time, MakeFunc, HandleFunc)
}
// SyncWindow represents a window that will sync counter data to the
// central datastore asynchronously.
//
// Note that for the best coordination between the window and the synchronizer,
// the synchronization is not automatic but is driven by the call to Sync.
type SyncWindow struct {
LocalWindow
changes int64
key string
syncer Synchronizer
}
// NewSyncWindow creates an instance of SyncWindow with the given synchronizer.
func NewSyncWindow(key string, syncer Synchronizer) (*SyncWindow, StopFunc) {
w := &SyncWindow{
key: key,
syncer: syncer,
}
w.syncer.Start()
return w, w.syncer.Stop
}
func (w *SyncWindow) AddCount(n int64) {
w.changes += n
w.LocalWindow.AddCount(n)
}
func (w *SyncWindow) Reset(s time.Time, c int64) {
// Clear changes accumulated within the OLD window.
//
// Note that for simplicity, we do not sync remaining changes to the
// central datastore before the reset, thus let the periodic synchronization
// take full charge of the accuracy of the window's count.
w.changes = 0
w.LocalWindow.Reset(s, c)
}
func (w *SyncWindow) makeSyncRequest() SyncRequest {
return SyncRequest{
Key: w.key,
Start: w.LocalWindow.start,
Count: w.LocalWindow.count,
Changes: w.changes,
}
}
func (w *SyncWindow) handleSyncResponse(resp SyncResponse) {
if resp.OK && resp.Start == w.LocalWindow.start {
// Update the state of the window, only when it has not been reset
// during the latest sync.
// Take the changes accumulated by other limiters into consideration.
w.LocalWindow.count += resp.OtherChanges
// Subtract the amount that has been synced from existing changes.
w.changes -= resp.Changes
}
}
func (w *SyncWindow) Sync(now time.Time) {
w.syncer.Sync(now, w.makeSyncRequest, w.handleSyncResponse)
}