Skip to content

Commit

Permalink
chore: add mock reporting server in mtu integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 12, 2024
1 parent 784a51c commit 300c142
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 41 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ jobs:
- integration_test/warehouse
- integration_test/tracing
- integration_test/backendconfigunavailability
- integration_test/trackedusersreporting
- processor
- regulation-worker
- router
Expand Down
150 changes: 109 additions & 41 deletions integration_test/trackedusersreporting/tracked_users_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand All @@ -15,6 +16,8 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/testhelper/transformertest"

"github.com/segmentio/go-hll"
Expand Down Expand Up @@ -46,6 +49,7 @@ type testConfig struct {
webhook *webhookutil.Recorder
configBEServer *nethttptest.Server
transformerUrl string
reportingServer *webhookutil.Recorder
}

type userIdentifier struct {
Expand All @@ -61,7 +65,7 @@ func TestTrackedUsersReporting(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, tc.gwPort, tc.postgresResource, tc.configBEServer.URL, t.TempDir(), tc.transformerUrl)
err := runRudderServer(t, ctx, tc)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
Expand All @@ -85,55 +89,110 @@ func TestTrackedUsersReporting(t *testing.T) {
return tc.webhook.RequestsCount() == eventsCount
}, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount())

// TODO: once flusher is implemented, add a mock reporting server to check is we are getting correct cardinality of users
cardinalityMap := getCardinalityFromDB(t, tc.postgresResource)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID])
require.Eventuallyf(t, func() bool {
cardinalityMap := getCardinalityFromReportingServer(t, tc.reportingServer.Requests())
return cardinalityMap[workspaceID][sourceID].userIDCount == 2 &&
cardinalityMap[workspaceID][sourceID].anonIDCount == 2 &&
cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2
}, 2*time.Minute, 5*time.Second, "data not reported to reporting service, count of reqs: %d", tc.reportingServer.RequestsCount())

cancel()
require.NoError(t, wg.Wait())
}

func getCardinalityFromDB(t *testing.T, postgresResource *postgres.Resource) map[string]map[string]int {
func getCardinalityFromReportingServer(t *testing.T, reqs []*http.Request) map[string]map[string]struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
} {
type trackedUsersEntry struct {
WorkspaceID string
SourceID string
InstanceID string
userIDHll string
annIDHll string
combHll string
ReportedAt time.Time `json:"reportedAt"`
WorkspaceID string `json:"workspaceId"`
SourceID string `json:"sourceId"`
InstanceID string `json:"instanceId"`
UserIDHLLHex string `json:"userIdHLL"`
AnonymousIDHLLHex string `json:"anonymousIdHLL"`
IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"`
}
rows, err := postgresResource.DB.Query("SELECT workspace_id, source_id, instance_id, userid_hll, anonymousid_hll, identified_anonymousid_hll FROM tracked_users_reports")
require.NoError(t, err)
require.NoError(t, rows.Err())
defer func() { _ = rows.Close() }()
var entry trackedUsersEntry
entries := make([]trackedUsersEntry, 0)
for rows.Next() {
err = rows.Scan(&entry.WorkspaceID, &entry.SourceID, &entry.InstanceID, &entry.userIDHll, &entry.annIDHll, &entry.combHll)
for _, req := range reqs {
unmarshalledReqs := make([]trackedUsersEntry, 0)
err := json.NewDecoder(req.Body).Decode(&unmarshalledReqs)
require.NoError(t, err)
entries = append(entries, entry)
entries = append(entries, unmarshalledReqs...)
}
result := make(map[string]map[string]int)
result := make(map[string]map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
})
for _, e := range entries {
if result[e.WorkspaceID] == nil {
result[e.WorkspaceID] = make(map[string]int)
result[e.WorkspaceID] = make(map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
})
}
userHllBytes, err := hex.DecodeString(e.userIDHll)
cardinalityMap := result[e.WorkspaceID][e.SourceID]
userHllBytes, err := hex.DecodeString(e.UserIDHLLHex)
require.NoError(t, err)
userHll, err := hll.FromBytes(userHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] += int(userHll.Cardinality())
annIDHllBytes, err := hex.DecodeString(e.annIDHll)
if cardinalityMap.userIDHll == nil {
cardinalityMap.userIDHll = &userHll
} else {
cardinalityMap.userIDHll.Union(userHll)
}
annIDHllBytes, err := hex.DecodeString(e.AnonymousIDHLLHex)
require.NoError(t, err)
annHll, err := hll.FromBytes(annIDHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] += int(annHll.Cardinality())
combineHllBytes, err := hex.DecodeString(e.combHll)
if cardinalityMap.anonIDHll == nil {
cardinalityMap.anonIDHll = &annHll
} else {
cardinalityMap.anonIDHll.Union(annHll)
}
combineHllBytes, err := hex.DecodeString(e.IdentifiedAnonymousIDHLLHex)
require.NoError(t, err)
combHll, err := hll.FromBytes(combineHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] -= int(combHll.Cardinality())
if cardinalityMap.identifiedUsersHll == nil {
cardinalityMap.identifiedUsersHll = &combHll
} else {
cardinalityMap.identifiedUsersHll.Union(combHll)
}
result[e.WorkspaceID][e.SourceID] = cardinalityMap
}
return result
return lo.MapEntries(result, func(workspaceID string, mp map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}) (string, map[string]struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
}) {
return workspaceID, lo.MapEntries(mp, func(sourceID string, value struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}) (string, struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
}) {
return sourceID, struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
}{
userIDCount: int(value.userIDHll.Cardinality()),
anonIDCount: int(value.anonIDHll.Cardinality()),
identifiedUsersCount: int(value.identifiedUsersHll.Cardinality()),
}
})
})
}

