Skip to content

Commit

Permalink
chore: refactor dedup package (#4913)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Jul 25, 2024
1 parent b96b681 commit dc2b4c7
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 114 deletions.
7 changes: 4 additions & 3 deletions mocks/services/dedup/mock_dedup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/dedup"
dedupTypes "github.com/rudderlabs/rudder-server/services/dedup/types"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rmetrics"
"github.com/rudderlabs/rudder-server/services/rsources"
Expand Down Expand Up @@ -613,7 +614,7 @@ func (proc *Handle) Setup(
})
}
if proc.config.enableDedup {
proc.dedup = dedup.New(dedup.DefaultPath())
proc.dedup = dedup.New()
}
proc.sourceObservers = []sourceObserver{delayed.NewEventStats(proc.statsFactory, proc.conf)}
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1707,7 +1708,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
p := payloadFunc()
messageSize := int64(len(p))
dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId)
if ok, previousSize := proc.dedup.Set(dedup.KeyValue{Key: dedupKey, Value: messageSize}); !ok {
ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize})
if err != nil {
panic(err)
}
if !ok {
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey)
sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1
continue
Expand Down
2 changes: 1 addition & 1 deletion processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2533,7 +2533,7 @@ var _ = Describe("Processor", Ordered, func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)

callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1)
c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0)).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1)

// We expect one transform call to destination A, after callUnprocessed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ var (
}
)

var _ = Describe("Yandexmetrica", func() {
Describe(("NewManager function test"), func() {
var _ = Describe("Antisymmetric", func() {
Describe("NewManager function test", func() {
It("should return yandexmetrica manager", func() {
yandexmetrica, err := yandexmetrica.NewManager(destination, backendconfig.DefaultBackendConfig)
Expect(err).To(BeNil())
Expect(yandexmetrica).NotTo(BeNil())
})
})
Describe(("Upload function test"), func() {
Describe("Upload function test", func() {
It("Testing a successful scenario", func() {
cache := oauthv2.NewCache()
ctrl := gomock.NewController(GinkgoT())
Expand Down
101 changes: 83 additions & 18 deletions services/dedup/badger.go → services/dedup/badger/badger.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package dedup
package badger

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"

"github.com/rudderlabs/rudder-go-kit/config"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/dedup/types"
"github.com/rudderlabs/rudder-server/utils/misc"
)

type badgerDB struct {
type BadgerDB struct {
stats stats.Stats
logger loggerForBadger
badgerDB *badger.DB
Expand All @@ -23,12 +27,58 @@ type badgerDB struct {
gcDone chan struct{}
path string
opts badger.Options
once sync.Once
}

// DefaultPath returns the default path for the deduplication service's badger DB
func DefaultPath() string {
badgerPathName := "/badgerdbv4"
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
panic(err)
}
return fmt.Sprintf(`%v%v`, tmpDirPath, badgerPathName)
}

func NewBadgerDB(path string) *BadgerDB {
dedupWindow := config.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS")

log := logger.NewLogger().Child("dedup")
badgerOpts := badger.
DefaultOptions(path).
WithCompression(options.None).
WithIndexCacheSize(16 << 20). // 16mb
WithNumGoroutines(1).
WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)).
WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)).
WithBlockCacheSize(0).
WithNumVersionsToKeep(1).
WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)).
WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)).
WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)).
WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false))

db := &BadgerDB{
stats: stats.Default,
logger: loggerForBadger{log},
path: path,
gcDone: make(chan struct{}),
close: make(chan struct{}),
window: dedupWindow,
opts: badgerOpts,
}
return db
}

