From e12ca4c71ec896c12c143863dcdde9196b227b51 Mon Sep 17 00:00:00 2001 From: Pieter Loubser Date: Fri, 17 Jan 2025 11:40:54 +0000 Subject: [PATCH] Make balancer cluster aware --- balancer/balancer.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index fb7994d..39fbb0c 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -31,9 +31,9 @@ import ( // step down a number of streams/consumers until the even distribution is met. // Which streams/consumers are stepped down is determined randomly. We use // preferred placement to move the leadership to a server with less than -// the even distribution. If stepping down fails, we will move on the next -// randomly selected server. If we get a second, similar failure the Balancer -// will return an error. +// the even distribution on the same cluster. If stepping down fails, we +// will move on the next randomly selected server. If we get a second, similar +// failure the Balancer will return an error. type Balancer struct { nc *nats.Conn log api.Logger @@ -49,6 +49,7 @@ type peer struct { name string entities []balanceEntity offset int + clusters map[string]bool } // New returns a new instance of the Balancer @@ -102,10 +103,12 @@ func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string tmp := peer{ name: leaderName, entities: []balanceEntity{}, + clusters: map[string]bool{}, } serverMap[leaderName] = &tmp } serverMap[leaderName].entities = append(serverMap[leaderName].entities, entity) + serverMap[leaderName].clusters[info.Name] = true for _, replica := range info.Replicas { _, ok = serverMap[replica.Name] @@ -113,6 +116,7 @@ func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string tmp := peer{ name: replica.Name, entities: []balanceEntity{}, + clusters: map[string]bool{info.Name: true}, } serverMap[replica.Name] = &tmp } @@ -135,22 +139,25 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeH if s.offset > 0 { b.log.Infof("Found server '%s' with offset of %d. Rebalancing", s.name, s.offset) retries := 0 - for i := 0; i <= s.offset; i++ { + for s.offset > 0 { // find a random stream (or consumer) to move to another server randomIndex := rand.Intn(len(s.entities)) entity := s.entities[randomIndex] + clusterinfo, err := entity.ClusterInfo() + if err != nil { + return 0, fmt.Errorf("unable to get clusterinfo for %s '%s'. %s", typeHint, entity.Name(), err) + } for _, ns := range servers { - if ns.offset < 0 { + if ns.offset < 0 && ns.clusters[clusterinfo.Name] { b.log.Infof("Requesting leader '%s' step down for %s '%s'. New preferred leader is %s.", s.name, typeHint, entity.Name(), ns.name) - placement := api.Placement{Preferred: ns.name} + placement := api.Placement{Preferred: ns.name, Cluster: clusterinfo.Name} err := entity.LeaderStepDown(&placement) if err != nil { b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err) // If we failed to step down the stream, decrement the iterator so that we don't kick one too few // Limit this to one retry, if we can't step down multiple leaders something is wrong if retries == 0 { - i-- retries++ s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1) break @@ -162,6 +169,7 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeH steppedDown += 1 ns.offset += 1 s.offset -= 1 + // Remove the entity we just moved from the server so it can't be randomly selected again s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1) break }