From 9504578609673603baa86ec2ebd57de60bc98e33 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Tue, 18 Jun 2024 12:32:44 +0800 Subject: [PATCH] [enhancement] logtail: make table subscription async (#16879) A table subscription takes long time if the table has much data which need flushed, and it will block the logtail update of other tables because they work in the same goroutine. So we put the table subscription into a independent goroutine, thus the logtails of other tables could be pushed to cn servers without being blocked. And when the first phase of pulling table data finishes, put it into the same goroutine with pushing-job, and pull the data again for the second phase. At last, send the whole data that are merged together to cn servers. After the PR, the table subscription will not block pushing-jobs. There are 50 parallel pulling jobs at the same time at most by defalt to avoid OOM and other risks. Approved by: @zhangxu19830126, @triump2020, @aptend, @XuPeng-SH --- pkg/tests/service/dnservice.go | 1 + pkg/tests/service/options.go | 11 + pkg/tnservice/cfg.go | 8 + pkg/tnservice/factory.go | 1 + pkg/tnservice/service_ports_test.go | 2 + .../v2/dashboard/grafana_dashboard_logtail.go | 37 +++- pkg/util/metric/v2/logtail.go | 34 ++- pkg/util/metric/v2/metrics.go | 4 +- pkg/vm/engine/disttae/logtail_consumer.go | 3 + pkg/vm/engine/tae/logtail/service/merger.go | 96 +++++++++ .../engine/tae/logtail/service/merger_test.go | 68 ++++++ pkg/vm/engine/tae/logtail/service/response.go | 10 + pkg/vm/engine/tae/logtail/service/server.go | 201 ++++++++++++------ pkg/vm/engine/tae/logtail/service/session.go | 2 +- pkg/vm/engine/tae/logtail/txn_handle.go | 2 +- pkg/vm/engine/tae/options/cfg.go | 10 + pkg/vm/engine/tae/options/cfg_test.go | 1 + 17 files changed, 417 insertions(+), 74 deletions(-) create mode 100644 pkg/vm/engine/tae/logtail/service/merger.go create mode 100644 pkg/vm/engine/tae/logtail/service/merger_test.go diff --git a/pkg/tests/service/dnservice.go b/pkg/tests/service/dnservice.go index 18ec2bbddda9c..7b8ea86d9b3e4 100644 --- a/pkg/tests/service/dnservice.go +++ b/pkg/tests/service/dnservice.go @@ -186,6 +186,7 @@ func buildTNConfig( cfg.LogtailServer.LogtailCollectInterval.Duration = opt.logtailPushServer.logtailCollectInterval cfg.LogtailServer.LogtailRPCStreamPoisonTime.Duration = opt.logtailPushServer.logtailRPCStreamPoisonTIme cfg.LogtailServer.LogtailResponseSendTimeout.Duration = opt.logtailPushServer.logtailResponseSendTimeout + cfg.LogtailServer.PullWorkerPoolSize = toml.ByteSize(opt.logtailPushServer.pullWorkerPoolSize) // We need the filled version of configuration. // It's necessary when building tnservice.Option. diff --git a/pkg/tests/service/options.go b/pkg/tests/service/options.go index 56cf550375693..d144714d8982c 100644 --- a/pkg/tests/service/options.go +++ b/pkg/tests/service/options.go @@ -68,6 +68,7 @@ const ( defaultRPCStreamPoisonTime = 5 * time.Second defaultLogtailCollectInterval = 50 * time.Millisecond defaultLogtailResponseSendTimeout = 10 * time.Second + defaultPullWorkerPoolSize = 50 ) // Options are params for creating test cluster. @@ -111,6 +112,7 @@ type Options struct { logtailRPCStreamPoisonTIme time.Duration logtailCollectInterval time.Duration logtailResponseSendTimeout time.Duration + pullWorkerPoolSize int64 } cn struct { @@ -204,6 +206,9 @@ func (opt *Options) validate() { if opt.logtailPushServer.logtailResponseSendTimeout <= 0 { opt.logtailPushServer.logtailResponseSendTimeout = defaultLogtailResponseSendTimeout } + if opt.logtailPushServer.pullWorkerPoolSize <= 0 { + opt.logtailPushServer.pullWorkerPoolSize = defaultPullWorkerPoolSize + } } // BuildHAKeeperConfig returns hakeeper.Config @@ -387,6 +392,12 @@ func (opt Options) WithLogtailResponseSendTimeout(timeout time.Duration) Options return opt } +// WithLogtailResponseSendTimeout sets response send timeout for logtail push server. +func (opt Options) WithPullWorkerPoolSize(s int64) Options { + opt.logtailPushServer.pullWorkerPoolSize = s + return opt +} + // WithCNOptionFunc set build cn options func func (opt Options) WithCNOptionFunc(fn func(i int) []cnservice.Option) Options { opt.cn.optionFunc = fn diff --git a/pkg/tnservice/cfg.go b/pkg/tnservice/cfg.go index 004c817efcef3..67a09b7c286a9 100644 --- a/pkg/tnservice/cfg.go +++ b/pkg/tnservice/cfg.go @@ -59,6 +59,7 @@ var ( defaultRPCStreamPoisonTime = 5 * time.Second defaultLogtailCollectInterval = 2 * time.Millisecond defaultLogtailResponseSendTimeout = 10 * time.Second + defaultPullWorkerPoolSize = 50 storageDir = "storage" defaultDataDir = "./mo-data" @@ -145,6 +146,7 @@ type Config struct { LogtailRPCStreamPoisonTime toml.Duration `toml:"logtail-rpc-stream-poison-time"` LogtailCollectInterval toml.Duration `toml:"logtail-collect-interval"` LogtailResponseSendTimeout toml.Duration `toml:"logtail-response-send-timeout"` + PullWorkerPoolSize toml.ByteSize `toml:"pull-worker-pool-size"` } // Txn transactions configuration @@ -275,6 +277,9 @@ func (c *Config) Validate() error { if c.LogtailServer.LogtailResponseSendTimeout.Duration <= 0 { c.LogtailServer.LogtailResponseSendTimeout.Duration = defaultLogtailResponseSendTimeout } + if c.LogtailServer.PullWorkerPoolSize <= 0 { + c.LogtailServer.PullWorkerPoolSize = toml.ByteSize(defaultPullWorkerPoolSize) + } if c.Cluster.RefreshInterval.Duration == 0 { c.Cluster.RefreshInterval.Duration = time.Second * 10 } @@ -384,6 +389,9 @@ func (c *Config) SetDefaultValue() { if c.LogtailServer.LogtailResponseSendTimeout.Duration <= 0 { c.LogtailServer.LogtailResponseSendTimeout.Duration = defaultLogtailResponseSendTimeout } + if c.LogtailServer.PullWorkerPoolSize <= 0 { + c.LogtailServer.PullWorkerPoolSize = toml.ByteSize(defaultPullWorkerPoolSize) + } if c.Cluster.RefreshInterval.Duration == 0 { c.Cluster.RefreshInterval.Duration = time.Second * 10 } diff --git a/pkg/tnservice/factory.go b/pkg/tnservice/factory.go index aee526a6e5ad5..05caa0b0f5931 100644 --- a/pkg/tnservice/factory.go +++ b/pkg/tnservice/factory.go @@ -167,6 +167,7 @@ func (s *store) newTAEStorage(ctx context.Context, shard metadata.TNShard, facto RPCStreamPoisonTime: s.cfg.LogtailServer.LogtailRPCStreamPoisonTime.Duration, LogtailCollectInterval: s.cfg.LogtailServer.LogtailCollectInterval.Duration, ResponseSendTimeout: s.cfg.LogtailServer.LogtailResponseSendTimeout.Duration, + PullWorkerPoolSize: int64(s.cfg.LogtailServer.PullWorkerPoolSize), } // the previous values diff --git a/pkg/tnservice/service_ports_test.go b/pkg/tnservice/service_ports_test.go index 73cdeb6f20665..78e5a358d8d22 100644 --- a/pkg/tnservice/service_ports_test.go +++ b/pkg/tnservice/service_ports_test.go @@ -49,6 +49,7 @@ func TestService_RegisterServices(t *testing.T) { LogtailRPCStreamPoisonTime toml.Duration `toml:"logtail-rpc-stream-poison-time"` LogtailCollectInterval toml.Duration `toml:"logtail-collect-interval"` LogtailResponseSendTimeout toml.Duration `toml:"logtail-response-send-timeout"` + PullWorkerPoolSize toml.ByteSize `toml:"pull-worker-pool-size"` }(struct { ListenAddress string ServiceAddress string @@ -57,6 +58,7 @@ func TestService_RegisterServices(t *testing.T) { LogtailRPCStreamPoisonTime toml.Duration LogtailCollectInterval toml.Duration LogtailResponseSendTimeout toml.Duration + PullWorkerPoolSize toml.ByteSize }{ ListenAddress: fmt.Sprintf("%s:%d", listenHost, port1+1), ServiceAddress: fmt.Sprintf("%s:%d", serviceHost, port1+1), diff --git a/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go b/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go index 4f5bbf5902be6..b85a97c47eec8 100644 --- a/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go +++ b/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go @@ -35,6 +35,7 @@ func (c *DashboardCreator) initLogTailDashboard() error { c.initLogtailBytesRow(), c.initLogtailLoadCheckpointRow(), c.initLogtailCollectRow(), + c.initLogtailTransmitRow(), c.initLogtailSubscriptionRow(), c.initLogtailUpdatePartitionRow(), )...) @@ -49,10 +50,24 @@ func (c *DashboardCreator) initLogtailCollectRow() dashboard.Option { return dashboard.Row( "Logtail collect duration", c.getHistogram( - "collect duration", - c.getMetricWithFilter("mo_logtail_collect_duration_seconds_bucket", ``), + "pull type phase1 collection duration", + c.getMetricWithFilter("mo_logtail_pull_collection_phase1_duration_seconds_bucket", ``), []float64{0.50, 0.8, 0.90, 0.99}, - 12, + 4, + axis.Unit("s"), + axis.Min(0)), + c.getHistogram( + "pull type phase2 collection duration", + c.getMetricWithFilter("mo_logtail_pull_collection_phase2_duration_seconds_bucket", ``), + []float64{0.50, 0.8, 0.90, 0.99}, + 4, + axis.Unit("s"), + axis.Min(0)), + c.getHistogram( + "push type collection duration", + c.getMetricWithFilter("mo_logtail_push_collection_duration_seconds_bucket", ``), + []float64{0.50, 0.8, 0.90, 0.99}, + 4, axis.Unit("s"), axis.Min(0)), ) @@ -213,3 +228,19 @@ func (c *DashboardCreator) initLogtailLoadCheckpointRow() dashboard.Option { axis.Min(0)), ) } + +func (c *DashboardCreator) initLogtailTransmitRow() dashboard.Option { + return dashboard.Row( + "Logtail Transmit counter", + c.withGraph( + "Server Send", + 6, + `sum(rate(`+c.getMetricWithFilter("mo_logtail_transmit_total", `type="server-send"`)+`[$interval]))`, + ""), + c.withGraph( + "Client Receive", + 6, + `sum(rate(`+c.getMetricWithFilter("mo_logtail_transmit_total", `type="client-receive"`)+`[$interval]))`, + ""), + ) +} diff --git a/pkg/util/metric/v2/logtail.go b/pkg/util/metric/v2/logtail.go index a516327e4e949..9d95eb23de323 100644 --- a/pkg/util/metric/v2/logtail.go +++ b/pkg/util/metric/v2/logtail.go @@ -128,14 +128,42 @@ var ( Buckets: getDurationBuckets(), }) - LogTailCollectDurationHistogram = prometheus.NewHistogram( + LogTailPullCollectionPhase1DurationHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "mo", Subsystem: "logtail", - Name: "collect_duration_seconds", - Help: "Bucketed histogram of logtail collecting duration.", + Name: "pull_collection_phase1_duration_seconds", + Help: "Bucketed histogram of logtail pull type collection duration of phase1.", Buckets: getDurationBuckets(), }) + + LogTailPullCollectionPhase2DurationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "mo", + Subsystem: "logtail", + Name: "pull_collection_phase2_duration_seconds", + Help: "Bucketed histogram of logtail pull type collection duration of phase2.", + Buckets: getDurationBuckets(), + }) + + LogTailPushCollectionDurationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "mo", + Subsystem: "logtail", + Name: "push_collection_duration_seconds", + Help: "Bucketed histogram of logtail push type collection duration.", + Buckets: getDurationBuckets(), + }) + + logTailTransmitCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "mo", + Subsystem: "logtail", + Name: "transmit_total", + Help: "Total number of transmit count.", + }, []string{"type"}) + LogTailServerSendCounter = logTailTransmitCounter.WithLabelValues("server-send") + LogTailClientReceiveCounter = logTailTransmitCounter.WithLabelValues("client-receive") ) var ( diff --git a/pkg/util/metric/v2/metrics.go b/pkg/util/metric/v2/metrics.go index 43c5ad3125c93..22316e65799ac 100644 --- a/pkg/util/metric/v2/metrics.go +++ b/pkg/util/metric/v2/metrics.go @@ -106,7 +106,9 @@ func initLogtailMetrics() { registry.MustRegister(logTailSendDurationHistogram) registry.MustRegister(LogTailLoadCheckpointDurationHistogram) - registry.MustRegister(LogTailCollectDurationHistogram) + registry.MustRegister(LogTailPushCollectionDurationHistogram) + registry.MustRegister(LogTailPullCollectionPhase1DurationHistogram) + registry.MustRegister(LogTailPullCollectionPhase2DurationHistogram) registry.MustRegister(LogTailSubscriptionCounter) registry.MustRegister(txnTNSideDurationHistogram) } diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index 2ac74ff9b966b..c16195c35aa7c 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -369,6 +369,9 @@ func (c *PushClient) receiveOneLogtail(ctx context.Context, e *Engine) error { ctx, cancel := context.WithTimeout(ctx, maxTimeToWaitServerResponse) defer cancel() + // Client receives one logtail counter. + defer v2.LogTailClientReceiveCounter.Add(1) + resp := c.subscriber.receiveResponse(ctx) if resp.err != nil { // POSSIBLE ERROR: context deadline exceeded, rpc closed, decode error. diff --git a/pkg/vm/engine/tae/logtail/service/merger.go b/pkg/vm/engine/tae/logtail/service/merger.go new file mode 100644 index 0000000000000..2e038ce2f5537 --- /dev/null +++ b/pkg/vm/engine/tae/logtail/service/merger.go @@ -0,0 +1,96 @@ +// Copyright 2021 - 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "fmt" + + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/pb/logtail" + "github.com/matrixorigin/matrixone/pkg/pb/timestamp" +) + +const ( + ckpLocationDivider = ';' +) + +// logtailMerger is the merger tool to merge multiple +// LogtailPhase instances. +type logtailMerger struct { + logtails []*LogtailPhase +} + +func newLogtailMerger(l ...*LogtailPhase) *logtailMerger { + var lm logtailMerger + lm.logtails = append(lm.logtails, l...) + return &lm +} + +// Merge merges all instances and return one logtail.TableLogtail +// and a callback function. +func (m *logtailMerger) Merge() (logtail.TableLogtail, func()) { + var tableID api.TableID + var entryCount int + var ts timestamp.Timestamp + for _, t := range m.logtails { + entryCount += len(t.tail.Commands) + + // check the table ID. + if tableID.TbId == 0 { + tableID = *t.tail.Table + } else if tableID.TbId != t.tail.Table.TbId { + panic(fmt.Sprintf("cannot merge logtails with different table: %d, %d", + tableID.TbId, t.tail.Table.TbId)) + } + + // get the max timestamp. + if ts.Less(*t.tail.Ts) { + ts = *t.tail.Ts + } + } + + // create the callbacks. + callbacks := make([]func(), 0, entryCount) + // create a new table logtail with the entry number. + tail := logtail.TableLogtail{ + Ts: &ts, + Table: &tableID, + Commands: make([]api.Entry, 0, entryCount), + } + for _, t := range m.logtails { + ckpLocLen := len(tail.CkpLocation) + if ckpLocLen > 0 && + tail.CkpLocation[ckpLocLen-1] != ckpLocationDivider { + tail.CkpLocation += string(ckpLocationDivider) + } + tail.CkpLocation += t.tail.CkpLocation + tail.Commands = append(tail.Commands, t.tail.Commands...) + callbacks = append(callbacks, t.closeCB) + } + + // remove the last ';' + ckpLocLen := len(tail.CkpLocation) + if ckpLocLen > 0 && + tail.CkpLocation[ckpLocLen-1] == ckpLocationDivider { + tail.CkpLocation = tail.CkpLocation[:ckpLocLen-1] + } + return tail, func() { + for _, cb := range callbacks { + if cb != nil { + cb() + } + } + } +} diff --git a/pkg/vm/engine/tae/logtail/service/merger_test.go b/pkg/vm/engine/tae/logtail/service/merger_test.go new file mode 100644 index 0000000000000..fbce92668165f --- /dev/null +++ b/pkg/vm/engine/tae/logtail/service/merger_test.go @@ -0,0 +1,68 @@ +// Copyright 2021 - 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "testing" + + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/pb/logtail" + "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + "github.com/stretchr/testify/assert" +) + +func TestLogtailMerge(t *testing.T) { + var cbValue int + tail1 := &LogtailPhase{ + tail: logtail.TableLogtail{ + CkpLocation: "aaa;bbb;", + Ts: ×tamp.Timestamp{ + PhysicalTime: 100, + }, + Table: &api.TableID{ + DbId: 1, + TbId: 2, + }, + Commands: []api.Entry{{TableId: 2}}, + }, + closeCB: func() { + cbValue++ + }, + } + tail2 := &LogtailPhase{ + tail: logtail.TableLogtail{ + CkpLocation: "ccc;ddd", + Ts: ×tamp.Timestamp{ + PhysicalTime: 200, + }, + Table: &api.TableID{ + DbId: 1, + TbId: 2, + }, + Commands: []api.Entry{{TableId: 2}}, + }, + closeCB: func() { + cbValue++ + }, + } + lm := newLogtailMerger(tail1, tail2) + assert.NotNil(t, lm) + tail, cb := lm.Merge() + cb() + assert.Equal(t, 2, cbValue) + assert.Equal(t, 2, len(tail.Commands)) + assert.Equal(t, int64(200), tail.Ts.PhysicalTime) + assert.Equal(t, "aaa;bbb;ccc;ddd", tail.CkpLocation) +} diff --git a/pkg/vm/engine/tae/logtail/service/response.go b/pkg/vm/engine/tae/logtail/service/response.go index 9e768ea5ce395..9c295c42dddbd 100644 --- a/pkg/vm/engine/tae/logtail/service/response.go +++ b/pkg/vm/engine/tae/logtail/service/response.go @@ -23,6 +23,16 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/logtail" ) +// LogtailPhase is the logtail information of one phase of +// subscription request. Phase 1 is executed asynchronously +// to collect most of logtails of the subscription request. +// And phase 2 is executed sync. +type LogtailPhase struct { + tail logtail.TableLogtail + closeCB func() + sub subscription +} + // LogtailResponse wraps logtail.LogtailResponse. type LogtailResponse struct { logtail.LogtailResponse diff --git a/pkg/vm/engine/tae/logtail/service/server.go b/pkg/vm/engine/tae/logtail/service/server.go index b2aaf7995a7b7..d32059586a12c 100644 --- a/pkg/vm/engine/tae/logtail/service/server.go +++ b/pkg/vm/engine/tae/logtail/service/server.go @@ -38,12 +38,6 @@ import ( const ( LogtailServiceRPCName = "logtail-server" - - // updateEventMaxInterval is the max interval between update events. - // If 3s has passed since last update event, we should try to send an - // update event, rather than a suscription, to avoid there are no - // update events and cause logtail consumer waits for too long. - updateEventMaxInterval = time.Second * 3 ) type ServerOption func(*LogtailServer) @@ -115,10 +109,21 @@ type LogtailServer struct { waterline *Waterliner errChan chan sessionError // errChan has no buffer in order to improve sensitivity. - subChan chan subscription - event *Notifier - logtail taelogtail.Logtailer + // subReqChan is the channel that contains the subscription request. + subReqChan chan subscription + + // subTailChan is the channel that contains the pull logtail of first phase. + // There is another goroutine, that receives these kinds of logtails and begin + // the second phase of collecting logtails. + subTailChan chan *LogtailPhase + + // pullWorkerPool is used to control the parallel of the pull workers. + pullWorkerPool chan struct{} + + event *Notifier + + logtailer taelogtail.Logtailer rpc morpc.RPCServer @@ -129,17 +134,19 @@ type LogtailServer struct { // NewLogtailServer initializes a server for logtail push model. func NewLogtailServer( - address string, cfg *options.LogtailServerCfg, logtail taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption, + address string, cfg *options.LogtailServerCfg, logtailer taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption, ) (*LogtailServer, error) { s := &LogtailServer{ - rt: rt, - logger: rt.Logger(), - cfg: cfg, - ssmgr: NewSessionManager(), - waterline: NewWaterliner(), - errChan: make(chan sessionError, 1), - subChan: make(chan subscription, 100), - logtail: logtail, + rt: rt, + logger: rt.Logger(), + cfg: cfg, + ssmgr: NewSessionManager(), + waterline: NewWaterliner(), + errChan: make(chan sessionError, 1), + subReqChan: make(chan subscription, 100), + subTailChan: make(chan *LogtailPhase, 300), + pullWorkerPool: make(chan struct{}, cfg.PullWorkerPoolSize), + logtailer: logtailer, } for _, opt := range opts { @@ -198,7 +205,7 @@ func NewLogtailServer( // receive logtail on event s.event = NewNotifier(s.rootCtx, eventBufferSize) - logtail.RegisterCallback(s.event.NotifyLogtail) + logtailer.RegisterCallback(s.event.NotifyLogtail) return s, nil } @@ -287,10 +294,10 @@ func (s *LogtailServer) onSubscription( return sendCtx.Err() case <-time.After(time.Second): logger.Error("cannot send subscription request, retry", - zap.Int("chan cap", cap(s.subChan)), - zap.Int("chan len", len(s.subChan)), + zap.Int("chan cap", cap(s.subReqChan)), + zap.Int("chan len", len(s.subReqChan)), ) - case s.subChan <- sub: + case s.subReqChan <- sub: return nil } } @@ -353,66 +360,123 @@ func (s *LogtailServer) sessionErrorHandler(ctx context.Context) { } } +// logtailPullWorker is an independent goroutine, which pull the table asynchronously. +// It generates a response which would be sent to logtail client as the part 1 of the +// whole response. +func (s *LogtailServer) logtailPullWorker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + s.logger.Error("stop logtail pull worker", zap.Error(ctx.Err())) + return + + case sub, ok := <-s.subReqChan: + if !ok { + s.logger.Info("subscription channel closed") + return + } + // Start a new goroutine to handle the subscription request. + // There are xxx goroutines working at the same time at most. + go s.pullLogtailsPhase1(ctx, sub) + } + } +} + +func (s *LogtailServer) pullLogtailsPhase1(ctx context.Context, sub subscription) { + // Limit the pull workers. + s.pullWorkerPool <- struct{}{} + defer func() { <-s.pullWorkerPool }() + + v2.LogTailSubscriptionCounter.Inc() + s.logger.Info("handle subscription asynchronously", zap.Any("table", sub.req.Table)) + + start := time.Now() + // Get logtail of phase1. The 'from' time is zero and the 'to' time is the + // newest time of waterline. The Ts field in the first return value is the + // 'from' time parameter of next phase call. + tail, err := s.getSubLogtailPhase( + ctx, + sub, + timestamp.Timestamp{}, + s.waterline.Waterline(), + ) + if err != nil { + s.logger.Error("failed to get logtail of phase1 of subscription", + zap.String("table", string(sub.tableID)), + ) + sub.session.Unregister(sub.tableID) + return + } + v2.LogTailPullCollectionPhase1DurationHistogram.Observe(time.Since(start).Seconds()) + + for { + select { + case <-ctx.Done(): + s.logger.Error("context done in pull table logtails", zap.Error(ctx.Err())) + return + + case s.subTailChan <- tail: + return + + default: + s.logger.Warn("the queue of logtails of phase1 is full") + time.Sleep(time.Second) + } + } +} + // logtailSender sends total or incremental logtail. func (s *LogtailServer) logtailSender(ctx context.Context) { e, ok := <-s.event.C if !ok { - s.logger.Info("publishemtn channel closed") + s.logger.Info("publishment channel closed") return } s.waterline.Advance(e.to) s.logger.Info("init waterline", zap.String("to", e.to.String())) - // lastUpdate is used to record the time of update event. - lastUpdate := time.Now() - for { select { case <-ctx.Done(): s.logger.Error("stop subscription handler", zap.Error(ctx.Err())) return - case sub, ok := <-s.subChan: + case tailPhase1, ok := <-s.subTailChan: if !ok { s.logger.Info("subscription channel closed") return } - v2.LogTailSubscriptionCounter.Inc() - interval := time.Since(lastUpdate) - if interval > updateEventMaxInterval { - s.logger.Info("long time passed since last update event", zap.Duration("interval", interval)) - - select { - case e, ok := <-s.event.C: - if !ok { - s.logger.Info("publishment channel closed") - return - } - s.logger.Info("send an update event first as long time passed since last one.") - s.publishEvent(ctx, e) - lastUpdate = time.Now() - - default: - s.logger.Info("there is no update event, although we want to send it first") - } + start := time.Now() + // Phase 2 of pulling logtails. The 'from' timestamp comes from + // the value of phase 1. + tailPhase2, err := s.getSubLogtailPhase( + ctx, + tailPhase1.sub, + *tailPhase1.tail.Ts, + s.waterline.Waterline(), + ) + if err != nil { + tailPhase1.sub.session.Unregister(tailPhase1.sub.tableID) + s.logger.Error("fail to send subscription response", zap.Error(err)) + } else { + v2.LogTailPullCollectionPhase2DurationHistogram.Observe(time.Since(start).Seconds()) + s.sendSubscription(ctx, tailPhase1, tailPhase2) } - s.logger.Info("handle subscription asynchronously", zap.Any("table", sub.req.Table)) - s.sendSubscription(ctx, sub) - case e, ok := <-s.event.C: if !ok { s.logger.Info("publishment channel closed") return } s.publishEvent(ctx, e) - lastUpdate = time.Now() } } } -func (s *LogtailServer) sendSubscription(ctx context.Context, sub subscription) { +func (s *LogtailServer) getSubLogtailPhase( + ctx context.Context, sub subscription, from, to timestamp.Timestamp, +) (*LogtailPhase, error) { sendCtx, cancel := context.WithTimeout(ctx, sub.timeout) defer cancel() @@ -424,17 +488,14 @@ func (s *LogtailServer) sendSubscription(ctx context.Context, sub subscription) }() table := *sub.req.Table - from := timestamp.Timestamp{} - to := s.waterline.Waterline() - // fetch total logtail for table var tail logtail.TableLogtail var closeCB func() moprobe.WithRegion(ctx, moprobe.SubscriptionPullLogTail, func() { - tail, closeCB, subErr = s.logtail.TableLogtail(sendCtx, table, from, to) + tail, closeCB, subErr = s.logtailer.TableLogtail(sendCtx, table, from, to) }) - if subErr != nil { + // if error occurs, just send the error immediately. if closeCB != nil { closeCB() } @@ -444,22 +505,27 @@ func (s *LogtailServer) sendSubscription(ctx context.Context, sub subscription) ); err != nil { s.logger.Error("fail to send error response", zap.Error(err)) } - return + return nil, subErr } - cb := func() { - if closeCB != nil { - closeCB() - } - } + return &LogtailPhase{ + tail: tail, + closeCB: closeCB, + sub: sub, + }, nil +} +func (s *LogtailServer) sendSubscription(ctx context.Context, p1, p2 *LogtailPhase) { + sub := p1.sub + sendCtx, cancel := context.WithTimeout(ctx, sub.timeout) + defer cancel() + tail, cb := newLogtailMerger(p1, p2).Merge() // send subscription response - subErr = sub.session.SendSubscriptionResponse(sendCtx, tail, cb) - if subErr != nil { - s.logger.Error("fail to send subscription response", zap.Error(subErr)) + if err := sub.session.SendSubscriptionResponse(sendCtx, tail, cb); err != nil { + s.logger.Error("fail to send subscription response", zap.Error(err)) + sub.session.Unregister(sub.tableID) return } - // mark table as subscribed sub.session.AdvanceState(sub.tableID) } @@ -557,6 +623,11 @@ func (s *LogtailServer) Start() error { return err } + if err := s.stopper.RunNamedTask("logtail pull worker", s.logtailPullWorker); err != nil { + s.logger.Error("fail to start logtail pull worker", zap.Error(err)) + return err + } + if err := s.stopper.RunNamedTask("logtail sender", s.logtailSender); err != nil { s.logger.Error("fail to start logtail sender", zap.Error(err)) return err diff --git a/pkg/vm/engine/tae/logtail/service/session.go b/pkg/vm/engine/tae/logtail/service/session.go index 767f8d1d626dd..3477e07c00df3 100644 --- a/pkg/vm/engine/tae/logtail/service/session.go +++ b/pkg/vm/engine/tae/logtail/service/session.go @@ -215,7 +215,7 @@ func (s *morpcStream) write( return err } } - + v2.LogTailServerSendCounter.Add(1) return nil } diff --git a/pkg/vm/engine/tae/logtail/txn_handle.go b/pkg/vm/engine/tae/logtail/txn_handle.go index 42618799ff984..6b6a35eaab08f 100644 --- a/pkg/vm/engine/tae/logtail/txn_handle.go +++ b/pkg/vm/engine/tae/logtail/txn_handle.go @@ -90,7 +90,7 @@ func (b *TxnLogtailRespBuilder) Close() { func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func()) { now := time.Now() defer func() { - v2.LogTailCollectDurationHistogram.Observe(time.Since(now).Seconds()) + v2.LogTailPushCollectionDurationHistogram.Observe(time.Since(now).Seconds()) }() b.txn = txn diff --git a/pkg/vm/engine/tae/options/cfg.go b/pkg/vm/engine/tae/options/cfg.go index 04c0961102eb5..669dfca32e056 100644 --- a/pkg/vm/engine/tae/options/cfg.go +++ b/pkg/vm/engine/tae/options/cfg.go @@ -26,6 +26,8 @@ const ( defaultLogtailCollectInterval = 50 * time.Millisecond defaultResponseSendTimeout = 30 * time.Second defaultRpcStreamPoisonTime = 5 * time.Second + // The default value of PullWorkerPoolSize. + defaultPullWorkerPoolSize = 50 ) type StorageCfg struct { @@ -87,6 +89,10 @@ type LogtailServerCfg struct { RPCStreamPoisonTime time.Duration LogtailCollectInterval time.Duration ResponseSendTimeout time.Duration + // PullWorkerPoolSize is the size of the pull worker pool. + // Means there are x pull workers working at most. + // Default value is defaultPullWorkerPoolSize=50. + PullWorkerPoolSize int64 } func NewDefaultLogtailServerCfg() *LogtailServerCfg { @@ -96,6 +102,7 @@ func NewDefaultLogtailServerCfg() *LogtailServerCfg { RPCStreamPoisonTime: defaultRpcStreamPoisonTime, LogtailCollectInterval: defaultLogtailCollectInterval, ResponseSendTimeout: defaultResponseSendTimeout, + PullWorkerPoolSize: defaultPullWorkerPoolSize, } } @@ -112,4 +119,7 @@ func (l *LogtailServerCfg) Validate() { if l.ResponseSendTimeout <= 0 { l.ResponseSendTimeout = defaultResponseSendTimeout } + if l.PullWorkerPoolSize <= 0 { + l.PullWorkerPoolSize = defaultPullWorkerPoolSize + } } diff --git a/pkg/vm/engine/tae/options/cfg_test.go b/pkg/vm/engine/tae/options/cfg_test.go index 8b1cd20fe4fc7..02517e417b111 100644 --- a/pkg/vm/engine/tae/options/cfg_test.go +++ b/pkg/vm/engine/tae/options/cfg_test.go @@ -28,4 +28,5 @@ func TestLogtailServerCfg(t *testing.T) { require.Equal(t, defaults.LogtailCollectInterval, validated.LogtailCollectInterval) require.Equal(t, defaults.ResponseSendTimeout, validated.ResponseSendTimeout) require.Equal(t, defaults.RPCStreamPoisonTime, validated.RPCStreamPoisonTime) + require.Equal(t, defaults.PullWorkerPoolSize, validated.PullWorkerPoolSize) }