Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add mock reporting server in mtu integration tests #4894

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ jobs:
- integration_test/warehouse
- integration_test/tracing
- integration_test/backendconfigunavailability
- integration_test/trackedusersreporting
- processor
- regulation-worker
- router
Expand Down
152 changes: 111 additions & 41 deletions integration_test/trackedusersreporting/tracked_users_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand All @@ -15,6 +16,8 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/testhelper/transformertest"

"github.com/segmentio/go-hll"
Expand Down Expand Up @@ -46,6 +49,7 @@ type testConfig struct {
webhook *webhookutil.Recorder
configBEServer *nethttptest.Server
transformerUrl string
reportingServer *webhookutil.Recorder
}

type userIdentifier struct {
Expand All @@ -61,7 +65,7 @@ func TestTrackedUsersReporting(t *testing.T) {

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runRudderServer(t, ctx, tc.gwPort, tc.postgresResource, tc.configBEServer.URL, t.TempDir(), tc.transformerUrl)
err := runRudderServer(t, ctx, tc)
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
Expand All @@ -85,55 +89,112 @@ func TestTrackedUsersReporting(t *testing.T) {
return tc.webhook.RequestsCount() == eventsCount
}, 1*time.Minute, 5*time.Second, "unexpected number of events received, count of events: %d", tc.webhook.RequestsCount())

// TODO: once flusher is implemented, add a mock reporting server to check is we are getting correct cardinality of users
cardinalityMap := getCardinalityFromDB(t, tc.postgresResource)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID])
require.Eventuallyf(t, func() bool {
cardinalityMap := getCardinalityFromReportingServer(t, tc.reportingServer.Requests())
return cardinalityMap[workspaceID][sourceID].userIDCount == 2 &&
cardinalityMap[workspaceID][sourceID].anonIDCount == 2 &&
cardinalityMap[workspaceID][sourceID].identifiedUsersCount == 2
}, 2*time.Minute, 5*time.Second, "data not reported to reporting service, count of reqs: %d", tc.reportingServer.RequestsCount())

cancel()
require.NoError(t, wg.Wait())
}

func getCardinalityFromDB(t *testing.T, postgresResource *postgres.Resource) map[string]map[string]int {
func getCardinalityFromReportingServer(t *testing.T, reqs []*http.Request) map[string]map[string]struct {
mihir20 marked this conversation as resolved.
Show resolved Hide resolved
userIDCount int
anonIDCount int
identifiedUsersCount int
} {
type trackedUsersEntry struct {
WorkspaceID string
SourceID string
InstanceID string
userIDHll string
annIDHll string
combHll string
ReportedAt time.Time `json:"reportedAt"`
WorkspaceID string `json:"workspaceId"`
SourceID string `json:"sourceId"`
InstanceID string `json:"instanceId"`
UserIDHLLHex string `json:"userIdHLL"`
AnonymousIDHLLHex string `json:"anonymousIdHLL"`
IdentifiedAnonymousIDHLLHex string `json:"identifiedAnonymousIdHLL"`
}
rows, err := postgresResource.DB.Query("SELECT workspace_id, source_id, instance_id, userid_hll, anonymousid_hll, identified_anonymousid_hll FROM tracked_users_reports")
require.NoError(t, err)
require.NoError(t, rows.Err())
defer func() { _ = rows.Close() }()
var entry trackedUsersEntry
entries := make([]trackedUsersEntry, 0)
for rows.Next() {
err = rows.Scan(&entry.WorkspaceID, &entry.SourceID, &entry.InstanceID, &entry.userIDHll, &entry.annIDHll, &entry.combHll)
for _, req := range reqs {
unmarshalledReqs := make([]trackedUsersEntry, 0)
err := json.NewDecoder(req.Body).Decode(&unmarshalledReqs)
require.NoError(t, err)
entries = append(entries, entry)
entries = append(entries, unmarshalledReqs...)
}
result := make(map[string]map[string]int)
result := make(map[string]map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
})
for _, e := range entries {
if result[e.WorkspaceID] == nil {
result[e.WorkspaceID] = make(map[string]int)
result[e.WorkspaceID] = make(map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
})
}
userHllBytes, err := hex.DecodeString(e.userIDHll)
cardinalityMap := result[e.WorkspaceID][e.SourceID]
userHllBytes, err := hex.DecodeString(e.UserIDHLLHex)
require.NoError(t, err)
userHll, err := hll.FromBytes(userHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] += int(userHll.Cardinality())
annIDHllBytes, err := hex.DecodeString(e.annIDHll)
if cardinalityMap.userIDHll == nil {
cardinalityMap.userIDHll = &userHll
} else {
cardinalityMap.userIDHll.Union(userHll)
}
annIDHllBytes, err := hex.DecodeString(e.AnonymousIDHLLHex)
require.NoError(t, err)
annHll, err := hll.FromBytes(annIDHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] += int(annHll.Cardinality())
combineHllBytes, err := hex.DecodeString(e.combHll)
if cardinalityMap.anonIDHll == nil {
cardinalityMap.anonIDHll = &annHll
} else {
cardinalityMap.anonIDHll.Union(annHll)
}
combineHllBytes, err := hex.DecodeString(e.IdentifiedAnonymousIDHLLHex)
require.NoError(t, err)
combHll, err := hll.FromBytes(combineHllBytes)
require.NoError(t, err)
result[e.WorkspaceID][e.SourceID] -= int(combHll.Cardinality())
if cardinalityMap.identifiedUsersHll == nil {
cardinalityMap.identifiedUsersHll = &combHll
} else {
cardinalityMap.identifiedUsersHll.Union(combHll)
}
result[e.WorkspaceID][e.SourceID] = cardinalityMap
}
return result
return lo.MapEntries(result, func(workspaceID string, mp map[string]struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}) (string, map[string]struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
},
) {
return workspaceID, lo.MapEntries(mp, func(sourceID string, value struct {
userIDHll *hll.Hll
anonIDHll *hll.Hll
identifiedUsersHll *hll.Hll
}) (string, struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
},
) {
return sourceID, struct {
userIDCount int
anonIDCount int
identifiedUsersCount int
}{
userIDCount: int(value.userIDHll.Cardinality()),
anonIDCount: int(value.anonIDHll.Cardinality()),
identifiedUsersCount: int(value.identifiedUsersHll.Cardinality()),
}
})
})
}

