Skip to content

Commit

Permalink
Merge branch 'main' into issue17039
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jun 21, 2024
2 parents f3291f9 + 72b187e commit 44181d3
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 63 deletions.
5 changes: 5 additions & 0 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func (l *lockTableAllocator) setRestartService(serviceID string) {
zap.String("serviceID", serviceID))
return
}
getLogger().Info("set restart lock service",
zap.String("serviceID", serviceID))
b.setStatus(pb.Status_ServiceLockWaiting)
}

Expand Down Expand Up @@ -306,6 +308,9 @@ func (l *lockTableAllocator) disableGroupTables(groupTables []pb.LockTable, b *s
delete(b.groupTables[t.Group], t.Table)
}
}
if len(groupTables) > 0 {
logBindsMove(groupTables)
}
}

func (l *lockTableAllocator) getServiceBinds(serviceID string) *serviceBinds {
Expand Down
7 changes: 6 additions & 1 deletion pkg/lockservice/lock_table_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
req.KeepLockTableBind.ServiceID = k.serviceID
req.KeepLockTableBind.Status = k.service.getStatus()
if !k.service.isStatus(pb.Status_ServiceLockEnable) {
req.KeepLockTableBind.LockTables = k.service.popGroupTables()
req.KeepLockTableBind.LockTables = k.service.topGroupTables()
req.KeepLockTableBind.TxnIDs = k.service.activeTxnHolder.getAllTxnID()
}

Expand All @@ -208,6 +208,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {

if resp.KeepLockTableBind.OK {
switch resp.KeepLockTableBind.Status {
case pb.Status_ServiceLockEnable:
case pb.Status_ServiceLockWaiting:
// maybe pb.Status_ServiceUnLockSucc
if k.service.isStatus(pb.Status_ServiceLockEnable) {
Expand All @@ -216,6 +217,10 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
default:
k.service.setStatus(resp.KeepLockTableBind.Status)
}
if len(req.KeepLockTableBind.LockTables) > 0 {
logBindsMove(k.service.popGroupTables())
logStatus(k.service.getStatus())
}
return
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/lockservice/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ func logPingFailed(
}
}

func logCanLockOnService() {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Error("if lock on service")
}
}

func logLocalBindsInvalid() {
logger := getWithSkipLogger()
logger.Error("all local lock table invalid")
Expand Down Expand Up @@ -475,6 +482,35 @@ func logWaitersAdded(
}
}

func logBindsMove(
binds []pb.LockTable) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Info("binds move",
bindsArrayField("binds", binds))
}
}

func logStatus(
status pb.Status) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Info("service status",
zap.String("status", status.String()))
}
}

func logStatusChange(
from pb.Status,
to pb.Status) {
logger := getWithSkipLogger()
if logger.Enabled(zap.InfoLevel) {
logger.Info("service status change",
zap.String("from", from.String()),
zap.String("to", to.String()))
}
}

func logWaiterGetNotify(
w *waiter,
v notifyValue) {
Expand Down Expand Up @@ -590,6 +626,19 @@ func waitTxnArrayField(name string, values []pb.WaitTxn) zap.Field {
return zap.String(name, buffer.String())
}

func bindsArrayField(name string, values []pb.LockTable) zap.Field {
var buffer bytes.Buffer
buffer.WriteString("[")
for idx, w := range values {
buffer.WriteString(w.DebugString())
if idx != len(values)-1 {
buffer.WriteString(",")
}
}
buffer.WriteString("]")
return zap.String(name, buffer.String())
}

func waiterArrayField(name string, values ...*waiter) zap.Field {
var buffer bytes.Buffer
buffer.WriteString("[")
Expand Down
19 changes: 16 additions & 3 deletions pkg/lockservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func (s *service) checkCanMoveGroupTables() {
}
s.mu.restartTime, _ = s.clock.Now()
s.mu.status = pb.Status_ServiceLockWaiting
logStatusChange(s.mu.status, pb.Status_ServiceLockWaiting)
}

func (s *service) incRef(group uint32, table uint64) {
Expand All @@ -317,18 +318,19 @@ func (s *service) canLockOnServiceStatus(
if s.isStatus(pb.Status_ServiceLockEnable) {
return true
}
defer logCanLockOnService()
if opts.Sharding == pb.Sharding_ByRow {
tableID = shardingByRow(rows[0])
}
if !s.validGroupTable(opts.Group, tableID) {
return false
}
if s.activeTxnHolder.empty() {
return false
}
if s.activeTxnHolder.hasActiveTxn(txnID) {
return true
}
if s.activeTxnHolder.empty() {
return false
}
if opts.SnapShotTs.LessEq(s.getRestartTime()) {
return true
}
Expand Down Expand Up @@ -385,6 +387,7 @@ func (s *service) setStatus(status pb.Status) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.status = status
logStatusChange(s.mu.status, status)
}

func (s *service) getStatus() pb.Status {
Expand All @@ -393,6 +396,16 @@ func (s *service) getStatus() pb.Status {
return s.mu.status
}

func (s *service) topGroupTables() []pb.LockTable {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.groupTables) == 0 {
return nil
}
g := s.mu.groupTables[0]
return g
}

func (s *service) popGroupTables() []pb.LockTable {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
63 changes: 63 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,69 @@ func TestIssue3288(t *testing.T) {
)
}

func TestIssue3538(t *testing.T) {
runLockServiceTestsWithLevel(
t,
zapcore.DebugLevel,
[]string{"s1", "s2"},
time.Second*1,
func(alloc *lockTableAllocator, s []*service) {
l1 := s[0]
l2 := s[1]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err := l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

alloc.setRestartService("s1")
for {
if l1.isStatus(pb.Status_ServiceLockWaiting) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}

require.NoError(t, l1.Unlock(ctx, []byte("txn1"), timestamp.Timestamp{}))

for {
_, err = l2.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn2"),
option)
if err == nil {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}
},
nil,
)
}

func TestIssue16121(t *testing.T) {
runLockServiceTestsWithLevel(
t,
Expand Down
2 changes: 1 addition & 1 deletion pkg/taskservice/daemon_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func TestRunDaemonTask(t *testing.T) {

func (r *taskRunner) testRegisterExecutor(t *testing.T, code task.TaskCode, started *atomic.Bool) {
r.RegisterExecutor(code, func(ctx context.Context, task task.Task) error {
started.Store(true)
ar := newMockActiveRoutine()
assert.NoError(t, r.Attach(context.Background(), 1, ar))
started.Store(true)
for {
select {
case <-ar.cancelC:
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/status/logtail_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package status

import (
"fmt"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"time"

"github.com/matrixorigin/matrixone/pkg/pb/timestamp"

"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
)

Expand All @@ -28,7 +29,7 @@ type SubTableID struct {
}

type SubTableStatus struct {
IsDeleting bool `json:"is_deleting"`
SubState int32 `json:"sub_state"`
LatestTime time.Time `json:"latest_time"`
}

Expand All @@ -46,7 +47,7 @@ func (s *LogtailStatus) fill(c *disttae.PushClient) {
for id, status := range st.SubTables {
tid := fmt.Sprintf("%d-%d", id.DatabaseID, id.TableID)
s.SubscribedTables[tid] = SubTableStatus{
IsDeleting: status.IsDeleting,
SubState: int32(status.SubState),
LatestTime: status.LatestTime,
}
}
Expand Down
Loading

0 comments on commit 44181d3

Please sign in to comment.