Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: collect stats for reporting event sampler #5357

Merged
merged 8 commits into from
Jan 6, 2025
2 changes: 1 addition & 1 deletion enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@

if eventSamplingEnabled.Load() {
var err error
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.ErrorsReporting, conf, log, stats)

Check warning on line 127 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L127

Added line #L127 was not covered by tests
if err != nil {
panic(err)
}
Expand Down
24 changes: 22 additions & 2 deletions enterprise/reporting/event_sampler/badger_event_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
)
Expand All @@ -23,6 +24,11 @@ type BadgerEventSampler struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
sc *StatsCollector
}

func GetPathName(module string) string {
return "/" + module + "-badger"
}

func DefaultPath(pathName string) (string, error) {
Expand All @@ -33,8 +39,15 @@ func DefaultPath(pathName string) (string, error) {
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName), nil
}

func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*BadgerEventSampler, error) {
dbPath, err := DefaultPath(pathName)
func NewBadgerEventSampler(
ctx context.Context,
module string,
ttl config.ValueLoader[time.Duration],
conf *config.Config,
log logger.Logger,
stats stats.Stats,
) (*BadgerEventSampler, error) {
dbPath, err := DefaultPath(GetPathName(module))
if err != nil || dbPath == "" {
return nil, err
}
Expand Down Expand Up @@ -63,6 +76,7 @@ func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.Valu
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},
sc: NewStatsCollector(BadgerTypeEventSampler, module, stats),
}

if err != nil {
Expand All @@ -81,6 +95,9 @@ func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.Valu
func (es *BadgerEventSampler) Get(key string) (bool, error) {
es.mu.Lock()
defer es.mu.Unlock()
start := time.Now()
defer es.sc.RecordGetDuration(start)
es.sc.RecordGet()

var found bool

Expand All @@ -106,6 +123,9 @@ func (es *BadgerEventSampler) Get(key string) (bool, error) {
func (es *BadgerEventSampler) Put(key string) error {
es.mu.Lock()
defer es.mu.Unlock()
start := time.Now()
defer es.sc.RecordPutDuration(start)
es.sc.RecordPut()

return es.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry([]byte(key), []byte{1}).WithTTL(es.ttl.Load())
Expand Down
18 changes: 10 additions & 8 deletions enterprise/reporting/event_sampler/event_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
BadgerTypeEventSampler = "badger"
InMemoryCacheTypeEventSampler = "in_memory_cache"
BadgerEventSamplerMetricsPathName = "/metrics-reporting-badger"
BadgerEventSamplerErrorsPathName = "/errors-reporting-badger"
BadgerTypeEventSampler = "badger"
InMemoryCacheTypeEventSampler = "in_memory_cache"
MetricsReporting = "metrics-reporting"
ErrorsReporting = "errors-reporting"
)

//go:generate mockgen -destination=../../../mocks/enterprise/reporting/event_sampler/mock_event_sampler.go -package=mocks github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler EventSampler
Expand All @@ -27,18 +28,19 @@
ttl config.ValueLoader[time.Duration],
eventSamplerType config.ValueLoader[string],
eventSamplingCardinality config.ValueLoader[int],
badgerDBPath string,
module string,
conf *config.Config,
log logger.Logger,
stats stats.Stats,
) (es EventSampler, err error) {
switch eventSamplerType.Load() {
case BadgerTypeEventSampler:
es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log)
es, err = NewBadgerEventSampler(ctx, module, ttl, conf, log, stats)
case InMemoryCacheTypeEventSampler:
es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality)
es, err = NewInMemoryCacheEventSampler(ctx, module, ttl, eventSamplingCardinality, stats)
default:
log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load())
es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log)
es, err = NewBadgerEventSampler(ctx, module, ttl, conf, log, stats)

Check warning on line 43 in enterprise/reporting/event_sampler/event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/event_sampler.go#L43

Added line #L43 was not covered by tests
}

if err != nil {
Expand Down
89 changes: 83 additions & 6 deletions enterprise/reporting/event_sampler/event_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestBadger(t *testing.T) {
Expand All @@ -23,15 +25,40 @@ func TestBadger(t *testing.T) {

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
statsStore, err := memstats.New()
require.NoError(t, err)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).LastValue(), float64(3))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).Durations()), 3)

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).LastValue(), float64(4))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).Durations()), 4)

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
Expand All @@ -43,7 +70,7 @@ func TestBadger(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())

es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, stats.NOP)
defer es.Close()

