diff --git a/backend-config/backend-config.go b/backend-config/backend-config.go index 1df6df8ca7..d9c0fff75e 100644 --- a/backend-config/backend-config.go +++ b/backend-config/backend-config.go @@ -142,6 +142,7 @@ func filterProcessorEnabledDestinations(config ConfigT) ConfigT { var modifiedConfig ConfigT modifiedConfig.Libraries = config.Libraries modifiedConfig.Sources = make([]SourceT, 0) + modifiedConfig.Credentials = config.Credentials for _, source := range config.Sources { var destinations []DestinationT for _, destination := range source.Destinations { // TODO skipcq: CRT-P0006 diff --git a/backend-config/types.go b/backend-config/types.go index 18750b693a..b764b30875 100644 --- a/backend-config/types.go +++ b/backend-config/types.go @@ -74,6 +74,12 @@ type SourceT struct { } } +type Credential struct { + Key string `json:"key"` + Value string `json:"value"` + IsSecret bool `json:"isSecret"` +} + func (s *SourceT) IsReplaySource() bool { return s.OriginalID != "" } @@ -87,6 +93,7 @@ type ConfigT struct { ConnectionFlags ConnectionFlags `json:"flags"` Settings Settings `json:"settings"` UpdatedAt time.Time `json:"updatedAt"` + Credentials map[string]Credential `json:"credentials"` } func (c *ConfigT) SourcesMap() map[string]*SourceT { diff --git a/integration_test/transformer_contract/transformer_contract_test.go b/integration_test/transformer_contract/transformer_contract_test.go new file mode 100644 index 0000000000..7bcacdaa51 --- /dev/null +++ b/integration_test/transformer_contract/transformer_contract_test.go @@ -0,0 +1,313 @@ +package transformer_contract + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "path" + "strconv" + "testing" + "time" + + _ "github.com/marcboeker/go-duckdb" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/rudderlabs/rudder-go-kit/config" + kithttputil "github.com/rudderlabs/rudder-go-kit/httputil" + kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" + "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/processor/transformer" + "github.com/rudderlabs/rudder-server/runner" + "github.com/rudderlabs/rudder-server/testhelper/health" + "github.com/rudderlabs/rudder-server/testhelper/transformertest" +) + +func TestTransformerContract(t *testing.T) { + t.Run("User Transformer", func(t *testing.T) { + config.Reset() + defer config.Reset() + + workspaceConfig := backendconfig.ConfigT{ + WorkspaceID: "workspace-1", + Sources: []backendconfig.SourceT{ + { + ID: "source-1", + Name: "source-name-1", + SourceDefinition: backendconfig.SourceDefinitionT{ + ID: "source-def-1", + Name: "source-def-name-1", + Category: "source-def-category-1", + Type: "source-def-type-1", + }, + WriteKey: "writekey-1", + WorkspaceID: "workspace-1", + Enabled: true, + Destinations: []backendconfig.DestinationT{ + { + ID: "destination-1", + Name: "destination-name-1", + DestinationDefinition: backendconfig.DestinationDefinitionT{ + ID: "destination-def-1", + Name: "destination-def-name-1", + DisplayName: "destination-def-display-name-1", + }, + Enabled: true, + WorkspaceID: "workspace-1", + Transformations: []backendconfig.TransformationT{ + { + ID: "transformation-1", + VersionID: "version-1", + }, + }, + IsProcessorEnabled: true, + RevisionID: "revision-1", + }, + }, + DgSourceTrackingPlanConfig: backendconfig.DgSourceTrackingPlanConfigT{ + SourceId: "source-1", + SourceConfigVersion: 1, + Deleted: false, + TrackingPlan: backendconfig.TrackingPlanT{ + Id: "tracking-plan-1", + Version: 1, + }, + }, + }, + }, + Credentials: map[string]backendconfig.Credential{ + "credential-1": { + Key: "key-1", + Value: "value-1", + IsSecret: false, + }, + }, + } + + bcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/workspaceConfig": + response, _ := json.Marshal(workspaceConfig) + _, _ = w.Write(response) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + + trServer := transformertest.NewBuilder(). + WithUserTransformHandler( + func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) { + for i := range request { + req := request[i] + + require.Equal(t, req.Metadata.SourceID, "source-1") + require.Equal(t, req.Metadata.SourceName, "source-name-1") + require.Equal(t, req.Metadata.SourceType, "source-def-name-1") + require.Equal(t, req.Metadata.SourceCategory, "source-def-category-1") + require.Equal(t, req.Metadata.SourceDefinitionID, "source-def-1") + require.Equal(t, req.Metadata.WorkspaceID, "workspace-1") + require.Equal(t, req.Metadata.DestinationID, "destination-1") + require.Equal(t, req.Metadata.DestinationType, "destination-def-name-1") + require.Equal(t, req.Metadata.DestinationName, "destination-name-1") + require.Equal(t, req.Metadata.TransformationID, "transformation-1") + require.Equal(t, req.Metadata.TransformationVersionID, "version-1") + require.Equal(t, req.Metadata.EventType, "identify") + require.Equal(t, req.Credentials, []transformer.Credential{ + { + ID: "credential-1", + Key: "key-1", + Value: "value-1", + IsSecret: false, + }, + }) + response = append(response, transformer.TransformerResponse{ + Metadata: req.Metadata, + Output: req.Message, + StatusCode: http.StatusOK, + }) + } + return + }, + ). + Build() + defer trServer.Close() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + postgresContainer, err := postgres.Setup(pool, t) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gwPort, err := kithelper.GetFreePort() + require.NoError(t, err) + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + err := runRudderServer(t, ctx, gwPort, postgresContainer, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 12 + + err = sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, postgresContainer.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, postgresContainer.DB, "rt", jobsdb.Succeeded.State, eventsCount) + + cancel() + require.NoError(t, wg.Wait()) + }) + // TODO: Add tests for dest transformer and tracking plan validation + t.Run("Dest Transformer", func(t *testing.T) {}) + t.Run("Tracking Plan Validation", func(t *testing.T) {}) +} + +func runRudderServer( + t testing.TB, + ctx context.Context, + port int, + postgresContainer *postgres.Resource, + cbURL, transformerURL, + tmpDir string, +) (err error) { + t.Setenv("CONFIG_BACKEND_URL", cbURL) + t.Setenv("WORKSPACE_TOKEN", "token") + t.Setenv("DEST_TRANSFORM_URL", transformerURL) + + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), postgresContainer.Port) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), postgresContainer.User) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), postgresContainer.Database) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), postgresContainer.Password) + + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.mode"), "off") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DestinationDebugger.disableEventDeliveryStatusUploads"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "SourceDebugger.disableEventUploads"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "TransformationDebugger.disableTransformationStatusUploads"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "JobsDB.backup.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "JobsDB.migrateDSLoopSleepDuration"), "60m") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "archival.Enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.syncer.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.mainLoopFreq"), "1s") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.uploadFreq"), "1s") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(port)) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RUDDER_TMPDIR"), os.TempDir()) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(tmpDir, "/recovery_data.json")) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false") + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panicked: %v", r) + } + }() + r := runner.New(runner.ReleaseInfo{EnterpriseToken: "DUMMY"}) + c := r.Run(ctx, []string{"transformer-contract"}) + if c != 0 { + err = fmt.Errorf("rudder-server exited with a non-0 exit code: %d", c) + } + return +} + +// nolint: unparam, bodyclose +func sendEvents( + num int, + eventType, writeKey, + url string, +) error { + for i := 0; i < num; i++ { + payload := []byte(fmt.Sprintf(` + { + "batch": [ + { + "userId": %[1]q, + "type": %[2]q, + "context": { + "traits": { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + } + ] + }`, + rand.String(10), + eventType, + )) + req, err := http.NewRequest(http.MethodPost, url+"/v1/batch", bytes.NewReader(payload)) + if err != nil { + return err + } + req.SetBasicAuth(writeKey, "password") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b)) + } + kithttputil.CloseResponse(resp) + } + return nil +} + +// nolint: unparam +func requireJobsCount( + t *testing.T, + ctx context.Context, + db *sql.DB, + queue, state string, + expectedCount int, +) { + t.Helper() + + query := fmt.Sprintf(` + SELECT + count(*) + FROM + unionjobsdbmetadata('%s', 1) + WHERE + job_state = '%s'; + `, + queue, + state, + ) + require.Eventuallyf(t, func() bool { + var jobsCount int + require.NoError(t, db.QueryRowContext(ctx, query).Scan(&jobsCount)) + t.Logf("%s %sJobCount: %d", queue, state, jobsCount) + return jobsCount == expectedCount + }, + 30*time.Second, + 1*time.Second, + "%d %s events should be in %s state", expectedCount, queue, state, + ) +} diff --git a/processor/processor.go b/processor/processor.go index 6ec1b148f7..5e8f6dc36c 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -144,6 +144,7 @@ type Handle struct { eventSchemaV2Enabled bool archivalEnabled config.ValueLoader[bool] eventAuditEnabled map[string]bool + credentialsMap map[string][]transformer.Credential } drainConfig struct { @@ -805,6 +806,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { sourceIdDestinationMap = make(map[string][]backendconfig.DestinationT) sourceIdSourceMap = map[string]backendconfig.SourceT{} eventAuditEnabled = make(map[string]bool) + credentialsMap = make(map[string][]transformer.Credential) ) for workspaceID, wConfig := range config { for i := range wConfig.Sources { @@ -827,6 +829,14 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { } workspaceLibrariesMap[workspaceID] = wConfig.Libraries eventAuditEnabled[workspaceID] = wConfig.Settings.EventAuditEnabled + credentialsMap[workspaceID] = lo.MapToSlice(wConfig.Credentials, func(key string, value backendconfig.Credential) transformer.Credential { + return transformer.Credential{ + ID: key, + Key: value.Key, + Value: value.Value, + IsSecret: value.IsSecret, + } + }) } proc.config.configSubscriberLock.Lock() proc.config.oneTrustConsentCategoriesMap = oneTrustConsentCategoriesMap @@ -836,6 +846,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { proc.config.sourceIdDestinationMap = sourceIdDestinationMap proc.config.sourceIdSourceMap = sourceIdSourceMap proc.config.eventAuditEnabled = eventAuditEnabled + proc.config.credentialsMap = credentialsMap proc.config.configSubscriberLock.Unlock() if !initDone { initDone = true @@ -1106,6 +1117,7 @@ func (proc *Handle) getTransformerEvents( Message: userTransformedEvent.Output, Metadata: *eventMetadata, Destination: *destination, + Credentials: proc.config.credentialsMap[commonMetaData.WorkspaceID], } eventsToTransform = append(eventsToTransform, updatedEvent) } @@ -1962,6 +1974,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf shallowEventCopy.Metadata.TransformationID = destination.Transformations[0].ID shallowEventCopy.Metadata.TransformationVersionID = destination.Transformations[0].VersionID } + shallowEventCopy.Credentials = proc.config.credentialsMap[destination.WorkspaceID] filterConfig(&shallowEventCopy) metadata := shallowEventCopy.Metadata srcAndDestKey := getKeyFromSourceAndDest(metadata.SourceID, metadata.DestinationID) diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index e9843af23e..d3d3e2450d 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -96,6 +96,14 @@ type TransformerEvent struct { Metadata Metadata `json:"metadata"` Destination backendconfig.DestinationT `json:"destination"` Libraries []backendconfig.LibraryT `json:"libraries"` + Credentials []Credential `json:"credentials"` +} + +type Credential struct { + ID string `json:"id"` + Key string `json:"key"` + Value string `json:"value"` + IsSecret bool `json:"isSecret"` } func isJobTerminated(status int) bool { diff --git a/processor/transformer/transformer_test.go b/processor/transformer/transformer_test.go index 0ee8db63d3..77c6a4eba0 100644 --- a/processor/transformer/transformer_test.go +++ b/processor/transformer/transformer_test.go @@ -203,6 +203,14 @@ func TestTransformer(t *testing.T) { "src-key-1": msgID, "forceStatusCode": statusCode, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, } tResp := TransformerResponse{ @@ -238,6 +246,14 @@ func TestTransformer(t *testing.T) { Message: map[string]interface{}{ "src-key-1": msgID, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, }) testCases := []struct { @@ -347,6 +363,14 @@ func TestTransformer(t *testing.T) { Message: map[string]interface{}{ "src-key-1": msgID, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, }) elt := &endlessLoopTransformer{ @@ -404,6 +428,14 @@ func TestTransformer(t *testing.T) { }, }, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, }) testCases := []struct { @@ -529,6 +561,14 @@ func TestTransformer(t *testing.T) { }, }, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, }) testCases := []struct { @@ -637,6 +677,14 @@ func TestTransformer(t *testing.T) { }, }, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, }) t.Run("Destination transformations", func(t *testing.T) { @@ -712,6 +760,14 @@ func TestTransformer(t *testing.T) { }, }, }, + Credentials: []Credential{ + { + ID: "test-credential", + Key: "test-key", + Value: "test-value", + IsSecret: false, + }, + }, }) rsp := tr.Transform(context.TODO(), events, 10) diff --git a/schema-forwarder/internal/testdata/configdata.go b/schema-forwarder/internal/testdata/configdata.go index a461abe05c..b468695705 100644 --- a/schema-forwarder/internal/testdata/configdata.go +++ b/schema-forwarder/internal/testdata/configdata.go @@ -189,4 +189,16 @@ var SampleBackendConfig = backendconfig.ConfigT{ }, EventAuditEnabled: false, }, + Credentials: map[string]backendconfig.Credential{ + "cred1": { + Key: "key1", + Value: "value1", + IsSecret: false, + }, + "cred2": { + Key: "key2", + Value: "value2", + IsSecret: true, + }, + }, }