Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: add AssumeConsumersRequireStable() #836

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ type cfg struct {
balancers []GroupBalancer // balancers we can use
protocol string // "consumer" by default, expected to never be overridden

sessionTimeout time.Duration
rebalanceTimeout time.Duration
heartbeatInterval time.Duration
requireStable bool
sessionTimeout time.Duration
rebalanceTimeout time.Duration
heartbeatInterval time.Duration
requireStable bool
assumeConsumersRequireStable bool

onAssigned func(context.Context, *Client, map[string][]int32)
onRevoked func(context.Context, *Client, map[string][]int32)
Expand Down Expand Up @@ -1503,10 +1504,21 @@ func HeartbeatInterval(interval time.Duration) GroupOpt {
// transactional timeouts to a small value (10s) rather than the default 60s.
// Lowering the transactional timeout will reduce the chance that consumers are
// entirely blocked.
//
// If all consumers in your group also require stable fetch offsets, you may
// want to additionally use [AssumeConsumersRequireStable].
func RequireStableFetchOffsets() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.requireStable = true }}
}

// AssumeConsumersRequireStable opts the [GroupTransactSession] into NOT
// aborting whenever rebalance occur (i.e., opts this client into assuming
// every other client in the group also requires stable offsets). This
// option should be used in tandem with [RequireStableFetchOffsets].
func AssumeConsumersRequireStable() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.assumeConsumersRequireStable = true }}
}

// BlockRebalanceOnPoll switches the client to block rebalances whenever you
// poll until you explicitly call AllowRebalance. This option also ensures that
// any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestGroupETL(t *testing.T) {
errs,
false,
tc.balancer,
false,
)
})
}
Expand Down
27 changes: 14 additions & 13 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ func (c *testConsumer) wait() {
c.wg.Wait()
}