func setup(t testing.TB) testConfig {
Expand All @@ -155,6 +214,9 @@ func setup(t testing.TB) testConfig {
t.Cleanup(webhook.Close)
webhookURL := webhook.Server.URL

reportingServer := webhookutil.NewRecorder()
t.Cleanup(webhook.Close)

trServer := transformertest.NewBuilder().
WithDestTransformHandler(
"WEBHOOK",
Expand Down Expand Up @@ -189,23 +251,22 @@ func setup(t testing.TB) testConfig {
webhook: webhook,
configBEServer: bcServer,
transformerUrl: trServer.URL,
reportingServer: reportingServer,
}
}

func runRudderServer(
t testing.TB,
ctx context.Context,
port int,
postgresContainer *postgres.Resource,
cbURL, tmpDir, transformerURL string,
tc testConfig,
) (err error) {
t.Setenv("CONFIG_BACKEND_URL", cbURL)
t.Setenv("CONFIG_BACKEND_URL", tc.configBEServer.URL)
t.Setenv("WORKSPACE_TOKEN", "token")
t.Setenv("DEST_TRANSFORM_URL", transformerURL)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), postgresContainer.Port)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), postgresContainer.User)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), postgresContainer.Database)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), postgresContainer.Password)
t.Setenv("DEST_TRANSFORM_URL", tc.transformerUrl)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), tc.postgresResource.Port)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), tc.postgresResource.User)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), tc.postgresResource.Database)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), tc.postgresResource.Password)

t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.mode"), "off")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DestinationDebugger.disableEventDeliveryStatusUploads"), "true")
Expand All @@ -217,15 +278,19 @@ func runRudderServer(
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.syncer.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.mainLoopFreq"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.uploadFreq"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(port))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(tc.gwPort))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RUDDER_TMPDIR"), os.TempDir())
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(tmpDir, "/recovery_data.json"))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(t.TempDir(), "/recovery_data.json"))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false")
// enable tracked users feature
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "TrackedUsers.enabled"), "true")

//setup reporting server
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "REPORTING_URL"), tc.reportingServer.Server.URL)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.flushWindow"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.recentExclusionWindowInSeconds"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.sleepInterval"), "2s")
t.Setenv("Processor.maxRetry", strconv.Itoa(1))

defer func() {
Expand All @@ -249,7 +314,8 @@ func sendEvents(
) (int, error) {
count := 0
for _, identifier := range identifiers {
num := rand.Intn(10)
// generate 1 or more events
num := 1 + rand.Intn(9)
for i := 0; i < num; i++ {
count++
payload := []byte(fmt.Sprintf(`
Expand Down Expand Up @@ -292,6 +358,8 @@ func sendEvents(
return 0, fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b))
}
kithttputil.CloseResponse(resp)
// sleeping so that multiple requests can be sent to reporting
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}
return count, nil
Expand Down

0 comments on commit 300c142

Please sign in to comment.