Skip to content

Commit

Permalink
Merge pull request #602 from twmb/sticky-move
Browse files Browse the repository at this point in the history
sticky balancer: try for better topic distribution among members
  • Loading branch information
twmb authored Oct 22, 2023
2 parents 3d115f1 + 1429d47 commit e94230f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/kgo/internal/sticky/sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,24 @@ func (b *balancer) assignUnassignedAndInitGraph() {
}

b.tryRestickyStales(topicPotentials, partitionConsumers)

// For each member, we now sort their current partitions by partition,
// then topic. Sorting the lowest numbers first means that once we
// steal from the end (when adding a member), we steal equally across
// all topics. This benefits the standard case the most, where all
// members consume equally.
for memberNum := range b.plan {
partNums := b.plan[memberNum]
sort.Slice(partNums, func(i, j int) bool {
lpNum, rpNum := partNums[i], partNums[j]
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
lt, rt := li.topic, ri.topic
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
return lp < rp || (lp == rp && lt < rt)
})
}

for _, potentials := range topicPotentials {
(&membersByPartitions{potentials, b.plan}).init()
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/kgo/internal/sticky/sticky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,30 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi
testPlanUsage(t, plan3, topics, nil)
}

func Test_stickyAddEqualMove(t *testing.T) {
t.Parallel()
topics := map[string]int32{"foo": 16, "bar": 16}
members := []GroupMember{
{ID: "1", Topics: []string{"foo", "bar"}},
}
plan1 := Balance(members, topics)

// PLAN 2
members[0].UserData = udEncode(1, 1, plan1["1"])
members = append(members, GroupMember{
ID: "2", Topics: []string{"foo", "bar"},
})

plan2 := Balance(members, topics)
testEqualDivvy(t, plan2, 16, members)
testPlanUsage(t, plan2, topics, nil)

if len(plan2["1"]["foo"]) != 8 || len(plan2["1"]["bar"]) != 8 ||
len(plan2["2"]["foo"]) != 8 || len(plan2["2"]["bar"]) != 8 {
t.Errorf("bad distribution: %v", plan2)
}
}

func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit e94230f

Please sign in to comment.