func (c *testConsumer) goRun(transactional bool, etlsBeforeQuit int) {
func (c *testConsumer) goRun(transactional bool, etlsBeforeQuit int, assumeStable bool) {
if transactional {
c.goTransact(etlsBeforeQuit)
c.goTransact(etlsBeforeQuit, assumeStable)
} else {
c.goGroupETL(etlsBeforeQuit)
}
Expand All @@ -419,6 +419,7 @@ func testChainETL(
errs chan error,
transactional bool,
balancer GroupBalancer,
assumeStable bool,
) {
var (
/////////////
Expand Down Expand Up @@ -484,24 +485,24 @@ func testChainETL(
////////////////////

for i := 0; i < 3; i++ { // three consumers start with standard poll&commit behavior
consumers1.goRun(transactional, -1)
consumers2.goRun(transactional, -1)
consumers3.goRun(transactional, -1)
consumers1.goRun(transactional, -1, assumeStable)
consumers2.goRun(transactional, -1, assumeStable)
consumers3.goRun(transactional, -1, assumeStable)
}

consumers1.goRun(transactional, 0) // bail immediately
consumers1.goRun(transactional, 2) // bail after two txns
consumers2.goRun(transactional, 2) // same
consumers1.goRun(transactional, 0, assumeStable) // bail immediately
consumers1.goRun(transactional, 2, assumeStable) // bail after two txns
consumers2.goRun(transactional, 2, assumeStable) // same

time.Sleep(5 * time.Second)
for i := 0; i < 3; i++ { // trigger rebalance after 5s with more consumers
consumers1.goRun(transactional, -1)
consumers2.goRun(transactional, -1)
consumers3.goRun(transactional, -1)
consumers1.goRun(transactional, -1, assumeStable)
consumers2.goRun(transactional, -1, assumeStable)
consumers3.goRun(transactional, -1, assumeStable)
}

consumers2.goRun(transactional, 0) // bail immediately
consumers1.goRun(transactional, 1) // bail after one txn
consumers2.goRun(transactional, 0, assumeStable) // bail immediately
consumers1.goRun(transactional, 1, assumeStable) // bail after one txn

doneConsume := make(chan struct{})
go func() {
Expand Down
20 changes: 16 additions & 4 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ const (
// (EOS).
//
// If you are running Kafka 2.5+, it is strongly recommended that you also use
// RequireStableFetchOffsets. See that config option's documentation for more
// details.
// [RequireStableFetchOffsets]. See that config option's documentation for more
// details. By default, if the client detects any rebalance, any active transaction
// is aborted for safety. You can use the [AssumeConsumersRequireStable] to opt into
// NOT aborting automatically on rebalance. See issue 754 for more detail.
type GroupTransactSession struct {
cl *Client

Expand Down Expand Up @@ -94,18 +96,27 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
userRevoked := cfg.onRevoked
cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.revoked {
s.failMu.Unlock()
return
}

if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit")
} else if cl.cfg.assumeConsumersRequireStable {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke, but we are assuming all consumers require stable; allowing commit while in user revoked")
defer func() {
s.failMu.Lock()
s.revoked = true
close(s.revokedCh)
s.failMu.Unlock()
}()
} else {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")
s.revoked = true
close(s.revokedCh)
}
s.failMu.Unlock()

if userRevoked != nil {
userRevoked(ctx, cl, rev)
Expand All @@ -115,14 +126,15 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
userLost := cfg.onLost
cfg.onLost = func(ctx context.Context, cl *Client, lost map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.lost {
s.failMu.Unlock()
return
}

cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction")
s.lost = true
close(s.lostCh)
s.failMu.Unlock()

if userLost != nil {
userLost(ctx, cl, lost)
Expand Down
36 changes: 24 additions & 12 deletions pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@
////////////////////////////

for _, tc := range []struct {
name string
balancer GroupBalancer
name string
balancer GroupBalancer
assumeStable bool
}{
{"roundrobin", RoundRobinBalancer()},
{"range", RangeBalancer()},
{"sticky", StickyBalancer()},
{"cooperative-sticky", CooperativeStickyBalancer()},
{"roundrobin", RoundRobinBalancer(), false},
{"range", RangeBalancer(), true},
{"sticky", StickyBalancer(), false},
{"cooperative-sticky", CooperativeStickyBalancer(), true},
{"cooperative-sticky", CooperativeStickyBalancer(), false},
} {
t.Run(tc.name, func(t *testing.T) {
testChainETL(
Expand All @@ -124,17 +126,18 @@
errs,
true,
tc.balancer,
tc.assumeStable,
)
})
}
}

func (c *testConsumer) goTransact(txnsBeforeQuit int) {
func (c *testConsumer) goTransact(txnsBeforeQuit int, assumeStable bool) {
c.wg.Add(1)
go c.transact(txnsBeforeQuit)
go c.transact(txnsBeforeQuit, assumeStable)
}

func (c *testConsumer) transact(txnsBeforeQuit int) {
func (c *testConsumer) transact(txnsBeforeQuit int, assumeStable bool) {
defer c.wg.Done()

opts := []Opt{
Expand All @@ -155,12 +158,21 @@
Balancers(c.balancer),
MaxBufferedRecords(10000),
}
if requireStableFetch {
opts = append(opts, RequireStableFetchOffsets())
var txnSess *GroupTransactSession
if assumeStable {
opts = append(opts, OnPartitionsRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) {

Check failure on line 163 in pkg/kgo/txn_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

unused-parameter: parameter 'cl' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 163 in pkg/kgo/txn_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

unused-parameter: parameter 'cl' seems to be unused, consider removing or renaming it as _ (revive)
txnSess.End(ctx, TryCommit)
}))
}
if requireStableFetch || assumeStable {
opts = append(opts,
RequireStableFetchOffsets(),
AssumeConsumersRequireStable(),
)
}
opts = append(opts, testClientOpts()...)

txnSess, _ := NewGroupTransactSession(opts...)
txnSess, _ = NewGroupTransactSession(opts...)
defer txnSess.Close()

ntxns := 0 // for if txnsBeforeQuit is non-negative
Expand Down
Loading