_ = es.Put("key1")
Expand All @@ -65,15 +92,40 @@ func TestInMemoryCache(t *testing.T) {

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
statsStore, err := memstats.New()
require.NoError(t, err)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).LastValue(), float64(3))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).Durations()), 3)

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).LastValue(), float64(4))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).Durations()), 4)

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
Expand All @@ -83,7 +135,7 @@ func TestInMemoryCache(t *testing.T) {
t.Run("should not get evicted keys", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, stats.NOP)
_ = es.Put("key1")

require.Eventually(t, func() bool {
Expand All @@ -95,19 +147,43 @@ func TestInMemoryCache(t *testing.T) {
t.Run("should not add keys if length exceeds", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 3000)
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
statsStore, err := memstats.New()
require.NoError(t, err)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
_ = es.Put("key4")
_ = es.Put("key5")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).LastValue(), float64(3))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).Durations()), 3)

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")
val5, _ := es.Get("key5")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).LastValue(), float64(5))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).Durations()), 5)

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
Expand Down Expand Up @@ -147,9 +223,10 @@ func BenchmarkEventSampler(b *testing.B) {
ttl,
eventSamplerType,
eventSamplingCardinality,
BadgerEventSamplerMetricsPathName,
MetricsReporting,
conf,
log,
stats.NOP,
)
require.NoError(b, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/cachettl"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type InMemoryCacheEventSampler struct {
Expand All @@ -15,9 +16,16 @@ type InMemoryCacheEventSampler struct {
ttl config.ValueLoader[time.Duration]
limit config.ValueLoader[int]
length int
sc *StatsCollector
}

func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[time.Duration], limit config.ValueLoader[int]) (*InMemoryCacheEventSampler, error) {
func NewInMemoryCacheEventSampler(
ctx context.Context,
module string,
ttl config.ValueLoader[time.Duration],
limit config.ValueLoader[int],
stats stats.Stats,
) (*InMemoryCacheEventSampler, error) {
c := cachettl.New[string, bool](cachettl.WithNoRefreshTTL)
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -28,6 +36,7 @@ func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[ti
ttl: ttl,
limit: limit,
length: 0,
sc: NewStatsCollector(InMemoryCacheTypeEventSampler, module, stats),
}

es.cache.OnEvicted(func(key string, value bool) {
Expand All @@ -38,6 +47,10 @@ func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[ti
}

func (es *InMemoryCacheEventSampler) Get(key string) (bool, error) {
start := time.Now()
defer es.sc.RecordGetDuration(start)
es.sc.RecordGet()

value := es.cache.Get(key)
return value, nil
}
Expand All @@ -47,6 +60,10 @@ func (es *InMemoryCacheEventSampler) Put(key string) error {
return nil
}

start := time.Now()
defer es.sc.RecordPutDuration(start)
es.sc.RecordPut()

es.cache.Put(key, true, es.ttl.Load())
es.length++
return nil
Expand Down
53 changes: 53 additions & 0 deletions enterprise/reporting/event_sampler/stats_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package event_sampler

import (
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
StatReportingEventSamplerRequestsTotal = "reporting_event_sampler_requests_total"
StatReportingEventSamplerRequestDuration = "reporting_event_sampler_request_duration_seconds"
)

type StatsCollector struct {
stats stats.Stats
getCounter stats.Measurement
putCounter stats.Measurement
getDuration stats.Measurement
putDuration stats.Measurement
}

func NewStatsCollector(eventSamplerType, module string, statsFactory stats.Stats) *StatsCollector {
getRequestTags := getTags(eventSamplerType, module, "get")
putRequestTags := getTags(eventSamplerType, module, "put")

return &StatsCollector{
stats: statsFactory,
getCounter: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestsTotal, stats.CountType, getRequestTags),
putCounter: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestsTotal, stats.CountType, putRequestTags),
getDuration: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestDuration, stats.TimerType, getRequestTags),
putDuration: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestDuration, stats.TimerType, putRequestTags),
}
}

func (sc *StatsCollector) RecordGet() {
sc.getCounter.Increment()
}

func (sc *StatsCollector) RecordPut() {
sc.putCounter.Increment()
}

func (sc *StatsCollector) RecordGetDuration(start time.Time) {
sc.getDuration.SendTiming(time.Since(start))
}

func (sc *StatsCollector) RecordPutDuration(start time.Time) {
sc.putDuration.SendTiming(time.Since(start))
}

func getTags(eventSamplerType, module, operation string) stats.Tags {
return stats.Tags{"type": eventSamplerType, "module": module, "operation": operation}
}
Loading
Loading