Skip to content

Commit

Permalink
fix: Add timeout to contexts in client calls (#6125)
Browse files Browse the repository at this point in the history
* add timeout context from infer call for modelgateway

* add timeout context to pipeline gateway

* set timeout context on process request

* add a test for grpc call timeout

* add agent k8s api call timeout

* add context timeout for shutting down services

* add timeout for controller k8s api calls

* add timeout for control plane context

* add timeout context to reconcile logic

* pr comments
  • Loading branch information
sakoush authored Dec 11, 2024
1 parent 74032a4 commit 1c40f62
Show file tree
Hide file tree
Showing 26 changed files with 413 additions and 301 deletions.
4 changes: 3 additions & 1 deletion operator/controllers/mlops/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.
// Add our finalizer
if !utils.ContainsStr(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) {
experiment.ObjectMeta.Finalizers = append(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName)
if err := r.Update(context.Background(), experiment); err != nil {
if err := r.Update(ctx, experiment); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -84,6 +84,8 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

experiment := &mlopsv1alpha1.Experiment{}
if err := r.Get(ctx, req.NamespacedName, experiment); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion operator/controllers/mlops/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge
// Add our finalizer
if !utils.ContainsStr(model.ObjectMeta.Finalizers, constants.ModelFinalizerName) {
model.ObjectMeta.Finalizers = append(model.ObjectMeta.Finalizers, constants.ModelFinalizerName)
if err := r.Update(context.Background(), model); err != nil {
if err := r.Update(ctx, model); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -78,6 +78,8 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge

func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

model := &mlopsv1alpha1.Model{}
if err := r.Get(ctx, req.NamespacedName, model); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion operator/controllers/mlops/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *PipelineReconciler) handleFinalizer(
// Add our finalizer
if !utils.ContainsStr(pipeline.ObjectMeta.Finalizers, constants.PipelineFinalizerName) {
pipeline.ObjectMeta.Finalizers = append(pipeline.ObjectMeta.Finalizers, constants.PipelineFinalizerName)
if err := r.Update(context.Background(), pipeline); err != nil {
if err := r.Update(ctx, pipeline); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -94,6 +94,8 @@ func (r *PipelineReconciler) handleFinalizer(
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

pipeline := &mlopsv1alpha1.Pipeline{}
if err := r.Get(ctx, req.NamespacedName, pipeline); err != nil {
Expand Down
10 changes: 7 additions & 3 deletions operator/controllers/mlops/seldonruntime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo
// Add our finalizer
if !utils.ContainsStr(runtime.ObjectMeta.Finalizers, constants.RuntimeFinalizerName) {
runtime.ObjectMeta.Finalizers = append(runtime.ObjectMeta.Finalizers, constants.RuntimeFinalizerName)
if err := r.Update(context.Background(), runtime); err != nil {
if err := r.Update(ctx, runtime); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -120,6 +120,8 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

seldonRuntime := &mlopsv1alpha1.SeldonRuntime{}
if err := r.Get(ctx, req.NamespacedName, seldonRuntime); err != nil {
Expand Down Expand Up @@ -214,9 +216,11 @@ func (r *SeldonRuntimeReconciler) updateStatus(seldonRuntime *mlopsv1alpha1.Seld
// Find SeldonRuntimes that reference the changes SeldonConfig
// TODO: pass an actual context from the caller to be used here
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(_ context.Context, obj client.Object) []reconcile.Request {
logger := log.FromContext(context.Background()).WithName("mapSeldonRuntimesFromSeldonConfig")
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
defer cancel()
logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromSeldonConfig")
var seldonRuntimes mlopsv1alpha1.SeldonRuntimeList
if err := r.Client.List(context.Background(), &seldonRuntimes); err != nil {
if err := r.Client.List(ctx, &seldonRuntimes); err != nil {
logger.Error(err, "error listing seldonRuntimes")
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions operator/controllers/mlops/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
"github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/common"
serverreconcile "github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/server"
"github.com/seldonio/seldon-core/operator/v2/pkg/constants"
scheduler "github.com/seldonio/seldon-core/operator/v2/scheduler"
)

Expand Down Expand Up @@ -65,6 +66,8 @@ type ServerReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

logger.Info("Received reconcile for Server", "name", req.Name, "namespace", req.NamespacedName.Namespace)

Expand Down Expand Up @@ -186,9 +189,11 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
// Find Servers that need reconcilliation from a change to a given ServerConfig
// TODO: pass an actual context from the caller to be used here
func (r *ServerReconciler) mapServerFromServerConfig(_ context.Context, obj client.Object) []reconcile.Request {
logger := log.FromContext(context.Background()).WithName("mapServerFromServerConfig")
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
defer cancel()
logger := log.FromContext(ctx).WithName("mapServerFromServerConfig")
var servers mlopsv1alpha1.ServerList
if err := r.Client.List(context.Background(), &servers); err != nil {
if err := r.Client.List(ctx, &servers); err != nil {
logger.Error(err, "error listing servers")
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package main
import (
"flag"
"os"
"time"

//+kubebuilder:scaffold:imports
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -43,6 +44,10 @@ func init() {
//+kubebuilder:scaffold:scheme
}

const (
defaultReconcileTimeout = 2 * time.Minute
)

func main() {
var metricsAddr string
var enableLeaderElection bool
Expand Down
15 changes: 14 additions & 1 deletion operator/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ the Change License after the Change Date as each is defined in accordance with t

package constants

import "os"
import (
"os"
"time"
)

const (
// note: we do not have a finalizer for servers as we rely on the draining logic to reschedule models
Expand Down Expand Up @@ -50,3 +53,13 @@ const (
ReconcileUpdateNeeded
ReconcileCreateNeeded
)

// k8s api call timeout
const (
// this is a constant that can be used to set the timeout for k8s api calls
// currently it can be used for a series of calls in a single logical operation
// which is expected to be completed in this amount of time (as opposed to a single call)
K8sAPICallsTxTimeout = 2 * time.Minute
ControlPlaneExecTimeOut = 5 * time.Minute
ReconcileTimeout = 5 * time.Minute
)
18 changes: 11 additions & 7 deletions operator/scheduler/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (
"google.golang.org/grpc/status"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
)

const (
execTimeOut = 5 * time.Minute
"github.com/seldonio/seldon-core/operator/v2/pkg/constants"
)

func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error {
Expand All @@ -49,10 +47,12 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC
}
logger.Info("Received event to handle state", "event", event)

fn := func() error {
fn := func(ctx context.Context) error {
return s.handleStateOnReconnect(ctx, grpcClient, namespace, event.GetEvent())
}
_, err = execWithTimeout(fn, execTimeOut)
// in general we could have also handled timeout via a context with timeout
// but we want to handle the timeout in a more controlled way and not depending on the other side
_, err = execWithTimeout(ctx, fn, constants.ControlPlaneExecTimeOut)
if err != nil {
logger.Error(err, "Failed to handle state on reconnect")
return err
Expand All @@ -64,10 +64,14 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC
return nil
}

func execWithTimeout(f func() error, d time.Duration) (bool, error) {
func execWithTimeout(baseContext context.Context, f func(ctx context.Context) error, d time.Duration) (bool, error) {
// cancel the context after the timeout
ctxWithCancel, cancel := context.WithCancel(baseContext)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- f()
errChan <- f(ctxWithCancel)
close(errChan)
}()
t := time.NewTimer(d)
Expand Down
2 changes: 1 addition & 1 deletion operator/scheduler/control_plane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestSendWithTimeout(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hasExpired, err := execWithTimeout(func() error {
hasExpired, err := execWithTimeout(context.Background(), func(_ context.Context) error {
return fn(test.err)
}, test.sleepTime)
g.Expect(hasExpired).To(Equal(test.isExpired))
Expand Down
116 changes: 58 additions & 58 deletions operator/scheduler/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,79 +102,79 @@ func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcCli
logger.Info("Received experiment event with no k8s metadata so ignoring", "Experiment", event.ExperimentName)
continue
}
experiment := &v1alpha1.Experiment{}
err = s.Get(ctx, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, experiment)
if err != nil {
logger.Error(err, "Failed to get experiment", "name", event.ExperimentName, "namespace", event.KubernetesMeta.Namespace)
continue
}

if !experiment.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Experiment is pending deletion", "experiment", experiment.Name)
if !event.Active {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestExperiment := &v1alpha1.Experiment{}
err = s.Get(ctx, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, latestExperiment)
if err != nil {
// An experiment is not active if it is being deleted or some models are not ready
if !event.Active {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout)
defer cancel()

latestExperiment := &v1alpha1.Experiment{}
err = s.Get(ctxWithTimeout, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, latestExperiment)
if err != nil {
return err
}
if !latestExperiment.ObjectMeta.DeletionTimestamp.IsZero() { // Experiment is being deleted
// remove finalizer now we have completed successfully
latestExperiment.ObjectMeta.Finalizers = utils.RemoveStr(latestExperiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName)
if err := s.Update(ctxWithTimeout, latestExperiment); err != nil {
logger.Error(err, "Failed to remove finalizer", "experiment", latestExperiment.GetName())
return err
}
if !latestExperiment.ObjectMeta.DeletionTimestamp.IsZero() { // Experiment is being deleted
// remove finalizer now we have completed successfully
latestExperiment.ObjectMeta.Finalizers = utils.RemoveStr(latestExperiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName)
if err := s.Update(ctx, latestExperiment); err != nil {
logger.Error(err, "Failed to remove finalizer", "experiment", latestExperiment.GetName())
return err
}
}
return nil
})
if retryErr != nil {
logger.Error(err, "Failed to remove finalizer after retries")
}
return nil
})
if retryErr != nil {
logger.Error(err, "Failed to remove finalizer after retries")
}
}

// Try to update status
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
experiment := &v1alpha1.Experiment{}
err = s.Get(ctx, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, experiment)
if err != nil {
return err
}
if event.KubernetesMeta.Generation != experiment.Generation {
logger.Info("Ignoring event for old generation", "currentGeneration", experiment.Generation, "eventGeneration", event.KubernetesMeta.Generation, "server", event.ExperimentName)
return nil
}
// Handle status update
if event.Active {
logger.Info("Setting experiment to ready", "experiment", event.ExperimentName)
experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, true, event.StatusDescription)
} else {
logger.Info("Setting experiment to not ready", "experiment", event.ExperimentName)
experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, false, event.StatusDescription)
}
if event.CandidatesReady {
experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, true, "Candidates ready")
} else {
experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, false, "Candidates not ready")
}
if event.MirrorReady {
experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, true, "Mirror ready")
} else {
experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, false, "Mirror not ready")
{
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout)
defer cancel()

experiment := &v1alpha1.Experiment{}
err = s.Get(ctxWithTimeout, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, experiment)
if err != nil {
return err
}
if event.KubernetesMeta.Generation != experiment.Generation {
logger.Info("Ignoring event for old generation", "currentGeneration", experiment.Generation, "eventGeneration", event.KubernetesMeta.Generation, "server", event.ExperimentName)
return nil
}
// Handle status update
if event.Active {
logger.Info("Setting experiment to ready", "experiment", event.ExperimentName)
experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, true, event.StatusDescription)
} else {
logger.Info("Setting experiment to not ready", "experiment", event.ExperimentName)
experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, false, event.StatusDescription)
}
if event.CandidatesReady {
experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, true, "Candidates ready")
} else {
experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, false, "Candidates not ready")
}
if event.MirrorReady {
experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, true, "Mirror ready")
} else {
experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, false, "Mirror not ready")
}
return s.updateExperimentStatus(ctxWithTimeout, experiment)
})
if retryErr != nil {
logger.Error(err, "Failed to update status", "experiment", event.ExperimentName)
}
return s.updateExperimentStatus(experiment)
})
if retryErr != nil {
logger.Error(err, "Failed to update status", "experiment", event.ExperimentName)
}

}
return nil
}

func (s *SchedulerClient) updateExperimentStatus(experiment *v1alpha1.Experiment) error {
if err := s.Status().Update(context.TODO(), experiment); err != nil {
func (s *SchedulerClient) updateExperimentStatus(ctx context.Context, experiment *v1alpha1.Experiment) error {
if err := s.Status().Update(ctx, experiment); err != nil {
s.recorder.Eventf(experiment, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for experiment %q: %v", experiment.Name, err)
return err
Expand Down
Loading

0 comments on commit 1c40f62

Please sign in to comment.