Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nexus SignalWorkflowOperation #1770

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (
// the interceptor package for more details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.Interceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.Interceptor]
type Interceptor interface {
ClientInterceptor
WorkerInterceptor
Expand All @@ -50,8 +48,6 @@ type Interceptor interface {
// documentation in the interceptor package for more details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor]
type WorkerInterceptor interface {
// InterceptActivity is called before each activity interception needed with
// the next interceptor in the chain.
Expand All @@ -69,8 +65,6 @@ type WorkerInterceptor interface {
// details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor]
type ActivityInboundInterceptor interface {
// Init is the first call of this interceptor. Implementations can change/wrap
// the outbound interceptor before calling Init on the next interceptor.
Expand All @@ -86,8 +80,6 @@ type ActivityInboundInterceptor interface {
// ExecuteActivityInput is the input to ActivityInboundInterceptor.ExecuteActivity.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput]
type ExecuteActivityInput struct {
Args []interface{}
}
Expand All @@ -97,8 +89,6 @@ type ExecuteActivityInput struct {
// more details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor]
type ActivityOutboundInterceptor interface {
// GetInfo intercepts activity.GetInfo.
GetInfo(ctx context.Context) ActivityInfo
Expand Down Expand Up @@ -129,8 +119,6 @@ type ActivityOutboundInterceptor interface {
// details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor]
type WorkflowInboundInterceptor interface {
// Init is the first call of this interceptor. Implementations can change/wrap
// the outbound interceptor before calling Init on the next interceptor.
Expand Down Expand Up @@ -168,17 +156,13 @@ type WorkflowInboundInterceptor interface {
// WorkflowInboundInterceptor.ExecuteWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput]
type ExecuteWorkflowInput struct {
Args []interface{}
}

// HandleSignalInput is the input to WorkflowInboundInterceptor.HandleSignal.
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput]
type HandleSignalInput struct {
SignalName string
// Arg is the signal argument. It is presented as a primitive payload since
Expand All @@ -189,8 +173,6 @@ type HandleSignalInput struct {
// UpdateInput carries the name and arguments of a workflow update invocation.
//
// Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput]
type UpdateInput struct {
Name string
Args []interface{}
Expand All @@ -199,8 +181,6 @@ type UpdateInput struct {
// HandleQueryInput is the input to WorkflowInboundInterceptor.HandleQuery.
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput]
type HandleQueryInput struct {
QueryType string
Args []interface{}
Expand All @@ -211,8 +191,6 @@ type HandleQueryInput struct {
// NOTE: Experimental
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput]
type ExecuteNexusOperationInput struct {
// Client to start the operation with.
Client NexusClient
Expand All @@ -231,8 +209,6 @@ type ExecuteNexusOperationInput struct {
// NOTE: Experimental
//
// Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput]
type RequestCancelNexusOperationInput struct {
// Client that was used to start the operation.
Client NexusClient
Expand All @@ -249,8 +225,6 @@ type RequestCancelNexusOperationInput struct {
// more details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor]
type WorkflowOutboundInterceptor interface {
// Go intercepts workflow.Go.
Go(ctx Context, name string, f func(ctx Context)) Context
Expand Down Expand Up @@ -396,8 +370,6 @@ type WorkflowOutboundInterceptor interface {
// interceptor package for more details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor]
type ClientInterceptor interface {
// This is called on client creation if set via client options
InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor
Expand All @@ -410,8 +382,6 @@ type ClientInterceptor interface {
// more details.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor]
type ClientOutboundInterceptor interface {
// ExecuteWorkflow intercepts client.Client.ExecuteWorkflow.
// interceptor.Header will return a non-nil map for this context.
Expand Down Expand Up @@ -494,8 +464,6 @@ type ClientPollWorkflowUpdateOutput struct {
// ClientOutboundInterceptor.CreateSchedule.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput]
type ScheduleClientCreateInput struct {
Options *ScheduleOptions
}
Expand All @@ -504,8 +472,6 @@ type ScheduleClientCreateInput struct {
// ClientOutboundInterceptor.ExecuteWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput]
type ClientExecuteWorkflowInput struct {
Options *StartWorkflowOptions
WorkflowType string
Expand All @@ -516,8 +482,6 @@ type ClientExecuteWorkflowInput struct {
// ClientOutboundInterceptor.SignalWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput]
type ClientSignalWorkflowInput struct {
WorkflowID string
RunID string
Expand All @@ -529,8 +493,6 @@ type ClientSignalWorkflowInput struct {
// ClientOutboundInterceptor.SignalWithStartWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput]
type ClientSignalWithStartWorkflowInput struct {
SignalName string
SignalArg interface{}
Expand All @@ -543,8 +505,6 @@ type ClientSignalWithStartWorkflowInput struct {
// ClientOutboundInterceptor.CancelWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput]
type ClientCancelWorkflowInput struct {
WorkflowID string
RunID string
Expand All @@ -554,8 +514,6 @@ type ClientCancelWorkflowInput struct {
// ClientOutboundInterceptor.TerminateWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput]
type ClientTerminateWorkflowInput struct {
WorkflowID string
RunID string
Expand All @@ -567,8 +525,6 @@ type ClientTerminateWorkflowInput struct {
// ClientOutboundInterceptor.QueryWorkflow.
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput]
type ClientQueryWorkflowInput struct {
WorkflowID string
RunID string
Expand Down
10 changes: 9 additions & 1 deletion internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1972,9 +1972,10 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie
return err
}

links, _ := ctx.Value(NexusOperationLinksKey).([]*commonpb.Link)

request := &workflowservice.SignalWorkflowExecutionRequest{
Namespace: w.client.namespace,
RequestId: uuid.New(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: in.WorkflowID,
RunId: in.RunID,
Expand All @@ -1983,6 +1984,13 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie
Input: input,
Identity: w.client.identity,
Header: header,
Links: links,
}

if requestID, ok := ctx.Value(NexusOperationRequestIDKey).(string); ok && requestID != "" {
request.RequestId = requestID
} else {
request.RequestId = uuid.New()
}

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
Expand Down
20 changes: 14 additions & 6 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ import (

// NexusOperationContext is an internal only struct that holds fields used by the temporalnexus functions.
type NexusOperationContext struct {
Client Client
Namespace string
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
registry *registry
Client Client
Namespace string
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
registry *registry
}

func (nc *NexusOperationContext) ResolveWorkflowName(wf any) (string, error) {
Expand All @@ -66,6 +66,14 @@ type isWorkflowRunOpContextKeyType struct{}
// panic as we don't want to expose a partial client to sync operations.
var IsWorkflowRunOpContextKey = isWorkflowRunOpContextKeyType{}

type nexusOperationRequestIDKeyType struct{}

var NexusOperationRequestIDKey = nexusOperationRequestIDKeyType{}

type nexusOperationLinksKeyType struct{}

var NexusOperationLinksKey = nexusOperationLinksKeyType{}

// NexusOperationContextFromGoContext gets the [NexusOperationContext] associated with the given [context.Context].
func NexusOperationContextFromGoContext(ctx context.Context) (nctx *NexusOperationContext, ok bool) {
nctx, ok = ctx.Value(nexusOperationContextKey).(*NexusOperationContext)
Expand Down
67 changes: 53 additions & 14 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/internal/common/metrics"
Expand Down Expand Up @@ -94,6 +93,38 @@ func NewSyncOperation[I any, O any](
}
}

// SignalWorkflowInput is the input to a NewSignalWorkflowOperation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the docstring. It's not the input to the operation.

type SignalWorkflowInput struct {
WorkflowID string
RunID string
SignalName string
Arg any
Comment on lines +98 to +101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document all of the public API fields.

}

// NewSignalWorkflowOperation is a helper for creating a synchronous nexus.Operation to deliver a signal.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also call out that this ensures idempotency via the request ID mechanism (to some degree - violated if the workflow spans multiple runs) and the bidi linking is also important to call out (for now it's only uni-directional but we'll fix that).

// NOTE: Experimental
func NewSignalWorkflowOperation[T any](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewSignalWorkflowOperation[T any](
func NewWorkflowSignalOperation[T any](

name string,
getSignalInput func(context.Context, T, nexus.StartOperationOptions) SignalWorkflowInput,
) nexus.Operation[T, nexus.NoValue] {
return NewSyncOperation(name, func(ctx context.Context, c client.Client, in T, options nexus.StartOperationOptions) (nexus.NoValue, error) {
signalInput := getSignalInput(ctx, in, options)

if options.RequestID != "" {
ctx = context.WithValue(ctx, internal.NexusOperationRequestIDKey, options.RequestID)
}

links, err := convertNexusLinks(options.Links, GetLogger(ctx))
if err != nil {
return nil, err
}
ctx = context.WithValue(ctx, internal.NexusOperationLinksKey, links)

return nil, c.SignalWorkflow(ctx, signalInput.WorkflowID, signalInput.RunID, signalInput.SignalName, signalInput.Arg)
})
}

func (o *syncOperation[I, O]) Name() string {
return o.name
}
Expand Down Expand Up @@ -360,8 +391,26 @@ func ExecuteUntypedWorkflow[R any](
})
}

links, err := convertNexusLinks(nexusOptions.Links, nctx.Log)
if err != nil {
return nil, err
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)

run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
return nil, err
}
return workflowHandle[R]{
namespace: nctx.Namespace,
id: run.GetID(),
runID: run.GetRunID(),
}, nil
}

func convertNexusLinks(nexusLinks []nexus.Link, log log.Logger) ([]*common.Link, error) {
var links []*common.Link
for _, nexusLink := range nexusOptions.Links {
for _, nexusLink := range nexusLinks {
switch nexusLink.Type {
case string((&common.Link_WorkflowEvent{}).ProtoReflect().Descriptor().FullName()):
link, err := ConvertNexusLinkToLinkWorkflowEvent(nexusLink)
Expand All @@ -374,18 +423,8 @@ func ExecuteUntypedWorkflow[R any](
},
})
default:
nctx.Log.Warn("ignoring unsupported link data type: %q", nexusLink.Type)
log.Warn("ignoring unsupported link data type: %q", nexusLink.Type)
}
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)

run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
return nil, err
}
return workflowHandle[R]{
namespace: nctx.Namespace,
id: run.GetID(),
runID: run.GetRunID(),
}, nil
return links, nil
}
Loading
Loading