func (d *badgerDB) Get(key string) (int64, bool) {
func (d *BadgerDB) Get(key string) (int64, bool, error) {
var payloadSize int64
var found bool
err := d.badgerDB.View(func(txn *badger.Txn) error {
var err error
err = d.init()
if err != nil {
return 0, false, err
}
err = d.badgerDB.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
Expand All @@ -40,12 +90,16 @@ func (d *badgerDB) Get(key string) (int64, bool) {
return nil
})
if err != nil && err != badger.ErrKeyNotFound {
panic(err)
return 0, false, err
}
return payloadSize, found
return payloadSize, found, nil
}

func (d *badgerDB) Set(kvs []KeyValue) error {
func (d *BadgerDB) Set(kvs []types.KeyValue) error {
err := d.init()
if err != nil {
return err
}
txn := d.badgerDB.NewTransaction(true)
for _, message := range kvs {
value := strconv.FormatInt(message.Value, 10)
Expand All @@ -66,26 +120,29 @@ func (d *badgerDB) Set(kvs []KeyValue) error {
return txn.Commit()
}

func (d *badgerDB) Close() {
func (d *BadgerDB) Close() {
close(d.close)
<-d.gcDone
_ = d.badgerDB.Close()
}

func (d *badgerDB) start() {
func (d *BadgerDB) init() error {
var err error

d.badgerDB, err = badger.Open(d.opts)
if err != nil {
panic(err)
}
rruntime.Go(func() {
d.gcLoop()
close(d.gcDone)
d.once.Do(func() {
d.badgerDB, err = badger.Open(d.opts)
if err != nil {
return
}
rruntime.Go(func() {
d.gcLoop()
close(d.gcDone)
})
})
return err
}

func (d *badgerDB) gcLoop() {
func (d *BadgerDB) gcLoop() {
for {
select {
case <-d.close:
Expand Down Expand Up @@ -113,3 +170,11 @@ func (d *badgerDB) gcLoop() {

}
}

type loggerForBadger struct {
logger.Logger
}

func (l loggerForBadger) Warningf(fmt string, args ...interface{}) {
l.Warnf(fmt, args...)
}
83 changes: 15 additions & 68 deletions services/dedup/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,14 @@ package dedup
import (
"fmt"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/services/dedup/badger"
"github.com/rudderlabs/rudder-server/services/dedup/types"
)

type OptFn func(*badgerDB)

// DefaultPath returns the default path for the deduplication service's badger DB
func DefaultPath() string {
badgerPathName := "/badgerdbv4"
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
panic(err)
}
return fmt.Sprintf(`%v%v`, tmpDirPath, badgerPathName)
}

// New creates a new deduplication service. The service needs to be closed after use.
func New(path string) Dedup {
dedupWindow := config.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS")

log := logger.NewLogger().Child("dedup")
badgerOpts := badger.
DefaultOptions(path).
WithCompression(options.None).
WithIndexCacheSize(16 << 20). // 16mb
WithNumGoroutines(1).
WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)).
WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)).
WithBlockCacheSize(0).
WithNumVersionsToKeep(1).
WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)).
WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)).
WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)).
WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false))

db := &badgerDB{
stats: stats.Default,
logger: loggerForBadger{log},
path: path,
gcDone: make(chan struct{}),
close: make(chan struct{}),
window: dedupWindow,
opts: badgerOpts,
}
db.start()
func New() Dedup {
db := badger.NewBadgerDB(badger.DefaultPath())
return &dedup{
badgerDB: db,
cache: make(map[string]int64),
Expand All @@ -66,49 +22,48 @@ func New(path string) Dedup {
// Dedup is the interface for deduplication service
type Dedup interface {
// Set returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value
Set(kv KeyValue) (bool, int64)
Set(kv types.KeyValue) (bool, int64, error)

// Commit commits a list of previously set keys to the DB
Commit(keys []string) error

// Close closes the deduplication service
Close()
}
type KeyValue struct {
Key string
Value int64
}

type dedup struct {
badgerDB *badgerDB
badgerDB *badger.BadgerDB
cacheMu sync.Mutex
cache map[string]int64
}

func (d *dedup) Set(kv KeyValue) (bool, int64) {
func (d *dedup) Set(kv types.KeyValue) (bool, int64, error) {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
if previous, found := d.cache[kv.Key]; found {
return false, previous
return false, previous, nil
}
previous, found, err := d.badgerDB.Get(kv.Key)
if err != nil {
return false, 0, err
}
previous, found := d.badgerDB.Get(kv.Key)
if !found {
d.cache[kv.Key] = kv.Value
}
return !found, previous
return !found, previous, nil
}

func (d *dedup) Commit(keys []string) error {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()

kvs := make([]KeyValue, len(keys))
kvs := make([]types.KeyValue, len(keys))
for i, key := range keys {
value, ok := d.cache[key]
if !ok {
return fmt.Errorf("key %v has not been previously set", key)
}
kvs[i] = KeyValue{Key: key, Value: value}
kvs[i] = types.KeyValue{Key: key, Value: value}
}

err := d.badgerDB.Set(kvs)
Expand All @@ -123,11 +78,3 @@ func (d *dedup) Commit(keys []string) error {
func (d *dedup) Close() {
d.badgerDB.Close()
}

type loggerForBadger struct {
logger.Logger
}

func (l loggerForBadger) Warningf(fmt string, args ...interface{}) {
l.Warnf(fmt, args...)
}
Loading

0 comments on commit dc2b4c7

Please sign in to comment.