func setup(t testing.TB) testConfig {
Expand All @@ -155,6 +216,9 @@ func setup(t testing.TB) testConfig {
t.Cleanup(webhook.Close)
webhookURL := webhook.Server.URL

reportingServer := webhookutil.NewRecorder()
t.Cleanup(webhook.Close)

trServer := transformertest.NewBuilder().
WithDestTransformHandler(
"WEBHOOK",
Expand Down Expand Up @@ -189,23 +253,22 @@ func setup(t testing.TB) testConfig {
webhook: webhook,
configBEServer: bcServer,
transformerUrl: trServer.URL,
reportingServer: reportingServer,
}
}

func runRudderServer(
t testing.TB,
ctx context.Context,
port int,
postgresContainer *postgres.Resource,
cbURL, tmpDir, transformerURL string,
tc testConfig,
) (err error) {
t.Setenv("CONFIG_BACKEND_URL", cbURL)
t.Setenv("CONFIG_BACKEND_URL", tc.configBEServer.URL)
t.Setenv("WORKSPACE_TOKEN", "token")
t.Setenv("DEST_TRANSFORM_URL", transformerURL)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), postgresContainer.Port)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), postgresContainer.User)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), postgresContainer.Database)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), postgresContainer.Password)
t.Setenv("DEST_TRANSFORM_URL", tc.transformerUrl)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), tc.postgresResource.Port)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), tc.postgresResource.User)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), tc.postgresResource.Database)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), tc.postgresResource.Password)

t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.mode"), "off")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DestinationDebugger.disableEventDeliveryStatusUploads"), "true")
Expand All @@ -217,15 +280,19 @@ func runRudderServer(
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.syncer.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.mainLoopFreq"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.uploadFreq"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(port))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(tc.gwPort))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RUDDER_TMPDIR"), os.TempDir())
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(tmpDir, "/recovery_data.json"))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(t.TempDir(), "/recovery_data.json"))
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false")
// enable tracked users feature
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "TrackedUsers.enabled"), "true")

// setup reporting server
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "REPORTING_URL"), tc.reportingServer.Server.URL)
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.flushWindow"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.recentExclusionWindowInSeconds"), "1s")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.flusher.sleepInterval"), "2s")
t.Setenv("Processor.maxRetry", strconv.Itoa(1))

defer func() {
Expand All @@ -249,7 +316,8 @@ func sendEvents(
) (int, error) {
count := 0
for _, identifier := range identifiers {
num := rand.Intn(10)
// generate 1 or more events
num := 1 + rand.Intn(9)
for i := 0; i < num; i++ {
count++
payload := []byte(fmt.Sprintf(`
Expand Down Expand Up @@ -292,6 +360,8 @@ func sendEvents(
return 0, fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b))
}
kithttputil.CloseResponse(resp)
// sleeping so that multiple requests can be sent to reporting
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this sleep in favour of another signal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased number of events and decreased processor batch size so that multiple processor batches will be processed.

}
}
return count, nil
Expand Down