diff --git a/internal/interceptor.go b/internal/interceptor.go index cea0d8acc..9b0cc8c7f 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -39,8 +39,6 @@ import ( // the interceptor package for more details. // // Exposed as: [go.temporal.io/sdk/interceptor.Interceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.Interceptor] type Interceptor interface { ClientInterceptor WorkerInterceptor @@ -50,8 +48,6 @@ type Interceptor interface { // documentation in the interceptor package for more details. // // Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor] type WorkerInterceptor interface { // InterceptActivity is called before each activity interception needed with // the next interceptor in the chain. @@ -69,8 +65,6 @@ type WorkerInterceptor interface { // details. // // Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor] type ActivityInboundInterceptor interface { // Init is the first call of this interceptor. Implementations can change/wrap // the outbound interceptor before calling Init on the next interceptor. @@ -86,8 +80,6 @@ type ActivityInboundInterceptor interface { // ExecuteActivityInput is the input to ActivityInboundInterceptor.ExecuteActivity. // // Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput] type ExecuteActivityInput struct { Args []interface{} } @@ -97,8 +89,6 @@ type ExecuteActivityInput struct { // more details. // // Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor] type ActivityOutboundInterceptor interface { // GetInfo intercepts activity.GetInfo. GetInfo(ctx context.Context) ActivityInfo @@ -129,8 +119,6 @@ type ActivityOutboundInterceptor interface { // details. // // Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor] type WorkflowInboundInterceptor interface { // Init is the first call of this interceptor. Implementations can change/wrap // the outbound interceptor before calling Init on the next interceptor. @@ -168,8 +156,6 @@ type WorkflowInboundInterceptor interface { // WorkflowInboundInterceptor.ExecuteWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput] type ExecuteWorkflowInput struct { Args []interface{} } @@ -177,8 +163,6 @@ type ExecuteWorkflowInput struct { // HandleSignalInput is the input to WorkflowInboundInterceptor.HandleSignal. // // Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput] type HandleSignalInput struct { SignalName string // Arg is the signal argument. It is presented as a primitive payload since @@ -189,8 +173,6 @@ type HandleSignalInput struct { // UpdateInput carries the name and arguments of a workflow update invocation. // // Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput] type UpdateInput struct { Name string Args []interface{} @@ -199,8 +181,6 @@ type UpdateInput struct { // HandleQueryInput is the input to WorkflowInboundInterceptor.HandleQuery. // // Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput] type HandleQueryInput struct { QueryType string Args []interface{} @@ -211,8 +191,6 @@ type HandleQueryInput struct { // NOTE: Experimental // // Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput] type ExecuteNexusOperationInput struct { // Client to start the operation with. Client NexusClient @@ -231,8 +209,6 @@ type ExecuteNexusOperationInput struct { // NOTE: Experimental // // Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput] type RequestCancelNexusOperationInput struct { // Client that was used to start the operation. Client NexusClient @@ -249,8 +225,6 @@ type RequestCancelNexusOperationInput struct { // more details. // // Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor] type WorkflowOutboundInterceptor interface { // Go intercepts workflow.Go. Go(ctx Context, name string, f func(ctx Context)) Context @@ -396,8 +370,6 @@ type WorkflowOutboundInterceptor interface { // interceptor package for more details. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor] type ClientInterceptor interface { // This is called on client creation if set via client options InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor @@ -410,8 +382,6 @@ type ClientInterceptor interface { // more details. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor] type ClientOutboundInterceptor interface { // ExecuteWorkflow intercepts client.Client.ExecuteWorkflow. // interceptor.Header will return a non-nil map for this context. @@ -494,8 +464,6 @@ type ClientPollWorkflowUpdateOutput struct { // ClientOutboundInterceptor.CreateSchedule. // // Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput] type ScheduleClientCreateInput struct { Options *ScheduleOptions } @@ -504,8 +472,6 @@ type ScheduleClientCreateInput struct { // ClientOutboundInterceptor.ExecuteWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput] type ClientExecuteWorkflowInput struct { Options *StartWorkflowOptions WorkflowType string @@ -516,8 +482,6 @@ type ClientExecuteWorkflowInput struct { // ClientOutboundInterceptor.SignalWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput] type ClientSignalWorkflowInput struct { WorkflowID string RunID string @@ -529,8 +493,6 @@ type ClientSignalWorkflowInput struct { // ClientOutboundInterceptor.SignalWithStartWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput] type ClientSignalWithStartWorkflowInput struct { SignalName string SignalArg interface{} @@ -543,8 +505,6 @@ type ClientSignalWithStartWorkflowInput struct { // ClientOutboundInterceptor.CancelWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput] type ClientCancelWorkflowInput struct { WorkflowID string RunID string @@ -554,8 +514,6 @@ type ClientCancelWorkflowInput struct { // ClientOutboundInterceptor.TerminateWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput] type ClientTerminateWorkflowInput struct { WorkflowID string RunID string @@ -567,8 +525,6 @@ type ClientTerminateWorkflowInput struct { // ClientOutboundInterceptor.QueryWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput] -// -// Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput] type ClientQueryWorkflowInput struct { WorkflowID string RunID string diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index e38233e34..1c9759905 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1972,9 +1972,10 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie return err } + links, _ := ctx.Value(NexusOperationLinksKey).([]*commonpb.Link) + request := &workflowservice.SignalWorkflowExecutionRequest{ Namespace: w.client.namespace, - RequestId: uuid.New(), WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: in.WorkflowID, RunId: in.RunID, @@ -1983,6 +1984,13 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie Input: input, Identity: w.client.identity, Header: header, + Links: links, + } + + if requestID, ok := ctx.Value(NexusOperationRequestIDKey).(string); ok && requestID != "" { + request.RequestId = requestID + } else { + request.RequestId = uuid.New() } grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index a55403c05..b0f559533 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -42,12 +42,12 @@ import ( // NexusOperationContext is an internal only struct that holds fields used by the temporalnexus functions. type NexusOperationContext struct { - Client Client - Namespace string - TaskQueue string - MetricsHandler metrics.Handler - Log log.Logger - registry *registry + Client Client + Namespace string + TaskQueue string + MetricsHandler metrics.Handler + Log log.Logger + registry *registry } func (nc *NexusOperationContext) ResolveWorkflowName(wf any) (string, error) { @@ -66,6 +66,14 @@ type isWorkflowRunOpContextKeyType struct{} // panic as we don't want to expose a partial client to sync operations. var IsWorkflowRunOpContextKey = isWorkflowRunOpContextKeyType{} +type nexusOperationRequestIDKeyType struct{} + +var NexusOperationRequestIDKey = nexusOperationRequestIDKeyType{} + +type nexusOperationLinksKeyType struct{} + +var NexusOperationLinksKey = nexusOperationLinksKeyType{} + // NexusOperationContextFromGoContext gets the [NexusOperationContext] associated with the given [context.Context]. func NexusOperationContextFromGoContext(ctx context.Context) (nctx *NexusOperationContext, ok bool) { nctx, ok = ctx.Value(nexusOperationContextKey).(*NexusOperationContext) diff --git a/temporalnexus/operation.go b/temporalnexus/operation.go index 58b4b8b4e..6649d6ae8 100644 --- a/temporalnexus/operation.go +++ b/temporalnexus/operation.go @@ -45,7 +45,6 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/internal" "go.temporal.io/sdk/internal/common/metrics" @@ -94,6 +93,38 @@ func NewSyncOperation[I any, O any]( } } +// SignalWorkflowInput is the input to a NewSignalWorkflowOperation. +type SignalWorkflowInput struct { + WorkflowID string + RunID string + SignalName string + Arg any +} + +// NewSignalWorkflowOperation is a helper for creating a synchronous nexus.Operation to deliver a signal. +// +// NOTE: Experimental +func NewSignalWorkflowOperation[T any]( + name string, + getSignalInput func(context.Context, T, nexus.StartOperationOptions) SignalWorkflowInput, +) nexus.Operation[T, nexus.NoValue] { + return NewSyncOperation(name, func(ctx context.Context, c client.Client, in T, options nexus.StartOperationOptions) (nexus.NoValue, error) { + signalInput := getSignalInput(ctx, in, options) + + if options.RequestID != "" { + ctx = context.WithValue(ctx, internal.NexusOperationRequestIDKey, options.RequestID) + } + + links, err := convertNexusLinks(options.Links, GetLogger(ctx)) + if err != nil { + return nil, err + } + ctx = context.WithValue(ctx, internal.NexusOperationLinksKey, links) + + return nil, c.SignalWorkflow(ctx, signalInput.WorkflowID, signalInput.RunID, signalInput.SignalName, signalInput.Arg) + }) +} + func (o *syncOperation[I, O]) Name() string { return o.name } @@ -360,8 +391,26 @@ func ExecuteUntypedWorkflow[R any]( }) } + links, err := convertNexusLinks(nexusOptions.Links, nctx.Log) + if err != nil { + return nil, err + } + internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links) + + run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...) + if err != nil { + return nil, err + } + return workflowHandle[R]{ + namespace: nctx.Namespace, + id: run.GetID(), + runID: run.GetRunID(), + }, nil +} + +func convertNexusLinks(nexusLinks []nexus.Link, log log.Logger) ([]*common.Link, error) { var links []*common.Link - for _, nexusLink := range nexusOptions.Links { + for _, nexusLink := range nexusLinks { switch nexusLink.Type { case string((&common.Link_WorkflowEvent{}).ProtoReflect().Descriptor().FullName()): link, err := ConvertNexusLinkToLinkWorkflowEvent(nexusLink) @@ -374,18 +423,8 @@ func ExecuteUntypedWorkflow[R any]( }, }) default: - nctx.Log.Warn("ignoring unsupported link data type: %q", nexusLink.Type) + log.Warn("ignoring unsupported link data type: %q", nexusLink.Type) } } - internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links) - - run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...) - if err != nil { - return nil, err - } - return workflowHandle[R]{ - namespace: nctx.Namespace, - id: run.GetID(), - runID: run.GetRunID(), - }, nil + return links, nil } diff --git a/test/nexus_test.go b/test/nexus_test.go index 12dcb0f1e..114dada79 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -205,6 +205,13 @@ func waitForCancelWorkflow(ctx workflow.Context, ownID string) (string, error) { return "", workflow.Await(ctx, func() bool { return false }) } +func waitForSignalWorkflow(ctx workflow.Context, _ string) (string, error) { + ch := workflow.GetSignalChannel(ctx, "nexus-signal") + var val string + ch.Receive(ctx, &val) + return val, ctx.Err() +} + var workflowOp = temporalnexus.NewWorkflowRunOperation( "workflow-op", waitForCancelWorkflow, @@ -550,6 +557,121 @@ func TestSyncOperationFromWorkflow(t *testing.T) { }) } +func TestSignalOperationFromWorkflow(t *testing.T) { + receiverID := "nexus-signal-receiver-" + uuid.NewString() + + op := temporalnexus.NewSignalWorkflowOperation("signal-operation", func(_ context.Context, input string, _ nexus.StartOperationOptions) temporalnexus.SignalWorkflowInput { + return temporalnexus.SignalWorkflowInput{ + WorkflowID: receiverID, + SignalName: "nexus-signal", + Arg: input, + } + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + tc := newTestContext(t, ctx) + + senderWF := func(ctx workflow.Context) error { + c := workflow.NewNexusClient(tc.endpoint, "test") + fut := c.ExecuteOperation(ctx, op, "nexus", workflow.NexusOperationOptions{}) + + var exec workflow.NexusOperationExecution + if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil { + return fmt.Errorf("expected start to succeed: %w", err) + } + if exec.OperationID != "" { + return fmt.Errorf("expected empty operation ID") + } + + return fut.Get(ctx, nil) + } + + w := worker.New(tc.client, tc.taskQueue, worker.Options{}) + service := nexus.NewService("test") + require.NoError(t, service.Register(op)) + w.RegisterNexusService(service) + w.RegisterWorkflow(waitForSignalWorkflow) + w.RegisterWorkflow(senderWF) + require.NoError(t, w.Start()) + t.Cleanup(w.Stop) + + receiver, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: receiverID, + TaskQueue: tc.taskQueue, + // The endpoint registry may take a bit to propagate to the history service, use a shorter workflow task + // timeout to speed up the attempts. + WorkflowTaskTimeout: time.Second, + }, waitForSignalWorkflow, "successful") + require.NoError(t, err) + + sender, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + TaskQueue: tc.taskQueue, + // The endpoint registry may take a bit to propagate to the history service, use a shorter workflow task + // timeout to speed up the attempts. + WorkflowTaskTimeout: time.Second, + }, senderWF) + require.NoError(t, err) + require.NoError(t, sender.Get(ctx, nil)) + + iter := tc.client.GetWorkflowHistory( + ctx, + sender.GetID(), + sender.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + var nexusOperationScheduleEventID int64 + var targetEvent *historypb.HistoryEvent + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED { + nexusOperationScheduleEventID = event.GetEventId() + require.NotEmpty(t, event.GetNexusOperationScheduledEventAttributes().GetRequestId()) + break + } + } + // TODO(pj): sync operations currently do not support attaching links. add assertions here once they do. + + var out string + require.NoError(t, receiver.Get(ctx, &out)) + require.Equal(t, "nexus", out) + + iter = tc.client.GetWorkflowHistory( + ctx, + receiver.GetID(), + receiver.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + targetEvent = event + break + } + } + require.NotNil(t, targetEvent) + require.NotNil(t, targetEvent.GetWorkflowExecutionSignaledEventAttributes()) + require.Len(t, targetEvent.GetLinks(), 1) + require.True(t, proto.Equal( + &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: sender.GetID(), + RunId: sender.GetRunID(), + Reference: &common.Link_WorkflowEvent_EventRef{ + EventRef: &common.Link_WorkflowEvent_EventReference{ + EventId: nexusOperationScheduleEventID, + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }, + }, + targetEvent.GetLinks()[0].GetWorkflowEvent(), + )) +} + func TestAsyncOperationFromWorkflow(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel()