-
Notifications
You must be signed in to change notification settings - Fork 125
/
Copy pathtimingwheel.go
226 lines (200 loc) · 6.44 KB
/
timingwheel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package timingwheel
import (
"errors"
"sync/atomic"
"time"
"unsafe"
"github.com/RussellLuo/timingwheel/delayqueue"
)
// TimingWheel is an implementation of Hierarchical Timing Wheels.
type TimingWheel struct {
tick int64 // in milliseconds
wheelSize int64
interval int64 // in milliseconds
currentTime int64 // in milliseconds
buckets []*bucket
queue *delayqueue.DelayQueue
// The higher-level overflow wheel.
//
// NOTE: This field may be updated and read concurrently, through Add().
overflowWheel unsafe.Pointer // type: *TimingWheel
exitC chan struct{}
waitGroup waitGroupWrapper
}
// NewTimingWheel creates an instance of TimingWheel with the given tick and wheelSize.
func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
tickMs := int64(tick / time.Millisecond)
if tickMs <= 0 {
panic(errors.New("tick must be greater than or equal to 1ms"))
}
startMs := timeToMs(time.Now().UTC())
return newTimingWheel(
tickMs,
wheelSize,
startMs,
delayqueue.New(int(wheelSize)),
)
}
// newTimingWheel is an internal helper function that really creates an instance of TimingWheel.
func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {
buckets := make([]*bucket, wheelSize)
for i := range buckets {
buckets[i] = newBucket()
}
return &TimingWheel{
tick: tickMs,
wheelSize: wheelSize,
currentTime: truncate(startMs, tickMs),
interval: tickMs * wheelSize,
buckets: buckets,
queue: queue,
exitC: make(chan struct{}),
}
}
// add inserts the timer t into the current timing wheel.
func (tw *TimingWheel) add(t *Timer) bool {
currentTime := atomic.LoadInt64(&tw.currentTime)
if t.expiration < currentTime+tw.tick {
// Already expired
return false
} else if t.expiration < currentTime+tw.interval {
// Put it into its own bucket
virtualID := t.expiration / tw.tick
b := tw.buckets[virtualID%tw.wheelSize]
b.Add(t)
// Set the bucket expiration time
if b.SetExpiration(virtualID * tw.tick) {
// The bucket needs to be enqueued since it was an expired bucket.
// We only need to enqueue the bucket when its expiration time has changed,
// i.e. the wheel has advanced and this bucket get reused with a new expiration.
// Any further calls to set the expiration within the same wheel cycle will
// pass in the same value and hence return false, thus the bucket with the
// same expiration will not be enqueued multiple times.
tw.queue.Offer(b, b.Expiration())
}
return true
} else {
// Out of the interval. Put it into the overflow wheel
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel == nil {
atomic.CompareAndSwapPointer(
&tw.overflowWheel,
nil,
unsafe.Pointer(newTimingWheel(
tw.interval,
tw.wheelSize,
currentTime,
tw.queue,
)),
)
overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
}
return (*TimingWheel)(overflowWheel).add(t)
}
}
// addOrRun inserts the timer t into the current timing wheel, or run the
// timer's task if it has already expired.
func (tw *TimingWheel) addOrRun(t *Timer) {
if !tw.add(t) {
// Already expired
// Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc),
// always execute the timer's task in its own goroutine.
go t.task()
}
}
func (tw *TimingWheel) advanceClock(expiration int64) {
currentTime := atomic.LoadInt64(&tw.currentTime)
if expiration >= currentTime+tw.tick {
currentTime = truncate(expiration, tw.tick)
atomic.StoreInt64(&tw.currentTime, currentTime)
// Try to advance the clock of the overflow wheel if present
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel != nil {
(*TimingWheel)(overflowWheel).advanceClock(currentTime)
}
}
}
// Start starts the current timing wheel.
func (tw *TimingWheel) Start() {
tw.waitGroup.Wrap(func() {
tw.queue.Poll(tw.exitC, func() int64 {
return timeToMs(time.Now().UTC())
})
})
tw.waitGroup.Wrap(func() {
for {
select {
case elem := <-tw.queue.C:
b := elem.(*bucket)
tw.advanceClock(b.Expiration())
b.Flush(tw.addOrRun)
case <-tw.exitC:
return
}
}
})
}
// Stop stops the current timing wheel.
//
// If there is any timer's task being running in its own goroutine, Stop does
// not wait for the task to complete before returning. If the caller needs to
// know whether the task is completed, it must coordinate with the task explicitly.
func (tw *TimingWheel) Stop() {
close(tw.exitC)
tw.waitGroup.Wait()
}
// AfterFunc waits for the duration to elapse and then calls f in its own goroutine.
// It returns a Timer that can be used to cancel the call using its Stop method.
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
t := &Timer{
expiration: timeToMs(time.Now().UTC().Add(d)),
task: f,
}
tw.addOrRun(t)
return t
}
// Scheduler determines the execution plan of a task.
type Scheduler interface {
// Next returns the next execution time after the given (previous) time.
// It will return a zero time if no next time is scheduled.
//
// All times must be UTC.
Next(time.Time) time.Time
}
// ScheduleFunc calls f (in its own goroutine) according to the execution
// plan scheduled by s. It returns a Timer that can be used to cancel the
// call using its Stop method.
//
// If the caller want to terminate the execution plan halfway, it must
// stop the timer and ensure that the timer is stopped actually, since in
// the current implementation, there is a gap between the expiring and the
// restarting of the timer. The wait time for ensuring is short since the
// gap is very small.
//
// Internally, ScheduleFunc will ask the first execution time (by calling
// s.Next()) initially, and create a timer if the execution time is non-zero.
// Afterwards, it will ask the next execution time each time f is about to
// be executed, and f will be called at the next execution time if the time
// is non-zero.
func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) (t *Timer) {
expiration := s.Next(time.Now().UTC())
if expiration.IsZero() {
// No time is scheduled, return nil.
return
}
t = &Timer{
expiration: timeToMs(expiration),
task: func() {
// Schedule the task to execute at the next time if possible.
expiration := s.Next(msToTime(t.expiration))
if !expiration.IsZero() {
t.expiration = timeToMs(expiration)
tw.addOrRun(t)
}
// Actually execute the task.
f()
},
}
tw.addOrRun(t)
return
}