Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nexus SignalWorkflowOperation #1770

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
// Exposed as: [go.temporal.io/sdk/interceptor.Interceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.Interceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.Interceptor]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... how did this happen? @Quinn-With-Two-Ns looks like this is the third time this got repeated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally aliased temporalnexus.SignalWorkflowInput to this type and added it to the comment but then undid that and aliased it to the internal type. The automated tools complained about the checks until I ran with -fix which I guess just added another line again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just went ahead and manually removed all the duplicates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuandrew FYI might be a bug in your alias tool

type Interceptor interface {
ClientInterceptor
WorkerInterceptor
Expand All @@ -52,6 +54,8 @@ type Interceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkerInterceptor]
type WorkerInterceptor interface {
// InterceptActivity is called before each activity interception needed with
// the next interceptor in the chain.
Expand All @@ -71,6 +75,8 @@ type WorkerInterceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityInboundInterceptor]
type ActivityInboundInterceptor interface {
// Init is the first call of this interceptor. Implementations can change/wrap
// the outbound interceptor before calling Init on the next interceptor.
Expand All @@ -88,6 +94,8 @@ type ActivityInboundInterceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteActivityInput]
type ExecuteActivityInput struct {
Args []interface{}
}
Expand All @@ -99,6 +107,8 @@ type ExecuteActivityInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ActivityOutboundInterceptor]
type ActivityOutboundInterceptor interface {
// GetInfo intercepts activity.GetInfo.
GetInfo(ctx context.Context) ActivityInfo
Expand Down Expand Up @@ -131,6 +141,8 @@ type ActivityOutboundInterceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowInboundInterceptor]
type WorkflowInboundInterceptor interface {
// Init is the first call of this interceptor. Implementations can change/wrap
// the outbound interceptor before calling Init on the next interceptor.
Expand Down Expand Up @@ -170,6 +182,8 @@ type WorkflowInboundInterceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteWorkflowInput]
type ExecuteWorkflowInput struct {
Args []interface{}
}
Expand All @@ -179,6 +193,8 @@ type ExecuteWorkflowInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleSignalInput]
type HandleSignalInput struct {
SignalName string
// Arg is the signal argument. It is presented as a primitive payload since
Expand All @@ -191,6 +207,8 @@ type HandleSignalInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.UpdateInput]
type UpdateInput struct {
Name string
Args []interface{}
Expand All @@ -201,6 +219,8 @@ type UpdateInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.HandleQueryInput]
type HandleQueryInput struct {
QueryType string
Args []interface{}
Expand All @@ -213,6 +233,8 @@ type HandleQueryInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ExecuteNexusOperationInput]
type ExecuteNexusOperationInput struct {
// Client to start the operation with.
Client NexusClient
Expand All @@ -233,6 +255,8 @@ type ExecuteNexusOperationInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.RequestCancelNexusOperationInput]
type RequestCancelNexusOperationInput struct {
// Client that was used to start the operation.
Client NexusClient
Expand All @@ -251,6 +275,8 @@ type RequestCancelNexusOperationInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.WorkflowOutboundInterceptor]
type WorkflowOutboundInterceptor interface {
// Go intercepts workflow.Go.
Go(ctx Context, name string, f func(ctx Context)) Context
Expand Down Expand Up @@ -398,6 +424,8 @@ type WorkflowOutboundInterceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientInterceptor]
type ClientInterceptor interface {
// This is called on client creation if set via client options
InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor
Expand All @@ -412,6 +440,8 @@ type ClientInterceptor interface {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientOutboundInterceptor]
type ClientOutboundInterceptor interface {
// ExecuteWorkflow intercepts client.Client.ExecuteWorkflow.
// interceptor.Header will return a non-nil map for this context.
Expand Down Expand Up @@ -459,6 +489,8 @@ type ClientOutboundInterceptor interface {
// ClientOutboundInterceptor.UpdateWorkflow
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientUpdateWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientUpdateWorkflowInput]
type ClientUpdateWorkflowInput struct {
UpdateID string
WorkflowID string
Expand All @@ -469,6 +501,8 @@ type ClientUpdateWorkflowInput struct {
WaitForStage WorkflowUpdateStage
}

// Exposed as: [go.temporal.io/sdk/interceptor.ClientUpdateWithStartWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientUpdateWithStartWorkflowInput]
type ClientUpdateWithStartWorkflowInput struct {
UpdateOptions *UpdateWorkflowOptions
Expand Down Expand Up @@ -496,6 +530,8 @@ type ClientPollWorkflowUpdateOutput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ScheduleClientCreateInput]
type ScheduleClientCreateInput struct {
Options *ScheduleOptions
}
Expand All @@ -506,6 +542,8 @@ type ScheduleClientCreateInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteWorkflowInput]
type ClientExecuteWorkflowInput struct {
Options *StartWorkflowOptions
WorkflowType string
Expand All @@ -518,6 +556,8 @@ type ClientExecuteWorkflowInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWorkflowInput][go.temporal.io/sdk/temporalnexus.SignalWorkflowInput]
type ClientSignalWorkflowInput struct {
WorkflowID string
RunID string
Expand All @@ -531,6 +571,8 @@ type ClientSignalWorkflowInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientSignalWithStartWorkflowInput]
type ClientSignalWithStartWorkflowInput struct {
SignalName string
SignalArg interface{}
Expand All @@ -545,6 +587,8 @@ type ClientSignalWithStartWorkflowInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelWorkflowInput]
type ClientCancelWorkflowInput struct {
WorkflowID string
RunID string
Expand All @@ -556,6 +600,8 @@ type ClientCancelWorkflowInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateWorkflowInput]
type ClientTerminateWorkflowInput struct {
WorkflowID string
RunID string
Expand All @@ -569,6 +615,8 @@ type ClientTerminateWorkflowInput struct {
// Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput]
//
// Exposed as: [go.temporal.io/sdk/interceptor.ClientQueryWorkflowInput]
type ClientQueryWorkflowInput struct {
WorkflowID string
RunID string
Expand Down
10 changes: 9 additions & 1 deletion internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1972,9 +1972,10 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie
return err
}

links, _ := ctx.Value(NexusOperationRequestIDKey).([]*commonpb.Link)
pdoerner marked this conversation as resolved.
Show resolved Hide resolved

request := &workflowservice.SignalWorkflowExecutionRequest{
Namespace: w.client.namespace,
RequestId: uuid.New(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: in.WorkflowID,
RunId: in.RunID,
Expand All @@ -1983,6 +1984,13 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie
Input: input,
Identity: w.client.identity,
Header: header,
Links: links,
}

if requestID, ok := ctx.Value(NexusOperationRequestIDKey).(string); ok && requestID != "" {
request.RequestId = requestID
} else {
request.RequestId = uuid.New()
}

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
Expand Down
20 changes: 14 additions & 6 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ import (

// NexusOperationContext is an internal only struct that holds fields used by the temporalnexus functions.
type NexusOperationContext struct {
Client Client
Namespace string
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
registry *registry
Client Client
Namespace string
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
registry *registry
}

func (nc *NexusOperationContext) ResolveWorkflowName(wf any) (string, error) {
Expand All @@ -66,6 +66,14 @@ type isWorkflowRunOpContextKeyType struct{}
// panic as we don't want to expose a partial client to sync operations.
var IsWorkflowRunOpContextKey = isWorkflowRunOpContextKeyType{}

type nexusOperationRequestIDKeyType struct{}

var NexusOperationRequestIDKey = nexusOperationRequestIDKeyType{}

type nexusOperationLinksKeyType struct{}

var NexusOperationLinksKey = nexusOperationLinksKeyType{}

// NexusOperationContextFromGoContext gets the [NexusOperationContext] associated with the given [context.Context].
func NexusOperationContextFromGoContext(ctx context.Context) (nctx *NexusOperationContext, ok bool) {
nctx, ok = ctx.Value(nexusOperationContextKey).(*NexusOperationContext)
Expand Down
57 changes: 43 additions & 14 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/internal/common/metrics"
Expand Down Expand Up @@ -94,6 +93,28 @@ func NewSyncOperation[I any, O any](
}
}

// SignalWorkflowInput is the input to a NewSignalWorkflowOperation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the docstring. It's not the input to the operation.

type SignalWorkflowInput = internal.ClientSignalWorkflowInput
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't reuse the type from the interceptors, you want a type that contains the workflow ID, optional run ID and signal name.


// NewSignalWorkflowOperation is a helper for creating a synchronous nexus.Operation to deliver a signal.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also call out that this ensures idempotency via the request ID mechanism (to some degree - violated if the workflow spans multiple runs) and the bidi linking is also important to call out (for now it's only uni-directional but we'll fix that).

// NOTE: Experimental
func NewSignalWorkflowOperation(name string) nexus.Operation[SignalWorkflowInput, nexus.NoValue] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewSignalWorkflowOperation(name string) nexus.Operation[SignalWorkflowInput, nexus.NoValue] {
func NewWorkflowSignalOperation[T any](name string, func(ctx context.Context, input T, options nexus.StartOpertionOptions) SignalWorkflowInput) nexus.Operation[T, nexus.NoValue] {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why require users to pass in a function to extract the required parameters from T instead of just giving them an input type which contains all the required values (wf ID, run ID, signal name, and signal arg)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller doesn't care that this is a signal, that's the handler's call.

return NewSyncOperation(name, func(ctx context.Context, c client.Client, in SignalWorkflowInput, options nexus.StartOperationOptions) (nexus.NoValue, error) {
if options.RequestID != "" {
ctx = context.WithValue(ctx, internal.NexusOperationRequestIDKey, options.RequestID)
}

links, err := convertNexusLinks(options.Links, GetLogger(ctx))
if err != nil {
return nil, err
}
ctx = context.WithValue(ctx, internal.NexusOperationLinksKey, links)

return nil, c.SignalWorkflow(ctx, in.WorkflowID, in.RunID, in.SignalName, in.Arg)
})
}

func (o *syncOperation[I, O]) Name() string {
return o.name
}
Expand Down Expand Up @@ -360,8 +381,26 @@ func ExecuteUntypedWorkflow[R any](
})
}

links, err := convertNexusLinks(nexusOptions.Links, nctx.Log)
if err != nil {
return nil, err
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)

run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
return nil, err
}
return workflowHandle[R]{
namespace: nctx.Namespace,
id: run.GetID(),
runID: run.GetRunID(),
}, nil
}

func convertNexusLinks(nexusLinks []nexus.Link, log log.Logger) ([]*common.Link, error) {
var links []*common.Link
for _, nexusLink := range nexusOptions.Links {
for _, nexusLink := range nexusLinks {
switch nexusLink.Type {
case string((&common.Link_WorkflowEvent{}).ProtoReflect().Descriptor().FullName()):
link, err := ConvertNexusLinkToLinkWorkflowEvent(nexusLink)
Expand All @@ -374,18 +413,8 @@ func ExecuteUntypedWorkflow[R any](
},
})
default:
nctx.Log.Warn("ignoring unsupported link data type: %q", nexusLink.Type)
log.Warn("ignoring unsupported link data type: %q", nexusLink.Type)
}
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)

run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
return nil, err
}
return workflowHandle[R]{
namespace: nctx.Namespace,
id: run.GetID(),
runID: run.GetRunID(),
}, nil
return links, nil
}
Loading
Loading