diff --git a/pkg/tests/service/dnservice.go b/pkg/tests/service/dnservice.go index 18ec2bbddda9c..7b8ea86d9b3e4 100644 --- a/pkg/tests/service/dnservice.go +++ b/pkg/tests/service/dnservice.go @@ -186,6 +186,7 @@ func buildTNConfig( cfg.LogtailServer.LogtailCollectInterval.Duration = opt.logtailPushServer.logtailCollectInterval cfg.LogtailServer.LogtailRPCStreamPoisonTime.Duration = opt.logtailPushServer.logtailRPCStreamPoisonTIme cfg.LogtailServer.LogtailResponseSendTimeout.Duration = opt.logtailPushServer.logtailResponseSendTimeout + cfg.LogtailServer.PullWorkerPoolSize = toml.ByteSize(opt.logtailPushServer.pullWorkerPoolSize) // We need the filled version of configuration. // It's necessary when building tnservice.Option. diff --git a/pkg/tests/service/options.go b/pkg/tests/service/options.go index 56cf550375693..d144714d8982c 100644 --- a/pkg/tests/service/options.go +++ b/pkg/tests/service/options.go @@ -68,6 +68,7 @@ const ( defaultRPCStreamPoisonTime = 5 * time.Second defaultLogtailCollectInterval = 50 * time.Millisecond defaultLogtailResponseSendTimeout = 10 * time.Second + defaultPullWorkerPoolSize = 50 ) // Options are params for creating test cluster. @@ -111,6 +112,7 @@ type Options struct { logtailRPCStreamPoisonTIme time.Duration logtailCollectInterval time.Duration logtailResponseSendTimeout time.Duration + pullWorkerPoolSize int64 } cn struct { @@ -204,6 +206,9 @@ func (opt *Options) validate() { if opt.logtailPushServer.logtailResponseSendTimeout <= 0 { opt.logtailPushServer.logtailResponseSendTimeout = defaultLogtailResponseSendTimeout } + if opt.logtailPushServer.pullWorkerPoolSize <= 0 { + opt.logtailPushServer.pullWorkerPoolSize = defaultPullWorkerPoolSize + } } // BuildHAKeeperConfig returns hakeeper.Config @@ -387,6 +392,12 @@ func (opt Options) WithLogtailResponseSendTimeout(timeout time.Duration) Options return opt } +// WithLogtailResponseSendTimeout sets response send timeout for logtail push server. +func (opt Options) WithPullWorkerPoolSize(s int64) Options { + opt.logtailPushServer.pullWorkerPoolSize = s + return opt +} + // WithCNOptionFunc set build cn options func func (opt Options) WithCNOptionFunc(fn func(i int) []cnservice.Option) Options { opt.cn.optionFunc = fn diff --git a/pkg/tnservice/cfg.go b/pkg/tnservice/cfg.go index 004c817efcef3..67a09b7c286a9 100644 --- a/pkg/tnservice/cfg.go +++ b/pkg/tnservice/cfg.go @@ -59,6 +59,7 @@ var ( defaultRPCStreamPoisonTime = 5 * time.Second defaultLogtailCollectInterval = 2 * time.Millisecond defaultLogtailResponseSendTimeout = 10 * time.Second + defaultPullWorkerPoolSize = 50 storageDir = "storage" defaultDataDir = "./mo-data" @@ -145,6 +146,7 @@ type Config struct { LogtailRPCStreamPoisonTime toml.Duration `toml:"logtail-rpc-stream-poison-time"` LogtailCollectInterval toml.Duration `toml:"logtail-collect-interval"` LogtailResponseSendTimeout toml.Duration `toml:"logtail-response-send-timeout"` + PullWorkerPoolSize toml.ByteSize `toml:"pull-worker-pool-size"` } // Txn transactions configuration @@ -275,6 +277,9 @@ func (c *Config) Validate() error { if c.LogtailServer.LogtailResponseSendTimeout.Duration <= 0 { c.LogtailServer.LogtailResponseSendTimeout.Duration = defaultLogtailResponseSendTimeout } + if c.LogtailServer.PullWorkerPoolSize <= 0 { + c.LogtailServer.PullWorkerPoolSize = toml.ByteSize(defaultPullWorkerPoolSize) + } if c.Cluster.RefreshInterval.Duration == 0 { c.Cluster.RefreshInterval.Duration = time.Second * 10 } @@ -384,6 +389,9 @@ func (c *Config) SetDefaultValue() { if c.LogtailServer.LogtailResponseSendTimeout.Duration <= 0 { c.LogtailServer.LogtailResponseSendTimeout.Duration = defaultLogtailResponseSendTimeout } + if c.LogtailServer.PullWorkerPoolSize <= 0 { + c.LogtailServer.PullWorkerPoolSize = toml.ByteSize(defaultPullWorkerPoolSize) + } if c.Cluster.RefreshInterval.Duration == 0 { c.Cluster.RefreshInterval.Duration = time.Second * 10 } diff --git a/pkg/tnservice/factory.go b/pkg/tnservice/factory.go index aee526a6e5ad5..05caa0b0f5931 100644 --- a/pkg/tnservice/factory.go +++ b/pkg/tnservice/factory.go @@ -167,6 +167,7 @@ func (s *store) newTAEStorage(ctx context.Context, shard metadata.TNShard, facto RPCStreamPoisonTime: s.cfg.LogtailServer.LogtailRPCStreamPoisonTime.Duration, LogtailCollectInterval: s.cfg.LogtailServer.LogtailCollectInterval.Duration, ResponseSendTimeout: s.cfg.LogtailServer.LogtailResponseSendTimeout.Duration, + PullWorkerPoolSize: int64(s.cfg.LogtailServer.PullWorkerPoolSize), } // the previous values diff --git a/pkg/tnservice/service_ports_test.go b/pkg/tnservice/service_ports_test.go index 73cdeb6f20665..78e5a358d8d22 100644 --- a/pkg/tnservice/service_ports_test.go +++ b/pkg/tnservice/service_ports_test.go @@ -49,6 +49,7 @@ func TestService_RegisterServices(t *testing.T) { LogtailRPCStreamPoisonTime toml.Duration `toml:"logtail-rpc-stream-poison-time"` LogtailCollectInterval toml.Duration `toml:"logtail-collect-interval"` LogtailResponseSendTimeout toml.Duration `toml:"logtail-response-send-timeout"` + PullWorkerPoolSize toml.ByteSize `toml:"pull-worker-pool-size"` }(struct { ListenAddress string ServiceAddress string @@ -57,6 +58,7 @@ func TestService_RegisterServices(t *testing.T) { LogtailRPCStreamPoisonTime toml.Duration LogtailCollectInterval toml.Duration LogtailResponseSendTimeout toml.Duration + PullWorkerPoolSize toml.ByteSize }{ ListenAddress: fmt.Sprintf("%s:%d", listenHost, port1+1), ServiceAddress: fmt.Sprintf("%s:%d", serviceHost, port1+1), diff --git a/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go b/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go index 4f5bbf5902be6..b85a97c47eec8 100644 --- a/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go +++ b/pkg/util/metric/v2/dashboard/grafana_dashboard_logtail.go @@ -35,6 +35,7 @@ func (c *DashboardCreator) initLogTailDashboard() error { c.initLogtailBytesRow(), c.initLogtailLoadCheckpointRow(), c.initLogtailCollectRow(), + c.initLogtailTransmitRow(), c.initLogtailSubscriptionRow(), c.initLogtailUpdatePartitionRow(), )...) @@ -49,10 +50,24 @@ func (c *DashboardCreator) initLogtailCollectRow() dashboard.Option { return dashboard.Row( "Logtail collect duration", c.getHistogram( - "collect duration", - c.getMetricWithFilter("mo_logtail_collect_duration_seconds_bucket", ``), + "pull type phase1 collection duration", + c.getMetricWithFilter("mo_logtail_pull_collection_phase1_duration_seconds_bucket", ``), []float64{0.50, 0.8, 0.90, 0.99}, - 12, + 4, + axis.Unit("s"), + axis.Min(0)), + c.getHistogram( + "pull type phase2 collection duration", + c.getMetricWithFilter("mo_logtail_pull_collection_phase2_duration_seconds_bucket", ``), + []float64{0.50, 0.8, 0.90, 0.99}, + 4, + axis.Unit("s"), + axis.Min(0)), + c.getHistogram( + "push type collection duration", + c.getMetricWithFilter("mo_logtail_push_collection_duration_seconds_bucket", ``), + []float64{0.50, 0.8, 0.90, 0.99}, + 4, axis.Unit("s"), axis.Min(0)), ) @@ -213,3 +228,19 @@ func (c *DashboardCreator) initLogtailLoadCheckpointRow() dashboard.Option { axis.Min(0)), ) } + +func (c *DashboardCreator) initLogtailTransmitRow() dashboard.Option { + return dashboard.Row( + "Logtail Transmit counter", + c.withGraph( + "Server Send", + 6, + `sum(rate(`+c.getMetricWithFilter("mo_logtail_transmit_total", `type="server-send"`)+`[$interval]))`, + ""), + c.withGraph( + "Client Receive", + 6, + `sum(rate(`+c.getMetricWithFilter("mo_logtail_transmit_total", `type="client-receive"`)+`[$interval]))`, + ""), + ) +} diff --git a/pkg/util/metric/v2/logtail.go b/pkg/util/metric/v2/logtail.go index a516327e4e949..9d95eb23de323 100644 --- a/pkg/util/metric/v2/logtail.go +++ b/pkg/util/metric/v2/logtail.go @@ -128,14 +128,42 @@ var ( Buckets: getDurationBuckets(), }) - LogTailCollectDurationHistogram = prometheus.NewHistogram( + LogTailPullCollectionPhase1DurationHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "mo", Subsystem: "logtail", - Name: "collect_duration_seconds", - Help: "Bucketed histogram of logtail collecting duration.", + Name: "pull_collection_phase1_duration_seconds", + Help: "Bucketed histogram of logtail pull type collection duration of phase1.", Buckets: getDurationBuckets(), }) + + LogTailPullCollectionPhase2DurationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "mo", + Subsystem: "logtail", + Name: "pull_collection_phase2_duration_seconds", + Help: "Bucketed histogram of logtail pull type collection duration of phase2.", + Buckets: getDurationBuckets(), + }) + + LogTailPushCollectionDurationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "mo", + Subsystem: "logtail", + Name: "push_collection_duration_seconds", + Help: "Bucketed histogram of logtail push type collection duration.", + Buckets: getDurationBuckets(), + }) + + logTailTransmitCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "mo", + Subsystem: "logtail", + Name: "transmit_total", + Help: "Total number of transmit count.", + }, []string{"type"}) + LogTailServerSendCounter = logTailTransmitCounter.WithLabelValues("server-send") + LogTailClientReceiveCounter = logTailTransmitCounter.WithLabelValues("client-receive") ) var ( diff --git a/pkg/util/metric/v2/metrics.go b/pkg/util/metric/v2/metrics.go index 43c5ad3125c93..22316e65799ac 100644 --- a/pkg/util/metric/v2/metrics.go +++ b/pkg/util/metric/v2/metrics.go @@ -106,7 +106,9 @@ func initLogtailMetrics() { registry.MustRegister(logTailSendDurationHistogram) registry.MustRegister(LogTailLoadCheckpointDurationHistogram) - registry.MustRegister(LogTailCollectDurationHistogram) + registry.MustRegister(LogTailPushCollectionDurationHistogram) + registry.MustRegister(LogTailPullCollectionPhase1DurationHistogram) + registry.MustRegister(LogTailPullCollectionPhase2DurationHistogram) registry.MustRegister(LogTailSubscriptionCounter) registry.MustRegister(txnTNSideDurationHistogram) } diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index 2ac74ff9b966b..c16195c35aa7c 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -369,6 +369,9 @@ func (c *PushClient) receiveOneLogtail(ctx context.Context, e *Engine) error { ctx, cancel := context.WithTimeout(ctx, maxTimeToWaitServerResponse) defer cancel() + // Client receives one logtail counter. + defer v2.LogTailClientReceiveCounter.Add(1) + resp := c.subscriber.receiveResponse(ctx) if resp.err != nil { // POSSIBLE ERROR: context deadline exceeded, rpc closed, decode error. diff --git a/pkg/vm/engine/tae/logtail/service/merger.go b/pkg/vm/engine/tae/logtail/service/merger.go new file mode 100644 index 0000000000000..2e038ce2f5537 --- /dev/null +++ b/pkg/vm/engine/tae/logtail/service/merger.go @@ -0,0 +1,96 @@ +// Copyright 2021 - 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "fmt" + + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/pb/logtail" + "github.com/matrixorigin/matrixone/pkg/pb/timestamp" +) + +const ( + ckpLocationDivider = ';' +) + +// logtailMerger is the merger tool to merge multiple +// LogtailPhase instances. +type logtailMerger struct { + logtails []*LogtailPhase +} + +func newLogtailMerger(l ...*LogtailPhase) *logtailMerger { + var lm logtailMerger + lm.logtails = append(lm.logtails, l...) + return &lm +} + +// Merge merges all instances and return one logtail.TableLogtail +// and a callback function. +func (m *logtailMerger) Merge() (logtail.TableLogtail, func()) { + var tableID api.TableID + var entryCount int + var ts timestamp.Timestamp + for _, t := range m.logtails { + entryCount += len(t.tail.Commands) + + // check the table ID. + if tableID.TbId == 0 { + tableID = *t.tail.Table + } else if tableID.TbId != t.tail.Table.TbId { + panic(fmt.Sprintf("cannot merge logtails with different table: %d, %d", + tableID.TbId, t.tail.Table.TbId)) + } + + // get the max timestamp. + if ts.Less(*t.tail.Ts) { + ts = *t.tail.Ts + } + } + + // create the callbacks. + callbacks := make([]func(), 0, entryCount) + // create a new table logtail with the entry number. + tail := logtail.TableLogtail{ + Ts: &ts, + Table: &tableID, + Commands: make([]api.Entry, 0, entryCount), + } + for _, t := range m.logtails { + ckpLocLen := len(tail.CkpLocation) + if ckpLocLen > 0 && + tail.CkpLocation[ckpLocLen-1] != ckpLocationDivider { + tail.CkpLocation += string(ckpLocationDivider) + } + tail.CkpLocation += t.tail.CkpLocation + tail.Commands = append(tail.Commands, t.tail.Commands...) + callbacks = append(callbacks, t.closeCB) + } + + // remove the last ';' + ckpLocLen := len(tail.CkpLocation) + if ckpLocLen > 0 && + tail.CkpLocation[ckpLocLen-1] == ckpLocationDivider { + tail.CkpLocation = tail.CkpLocation[:ckpLocLen-1] + } + return tail, func() { + for _, cb := range callbacks { + if cb != nil { + cb() + } + } + } +} diff --git a/pkg/vm/engine/tae/logtail/service/merger_test.go b/pkg/vm/engine/tae/logtail/service/merger_test.go new file mode 100644 index 0000000000000..fbce92668165f --- /dev/null +++ b/pkg/vm/engine/tae/logtail/service/merger_test.go @@ -0,0 +1,68 @@ +// Copyright 2021 - 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "testing" + + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/pb/logtail" + "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + "github.com/stretchr/testify/assert" +) + +func TestLogtailMerge(t *testing.T) { + var cbValue int + tail1 := &LogtailPhase{ + tail: logtail.TableLogtail{ + CkpLocation: "aaa;bbb;", + Ts: ×tamp.Timestamp{ + PhysicalTime: 100, + }, + Table: &api.TableID{ + DbId: 1, + TbId: 2, + }, + Commands: []api.Entry{{TableId: 2}}, + }, + closeCB: func() { + cbValue++ + }, + } + tail2 := &LogtailPhase{ + tail: logtail.TableLogtail{ + CkpLocation: "ccc;ddd", + Ts: ×tamp.Timestamp{ + PhysicalTime: 200, + }, + Table: &api.TableID{ + DbId: 1, + TbId: 2, + }, + Commands: []api.Entry{{TableId: 2}}, + }, + closeCB: func() { + cbValue++ + }, + } + lm := newLogtailMerger(tail1, tail2) + assert.NotNil(t, lm) + tail, cb := lm.Merge() + cb() + assert.Equal(t, 2, cbValue) + assert.Equal(t, 2, len(tail.Commands)) + assert.Equal(t, int64(200), tail.Ts.PhysicalTime) + assert.Equal(t, "aaa;bbb;ccc;ddd", tail.CkpLocation) +} diff --git a/pkg/vm/engine/tae/logtail/service/response.go b/pkg/vm/engine/tae/logtail/service/response.go index 9e768ea5ce395..9c295c42dddbd 100644 --- a/pkg/vm/engine/tae/logtail/service/response.go +++ b/pkg/vm/engine/tae/logtail/service/response.go @@ -23,6 +23,16 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/logtail" ) +// LogtailPhase is the logtail information of one phase of +// subscription request. Phase 1 is executed asynchronously +// to collect most of logtails of the subscription request. +// And phase 2 is executed sync. +type LogtailPhase struct { + tail logtail.TableLogtail + closeCB func() + sub subscription +} + // LogtailResponse wraps logtail.LogtailResponse. type LogtailResponse struct { logtail.LogtailResponse diff --git a/pkg/vm/engine/tae/logtail/service/server.go b/pkg/vm/engine/tae/logtail/service/server.go index b2aaf7995a7b7..d32059586a12c 100644 --- a/pkg/vm/engine/tae/logtail/service/server.go +++ b/pkg/vm/engine/tae/logtail/service/server.go @@ -38,12 +38,6 @@ import ( const ( LogtailServiceRPCName = "logtail-server" - - // updateEventMaxInterval is the max interval between update events. - // If 3s has passed since last update event, we should try to send an - // update event, rather than a suscription, to avoid there are no - // update events and cause logtail consumer waits for too long. - updateEventMaxInterval = time.Second * 3 ) type ServerOption func(*LogtailServer) @@ -115,10 +109,21 @@ type LogtailServer struct { waterline *Waterliner errChan chan sessionError // errChan has no buffer in order to improve sensitivity. - subChan chan subscription - event *Notifier - logtail taelogtail.Logtailer + // subReqChan is the channel that contains the subscription request. + subReqChan chan subscription + + // subTailChan is the channel that contains the pull logtail of first phase. + // There is another goroutine, that receives these kinds of logtails and begin + // the second phase of collecting logtails. + subTailChan chan *LogtailPhase + + // pullWorkerPool is used to control the parallel of the pull workers. + pullWorkerPool chan struct{} + + event *Notifier + + logtailer taelogtail.Logtailer rpc morpc.RPCServer @@ -129,17 +134,19 @@ type LogtailServer struct { // NewLogtailServer initializes a server for logtail push model. func NewLogtailServer( - address string, cfg *options.LogtailServerCfg, logtail taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption, + address string, cfg *options.LogtailServerCfg, logtailer taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption, ) (*LogtailServer, error) { s := &LogtailServer{ - rt: rt, - logger: rt.Logger(), - cfg: cfg, - ssmgr: NewSessionManager(), - waterline: NewWaterliner(), - errChan: make(chan sessionError, 1), - subChan: make(chan subscription, 100), - logtail: logtail, + rt: rt, + logger: rt.Logger(), + cfg: cfg, + ssmgr: NewSessionManager(), + waterline: NewWaterliner(), + errChan: make(chan sessionError, 1), + subReqChan: make(chan subscription, 100), + subTailChan: make(chan *LogtailPhase, 300), + pullWorkerPool: make(chan struct{}, cfg.PullWorkerPoolSize), + logtailer: logtailer, } for _, opt := range opts { @@ -198,7 +205,7 @@ func NewLogtailServer( // receive logtail on event s.event = NewNotifier(s.rootCtx, eventBufferSize) - logtail.RegisterCallback(s.event.NotifyLogtail) + logtailer.RegisterCallback(s.event.NotifyLogtail) return s, nil } @@ -287,10 +294,10 @@ func (s *LogtailServer) onSubscription( return sendCtx.Err() case <-time.After(time.Second): logger.Error("cannot send subscription request, retry", - zap.Int("chan cap", cap(s.subChan)), - zap.Int("chan len", len(s.subChan)), + zap.Int("chan cap", cap(s.subReqChan)), + zap.Int("chan len", len(s.subReqChan)), ) - case s.subChan <- sub: + case s.subReqChan <- sub: return nil } } @@ -353,66 +360,123 @@ func (s *LogtailServer) sessionErrorHandler(ctx context.Context) { } } +// logtailPullWorker is an independent goroutine, which pull the table asynchronously. +// It generates a response which would be sent to logtail client as the part 1 of the +// whole response. +func (s *LogtailServer) logtailPullWorker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + s.logger.Error("stop logtail pull worker", zap.Error(ctx.Err())) + return + + case sub, ok := <-s.subReqChan: + if !ok { + s.logger.Info("subscription channel closed") + return + } + // Start a new goroutine to handle the subscription request. + // There are xxx goroutines working at the same time at most. + go s.pullLogtailsPhase1(ctx, sub) + } + } +} + +func (s *LogtailServer) pullLogtailsPhase1(ctx context.Context, sub subscription) { + // Limit the pull workers. + s.pullWorkerPool <- struct{}{} + defer func() { <-s.pullWorkerPool }() + + v2.LogTailSubscriptionCounter.Inc() + s.logger.Info("handle subscription asynchronously", zap.Any("table", sub.req.Table)) + + start := time.Now() + // Get logtail of phase1. The 'from' time is zero and the 'to' time is the + // newest time of waterline. The Ts field in the first return value is the + // 'from' time parameter of next phase call. + tail, err := s.getSubLogtailPhase( + ctx, + sub, + timestamp.Timestamp{}, + s.waterline.Waterline(), + ) + if err != nil { + s.logger.Error("failed to get logtail of phase1 of subscription", + zap.String("table", string(sub.tableID)), + ) + sub.session.Unregister(sub.tableID) + return + } + v2.LogTailPullCollectionPhase1DurationHistogram.Observe(time.Since(start).Seconds()) + + for { + select { + case <-ctx.Done(): + s.logger.Error("context done in pull table logtails", zap.Error(ctx.Err())) + return + + case s.subTailChan <- tail: + return + + default: + s.logger.Warn("the queue of logtails of phase1 is full") + time.Sleep(time.Second) + } + } +} + // logtailSender sends total or incremental logtail. func (s *LogtailServer) logtailSender(ctx context.Context) { e, ok := <-s.event.C if !ok { - s.logger.Info("publishemtn channel closed") + s.logger.Info("publishment channel closed") return } s.waterline.Advance(e.to) s.logger.Info("init waterline", zap.String("to", e.to.String())) - // lastUpdate is used to record the time of update event. - lastUpdate := time.Now() - for { select { case <-ctx.Done(): s.logger.Error("stop subscription handler", zap.Error(ctx.Err())) return - case sub, ok := <-s.subChan: + case tailPhase1, ok := <-s.subTailChan: if !ok { s.logger.Info("subscription channel closed") return } - v2.LogTailSubscriptionCounter.Inc() - interval := time.Since(lastUpdate) - if interval > updateEventMaxInterval { - s.logger.Info("long time passed since last update event", zap.Duration("interval", interval)) - - select { - case e, ok := <-s.event.C: - if !ok { - s.logger.Info("publishment channel closed") - return - } - s.logger.Info("send an update event first as long time passed since last one.") - s.publishEvent(ctx, e) - lastUpdate = time.Now() - - default: - s.logger.Info("there is no update event, although we want to send it first") - } + start := time.Now() + // Phase 2 of pulling logtails. The 'from' timestamp comes from + // the value of phase 1. + tailPhase2, err := s.getSubLogtailPhase( + ctx, + tailPhase1.sub, + *tailPhase1.tail.Ts, + s.waterline.Waterline(), + ) + if err != nil { + tailPhase1.sub.session.Unregister(tailPhase1.sub.tableID) + s.logger.Error("fail to send subscription response", zap.Error(err)) + } else { + v2.LogTailPullCollectionPhase2DurationHistogram.Observe(time.Since(start).Seconds()) + s.sendSubscription(ctx, tailPhase1, tailPhase2) } - s.logger.Info("handle subscription asynchronously", zap.Any("table", sub.req.Table)) - s.sendSubscription(ctx, sub) - case e, ok := <-s.event.C: if !ok { s.logger.Info("publishment channel closed") return } s.publishEvent(ctx, e) - lastUpdate = time.Now() } } } -func (s *LogtailServer) sendSubscription(ctx context.Context, sub subscription) { +func (s *LogtailServer) getSubLogtailPhase( + ctx context.Context, sub subscription, from, to timestamp.Timestamp, +) (*LogtailPhase, error) { sendCtx, cancel := context.WithTimeout(ctx, sub.timeout) defer cancel() @@ -424,17 +488,14 @@ func (s *LogtailServer) sendSubscription(ctx context.Context, sub subscription) }() table := *sub.req.Table - from := timestamp.Timestamp{} - to := s.waterline.Waterline() - // fetch total logtail for table var tail logtail.TableLogtail var closeCB func() moprobe.WithRegion(ctx, moprobe.SubscriptionPullLogTail, func() { - tail, closeCB, subErr = s.logtail.TableLogtail(sendCtx, table, from, to) + tail, closeCB, subErr = s.logtailer.TableLogtail(sendCtx, table, from, to) }) - if subErr != nil { + // if error occurs, just send the error immediately. if closeCB != nil { closeCB() } @@ -444,22 +505,27 @@ func (s *LogtailServer) sendSubscription(ctx context.Context, sub subscription) ); err != nil { s.logger.Error("fail to send error response", zap.Error(err)) } - return + return nil, subErr } - cb := func() { - if closeCB != nil { - closeCB() - } - } + return &LogtailPhase{ + tail: tail, + closeCB: closeCB, + sub: sub, + }, nil +} +func (s *LogtailServer) sendSubscription(ctx context.Context, p1, p2 *LogtailPhase) { + sub := p1.sub + sendCtx, cancel := context.WithTimeout(ctx, sub.timeout) + defer cancel() + tail, cb := newLogtailMerger(p1, p2).Merge() // send subscription response - subErr = sub.session.SendSubscriptionResponse(sendCtx, tail, cb) - if subErr != nil { - s.logger.Error("fail to send subscription response", zap.Error(subErr)) + if err := sub.session.SendSubscriptionResponse(sendCtx, tail, cb); err != nil { + s.logger.Error("fail to send subscription response", zap.Error(err)) + sub.session.Unregister(sub.tableID) return } - // mark table as subscribed sub.session.AdvanceState(sub.tableID) } @@ -557,6 +623,11 @@ func (s *LogtailServer) Start() error { return err } + if err := s.stopper.RunNamedTask("logtail pull worker", s.logtailPullWorker); err != nil { + s.logger.Error("fail to start logtail pull worker", zap.Error(err)) + return err + } + if err := s.stopper.RunNamedTask("logtail sender", s.logtailSender); err != nil { s.logger.Error("fail to start logtail sender", zap.Error(err)) return err diff --git a/pkg/vm/engine/tae/logtail/service/session.go b/pkg/vm/engine/tae/logtail/service/session.go index 767f8d1d626dd..3477e07c00df3 100644 --- a/pkg/vm/engine/tae/logtail/service/session.go +++ b/pkg/vm/engine/tae/logtail/service/session.go @@ -215,7 +215,7 @@ func (s *morpcStream) write( return err } } - + v2.LogTailServerSendCounter.Add(1) return nil } diff --git a/pkg/vm/engine/tae/logtail/txn_handle.go b/pkg/vm/engine/tae/logtail/txn_handle.go index 42618799ff984..6b6a35eaab08f 100644 --- a/pkg/vm/engine/tae/logtail/txn_handle.go +++ b/pkg/vm/engine/tae/logtail/txn_handle.go @@ -90,7 +90,7 @@ func (b *TxnLogtailRespBuilder) Close() { func (b *TxnLogtailRespBuilder) CollectLogtail(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func()) { now := time.Now() defer func() { - v2.LogTailCollectDurationHistogram.Observe(time.Since(now).Seconds()) + v2.LogTailPushCollectionDurationHistogram.Observe(time.Since(now).Seconds()) }() b.txn = txn diff --git a/pkg/vm/engine/tae/options/cfg.go b/pkg/vm/engine/tae/options/cfg.go index 04c0961102eb5..669dfca32e056 100644 --- a/pkg/vm/engine/tae/options/cfg.go +++ b/pkg/vm/engine/tae/options/cfg.go @@ -26,6 +26,8 @@ const ( defaultLogtailCollectInterval = 50 * time.Millisecond defaultResponseSendTimeout = 30 * time.Second defaultRpcStreamPoisonTime = 5 * time.Second + // The default value of PullWorkerPoolSize. + defaultPullWorkerPoolSize = 50 ) type StorageCfg struct { @@ -87,6 +89,10 @@ type LogtailServerCfg struct { RPCStreamPoisonTime time.Duration LogtailCollectInterval time.Duration ResponseSendTimeout time.Duration + // PullWorkerPoolSize is the size of the pull worker pool. + // Means there are x pull workers working at most. + // Default value is defaultPullWorkerPoolSize=50. + PullWorkerPoolSize int64 } func NewDefaultLogtailServerCfg() *LogtailServerCfg { @@ -96,6 +102,7 @@ func NewDefaultLogtailServerCfg() *LogtailServerCfg { RPCStreamPoisonTime: defaultRpcStreamPoisonTime, LogtailCollectInterval: defaultLogtailCollectInterval, ResponseSendTimeout: defaultResponseSendTimeout, + PullWorkerPoolSize: defaultPullWorkerPoolSize, } } @@ -112,4 +119,7 @@ func (l *LogtailServerCfg) Validate() { if l.ResponseSendTimeout <= 0 { l.ResponseSendTimeout = defaultResponseSendTimeout } + if l.PullWorkerPoolSize <= 0 { + l.PullWorkerPoolSize = defaultPullWorkerPoolSize + } } diff --git a/pkg/vm/engine/tae/options/cfg_test.go b/pkg/vm/engine/tae/options/cfg_test.go index 8b1cd20fe4fc7..02517e417b111 100644 --- a/pkg/vm/engine/tae/options/cfg_test.go +++ b/pkg/vm/engine/tae/options/cfg_test.go @@ -28,4 +28,5 @@ func TestLogtailServerCfg(t *testing.T) { require.Equal(t, defaults.LogtailCollectInterval, validated.LogtailCollectInterval) require.Equal(t, defaults.ResponseSendTimeout, validated.ResponseSendTimeout) require.Equal(t, defaults.RPCStreamPoisonTime, validated.RPCStreamPoisonTime) + require.Equal(t, defaults.PullWorkerPoolSize, validated.PullWorkerPoolSize) }