Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix can restart cn when tn crash #17115

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ func (l *lockTableAllocator) setRestartService(serviceID string) {
zap.String("serviceID", serviceID))
return
}
getLogger().Info("set restart lock service",
zap.String("serviceID", serviceID))
logServiceStatus("set restart lock service",
serviceID,
b.getStatus())
b.setStatus(pb.Status_ServiceLockWaiting)
}

Expand All @@ -221,7 +222,8 @@ func (l *lockTableAllocator) remainTxnInService(serviceID string) int32 {
txnIDs := b.getTxnIds()
getLogger().Error("remain txn in restart service",
bytesArrayField("txnIDs", txnIDs),
zap.String("serviceID", serviceID))
zap.String("serviceID", serviceID),
zap.String("status", b.getStatus().String()))

c := len(txnIDs)
if c == 0 {
Expand All @@ -231,7 +233,8 @@ func (l *lockTableAllocator) remainTxnInService(serviceID string) int32 {
// -1 means can not get right remain txn in restart lock service
c = -1
getLogger().Error("can not get right remain txn in restart lock service",
zap.String("serviceID", serviceID))
zap.String("serviceID", serviceID),
zap.String("status", b.getStatus().String()))
}

}
Expand All @@ -255,6 +258,9 @@ func (l *lockTableAllocator) canRestartService(serviceID string) bool {
zap.String("serviceID", serviceID))
return true
}
logServiceStatus("can restart lock service",
serviceID,
b.getStatus())
return b.isStatus(pb.Status_ServiceCanRestart)
}

Expand Down Expand Up @@ -612,9 +618,16 @@ func (b *serviceBinds) isStatus(status pb.Status) bool {
return b.status == status
}

func (b *serviceBinds) getStatus() pb.Status {
b.RLock()
defer b.RUnlock()
return b.status
}

func (b *serviceBinds) setStatus(status pb.Status) {
b.Lock()
defer b.Unlock()
logStatusChange(b.status, status)
b.status = status
}

Expand Down Expand Up @@ -754,8 +767,14 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
}
b := l.getServiceBinds(req.KeepLockTableBind.ServiceID)
if b.isStatus(pb.Status_ServiceLockEnable) {
writeResponse(ctx, cancel, resp, nil, cs)
return
if req.KeepLockTableBind.Status != pb.Status_ServiceLockEnable {
getLogger().Error("tn has abnormal lock service status",
zap.String("serviceID", b.serviceID),
zap.String("status", req.KeepLockTableBind.Status.String()))
} else {
writeResponse(ctx, cancel, resp, nil, cs)
return
}
}
b.setTxnIds(req.KeepLockTableBind.TxnIDs)
switch req.KeepLockTableBind.Status {
Expand Down
7 changes: 7 additions & 0 deletions pkg/lockservice/lock_table_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package lockservice

import (
"context"
"go.uber.org/zap"
"time"

"github.com/matrixorigin/matrixone/pkg/common/morpc"
Expand Down Expand Up @@ -209,6 +210,12 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
if resp.KeepLockTableBind.OK {
switch resp.KeepLockTableBind.Status {
case pb.Status_ServiceLockEnable:
if !k.service.isStatus(pb.Status_ServiceLockEnable) {
getLogger().Error("tn has abnormal lock service status",
zap.String("serviceID", k.serviceID),
zap.String("status", k.service.getStatus().String()))
}
return
case pb.Status_ServiceLockWaiting:
// maybe pb.Status_ServiceUnLockSucc
if k.service.isStatus(pb.Status_ServiceLockEnable) {
Expand Down
19 changes: 17 additions & 2 deletions pkg/lockservice/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,12 @@ func logPingFailed(
}
}

func logCanLockOnService() {
func logCanLockOnService(
serviceID string) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Error("if lock on service")
logger.Error("if lock on service",
zap.String("serviceID", serviceID))
}
}

Expand Down Expand Up @@ -500,6 +502,19 @@ func logStatus(
}
}

func logServiceStatus(
info string,
serviceID string,
status pb.Status) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Info("service status",
zap.String("info", info),
zap.String("serviceID", serviceID),
zap.String("status", status.String()))
}
}

func logStatusChange(
from pb.Status,
to pb.Status) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/lockservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ func (s *service) canLockOnServiceStatus(
if s.isStatus(pb.Status_ServiceLockEnable) {
return true
}
defer logCanLockOnService()
if opts.Sharding == pb.Sharding_ByRow {
tableID = shardingByRow(rows[0])
}
if !s.validGroupTable(opts.Group, tableID) {
logCanLockOnService(s.serviceID)
return false
}
if s.activeTxnHolder.hasActiveTxn(txnID) {
Expand Down Expand Up @@ -386,8 +386,8 @@ func (s *service) Close() error {
func (s *service) setStatus(status pb.Status) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.status = status
logStatusChange(s.mu.status, status)
s.mu.status = status
}

func (s *service) getStatus() pb.Status {
Expand Down
187 changes: 187 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,193 @@ func TestReLockSuccWithLockTableBindChanged(t *testing.T) {
)
}

func TestIssue3537(t *testing.T) {
runLockServiceTestsWithLevel(
t,
zapcore.DebugLevel,
[]string{"s1"},
time.Second*1,
func(alloc *lockTableAllocator, s []*service) {
l1 := s[0]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err := l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

alloc.setRestartService("s1")
for {
if l1.isStatus(pb.Status_ServiceLockWaiting) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}

b := alloc.getServiceBindsWithoutPrefix("s1")
b.setStatus(pb.Status_ServiceLockEnable)

for {
if b.isStatus(pb.Status_ServiceLockWaiting) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}

require.NoError(t, l1.Unlock(ctx, []byte("txn1"), timestamp.Timestamp{}))
for {
if b.isStatus(pb.Status_ServiceCanRestart) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}
},
nil,
)
}

func TestIssue3537_2(t *testing.T) {
runLockServiceTestsWithLevel(
t,
zapcore.DebugLevel,
[]string{"s1"},
time.Second*1,
func(alloc *lockTableAllocator, s []*service) {
l1 := s[0]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err := l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

alloc.setRestartService("s1")
for {
if l1.isStatus(pb.Status_ServiceLockWaiting) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}

b := alloc.getServiceBindsWithoutPrefix("s1")
b.setStatus(pb.Status_ServiceLockEnable)

require.NoError(t, l1.Unlock(ctx, []byte("txn1"), timestamp.Timestamp{}))

for {
if b.isStatus(pb.Status_ServiceCanRestart) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}
},
nil,
)
}

func TestIssue3537_3(t *testing.T) {
runLockServiceTestsWithLevel(
t,
zapcore.DebugLevel,
[]string{"s1"},
time.Second*1,
func(alloc *lockTableAllocator, s []*service) {
l1 := s[0]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err := l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

alloc.setRestartService("s1")
for {
if l1.isStatus(pb.Status_ServiceLockWaiting) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}

require.NoError(t, l1.Unlock(ctx, []byte("txn1"), timestamp.Timestamp{}))

b := alloc.getServiceBindsWithoutPrefix("s1")
b.setStatus(pb.Status_ServiceLockEnable)

for {
if b.isStatus(pb.Status_ServiceCanRestart) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}
},
nil,
)
}

func TestIssue3288(t *testing.T) {
runLockServiceTestsWithLevel(
t,
Expand Down
Loading