diff --git a/internal/controller/mcpserver_controller.go b/internal/controller/mcpserver_controller.go index 41e394d0..55270cd2 100644 --- a/internal/controller/mcpserver_controller.go +++ b/internal/controller/mcpserver_controller.go @@ -108,6 +108,10 @@ const ( eventActionConfigurationAccepted = "ConfigurationAccepted" // eventActionServerReady is the reporting action when Ready becomes True with reason Available. eventActionServerReady = "ServerReady" + // eventActionMCPHandshakeFailed is the reporting action when the MCP handshake fails. + eventActionMCPHandshakeFailed = "MCPHandshakeFailed" + // eventActionMCPHandshakeRetriesExhausted is the reporting action when handshake retries are exhausted. + eventActionMCPHandshakeRetriesExhausted = "MCPHandshakeRetriesExhausted" // requeueDelayMCPHandshake is the initial delay before requeuing when an MCP handshake fails. requeueDelayMCPHandshake = 10 * time.Second @@ -332,14 +336,7 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.emitServerReady(mcpServer) } - var handshakeRetryCount int32 - if readyCondition.Reason == ReasonMCPEndpointUnavailable { - if mcpServer.Status.ObservedGeneration == mcpServer.Generation { - handshakeRetryCount = mcpServer.Status.HandshakeRetryCount + 1 - } else { - handshakeRetryCount = 1 - } - } + handshakeRetryCount := r.reconcileHandshakeEventsAndRetryCount(mcpServer, readyCondition) status := acv1alpha1.MCPServerStatus(). WithObservedGeneration(mcpServer.Generation). @@ -478,14 +475,16 @@ func (r *MCPServerReconciler) emitConfigurationInvalid(mcpServer *mcpv1alpha1.MC if r.Recorder == nil { return } - r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, validationErr.Reason, eventActionConfigurationValidation, "%s", validationErr.Message) + r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, validationErr.Reason, eventActionConfigurationValidation, + "MCPServer %s: %s", mcpServer.Name, validationErr.Message) } func (r *MCPServerReconciler) emitConfigurationAccepted(mcpServer *mcpv1alpha1.MCPServer) { if r.Recorder == nil { return } - r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonValid, eventActionConfigurationAccepted, "%s", "MCPServer configuration is valid; Accepted=True") + r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonValid, eventActionConfigurationAccepted, + "MCPServer %s configuration is valid; Accepted=True", mcpServer.Name) } func (r *MCPServerReconciler) emitServerReady(mcpServer *mcpv1alpha1.MCPServer) { @@ -495,6 +494,23 @@ func (r *MCPServerReconciler) emitServerReady(mcpServer *mcpv1alpha1.MCPServer) r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonAvailable, eventActionServerReady, "MCPServer %s is ready; Ready=True", mcpServer.Name) } +func (r *MCPServerReconciler) emitMCPHandshakeFailed(mcpServer *mcpv1alpha1.MCPServer, message string) { + if r.Recorder == nil { + return + } + r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, ReasonMCPEndpointUnavailable, eventActionMCPHandshakeFailed, + "MCP handshake failed for MCPServer %s: %s", mcpServer.Name, message) +} + +func (r *MCPServerReconciler) emitMCPHandshakeRetriesExhausted(mcpServer *mcpv1alpha1.MCPServer, retryCount int32) { + if r.Recorder == nil { + return + } + r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, ReasonMCPEndpointUnavailable, eventActionMCPHandshakeRetriesExhausted, + "MCP handshake retries exhausted for MCPServer %s after %d attempts; fix the MCP endpoint or update spec to retry", + mcpServer.Name, retryCount) +} + func (r *MCPServerReconciler) applyStatus( ctx context.Context, mcpServer *mcpv1alpha1.MCPServer, diff --git a/internal/controller/mcpserver_controller_conditions.go b/internal/controller/mcpserver_controller_conditions.go index fcb1f1e1..b67005e5 100644 --- a/internal/controller/mcpserver_controller_conditions.go +++ b/internal/controller/mcpserver_controller_conditions.go @@ -284,3 +284,9 @@ func readyConditionIsAvailable(conditions []metav1.Condition) bool { c := meta.FindStatusCondition(conditions, ConditionTypeReady) return c != nil && c.Status == metav1.ConditionTrue && c.Reason == ReasonAvailable } + +func duplicateHandshakeUnavailable(conditions []metav1.Condition, message string) bool { + prevReady := meta.FindStatusCondition(conditions, ConditionTypeReady) + return prevReady != nil && prevReady.Status == metav1.ConditionFalse && + prevReady.Reason == ReasonMCPEndpointUnavailable && prevReady.Message == message +} diff --git a/internal/controller/mcpserver_controller_conditions_test.go b/internal/controller/mcpserver_controller_conditions_test.go index ef9f2b15..c015d050 100644 --- a/internal/controller/mcpserver_controller_conditions_test.go +++ b/internal/controller/mcpserver_controller_conditions_test.go @@ -673,4 +673,18 @@ var _ = Describe("status condition helpers", func() { {Type: ConditionTypeReady, Status: metav1.ConditionTrue, Reason: ReasonAvailable}, })).To(BeTrue()) }) + + It("duplicateHandshakeUnavailable returns true only for matching Ready=False MCPEndpointUnavailable message", func() { + msg := "MCP endpoint is not serving a valid MCP protocol: connection refused" + Expect(duplicateHandshakeUnavailable(nil, msg)).To(BeFalse()) + Expect(duplicateHandshakeUnavailable([]metav1.Condition{ + {Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonDeploymentUnavailable, Message: msg}, + }, msg)).To(BeFalse()) + Expect(duplicateHandshakeUnavailable([]metav1.Condition{ + {Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonMCPEndpointUnavailable, Message: "other"}, + }, msg)).To(BeFalse()) + Expect(duplicateHandshakeUnavailable([]metav1.Condition{ + {Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonMCPEndpointUnavailable, Message: msg}, + }, msg)).To(BeTrue()) + }) }) diff --git a/internal/controller/mcpserver_controller_handshake.go b/internal/controller/mcpserver_controller_handshake.go index 67eb1b02..cddf133c 100644 --- a/internal/controller/mcpserver_controller_handshake.go +++ b/internal/controller/mcpserver_controller_handshake.go @@ -170,3 +170,34 @@ func isHTTPAuthError(err error) bool { return strings.HasSuffix(msg, ": "+http.StatusText(http.StatusUnauthorized)) || strings.HasSuffix(msg, ": "+http.StatusText(http.StatusForbidden)) } + +// reconcileHandshakeEventsAndRetryCount emits handshake-related events and returns the updated retry count. +func (r *MCPServerReconciler) reconcileHandshakeEventsAndRetryCount( + mcpServer *mcpv1alpha1.MCPServer, + readyCondition metav1.Condition, +) int32 { + if readyCondition.Reason != ReasonMCPEndpointUnavailable { + return 0 + } + + prevHandshakeRetryCount := mcpServer.Status.HandshakeRetryCount + if mcpServer.Status.ObservedGeneration != mcpServer.Generation { + prevHandshakeRetryCount = 0 + } + + var handshakeRetryCount int32 + if mcpServer.Status.ObservedGeneration == mcpServer.Generation { + handshakeRetryCount = mcpServer.Status.HandshakeRetryCount + 1 + } else { + handshakeRetryCount = 1 + } + + if !duplicateHandshakeUnavailable(mcpServer.Status.Conditions, readyCondition.Message) { + r.emitMCPHandshakeFailed(mcpServer, readyCondition.Message) + } + if int(handshakeRetryCount) >= maxMCPHandshakeRetries && int(prevHandshakeRetryCount) < maxMCPHandshakeRetries { + r.emitMCPHandshakeRetriesExhausted(mcpServer, handshakeRetryCount) + } + + return handshakeRetryCount +} diff --git a/internal/controller/mcpserver_controller_handshake_test.go b/internal/controller/mcpserver_controller_handshake_test.go index 32cb1061..614d6388 100644 --- a/internal/controller/mcpserver_controller_handshake_test.go +++ b/internal/controller/mcpserver_controller_handshake_test.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "strings" "time" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -449,6 +450,142 @@ var _ = Describe("MCPServer Controller - MCP Handshake Validation", func() { Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive()) }) + It("should emit a Warning MCPHandshakeFailed event only when handshake error message changes", func() { + failMsg := "intentional failure" + reconciler, fr := newReconcilerForTestWithFakeEvents(k8sClient, k8sClient.Scheme()) + reconciler.MCPDialer = func(ctx context.Context, url string) (*mcpv1alpha1.MCPServerInfo, error) { + return nil, fmt.Errorf("%s", failMsg) + } + + By("Initial reconciliation creates deployment") + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + drainFakeRecorderEvents(fr) + + By("Simulating deployment becoming available") + deployment := &appsv1.Deployment{} + Expect(k8sClient.Get(ctx, client.ObjectKey{ + Name: resourceName, Namespace: "default", + }, deployment)).To(Succeed()) + deployment.Status.Replicas = 1 + deployment.Status.ReadyReplicas = 1 + deployment.Status.Conditions = []appsv1.DeploymentCondition{ + {Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}, + {Type: appsv1.DeploymentProgressing, Status: corev1.ConditionTrue}, + } + Expect(k8sClient.Status().Update(ctx, deployment)).To(Succeed()) + + By("First handshake failure — Warning event emitted once") + _, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + var handshakeFailedEvent string + Eventually(fr.Events).Should(Receive(&handshakeFailedEvent)) + Expect(handshakeFailedEvent).To(ContainSubstring(corev1.EventTypeWarning)) + Expect(handshakeFailedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable)) + Expect(handshakeFailedEvent).To(ContainSubstring(resourceName)) + Expect(handshakeFailedEvent).To(ContainSubstring(failMsg)) + + By("Second reconcile with same error — no duplicate handshake failed event") + _, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive()) + + By("Change error message — second Warning event emitted") + failMsg = "different failure message" + _, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + var secondHandshakeFailedEvent string + Eventually(fr.Events).Should(Receive(&secondHandshakeFailedEvent)) + Expect(secondHandshakeFailedEvent).To(ContainSubstring(corev1.EventTypeWarning)) + Expect(secondHandshakeFailedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable)) + Expect(secondHandshakeFailedEvent).To(ContainSubstring(resourceName)) + Expect(secondHandshakeFailedEvent).To(ContainSubstring(failMsg)) + Expect(secondHandshakeFailedEvent).NotTo(Equal(handshakeFailedEvent)) + }) + + It("should emit MCPHandshakeRetriesExhausted once when max handshake retries is reached", func() { + reconciler, fr := newReconcilerForTestWithFakeEvents(k8sClient, k8sClient.Scheme()) + reconciler.MCPDialer = func(ctx context.Context, url string) (*mcpv1alpha1.MCPServerInfo, error) { + return nil, fmt.Errorf("intentional failure") + } + + By("Initial reconciliation creates deployment") + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + drainFakeRecorderEvents(fr) + + By("Simulating deployment becoming available") + deployment := &appsv1.Deployment{} + Expect(k8sClient.Get(ctx, client.ObjectKey{ + Name: resourceName, Namespace: "default", + }, deployment)).To(Succeed()) + deployment.Status.Replicas = 1 + deployment.Status.ReadyReplicas = 1 + deployment.Status.Conditions = []appsv1.DeploymentCondition{ + {Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}, + {Type: appsv1.DeploymentProgressing, Status: corev1.ConditionTrue}, + } + Expect(k8sClient.Status().Update(ctx, deployment)).To(Succeed()) + + By("Reconciling until handshake retries are exhausted") + for i := range maxMCPHandshakeRetries { + result, recErr := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(recErr).NotTo(HaveOccurred()) + if i < maxMCPHandshakeRetries-1 { + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + } + } + + var collected []string + Eventually(func(g Gomega) { + collected = drainEvents(fr.Events) + exhausted := 0 + for _, ev := range collected { + if strings.Contains(ev, "retries exhausted") { + exhausted++ + } + } + g.Expect(exhausted).To(Equal(1)) + }).Should(Succeed()) + var exhaustedEvent string + for _, ev := range collected { + if strings.Contains(ev, "retries exhausted") { + exhaustedEvent = ev + break + } + } + Expect(exhaustedEvent).To(ContainSubstring(corev1.EventTypeWarning)) + Expect(exhaustedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable)) + Expect(exhaustedEvent).To(ContainSubstring(resourceName)) + + mcpServer := &mcpv1alpha1.MCPServer{} + Expect(k8sClient.Get(ctx, typeNamespacedName, mcpServer)).To(Succeed()) + Expect(mcpServer.Status.HandshakeRetryCount).To(BeNumerically(">=", maxMCPHandshakeRetries)) + + By("Further reconcile — no duplicate exhausted event") + drainFakeRecorderEvents(fr) + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeZero()) + Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive()) + }) + It("should pass a context with timeout to the dialer", func() { var receivedCtx context.Context reconciler := &MCPServerReconciler{ diff --git a/internal/controller/mcpserver_controller_test.go b/internal/controller/mcpserver_controller_test.go index cc2dbd44..ed8f06f5 100644 --- a/internal/controller/mcpserver_controller_test.go +++ b/internal/controller/mcpserver_controller_test.go @@ -74,6 +74,19 @@ func drainFakeRecorderEvents(fr *events.FakeRecorder) { } } +// drainEvents returns all currently buffered events from ch without blocking. +func drainEvents(ch <-chan string) []string { + var drained []string + for { + select { + case ev := <-ch: + drained = append(drained, ev) + default: + return drained + } + } +} + // newTestMCPServer returns an MCPServer with standard test defaults: // namespace "default", SourceTypeContainerImage with ref // "docker.io/library/test-image:latest", and port 8080. @@ -152,6 +165,7 @@ var _ = Describe("MCPServer Controller", func() { Eventually(fr.Events).Should(Receive(&first)) Expect(first).To(ContainSubstring(corev1.EventTypeNormal)) Expect(first).To(ContainSubstring(ReasonValid)) + Expect(first).To(ContainSubstring(resourceName)) Expect(first).To(ContainSubstring("Accepted=True")) _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{