Skip to content

Commit

Permalink
Merge branch 'master' into feat.fetchRemoteSchemaAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Jan 9, 2025
2 parents f27fb84 + ddf04ff commit 9d990f4
Show file tree
Hide file tree
Showing 33 changed files with 2,066 additions and 1,185 deletions.
47 changes: 47 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
# Changelog

## [1.40.1](https://github.com/rudderlabs/rudder-server/compare/v1.40.0...v1.40.1) (2025-01-08)


### Bug Fixes

* warehouse router tracker ([#5407](https://github.com/rudderlabs/rudder-server/issues/5407)) ([8e314b6](https://github.com/rudderlabs/rudder-server/commit/8e314b6160c37283fbbe889f1dbad86f994943b1))

## [1.40.0](https://github.com/rudderlabs/rudder-server/compare/v1.39.0...v1.40.0) (2025-01-06)


### Features

* add support for hash to bingads offline conversions ([#5390](https://github.com/rudderlabs/rudder-server/issues/5390)) ([8a186e5](https://github.com/rudderlabs/rudder-server/commit/8a186e59e74772ebf10aca2ec352c6a51537c57a))
* client side load balancing for user transformations ([#5375](https://github.com/rudderlabs/rudder-server/issues/5375)) ([de3e87c](https://github.com/rudderlabs/rudder-server/commit/de3e87c83f4cb0adf6fd4dff0437fbcd7bcd2855))


### Bug Fixes

* allow only enabled dest in backendSubscriber, fix klaviyo bulk ([#5309](https://github.com/rudderlabs/rudder-server/issues/5309)) ([98827e5](https://github.com/rudderlabs/rudder-server/commit/98827e57eae6949ea06bd7a0ca58e678360e66e6))
* bq partitioning for additional columns ([#5293](https://github.com/rudderlabs/rudder-server/issues/5293)) ([98827e5](https://github.com/rudderlabs/rudder-server/commit/98827e57eae6949ea06bd7a0ca58e678360e66e6))
* disable vacuum at startup for reporting ([#5325](https://github.com/rudderlabs/rudder-server/issues/5325)) ([98827e5](https://github.com/rudderlabs/rudder-server/commit/98827e57eae6949ea06bd7a0ca58e678360e66e6))
* klaviyo bulk upload and BingAds OC ([#5305](https://github.com/rudderlabs/rudder-server/issues/5305)) ([98827e5](https://github.com/rudderlabs/rudder-server/commit/98827e57eae6949ea06bd7a0ca58e678360e66e6))
* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([b417005](https://github.com/rudderlabs/rudder-server/commit/b4170057d7035303dac762588bba21e2bd6413c1))
* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([a82aa47](https://github.com/rudderlabs/rudder-server/commit/a82aa4719922f0ff82a938df69b6a83b8979e911))
* processing pickup race condition ([#5374](https://github.com/rudderlabs/rudder-server/issues/5374)) ([dd33fee](https://github.com/rudderlabs/rudder-server/commit/dd33feefa233457fb7667666ede870118d3fc125))
* replay tracking plan bug ([#5389](https://github.com/rudderlabs/rudder-server/issues/5389)) ([b417005](https://github.com/rudderlabs/rudder-server/commit/b4170057d7035303dac762588bba21e2bd6413c1))


### Miscellaneous

* [Snyk] Security upgrade alpine from 3.17 to 3.21.0 ([#5366](https://github.com/rudderlabs/rudder-server/issues/5366)) ([f22e8f4](https://github.com/rudderlabs/rudder-server/commit/f22e8f4bb61893af132f3f4386155c83c695fd51))
* add error msg in the logs when gw req fails ([#5369](https://github.com/rudderlabs/rudder-server/issues/5369)) ([b417005](https://github.com/rudderlabs/rudder-server/commit/b4170057d7035303dac762588bba21e2bd6413c1))
* add error msg in the logs when gw req fails ([#5369](https://github.com/rudderlabs/rudder-server/issues/5369)) ([a82aa47](https://github.com/rudderlabs/rudder-server/commit/a82aa4719922f0ff82a938df69b6a83b8979e911))
* cleanup archiver to use uploadID for filtering ([#5346](https://github.com/rudderlabs/rudder-server/issues/5346)) ([8fa4436](https://github.com/rudderlabs/rudder-server/commit/8fa4436346530817ee512aa0e116f2b1070eb25c))
* collect stats for reporting event sampler ([#5357](https://github.com/rudderlabs/rudder-server/issues/5357)) ([e504d75](https://github.com/rudderlabs/rudder-server/commit/e504d75e4bf1d086596c6360788abef28671d551))
* **deps:** bump google.golang.org/api from 0.211.0 to 0.212.0 in the frequent group ([#5378](https://github.com/rudderlabs/rudder-server/issues/5378)) ([7b46cd2](https://github.com/rudderlabs/rudder-server/commit/7b46cd2eec2f995aff9db6f73bc197b101d145d8))
* **deps:** bump google.golang.org/protobuf from 1.35.2 to 1.36.0 in the go-deps group ([#5379](https://github.com/rudderlabs/rudder-server/issues/5379)) ([7425659](https://github.com/rudderlabs/rudder-server/commit/74256599bbbe460c0166470196704b9de9477c14))
* **deps:** bump the go-deps group across 1 directory with 2 updates ([#5397](https://github.com/rudderlabs/rudder-server/issues/5397)) ([46d9b6a](https://github.com/rudderlabs/rudder-server/commit/46d9b6aa376a6f6c6611d9d7e3bfcbbbbc7fb478))
* **deps:** bump the go-deps group across 1 directory with 2 updates ([#5403](https://github.com/rudderlabs/rudder-server/issues/5403)) ([78fb917](https://github.com/rudderlabs/rudder-server/commit/78fb9178c1bed663c095a7d6506d891279370084))
* **deps:** bump the go-deps group across 1 directory with 4 updates ([#5368](https://github.com/rudderlabs/rudder-server/issues/5368)) ([33a1e30](https://github.com/rudderlabs/rudder-server/commit/33a1e3078dccdd071e555175410c1f461d67b1e6))
* **deps:** bump the go-deps group across 1 directory with 5 updates ([#5386](https://github.com/rudderlabs/rudder-server/issues/5386)) ([471d492](https://github.com/rudderlabs/rudder-server/commit/471d492910cd2fb9722d5d1bf4f8640e6f4a2f0b))
* **deps:** bump the go-deps group with 3 updates ([#5373](https://github.com/rudderlabs/rudder-server/issues/5373)) ([d0ce669](https://github.com/rudderlabs/rudder-server/commit/d0ce6697971851848ad2b0010a8e1aa0e541c513))
* oauth v2 stats refactor ([#5262](https://github.com/rudderlabs/rudder-server/issues/5262)) ([18f4bdf](https://github.com/rudderlabs/rudder-server/commit/18f4bdf0e918f7b8d6792220a9abe6d227377585))
* reduce the error report sample events ([#5371](https://github.com/rudderlabs/rudder-server/issues/5371)) ([989310c](https://github.com/rudderlabs/rudder-server/commit/989310cb109e6bfd675def135ca0974ecf8b52d6))
* remove full vacuum at flusher startup ([#5332](https://github.com/rudderlabs/rudder-server/issues/5332)) ([98827e5](https://github.com/rudderlabs/rudder-server/commit/98827e57eae6949ea06bd7a0ca58e678360e66e6))
* sync release v1.39.0 to main branch ([#5367](https://github.com/rudderlabs/rudder-server/issues/5367)) ([9f79eee](https://github.com/rudderlabs/rudder-server/commit/9f79eee2262b46ecf1fe6e4abce0dcac9cf25eec))

## [1.39.3](https://github.com/rudderlabs/rudder-server/compare/v1.39.2...v1.39.3) (2024-12-23)


Expand Down
2 changes: 1 addition & 1 deletion enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewErrorDetailReporter(

if eventSamplingEnabled.Load() {
var err error
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.ErrorsReporting, conf, log, stats)
if err != nil {
panic(err)
}
Expand Down
24 changes: 22 additions & 2 deletions enterprise/reporting/event_sampler/badger_event_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
)
Expand All @@ -23,6 +24,11 @@ type BadgerEventSampler struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
sc *StatsCollector
}

func GetPathName(module string) string {
return "/" + module + "-badger"
}

func DefaultPath(pathName string) (string, error) {
Expand All @@ -33,8 +39,15 @@ func DefaultPath(pathName string) (string, error) {
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName), nil
}

func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*BadgerEventSampler, error) {
dbPath, err := DefaultPath(pathName)
func NewBadgerEventSampler(
ctx context.Context,
module string,
ttl config.ValueLoader[time.Duration],
conf *config.Config,
log logger.Logger,
stats stats.Stats,
) (*BadgerEventSampler, error) {
dbPath, err := DefaultPath(GetPathName(module))
if err != nil || dbPath == "" {
return nil, err
}
Expand Down Expand Up @@ -63,6 +76,7 @@ func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.Valu
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},
sc: NewStatsCollector(BadgerTypeEventSampler, module, stats),
}

if err != nil {
Expand All @@ -81,6 +95,9 @@ func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.Valu
func (es *BadgerEventSampler) Get(key string) (bool, error) {
es.mu.Lock()
defer es.mu.Unlock()
start := time.Now()
defer es.sc.RecordGetDuration(start)
es.sc.RecordGet()

var found bool

Expand All @@ -106,6 +123,9 @@ func (es *BadgerEventSampler) Get(key string) (bool, error) {
func (es *BadgerEventSampler) Put(key string) error {
es.mu.Lock()
defer es.mu.Unlock()
start := time.Now()
defer es.sc.RecordPutDuration(start)
es.sc.RecordPut()

return es.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry([]byte(key), []byte{1}).WithTTL(es.ttl.Load())
Expand Down
18 changes: 10 additions & 8 deletions enterprise/reporting/event_sampler/event_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
BadgerTypeEventSampler = "badger"
InMemoryCacheTypeEventSampler = "in_memory_cache"
BadgerEventSamplerMetricsPathName = "/metrics-reporting-badger"
BadgerEventSamplerErrorsPathName = "/errors-reporting-badger"
BadgerTypeEventSampler = "badger"
InMemoryCacheTypeEventSampler = "in_memory_cache"
MetricsReporting = "metrics-reporting"
ErrorsReporting = "errors-reporting"
)

//go:generate mockgen -destination=../../../mocks/enterprise/reporting/event_sampler/mock_event_sampler.go -package=mocks github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler EventSampler
Expand All @@ -27,18 +28,19 @@ func NewEventSampler(
ttl config.ValueLoader[time.Duration],
eventSamplerType config.ValueLoader[string],
eventSamplingCardinality config.ValueLoader[int],
badgerDBPath string,
module string,
conf *config.Config,
log logger.Logger,
stats stats.Stats,
) (es EventSampler, err error) {
switch eventSamplerType.Load() {
case BadgerTypeEventSampler:
es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log)
es, err = NewBadgerEventSampler(ctx, module, ttl, conf, log, stats)
case InMemoryCacheTypeEventSampler:
es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality)
es, err = NewInMemoryCacheEventSampler(ctx, module, ttl, eventSamplingCardinality, stats)
default:
log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load())
es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log)
es, err = NewBadgerEventSampler(ctx, module, ttl, conf, log, stats)
}

if err != nil {
Expand Down
89 changes: 83 additions & 6 deletions enterprise/reporting/event_sampler/event_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestBadger(t *testing.T) {
Expand All @@ -23,15 +25,40 @@ func TestBadger(t *testing.T) {

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
statsStore, err := memstats.New()
require.NoError(t, err)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).LastValue(), float64(3))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).Durations()), 3)

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).LastValue(), float64(4))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": BadgerTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).Durations()), 4)

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
Expand All @@ -43,7 +70,7 @@ func TestBadger(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())

es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, stats.NOP)
defer es.Close()

_ = es.Put("key1")
Expand All @@ -65,15 +92,40 @@ func TestInMemoryCache(t *testing.T) {

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
statsStore, err := memstats.New()
require.NoError(t, err)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).LastValue(), float64(3))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).Durations()), 3)

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).LastValue(), float64(4))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).Durations()), 4)

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
Expand All @@ -83,7 +135,7 @@ func TestInMemoryCache(t *testing.T) {
t.Run("should not get evicted keys", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, stats.NOP)
_ = es.Put("key1")

require.Eventually(t, func() bool {
Expand All @@ -95,19 +147,43 @@ func TestInMemoryCache(t *testing.T) {
t.Run("should not add keys if length exceeds", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 3000)
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
statsStore, err := memstats.New()
require.NoError(t, err)
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
_ = es.Put("key4")
_ = es.Put("key5")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).LastValue(), float64(3))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "put",
}).Durations()), 3)

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")
val5, _ := es.Get("key5")

require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).LastValue(), float64(5))
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
"type": InMemoryCacheTypeEventSampler,
"module": MetricsReporting,
"operation": "get",
}).Durations()), 5)

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
Expand Down Expand Up @@ -147,9 +223,10 @@ func BenchmarkEventSampler(b *testing.B) {
ttl,
eventSamplerType,
eventSamplingCardinality,
BadgerEventSamplerMetricsPathName,
MetricsReporting,
conf,
log,
stats.NOP,
)
require.NoError(b, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/cachettl"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type InMemoryCacheEventSampler struct {
Expand All @@ -15,9 +16,16 @@ type InMemoryCacheEventSampler struct {
ttl config.ValueLoader[time.Duration]
limit config.ValueLoader[int]
length int
sc *StatsCollector
}

func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[time.Duration], limit config.ValueLoader[int]) (*InMemoryCacheEventSampler, error) {
func NewInMemoryCacheEventSampler(
ctx context.Context,
module string,
ttl config.ValueLoader[time.Duration],
limit config.ValueLoader[int],
stats stats.Stats,
) (*InMemoryCacheEventSampler, error) {
c := cachettl.New[string, bool](cachettl.WithNoRefreshTTL)
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -28,6 +36,7 @@ func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[ti
ttl: ttl,
limit: limit,
length: 0,
sc: NewStatsCollector(InMemoryCacheTypeEventSampler, module, stats),
}

es.cache.OnEvicted(func(key string, value bool) {
Expand All @@ -38,6 +47,10 @@ func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[ti
}

func (es *InMemoryCacheEventSampler) Get(key string) (bool, error) {
start := time.Now()
defer es.sc.RecordGetDuration(start)
es.sc.RecordGet()

value := es.cache.Get(key)
return value, nil
}
Expand All @@ -47,6 +60,10 @@ func (es *InMemoryCacheEventSampler) Put(key string) error {
return nil
}

start := time.Now()
defer es.sc.RecordPutDuration(start)
es.sc.RecordPut()

es.cache.Put(key, true, es.ttl.Load())
es.length++
return nil
Expand Down
Loading

0 comments on commit 9d990f4

Please sign in to comment.