Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimingWheel中currentTime和overflowWheel变量存在竞争问题 #33

Open
juine opened this issue Jan 18, 2021 · 3 comments
Open

TimingWheel中currentTime和overflowWheel变量存在竞争问题 #33

juine opened this issue Jan 18, 2021 · 3 comments

Comments

@juine
Copy link

juine commented Jan 18, 2021

TimingWheel结构体中currentTime变量同时为多个goroutine访问,其中 add方法是只读,addvanceClock则是先去读currentTime变量,再判断,然后写入。使用了atomic只保证了读写currentTime变量那一个操作原子化,但是advanceClock则是先读再判断最后再写入, 会与add函数发生竞争,同理overflow变量也是一样好。粗略看了一下,kafka源码里面采用了是读写锁临时保护了这些变量

@yunlong0928
Copy link

确实是这样,kafka源码中的注释也提到了advanceClock和add不能同时调用,具体实现的方式就是读写锁

@RussellLuo
Copy link
Owner

使用了atomic只保证了读写currentTime变量那一个操作原子化,但是advanceClock则是先读再判断最后再写入, 会与add函数发生竞争,同理overflow变量也是一样好。

@juine @yunlong0928 两位好!请问具体是什么竞争,以及会导致什么问题,方便具体描述下吗?

@M6ZeroG
Copy link

M6ZeroG commented Mar 14, 2022

TimingWheel结构体中currentTime,当add方法和addvanceClock方法并发时,存在如下这种竞争情况:add方法执行完atomic.LoadInt64(&tw.currentTime)后addvanceClock方法使用atomic.StoreInt64(&tw.currentTime, currentTime)改变了currentTime,致使add方法关于t.expirationcurrentTime+tw.tick大小的判断出现问题。

if t.expiration < currentTime+tw.tick {
// Already expired
return false
} else if t.expiration < currentTime+tw.interval {
// Put it into its own bucket
virtualID := t.expiration / tw.tick
b := tw.buckets[virtualID%tw.wheelSize]
b.Add(t)
// Set the bucket expiration time
if b.SetExpiration(virtualID * tw.tick) {
// The bucket needs to be enqueued since it was an expired bucket.
// We only need to enqueue the bucket when its expiration time has changed,
// i.e. the wheel has advanced and this bucket get reused with a new expiration.
// Any further calls to set the expiration within the same wheel cycle will
// pass in the same value and hence return false, thus the bucket with the
// same expiration will not be enqueued multiple times.
tw.queue.Offer(b, b.Expiration())
}
return true

除非确保advanceClock方法和add方法不同时调用
详见yunlong0928:master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants