From cc38aa52bd3ae38d2ad9648290dd425f4d203352 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 8 Nov 2024 19:47:06 +0000 Subject: [PATCH 1/4] crosscluster/logical: don't set fraction once resolved ts exists Release note: none. Epic: none. --- pkg/ccl/crosscluster/logical/logical_replication_job.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 75369bd04df2..6bcae79dc34c 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -542,17 +542,16 @@ func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error { // TODO (msbutler): add ldr initial and lagging range timeseries metrics. aggRangeStats, fractionCompleted, status := rh.rangeStats.RollupStats() + progress.RunningStatus = status - if rh.replicatedTimeAtStart.Less(replicatedTime) { + if replicatedTime.IsSet() { prog.ReplicatedTime = replicatedTime // The HighWater is for informational purposes // only. progress.Progress = &jobspb.Progress_HighWater{ HighWater: &replicatedTime, } - } - progress.RunningStatus = status - if fractionCompleted > 0 && fractionCompleted < 1 { + } else if fractionCompleted > 0 && fractionCompleted < 1 { // If 0, the coordinator has not gotten a complete range stats update // from all nodes yet. // From 31878f9f62ace2ab0a7efedea3b1e8716266b611 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 9 Nov 2024 15:36:08 +0000 Subject: [PATCH 2/4] crosscluster/producer: switch timer to ticker to continue polling Also while we are here re-read walltime per range in case a large number of ranges takes a long time to iterate to ensure ranges near the end of iteration do not see artificial lag. Release note: none. Epic: none. --- pkg/ccl/crosscluster/producer/range_stats.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/crosscluster/producer/range_stats.go b/pkg/ccl/crosscluster/producer/range_stats.go index cd7247af1c5c..26c5de497b69 100644 --- a/pkg/ccl/crosscluster/producer/range_stats.go +++ b/pkg/ccl/crosscluster/producer/range_stats.go @@ -44,8 +44,8 @@ func startStatsPoller( g: ctxgroup.WithContext(ctx), } poller.g.GoCtx(func(ctx context.Context) error { - timer := time.NewTimer(interval) - defer timer.Stop() + tick := time.NewTicker(interval) + defer tick.Stop() for { stats, err := computeRangeStats(ctx, spans, frontier, ranges) if err != nil { @@ -59,7 +59,7 @@ func startStatsPoller( select { case <-ctx.Done(): return ctx.Err() - case <-timer.C: + case <-tick.C: //continue } } @@ -85,7 +85,6 @@ func computeRangeStats( frontier span.Frontier, ranges rangedesc.IteratorFactory, ) (streampb.StreamEvent_RangeStats, error) { - now := timeutil.Now() var stats streampb.StreamEvent_RangeStats for _, initialSpan := range spans { lazyIterator, err := ranges.NewLazyIterator(ctx, initialSpan, 100) @@ -93,6 +92,7 @@ func computeRangeStats( return streampb.StreamEvent_RangeStats{}, err } for ; lazyIterator.Valid(); lazyIterator.Next() { + now := timeutil.Now() rangeSpan := roachpb.Span{ Key: lazyIterator.CurRangeDescriptor().StartKey.AsRawKey(), EndKey: lazyIterator.CurRangeDescriptor().EndKey.AsRawKey(), @@ -104,7 +104,7 @@ func computeRangeStats( if timestamp.IsEmpty() { stats.ScanningRangeCount += 1 return span.StopMatch - } else if timestamp.GoTime().Before(now.Add(-laggingSpanThreshold)) { + } else if now.Sub(timestamp.GoTime()) > laggingSpanThreshold { stats.LaggingRangeCount += 1 return span.StopMatch } From ac71c8a7442f0f4748a6f5a70af5413909b569c9 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 9 Nov 2024 15:52:02 +0000 Subject: [PATCH 3/4] ui/jobs: show running status for LDR jobs Release note: none. Epic: none. --- pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx index 5a86c72a64dc..5e18a1030b54 100644 --- a/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx @@ -57,7 +57,7 @@ function jobToVisualForReplicationIngestion(job: Job): JobStatusVisual { if (job.fraction_completed > 0 && job.status === JOB_STATUS_RUNNING) { return JobStatusVisual.ProgressBarWithDuration; } - return JobStatusVisual.BadgeOnly; + return JobStatusVisual.BadgeWithMessage; } export const JOB_STATUS_SUCCEEDED = "succeeded"; From 879a89a05550fca5287ac60f96d4d4903ca3786e Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 9 Nov 2024 16:02:36 +0000 Subject: [PATCH 4/4] crosscluster/logical: elide status msg when caught up Release note: none. Epic: none. --- pkg/ccl/crosscluster/logical/range_stats.go | 2 +- pkg/ccl/crosscluster/logical/range_stats_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/crosscluster/logical/range_stats.go b/pkg/ccl/crosscluster/logical/range_stats.go index 473d6c5a6b54..0fd66dd281d8 100644 --- a/pkg/ccl/crosscluster/logical/range_stats.go +++ b/pkg/ccl/crosscluster/logical/range_stats.go @@ -54,7 +54,7 @@ func (r *rangeStatsByProcessorID) RollupStats() (streampb.StreamEvent_RangeStats if total.LaggingRangeCount != 0 { return total, fractionCompleted, fmt.Sprintf("catching up on %d out of %d ranges", total.LaggingRangeCount, total.RangeCount) } - return total, 1, fmt.Sprintf("all %d ranges are caught up", total.RangeCount) + return total, 1, "" } func newRangeStatsCollector(processorCount int) rangeStatsByProcessorID { diff --git a/pkg/ccl/crosscluster/logical/range_stats_test.go b/pkg/ccl/crosscluster/logical/range_stats_test.go index abcc2ff9d5e8..1e39b5f15915 100644 --- a/pkg/ccl/crosscluster/logical/range_stats_test.go +++ b/pkg/ccl/crosscluster/logical/range_stats_test.go @@ -32,7 +32,7 @@ func TestRangeStats(t *testing.T) { RangeCount: 10, }, fraction: 1, - expectedMsg: "all 10 ranges are caught up", + expectedMsg: "", inputStats: map[int32]*streampb.StreamEvent_RangeStats{ 1: {RangeCount: 5}, 2: {RangeCount: 3},