Skip to content

Commit

Permalink
metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gouhongshen committed Jun 17, 2024
1 parent 6cfdcaf commit b0215ed
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 24 deletions.
24 changes: 15 additions & 9 deletions pkg/util/metric/v2/dashboard/grafana_dashboard_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *DashboardCreator) initTxnDashboard() error {
c.initTxnMpoolRow(),
c.initTxnOnPrepareWALRow(),
c.initTxnBeforeCommitRow(),
c.initTxnDequeuePreparedRow(),
c.initTxnTNDeduplicateDurationRow(),
c.initTxnTableRangesRow(),
c.initTxnRangesSelectivityRow(),
c.initTxnRangesCountRow(),
Expand Down Expand Up @@ -292,16 +292,22 @@ func (c *DashboardCreator) initTxnOnPrepareWALRow() dashboard.Option {
)
}

func (c *DashboardCreator) initTxnDequeuePreparedRow() dashboard.Option {
func (c *DashboardCreator) initTxnTNDeduplicateDurationRow() dashboard.Option {
return dashboard.Row(
"txn dequeue prepared duration",
c.getHistogram(
"txn dequeue prepared duration",
c.getMetricWithFilter("mo_txn_tn_side_duration_seconds_bucket", `step="dequeue_prepared"`),
[]float64{0.50, 0.8, 0.90, 0.99},
12,
"Txn TN Deduplication Duration",
c.getMultiHistogram(
[]string{
c.getMetricWithFilter("mo_txn_tn_deduplicate_duration_seconds_bucket", `type="append_deduplicate"`),
c.getMetricWithFilter("mo_txn_tn_deduplicate_duration_seconds_bucket", `type="prePrepare_deduplicate"`),
},
[]string{
"append_deduplicate",
"prePrepare_deduplicate",
},
[]float64{0.80, 0.90, 0.95, 0.99},
[]float32{3, 3, 3, 3},
axis.Unit("s"),
axis.Min(0)),
axis.Min(0))...,
)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/util/metric/v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func initTxnMetrics() {
registry.MustRegister(txnCNCommittedLocationQuantityGauge)

registry.MustRegister(txnRangesSelectivityHistogram)
registry.MustRegister(txnTNDeduplicateDurationHistogram)
}

func initRPCMetrics() {
Expand Down
15 changes: 13 additions & 2 deletions pkg/util/metric/v2/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,19 @@ var (
TxnPreparedWaitDurationHistogram = txnTNSideDurationHistogram.WithLabelValues("5-PreparedWait")
TxnPreparedDurationHistogram = txnTNSideDurationHistogram.WithLabelValues("6-Prepared")

TxnDequeuePreparedDurationHistogram = txnTNSideDurationHistogram.WithLabelValues("dequeue_prepared")
TxnBeforeCommitDurationHistogram = txnTNSideDurationHistogram.WithLabelValues("before_txn_commit")
TxnBeforeCommitDurationHistogram = txnTNSideDurationHistogram.WithLabelValues("before_txn_commit")

txnTNDeduplicateDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "mo",
Subsystem: "txn",
Name: "tn_deduplicate_duration_seconds",
Help: "Bucketed histogram of txn duration on tn side",
Buckets: getDurationBuckets(),
}, []string{"type"})

TxnTNAppendDeduplicateDurationHistogram = txnTNDeduplicateDurationHistogram.WithLabelValues("append_deduplicate")
TxnTNPrePrepareDeduplicateDurationHistogram = txnTNDeduplicateDurationHistogram.WithLabelValues("prePrepare_deduplicate")

txnMpoolDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down
4 changes: 0 additions & 4 deletions pkg/vm/engine/tae/txn/txnbase/txnmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync/atomic"
"time"

v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"

Expand Down Expand Up @@ -559,7 +558,6 @@ func (mgr *TxnManager) dequeuePrepared(items ...any) {
store.TriggerTrace(txnif.TracePrepared)
mgr.workers.Submit(func() {
//Notice that WaitPrepared do nothing when op is OpRollback
t0 := time.Now()
if err := op.Txn.WaitPrepared(op.ctx); err != nil {
// v0.6 TODO: Error handling
panic(err)
Expand All @@ -570,8 +568,6 @@ func (mgr *TxnManager) dequeuePrepared(items ...any) {
} else {
mgr.on1PCPrepared(op)
}
dequeuePreparedDuration := time.Since(t0)
v2.TxnDequeuePreparedDurationHistogram.Observe(dequeuePreparedDuration.Seconds())
})
}
common.DoIfDebugEnabled(func() {
Expand Down
24 changes: 16 additions & 8 deletions pkg/vm/engine/tae/txn/txnimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,17 @@ import (
"runtime/trace"
"time"

"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/util"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/perfcounter"

"github.com/RoaringBitmap/roaring"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/moprobe"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
apipb "github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/util"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
Expand All @@ -45,6 +42,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/updates"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -589,7 +587,9 @@ func (tbl *txnTable) AddDeleteNode(id *common.ID, node txnif.DeleteNode) error {
}

func (tbl *txnTable) Append(ctx context.Context, data *containers.Batch) (err error) {
var dedupDur float64
if tbl.schema.HasPK() && !tbl.schema.IsSecondaryIndexTable() {
now := time.Now()
dedupType := tbl.store.txn.GetDedupType()
if dedupType == txnif.FullDedup {
//do PK deduplication check against txn's work space.
Expand All @@ -616,11 +616,19 @@ func (tbl *txnTable) Append(ctx context.Context, data *containers.Batch) (err er
return
}
}
dedupDur += time.Since(now).Seconds()
}

if tbl.tableSpace == nil {
tbl.tableSpace = newTableSpace(tbl)
}
return tbl.tableSpace.Append(data)

dur, err := tbl.tableSpace.Append(data)
dedupDur += dur

v2.TxnTNAppendDeduplicateDurationHistogram.Observe(dedupDur)

return err
}
func (tbl *txnTable) AddObjsWithMetaLoc(ctx context.Context, stats containers.Vector) (err error) {
return stats.Foreach(func(v any, isNull bool, row int) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/vm/engine/tae/txn/txnimpl/table_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package txnimpl
import (
"context"
"fmt"
"time"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand Down Expand Up @@ -280,7 +281,7 @@ func (space *tableSpace) CloseAppends() {
}

// Append appends batch of data into anode.
func (space *tableSpace) Append(data *containers.Batch) (err error) {
func (space *tableSpace) Append(data *containers.Batch) (dur float64, err error) {
if space.appendable == nil {
space.registerANode()
}
Expand All @@ -296,6 +297,7 @@ func (space *tableSpace) Append(data *containers.Batch) (err error) {
}
dedupType := space.table.store.txn.GetDedupType()
if schema.HasPK() && dedupType == txnif.FullDedup {
now := time.Now()
if err = space.index.BatchInsert(
data.Attrs[schema.GetSingleSortKeyIdx()],
data.Vecs[schema.GetSingleSortKeyIdx()],
Expand All @@ -305,6 +307,7 @@ func (space *tableSpace) Append(data *containers.Batch) (err error) {
false); err != nil {
break
}
dur += time.Since(now).Seconds()
}
offset += appended
space.rows += appended
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/tae/txn/txnimpl/txndb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package txnimpl

import (
"context"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"sync"
"time"

Expand Down Expand Up @@ -464,11 +465,15 @@ func (db *txnDB) PrePrepare(ctx context.Context) (err error) {
return
}
}

now := time.Now()
for _, table := range db.tables {
if err = table.PrePrepareDedup(ctx); err != nil {
return
}
}
v2.TxnTNPrePrepareDeduplicateDurationHistogram.Observe(time.Since(now).Seconds())

for _, table := range db.tables {
if err = table.PrePrepare(); err != nil {
return
Expand Down

0 comments on commit b0215ed

Please sign in to comment.