From fe7ad82e9db9b217a9a1a517a46b643ec8484603 Mon Sep 17 00:00:00 2001 From: mihir Date: Fri, 12 Jul 2024 16:45:52 +0530 Subject: [PATCH 1/4] chore: add mock reporting server in mtu integration tests --- .github/workflows/tests.yaml | 1 + .../tracked_users_reporting_test.go | 152 +++++++++++++----- 2 files changed, 112 insertions(+), 41 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index aa24854b1a..5922c3f326 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -135,6 +135,7 @@ jobs: - integration_test/warehouse - integration_test/tracing - integration_test/backendconfigunavailability + - integration_test/trackedusersreporting - processor - regulation-worker - router diff --git a/integration_test/trackedusersreporting/tracked_users_reporting_test.go b/integration_test/trackedusersreporting/tracked_users_reporting_test.go index b890885b59..394a9137d9 100644 --- a/integration_test/trackedusersreporting/tracked_users_reporting_test.go +++ b/integration_test/trackedusersreporting/tracked_users_reporting_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "encoding/json" "fmt" "io" "math/rand" @@ -15,6 +16,8 @@ import ( "testing" "time" + "github.com/samber/lo" + "github.com/rudderlabs/rudder-server/testhelper/transformertest" "github.com/segmentio/go-hll" @@ -46,6 +49,7 @@ type testConfig struct { webhook *webhookutil.Recorder configBEServer *nethttptest.Server transformerUrl string + reportingServer *webhookutil.Recorder } type userIdentifier struct { @@ -61,7 +65,7 @@ func TestTrackedUsersReporting(t *testing.T) { wg, ctx := errgroup.WithContext(ctx) wg.Go(func() error { - err := runRudderServer(t, ctx, tc.gwPort, tc.postgresResource, tc.configBEServer.URL, t.TempDir(), tc.transformerUrl) + err := runRudderServer(t, ctx, tc) if err != nil { t.Logf("rudder-server exited with error: %v", err) } @@ -85,55 +89,112 @@ func TestTrackedUsersReporting(t *testing.T) { return tc.webhook.RequestsCount() == eventsCount }, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount()) - // TODO: once flusher is implemented, add a mock reporting server to check is we are getting correct cardinality of users - cardinalityMap := getCardinalityFromDB(t, tc.postgresResource) - require.Equal(t, 2, cardinalityMap[workspaceID][sourceID]) + require.Eventuallyf(t, func() bool { + cardinalityMap := getCardinalityFromReportingServer(t, tc.reportingServer.Requests()) + return cardinalityMap[workspaceID][sourceID].userIDCount == 2 && + cardinalityMap[workspaceID][sourceID].anonIDCount == 2 && + cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2 + }, 2*time.Minute, 5*time.Second, "data not reported to reporting service, count of reqs: %d", tc.reportingServer.RequestsCount()) + cancel() require.NoError(t, wg.Wait()) } -func getCardinalityFromDB(t *testing.T, postgresResource *postgres.Resource) map[string]map[string]int { +func getCardinalityFromReportingServer(t *testing.T, reqs []*http.Request) map[string]map[string]struct { + userIDCount int + anonIDCount int + identifiedUsersCount int +} { type trackedUsersEntry struct { - WorkspaceID string - SourceID string - InstanceID string - userIDHll string - annIDHll string - combHll string + ReportedAt time.Time `json:"reportedAt"` + WorkspaceID string `json:"workspaceId"` + SourceID string `json:"sourceId"` + InstanceID string `json:"instanceId"` + UserIDHLLHex string `json:"userIdHLL"` + AnonymousIDHLLHex string `json:"anonymousIdHLL"` + IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"` } - rows, err := postgresResource.DB.Query("SELECT workspace_id, source_id, instance_id, userid_hll, anonymousid_hll, identified_anonymousid_hll FROM tracked_users_reports") - require.NoError(t, err) - require.NoError(t, rows.Err()) - defer func() { _ = rows.Close() }() - var entry trackedUsersEntry entries := make([]trackedUsersEntry, 0) - for rows.Next() { - err = rows.Scan(&entry.WorkspaceID, &entry.SourceID, &entry.InstanceID, &entry.userIDHll, &entry.annIDHll, &entry.combHll) + for _, req := range reqs { + unmarshalledReqs := make([]trackedUsersEntry, 0) + err := json.NewDecoder(req.Body).Decode(&unmarshalledReqs) require.NoError(t, err) - entries = append(entries, entry) + entries = append(entries, unmarshalledReqs...) } - result := make(map[string]map[string]int) + result := make(map[string]map[string]struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + }) for _, e := range entries { if result[e.WorkspaceID] == nil { - result[e.WorkspaceID] = make(map[string]int) + result[e.WorkspaceID] = make(map[string]struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + }) } - userHllBytes, err := hex.DecodeString(e.userIDHll) + cardinalityMap := result[e.WorkspaceID][e.SourceID] + userHllBytes, err := hex.DecodeString(e.UserIDHLLHex) require.NoError(t, err) userHll, err := hll.FromBytes(userHllBytes) require.NoError(t, err) - result[e.WorkspaceID][e.SourceID] += int(userHll.Cardinality()) - annIDHllBytes, err := hex.DecodeString(e.annIDHll) + if cardinalityMap.userIDHll == nil { + cardinalityMap.userIDHll = &userHll + } else { + cardinalityMap.userIDHll.Union(userHll) + } + annIDHllBytes, err := hex.DecodeString(e.AnonymousIDHLLHex) require.NoError(t, err) annHll, err := hll.FromBytes(annIDHllBytes) require.NoError(t, err) - result[e.WorkspaceID][e.SourceID] += int(annHll.Cardinality()) - combineHllBytes, err := hex.DecodeString(e.combHll) + if cardinalityMap.anonIDHll == nil { + cardinalityMap.anonIDHll = &annHll + } else { + cardinalityMap.anonIDHll.Union(annHll) + } + combineHllBytes, err := hex.DecodeString(e.IdentifiedAnonymousIDHLLHex) require.NoError(t, err) combHll, err := hll.FromBytes(combineHllBytes) require.NoError(t, err) - result[e.WorkspaceID][e.SourceID] -= int(combHll.Cardinality()) + if cardinalityMap.identifiedUsersHll == nil { + cardinalityMap.identifiedUsersHll = &combHll + } else { + cardinalityMap.identifiedUsersHll.Union(combHll) + } + result[e.WorkspaceID][e.SourceID] = cardinalityMap } - return result + return lo.MapEntries(result, func(workspaceID string, mp map[string]struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + }) (string, map[string]struct { + userIDCount int + anonIDCount int + identifiedUsersCount int + }, + ) { + return workspaceID, lo.MapEntries(mp, func(sourceID string, value struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + }) (string, struct { + userIDCount int + anonIDCount int + identifiedUsersCount int + }, + ) { + return sourceID, struct { + userIDCount int + anonIDCount int + identifiedUsersCount int + }{ + userIDCount: int(value.userIDHll.Cardinality()), + anonIDCount: int(value.anonIDHll.Cardinality()), + identifiedUsersCount: int(value.identifiedUsersHll.Cardinality()), + } + }) + }) } func setup(t testing.TB) testConfig { @@ -155,6 +216,9 @@ func setup(t testing.TB) testConfig { t.Cleanup(webhook.Close) webhookURL := webhook.Server.URL + reportingServer := webhookutil.NewRecorder() + t.Cleanup(webhook.Close) + trServer := transformertest.NewBuilder(). WithDestTransformHandler( "WEBHOOK", @@ -189,23 +253,22 @@ func setup(t testing.TB) testConfig { webhook: webhook, configBEServer: bcServer, transformerUrl: trServer.URL, + reportingServer: reportingServer, } } func runRudderServer( t testing.TB, ctx context.Context, - port int, - postgresContainer *postgres.Resource, - cbURL, tmpDir, transformerURL string, + tc testConfig, ) (err error) { - t.Setenv("CONFIG_BACKEND_URL", cbURL) + t.Setenv("CONFIG_BACKEND_URL", tc.configBEServer.URL) t.Setenv("WORKSPACE_TOKEN", "token") - t.Setenv("DEST_TRANSFORM_URL", transformerURL) - t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), postgresContainer.Port) - t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), postgresContainer.User) - t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), postgresContainer.Database) - t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), postgresContainer.Password) + t.Setenv("DEST_TRANSFORM_URL", tc.transformerUrl) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), tc.postgresResource.Port) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), tc.postgresResource.User) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), tc.postgresResource.Database) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), tc.postgresResource.Password) t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.mode"), "off") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DestinationDebugger.disableEventDeliveryStatusUploads"), "true") @@ -217,15 +280,19 @@ func runRudderServer( t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.syncer.enabled"), "false") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.mainLoopFreq"), "1s") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.uploadFreq"), "1s") - t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(port)) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(tc.gwPort)) t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RUDDER_TMPDIR"), os.TempDir()) - t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(tmpDir, "/recovery_data.json")) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(t.TempDir(), "/recovery_data.json")) t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false") // enable tracked users feature t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "TrackedUsers.enabled"), "true") - + // setup reporting server + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "REPORTING_URL"), tc.reportingServer.Server.URL) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.flushWindow"), "1s") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.recentExclusionWindowInSeconds"), "1s") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.sleepInterval"), "2s") t.Setenv("Processor.maxRetry", strconv.Itoa(1)) defer func() { @@ -249,7 +316,8 @@ func sendEvents( ) (int, error) { count := 0 for _, identifier := range identifiers { - num := rand.Intn(10) + // generate 1 or more events + num := 1 + rand.Intn(9) for i := 0; i < num; i++ { count++ payload := []byte(fmt.Sprintf(` @@ -292,6 +360,8 @@ func sendEvents( return 0, fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b)) } kithttputil.CloseResponse(resp) + // sleeping so that multiple requests can be sent to reporting + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } } return count, nil From f9b81913ccdc83cec4fafdc9abf7cbf8653a3ecf Mon Sep 17 00:00:00 2001 From: mihir Date: Mon, 15 Jul 2024 18:22:40 +0530 Subject: [PATCH 2/4] addressed comments --- .../tracked_users_reporting_test.go | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/integration_test/trackedusersreporting/tracked_users_reporting_test.go b/integration_test/trackedusersreporting/tracked_users_reporting_test.go index 394a9137d9..7d68952640 100644 --- a/integration_test/trackedusersreporting/tracked_users_reporting_test.go +++ b/integration_test/trackedusersreporting/tracked_users_reporting_test.go @@ -49,7 +49,7 @@ type testConfig struct { webhook *webhookutil.Recorder configBEServer *nethttptest.Server transformerUrl string - reportingServer *webhookutil.Recorder + reportingServer *mockReportingServer } type userIdentifier struct { @@ -57,6 +57,10 @@ type userIdentifier struct { anonymousID string } +type mockReportingServer struct { + *webhookutil.Recorder +} + func TestTrackedUsersReporting(t *testing.T) { tc := setup(t) @@ -90,17 +94,17 @@ func TestTrackedUsersReporting(t *testing.T) { }, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount()) require.Eventuallyf(t, func() bool { - cardinalityMap := getCardinalityFromReportingServer(t, tc.reportingServer.Requests()) + cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer(t) return cardinalityMap[workspaceID][sourceID].userIDCount == 2 && cardinalityMap[workspaceID][sourceID].anonIDCount == 2 && cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2 - }, 2*time.Minute, 5*time.Second, "data not reported to reporting service, count of reqs: %d", tc.reportingServer.RequestsCount()) + }, 1*time.Minute, 5*time.Second, "data not reported to reporting service, count of reqs: %d", tc.reportingServer.RequestsCount()) cancel() require.NoError(t, wg.Wait()) } -func getCardinalityFromReportingServer(t *testing.T, reqs []*http.Request) map[string]map[string]struct { +func (m *mockReportingServer) getCardinalityFromReportingServer(t *testing.T) map[string]map[string]struct { userIDCount int anonIDCount int identifiedUsersCount int @@ -115,7 +119,7 @@ func getCardinalityFromReportingServer(t *testing.T, reqs []*http.Request) map[s IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"` } entries := make([]trackedUsersEntry, 0) - for _, req := range reqs { + for _, req := range m.Requests() { unmarshalledReqs := make([]trackedUsersEntry, 0) err := json.NewDecoder(req.Body).Decode(&unmarshalledReqs) require.NoError(t, err) @@ -216,8 +220,10 @@ func setup(t testing.TB) testConfig { t.Cleanup(webhook.Close) webhookURL := webhook.Server.URL - reportingServer := webhookutil.NewRecorder() - t.Cleanup(webhook.Close) + reportingServer := &mockReportingServer{ + Recorder: webhookutil.NewRecorder(), + } + t.Cleanup(reportingServer.Close) trServer := transformertest.NewBuilder(). WithDestTransformHandler( @@ -293,6 +299,8 @@ func runRudderServer( t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.flushWindow"), "1s") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.recentExclusionWindowInSeconds"), "1s") t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.sleepInterval"), "2s") + // so that multiple processor batches are processed + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Processor.maxLoopProcessEvents"), "10") t.Setenv("Processor.maxRetry", strconv.Itoa(1)) defer func() { @@ -317,7 +325,7 @@ func sendEvents( count := 0 for _, identifier := range identifiers { // generate 1 or more events - num := 1 + rand.Intn(9) + num := 1 + rand.Intn(100) for i := 0; i < num; i++ { count++ payload := []byte(fmt.Sprintf(` @@ -360,8 +368,6 @@ func sendEvents( return 0, fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b)) } kithttputil.CloseResponse(resp) - // sleeping so that multiple requests can be sent to reporting - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } } return count, nil From 5cb52d187af9216b41f5c91b3dda1b93a355261a Mon Sep 17 00:00:00 2001 From: mihir Date: Mon, 15 Jul 2024 18:56:56 +0530 Subject: [PATCH 3/4] addressed comments --- .../tracked_users_reporting_test.go | 251 +++++++++++------- 1 file changed, 149 insertions(+), 102 deletions(-) diff --git a/integration_test/trackedusersreporting/tracked_users_reporting_test.go b/integration_test/trackedusersreporting/tracked_users_reporting_test.go index 7d68952640..a473a6236e 100644 --- a/integration_test/trackedusersreporting/tracked_users_reporting_test.go +++ b/integration_test/trackedusersreporting/tracked_users_reporting_test.go @@ -13,9 +13,12 @@ import ( "os" "path" "strconv" + "sync" "testing" "time" + "github.com/rudderlabs/rudder-go-kit/testhelper/httptest" + "github.com/samber/lo" "github.com/rudderlabs/rudder-server/testhelper/transformertest" @@ -58,117 +61,120 @@ type userIdentifier struct { } type mockReportingServer struct { - *webhookutil.Recorder + Server *httptest.Server + hllMap map[string]map[string]struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + } + hllMapMutex sync.RWMutex } -func TestTrackedUsersReporting(t *testing.T) { - tc := setup(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wg, ctx := errgroup.WithContext(ctx) - wg.Go(func() error { - err := runRudderServer(t, ctx, tc) +func newMockReportingServer() *mockReportingServer { + whr := mockReportingServer{ + hllMap: make(map[string]map[string]struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + }), + } + whr.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + defer func() { + if err != nil { + http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) + return + } else { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + return + } + }() + type trackedUsersEntry struct { + ReportedAt time.Time `json:"reportedAt"` + WorkspaceID string `json:"workspaceId"` + SourceID string `json:"sourceId"` + InstanceID string `json:"instanceId"` + UserIDHLLHex string `json:"userIdHLL"` + AnonymousIDHLLHex string `json:"anonymousIdHLL"` + IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"` + } + unmarshalledReq := make([]*trackedUsersEntry, 0) + err = json.NewDecoder(r.Body).Decode(&unmarshalledReq) if err != nil { - t.Logf("rudder-server exited with error: %v", err) + return } - return err - }) - - url := fmt.Sprintf("http://localhost:%d", tc.gwPort) - health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) - - eventsCount, err := sendEvents([]userIdentifier{ - {userID: "user-1", anonymousID: "anon-1"}, - {userID: "user-2", anonymousID: "anon-2"}, - {userID: "user-1"}, - {anonymousID: "anon-1"}, - {userID: "user-2"}, - {anonymousID: "anon-2"}, - }, "identify", writeKey, url) - require.NoError(t, err) - - require.Eventuallyf(t, func() bool { - return tc.webhook.RequestsCount() == eventsCount - }, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount()) + whr.hllMapMutex.Lock() + for _, e := range unmarshalledReq { + if whr.hllMap[e.WorkspaceID] == nil { + whr.hllMap[e.WorkspaceID] = make(map[string]struct { + userIDHll *hll.Hll + anonIDHll *hll.Hll + identifiedUsersHll *hll.Hll + }) + } + cardinalityMap := whr.hllMap[e.WorkspaceID][e.SourceID] + var userHllBytes, annIDHllBytes, combineHllBytes []byte + var userHll, annHll, combHll hll.Hll + userHllBytes, err = hex.DecodeString(e.UserIDHLLHex) + if err != nil { + return + } + userHll, err = hll.FromBytes(userHllBytes) + if err != nil { + return + } + if cardinalityMap.userIDHll == nil { + cardinalityMap.userIDHll = &userHll + } else { + cardinalityMap.userIDHll.Union(userHll) + } + annIDHllBytes, err = hex.DecodeString(e.AnonymousIDHLLHex) + if err != nil { + return + } + annHll, err := hll.FromBytes(annIDHllBytes) + if err != nil { + return + } + if cardinalityMap.anonIDHll == nil { + cardinalityMap.anonIDHll = &annHll + } else { + cardinalityMap.anonIDHll.Union(annHll) + } + combineHllBytes, err = hex.DecodeString(e.IdentifiedAnonymousIDHLLHex) + if err != nil { + return + } + combHll, err = hll.FromBytes(combineHllBytes) + if err != nil { + return + } + if cardinalityMap.identifiedUsersHll == nil { + cardinalityMap.identifiedUsersHll = &combHll + } else { + cardinalityMap.identifiedUsersHll.Union(combHll) + } + whr.hllMap[e.WorkspaceID][e.SourceID] = cardinalityMap + } + whr.hllMapMutex.Unlock() + })) - require.Eventuallyf(t, func() bool { - cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer(t) - return cardinalityMap[workspaceID][sourceID].userIDCount == 2 && - cardinalityMap[workspaceID][sourceID].anonIDCount == 2 && - cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2 - }, 1*time.Minute, 5*time.Second, "data not reported to reporting service, count of reqs: %d", tc.reportingServer.RequestsCount()) + return &whr +} - cancel() - require.NoError(t, wg.Wait()) +func (m *mockReportingServer) Close() { + m.Server.Close() } -func (m *mockReportingServer) getCardinalityFromReportingServer(t *testing.T) map[string]map[string]struct { +func (m *mockReportingServer) getCardinalityFromReportingServer() map[string]map[string]struct { userIDCount int anonIDCount int identifiedUsersCount int } { - type trackedUsersEntry struct { - ReportedAt time.Time `json:"reportedAt"` - WorkspaceID string `json:"workspaceId"` - SourceID string `json:"sourceId"` - InstanceID string `json:"instanceId"` - UserIDHLLHex string `json:"userIdHLL"` - AnonymousIDHLLHex string `json:"anonymousIdHLL"` - IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"` - } - entries := make([]trackedUsersEntry, 0) - for _, req := range m.Requests() { - unmarshalledReqs := make([]trackedUsersEntry, 0) - err := json.NewDecoder(req.Body).Decode(&unmarshalledReqs) - require.NoError(t, err) - entries = append(entries, unmarshalledReqs...) - } - result := make(map[string]map[string]struct { - userIDHll *hll.Hll - anonIDHll *hll.Hll - identifiedUsersHll *hll.Hll - }) - for _, e := range entries { - if result[e.WorkspaceID] == nil { - result[e.WorkspaceID] = make(map[string]struct { - userIDHll *hll.Hll - anonIDHll *hll.Hll - identifiedUsersHll *hll.Hll - }) - } - cardinalityMap := result[e.WorkspaceID][e.SourceID] - userHllBytes, err := hex.DecodeString(e.UserIDHLLHex) - require.NoError(t, err) - userHll, err := hll.FromBytes(userHllBytes) - require.NoError(t, err) - if cardinalityMap.userIDHll == nil { - cardinalityMap.userIDHll = &userHll - } else { - cardinalityMap.userIDHll.Union(userHll) - } - annIDHllBytes, err := hex.DecodeString(e.AnonymousIDHLLHex) - require.NoError(t, err) - annHll, err := hll.FromBytes(annIDHllBytes) - require.NoError(t, err) - if cardinalityMap.anonIDHll == nil { - cardinalityMap.anonIDHll = &annHll - } else { - cardinalityMap.anonIDHll.Union(annHll) - } - combineHllBytes, err := hex.DecodeString(e.IdentifiedAnonymousIDHLLHex) - require.NoError(t, err) - combHll, err := hll.FromBytes(combineHllBytes) - require.NoError(t, err) - if cardinalityMap.identifiedUsersHll == nil { - cardinalityMap.identifiedUsersHll = &combHll - } else { - cardinalityMap.identifiedUsersHll.Union(combHll) - } - result[e.WorkspaceID][e.SourceID] = cardinalityMap - } - return lo.MapEntries(result, func(workspaceID string, mp map[string]struct { + m.hllMapMutex.RLock() + defer m.hllMapMutex.RUnlock() + return lo.MapEntries(m.hllMap, func(workspaceID string, mp map[string]struct { userIDHll *hll.Hll anonIDHll *hll.Hll identifiedUsersHll *hll.Hll @@ -201,6 +207,49 @@ func (m *mockReportingServer) getCardinalityFromReportingServer(t *testing.T) ma }) } +func TestTrackedUsersReporting(t *testing.T) { + tc := setup(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + err := runRudderServer(t, ctx, tc) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount, err := sendEvents([]userIdentifier{ + {userID: "user-1", anonymousID: "anon-1"}, + {userID: "user-2", anonymousID: "anon-2"}, + {userID: "user-1"}, + {anonymousID: "anon-1"}, + {userID: "user-2"}, + {anonymousID: "anon-2"}, + }, "identify", writeKey, url) + require.NoError(t, err) + + require.Eventuallyf(t, func() bool { + return tc.webhook.RequestsCount() == eventsCount + }, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount()) + + require.Eventuallyf(t, func() bool { + cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer() + return cardinalityMap[workspaceID][sourceID].userIDCount == 2 && + cardinalityMap[workspaceID][sourceID].anonIDCount == 2 && + cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2 + }, 1*time.Minute, 5*time.Second, "data not reported to reporting service, hllMap: %v", tc.reportingServer.hllMap) + + cancel() + require.NoError(t, wg.Wait()) +} + func setup(t testing.TB) testConfig { t.Helper() @@ -220,9 +269,7 @@ func setup(t testing.TB) testConfig { t.Cleanup(webhook.Close) webhookURL := webhook.Server.URL - reportingServer := &mockReportingServer{ - Recorder: webhookutil.NewRecorder(), - } + reportingServer := newMockReportingServer() t.Cleanup(reportingServer.Close) trServer := transformertest.NewBuilder(). From bbb6ebe0c2754db9dcb848bd1e1e6ee03dab5990 Mon Sep 17 00:00:00 2001 From: mihir Date: Fri, 19 Jul 2024 11:52:36 +0530 Subject: [PATCH 4/4] addressed comments --- .../tracked_users_reporting_test.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/integration_test/trackedusersreporting/tracked_users_reporting_test.go b/integration_test/trackedusersreporting/tracked_users_reporting_test.go index a473a6236e..1d0a453a40 100644 --- a/integration_test/trackedusersreporting/tracked_users_reporting_test.go +++ b/integration_test/trackedusersreporting/tracked_users_reporting_test.go @@ -240,16 +240,24 @@ func TestTrackedUsersReporting(t *testing.T) { }, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount()) require.Eventuallyf(t, func() bool { - cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer() - return cardinalityMap[workspaceID][sourceID].userIDCount == 2 && - cardinalityMap[workspaceID][sourceID].anonIDCount == 2 && - cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2 - }, 1*time.Minute, 5*time.Second, "data not reported to reporting service, hllMap: %v", tc.reportingServer.hllMap) + return getCardinalityOfReportingTable(t, tc.postgresResource) == 0 + }, 1*time.Minute, 5*time.Second, "data not reported to reporting service") + cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer() + require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].userIDCount) + require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].anonIDCount) + require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].identifiedUsersCount) cancel() require.NoError(t, wg.Wait()) } +func getCardinalityOfReportingTable(t *testing.T, db *postgres.Resource) int { + var count int + err := db.DB.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", "tracked_users_reports")).Scan(&count) + require.NoError(t, err) + return count +} + func setup(t testing.TB) testConfig { t.Helper()