Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: reintroduce random broker iteration #606

Merged
merged 1 commit into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -948,7 +949,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
if latencyMillis > minPessimismMillis {
minPessimismMillis = latencyMillis
}
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
var random float64
cxn.b.cl.rng(func(r *rand.Rand) { random = r.Float64() })
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*random) // 95 to 98% of lifetime (pessimism 2% to 5%)

// Our minimum lifetime is always 1s (or latency, if larger).
// When our max pessimism becomes more than min pessimism,
Expand Down
55 changes: 34 additions & 21 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type Client struct {
ctx context.Context
ctxCancel func()

rng func() float64
rng func(func(*rand.Rand))

brokersMu sync.RWMutex
brokers []*broker // ordered by broker ID
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
anyBrokerIdx int32
anyBrokerOrd []int32 // shuffled brokers, for random ordering
anySeedIdx int32
stopBrokers bool // set to true on close to stop updateBrokers

Expand Down Expand Up @@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) {
ctx: ctx,
ctxCancel: cancel,

rng: func() func() float64 {
rng: func() func(func(*rand.Rand)) {
var mu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func() float64 {
return func(fn func(*rand.Rand)) {
mu.Lock()
defer mu.Unlock()
return rng.Float64()
fn(rng)
}
}(),

Expand Down Expand Up @@ -733,33 +733,45 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
}
}

func (cl *Client) reinitAnyBrokerOrd() {
cl.anyBrokerOrd = append(cl.anyBrokerOrd[:0], make([]int32, len(cl.brokers))...)
for i := range cl.anyBrokerOrd {
cl.anyBrokerOrd[i] = int32(i)
}
cl.rng(func(r *rand.Rand) {
r.Shuffle(len(cl.anyBrokerOrd), func(i, j int) {
cl.anyBrokerOrd[i], cl.anyBrokerOrd[j] = cl.anyBrokerOrd[j], cl.anyBrokerOrd[i]
})
})
}

// broker returns a random broker from all brokers ever known.
func (cl *Client) broker() *broker {
cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()

// Every time we loop through all discovered brokers, we issue one
// request to the next seed. This ensures that if all discovered
// brokers are down, we will *eventually* loop through seeds and
// hopefully have a reachable seed.
var b *broker
if len(cl.brokers) > 0 && int(cl.anyBrokerIdx) < len(cl.brokers) {
cl.anyBrokerIdx %= int32(len(cl.brokers))
b = cl.brokers[cl.anyBrokerIdx]
cl.anyBrokerIdx++
} else {
seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerIdx to begin ranging through
// discovered brokers again.
if len(cl.brokers) > 0 {
cl.anyBrokerIdx = 0
}
if len(cl.anyBrokerOrd) > 0 {
b = cl.brokers[cl.anyBrokerOrd[0]]
cl.anyBrokerOrd = cl.anyBrokerOrd[1:]
return b
}

seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerOrd to begin ranging through
// discovered brokers again. If there are still no brokers,
// this reinit will do nothing and we will keep looping seeds.
cl.reinitAnyBrokerOrd()
return b
}

Expand Down Expand Up @@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
}

cl.brokers = newBrokers
cl.reinitAnyBrokerOrd()
}

// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
Expand Down