From 1363c75a824c82dfa90e289713d1382489c054ff Mon Sep 17 00:00:00 2001 From: Pasquale Congiusti Date: Thu, 4 Jan 2024 15:28:30 +0100 Subject: [PATCH] fix(ctrl): monitor Pod ready condition When the user uses a startup probe, the Integration won't turn as running until the condition is reached Closes #4977 --- pkg/controller/integration/monitor.go | 161 +++++------------- pkg/controller/integration/monitor_cronjob.go | 2 +- .../integration/monitor_deployment.go | 4 +- pkg/controller/integration/monitor_knative.go | 2 +- 4 files changed, 50 insertions(+), 119 deletions(-) diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 7c5a6b7c64..9a6208fcb9 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -254,7 +254,7 @@ func getIntegrationSecretsAndConfigmaps(ctx context.Context, client client.Clien type controller interface { checkReadyCondition(ctx context.Context) (bool, error) getPodSpec() corev1.PodSpec - updateReadyCondition(readyPods []corev1.Pod) bool + updateReadyCondition(readyPods int) bool } func (action *monitorAction) newController(env *trait.Environment, integration *v1.Integration) (controller, error) { @@ -318,33 +318,32 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition( if err != nil { return err } - - readyPods, unreadyPods := filterPodsByReadyStatus(environment, runningPods, controller.getPodSpec()) - if done, err := controller.checkReadyCondition(ctx); done || err != nil { - if len(readyPods) > 0 || len(unreadyPods) > 0 { - // There may be pods that are not ready but still probable for getting error messages. - // Ignore returned error from probing as it's expected when the ctrl obj is not ready. - _ = action.probeReadiness(ctx, environment, integration, unreadyPods, readyPods) - } + // There may be pods that are not ready but still probable for getting error messages. + // Ignore returned error from probing as it's expected when the ctrl obj is not ready. + _, _, _ = action.probeReadiness(ctx, environment, integration, runningPods) return err } - if done := checkPodStatuses(integration, pendingPods, runningPods); done { + if arePodsFailingStatuses(integration, pendingPods, runningPods) { return nil } - integration.Status.Phase = v1.IntegrationPhaseRunning - - if done := controller.updateReadyCondition(readyPods); done { + readyPods, probeOk, err := action.probeReadiness(ctx, environment, integration, runningPods) + if err != nil { + return err + } + if !probeOk { + integration.Status.Phase = v1.IntegrationPhaseError return nil } - if err := action.probeReadiness(ctx, environment, integration, unreadyPods, readyPods); err != nil { - return err + if done := controller.updateReadyCondition(readyPods); done { + integration.Status.Phase = v1.IntegrationPhaseRunning + return nil } return nil } -func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool { +func arePodsFailingStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool { // Check Pods statuses for _, pod := range pendingPods { // Check the scheduled condition @@ -396,114 +395,44 @@ func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, run return false } -func filterPodsByReadyStatus(environment *trait.Environment, runningPods []corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) { - var readyPods []corev1.Pod - var unreadyPods []corev1.Pod - - integrationContainerName := environment.GetIntegrationContainerName() - for _, pod := range runningPods { - // We compare the Integration PodSpec to that of the Pod in order to make - // sure we account for up-to-date version. - if !comparePodSpec(integrationContainerName, podSpec, pod.Spec) { - continue - } - ready := kubernetes.GetPodCondition(pod, corev1.PodReady) - if ready == nil { - continue - } - switch ready.Status { - case corev1.ConditionTrue: - // We still account terminating Pods to handle rolling deployments - readyPods = append(readyPods, pod) - case corev1.ConditionFalse: - if pod.DeletionTimestamp != nil { - continue - } - unreadyPods = append(unreadyPods, pod) - } - } - - return readyPods, unreadyPods -} - -// comparePodSpec compares given pod spec according to integration specific information (e.g. digest, container image). -func comparePodSpec(integrationContainerName string, runningPodSpec corev1.PodSpec, referencePod corev1.PodSpec) bool { - runningPodContainer := findIntegrationContainer(integrationContainerName, runningPodSpec) - referencePodContainer := findIntegrationContainer(integrationContainerName, referencePod) - - if runningPodContainer == nil || referencePodContainer == nil { - return false - } - - // integration digest must be the same - if getIntegrationDigest(runningPodContainer.Env) != getIntegrationDigest(referencePodContainer.Env) { - return false - } - - // integration container image must be the same (same integration kit) - if runningPodContainer.Image != referencePodContainer.Image { - return false - } - - return true -} - -func getIntegrationDigest(envs []corev1.EnvVar) string { - for _, env := range envs { - if env.Name == digest.IntegrationDigestEnvVar { - return env.Value - } - } - - return "" -} - -// findIntegrationContainer find if present the integration container in the pod spec using the integration specifications. -func findIntegrationContainer(integrationContainerName string, spec corev1.PodSpec) *corev1.Container { - for _, c := range spec.Containers { - if c.Name == integrationContainerName { - return &c - } - } - - return nil -} - // probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime. -func (action *monitorAction) probeReadiness( - ctx context.Context, environment *trait.Environment, integration *v1.Integration, - unreadyPods []corev1.Pod, readyPods []corev1.Pod, -) error { +// The func return the number of readyPods, the success of the probe and any error may have happened during its execution. +func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pods []corev1.Pod) (int, bool, error) { + // as a default we assume the Integration is Ready readyCondition := v1.IntegrationCondition{ Type: v1.IntegrationConditionReady, - Status: corev1.ConditionFalse, - Pods: make([]v1.PodCondition, len(unreadyPods)), + Status: corev1.ConditionTrue, + Pods: make([]v1.PodCondition, len(pods)), } + readyPods := 0 + unreadyPods := 0 + runtimeReady := true runtimeFailed := false + probeReadinessOk := true - for i := range unreadyPods { - pod := &unreadyPods[i] - if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); ready.Reason != "ContainersNotReady" { - continue - } - container := getIntegrationContainer(environment, pod) - if container == nil { - return fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name) - } - + for i := range pods { + pod := &pods[i] readyCondition.Pods[i].Name = pod.Name - for p := range pod.Status.Conditions { if pod.Status.Conditions[p].Type == corev1.PodReady { readyCondition.Pods[i].Condition = pod.Status.Conditions[p] break } } + // If it's in ready status, then we don't care to probe. + if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); ready.Status == corev1.ConditionTrue { + readyPods++ + continue + } + unreadyPods++ + container := getIntegrationContainer(environment, pod) + if container == nil { + return readyPods, false, fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name) + } if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil { body, err := proxyGetHTTPProbe(ctx, action.client, probe, pod, container) - // When invoking the HTTP probe, the kubernetes client exposes a very // specific behavior: // @@ -559,7 +488,7 @@ func (action *monitorAction) probeReadiness( health, err := NewHealthCheck(body) if err != nil { - return err + return readyPods, false, err } for _, check := range health.Checks { if check.Status == v1.HealthCheckStatusUp { @@ -575,19 +504,21 @@ func (action *monitorAction) probeReadiness( } if runtimeFailed { - integration.Status.Phase = v1.IntegrationPhaseError + probeReadinessOk = false readyCondition.Reason = v1.IntegrationConditionErrorReason - readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", len(unreadyPods), len(unreadyPods)+len(readyPods)) + readyCondition.Status = corev1.ConditionFalse + readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods) + integration.Status.SetConditions(readyCondition) } if !runtimeReady { - integration.Status.Phase = v1.IntegrationPhaseError + probeReadinessOk = false readyCondition.Reason = v1.IntegrationConditionRuntimeNotReadyReason - readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", len(unreadyPods), len(unreadyPods)+len(readyPods)) + readyCondition.Status = corev1.ConditionFalse + readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods) + integration.Status.SetConditions(readyCondition) } - integration.Status.SetConditions(readyCondition) - - return nil + return readyPods, probeReadinessOk, nil } func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, error) { diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go index a2f969d34f..1620a66c31 100644 --- a/pkg/controller/integration/monitor_cronjob.go +++ b/pkg/controller/integration/monitor_cronjob.go @@ -77,7 +77,7 @@ func (c *cronJobController) getPodSpec() corev1.PodSpec { return c.obj.Spec.JobTemplate.Spec.Template.Spec } -func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool { +func (c *cronJobController) updateReadyCondition(readyPods int) bool { switch { case c.obj.Status.LastScheduleTime == nil: c.integration.SetReadyCondition(corev1.ConditionTrue, diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go index 55b7797c7e..e2f823c16f 100644 --- a/pkg/controller/integration/monitor_deployment.go +++ b/pkg/controller/integration/monitor_deployment.go @@ -59,14 +59,14 @@ func (c *deploymentController) getPodSpec() corev1.PodSpec { return c.obj.Spec.Template.Spec } -func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) bool { +func (c *deploymentController) updateReadyCondition(readyPods int) bool { replicas := int32(1) if r := c.integration.Spec.Replicas; r != nil { replicas = *r } // The Deployment status reports updated and ready replicas separately, // so that the number of ready replicas also accounts for older versions. - readyReplicas := int32(len(readyPods)) + readyReplicas := int32(readyPods) switch { case readyReplicas >= replicas: // The Integration is considered ready when the number of replicas diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go index 1d62eef2e4..06b7dc82bf 100644 --- a/pkg/controller/integration/monitor_knative.go +++ b/pkg/controller/integration/monitor_knative.go @@ -51,7 +51,7 @@ func (c *knativeServiceController) getPodSpec() corev1.PodSpec { return c.obj.Spec.Template.Spec.PodSpec } -func (c *knativeServiceController) updateReadyCondition(readyPods []corev1.Pod) bool { +func (c *knativeServiceController) updateReadyCondition(readyPods int) bool { ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady) if ready.IsTrue() { c.integration.SetReadyCondition(corev1.ConditionTrue,