diff --git a/pkg/lockservice/lock_table_allocator.go b/pkg/lockservice/lock_table_allocator.go index d45da3aaca61d..6b20026d8a8f3 100644 --- a/pkg/lockservice/lock_table_allocator.go +++ b/pkg/lockservice/lock_table_allocator.go @@ -206,6 +206,8 @@ func (l *lockTableAllocator) setRestartService(serviceID string) { zap.String("serviceID", serviceID)) return } + getLogger().Info("set restart lock service", + zap.String("serviceID", serviceID)) b.setStatus(pb.Status_ServiceLockWaiting) } @@ -306,6 +308,9 @@ func (l *lockTableAllocator) disableGroupTables(groupTables []pb.LockTable, b *s delete(b.groupTables[t.Group], t.Table) } } + if len(groupTables) > 0 { + logBindsMove(groupTables) + } } func (l *lockTableAllocator) getServiceBinds(serviceID string) *serviceBinds { diff --git a/pkg/lockservice/lock_table_keeper.go b/pkg/lockservice/lock_table_keeper.go index 913ea66e72ca2..c3fe8d3596768 100644 --- a/pkg/lockservice/lock_table_keeper.go +++ b/pkg/lockservice/lock_table_keeper.go @@ -193,7 +193,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) { req.KeepLockTableBind.ServiceID = k.serviceID req.KeepLockTableBind.Status = k.service.getStatus() if !k.service.isStatus(pb.Status_ServiceLockEnable) { - req.KeepLockTableBind.LockTables = k.service.popGroupTables() + req.KeepLockTableBind.LockTables = k.service.topGroupTables() req.KeepLockTableBind.TxnIDs = k.service.activeTxnHolder.getAllTxnID() } @@ -208,6 +208,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) { if resp.KeepLockTableBind.OK { switch resp.KeepLockTableBind.Status { + case pb.Status_ServiceLockEnable: case pb.Status_ServiceLockWaiting: // maybe pb.Status_ServiceUnLockSucc if k.service.isStatus(pb.Status_ServiceLockEnable) { @@ -216,6 +217,10 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) { default: k.service.setStatus(resp.KeepLockTableBind.Status) } + if len(req.KeepLockTableBind.LockTables) > 0 { + logBindsMove(k.service.popGroupTables()) + logStatus(k.service.getStatus()) + } return } diff --git a/pkg/lockservice/log.go b/pkg/lockservice/log.go index 8ce1ea50f6768..35737c4958b04 100644 --- a/pkg/lockservice/log.go +++ b/pkg/lockservice/log.go @@ -369,6 +369,13 @@ func logPingFailed( } } +func logCanLockOnService() { + logger := getWithSkipLogger() + if logger.Enabled(zap.InfoLevel) { + logger.Error("if lock on service") + } +} + func logLocalBindsInvalid() { logger := getWithSkipLogger() logger.Error("all local lock table invalid") @@ -475,6 +482,35 @@ func logWaitersAdded( } } +func logBindsMove( + binds []pb.LockTable) { + logger := getWithSkipLogger() + if logger.Enabled(zap.InfoLevel) { + logger.Info("binds move", + bindsArrayField("binds", binds)) + } +} + +func logStatus( + status pb.Status) { + logger := getWithSkipLogger() + if logger.Enabled(zap.InfoLevel) { + logger.Info("service status", + zap.String("status", status.String())) + } +} + +func logStatusChange( + from pb.Status, + to pb.Status) { + logger := getWithSkipLogger() + if logger.Enabled(zap.InfoLevel) { + logger.Info("service status change", + zap.String("from", from.String()), + zap.String("to", to.String())) + } +} + func logWaiterGetNotify( w *waiter, v notifyValue) { @@ -590,6 +626,19 @@ func waitTxnArrayField(name string, values []pb.WaitTxn) zap.Field { return zap.String(name, buffer.String()) } +func bindsArrayField(name string, values []pb.LockTable) zap.Field { + var buffer bytes.Buffer + buffer.WriteString("[") + for idx, w := range values { + buffer.WriteString(w.DebugString()) + if idx != len(values)-1 { + buffer.WriteString(",") + } + } + buffer.WriteString("]") + return zap.String(name, buffer.String()) +} + func waiterArrayField(name string, values ...*waiter) zap.Field { var buffer bytes.Buffer buffer.WriteString("[") diff --git a/pkg/lockservice/service.go b/pkg/lockservice/service.go index aff1e013a90ca..d19002fe4dfc0 100644 --- a/pkg/lockservice/service.go +++ b/pkg/lockservice/service.go @@ -298,6 +298,7 @@ func (s *service) checkCanMoveGroupTables() { } s.mu.restartTime, _ = s.clock.Now() s.mu.status = pb.Status_ServiceLockWaiting + logStatusChange(s.mu.status, pb.Status_ServiceLockWaiting) } func (s *service) incRef(group uint32, table uint64) { @@ -317,18 +318,19 @@ func (s *service) canLockOnServiceStatus( if s.isStatus(pb.Status_ServiceLockEnable) { return true } + defer logCanLockOnService() if opts.Sharding == pb.Sharding_ByRow { tableID = shardingByRow(rows[0]) } if !s.validGroupTable(opts.Group, tableID) { return false } - if s.activeTxnHolder.empty() { - return false - } if s.activeTxnHolder.hasActiveTxn(txnID) { return true } + if s.activeTxnHolder.empty() { + return false + } if opts.SnapShotTs.LessEq(s.getRestartTime()) { return true } @@ -385,6 +387,7 @@ func (s *service) setStatus(status pb.Status) { s.mu.Lock() defer s.mu.Unlock() s.mu.status = status + logStatusChange(s.mu.status, status) } func (s *service) getStatus() pb.Status { @@ -393,6 +396,16 @@ func (s *service) getStatus() pb.Status { return s.mu.status } +func (s *service) topGroupTables() []pb.LockTable { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.mu.groupTables) == 0 { + return nil + } + g := s.mu.groupTables[0] + return g +} + func (s *service) popGroupTables() []pb.LockTable { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/lockservice/service_test.go b/pkg/lockservice/service_test.go index 48cdd33beee56..d12db7738d052 100644 --- a/pkg/lockservice/service_test.go +++ b/pkg/lockservice/service_test.go @@ -1486,6 +1486,69 @@ func TestIssue3288(t *testing.T) { ) } +func TestIssue3538(t *testing.T) { + runLockServiceTestsWithLevel( + t, + zapcore.DebugLevel, + []string{"s1", "s2"}, + time.Second*1, + func(alloc *lockTableAllocator, s []*service) { + l1 := s[0] + l2 := s[1] + + ctx, cancel := context.WithTimeout( + context.Background(), + time.Second*10) + defer cancel() + option := pb.LockOptions{ + Granularity: pb.Granularity_Row, + Mode: pb.LockMode_Exclusive, + Policy: pb.WaitPolicy_Wait, + } + + _, err := l1.Lock( + ctx, + 0, + [][]byte{{1}}, + []byte("txn1"), + option) + require.NoError(t, err) + + alloc.setRestartService("s1") + for { + if l1.isStatus(pb.Status_ServiceLockWaiting) { + break + } + select { + case <-ctx.Done(): + panic("timeout bug") + default: + } + } + + require.NoError(t, l1.Unlock(ctx, []byte("txn1"), timestamp.Timestamp{})) + + for { + _, err = l2.Lock( + ctx, + 0, + [][]byte{{1}}, + []byte("txn2"), + option) + if err == nil { + break + } + select { + case <-ctx.Done(): + panic("timeout bug") + default: + } + } + }, + nil, + ) +} + func TestIssue16121(t *testing.T) { runLockServiceTestsWithLevel( t,