Skip to content
36 changes: 26 additions & 10 deletions internal/controller/mcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ const (
eventActionConfigurationAccepted = "ConfigurationAccepted"
// eventActionServerReady is the reporting action when Ready becomes True with reason Available.
eventActionServerReady = "ServerReady"
// eventActionMCPHandshakeFailed is the reporting action when the MCP handshake fails.
eventActionMCPHandshakeFailed = "MCPHandshakeFailed"
// eventActionMCPHandshakeRetriesExhausted is the reporting action when handshake retries are exhausted.
eventActionMCPHandshakeRetriesExhausted = "MCPHandshakeRetriesExhausted"

// requeueDelayMCPHandshake is the initial delay before requeuing when an MCP handshake fails.
requeueDelayMCPHandshake = 10 * time.Second
Expand Down Expand Up @@ -332,14 +336,7 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
r.emitServerReady(mcpServer)
}

var handshakeRetryCount int32
if readyCondition.Reason == ReasonMCPEndpointUnavailable {
if mcpServer.Status.ObservedGeneration == mcpServer.Generation {
handshakeRetryCount = mcpServer.Status.HandshakeRetryCount + 1
} else {
handshakeRetryCount = 1
}
}
handshakeRetryCount := r.reconcileHandshakeEventsAndRetryCount(mcpServer, readyCondition)

status := acv1alpha1.MCPServerStatus().
WithObservedGeneration(mcpServer.Generation).
Expand Down Expand Up @@ -478,14 +475,16 @@ func (r *MCPServerReconciler) emitConfigurationInvalid(mcpServer *mcpv1alpha1.MC
if r.Recorder == nil {
return
}
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, validationErr.Reason, eventActionConfigurationValidation, "%s", validationErr.Message)
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, validationErr.Reason, eventActionConfigurationValidation,
"MCPServer %s: %s", mcpServer.Name, validationErr.Message)
}

func (r *MCPServerReconciler) emitConfigurationAccepted(mcpServer *mcpv1alpha1.MCPServer) {
if r.Recorder == nil {
return
}
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonValid, eventActionConfigurationAccepted, "%s", "MCPServer configuration is valid; Accepted=True")
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonValid, eventActionConfigurationAccepted,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To maintain the consistency across all events that we are emitting.

"MCPServer %s configuration is valid; Accepted=True", mcpServer.Name)
}

func (r *MCPServerReconciler) emitServerReady(mcpServer *mcpv1alpha1.MCPServer) {
Expand All @@ -495,6 +494,23 @@ func (r *MCPServerReconciler) emitServerReady(mcpServer *mcpv1alpha1.MCPServer)
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonAvailable, eventActionServerReady, "MCPServer %s is ready; Ready=True", mcpServer.Name)
}

func (r *MCPServerReconciler) emitMCPHandshakeFailed(mcpServer *mcpv1alpha1.MCPServer, message string) {
if r.Recorder == nil {
return
}
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, ReasonMCPEndpointUnavailable, eventActionMCPHandshakeFailed,
"MCP handshake failed for MCPServer %s: %s", mcpServer.Name, message)
}

func (r *MCPServerReconciler) emitMCPHandshakeRetriesExhausted(mcpServer *mcpv1alpha1.MCPServer, retryCount int32) {
if r.Recorder == nil {
return
}
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, ReasonMCPEndpointUnavailable, eventActionMCPHandshakeRetriesExhausted,
"MCP handshake retries exhausted for MCPServer %s after %d attempts; fix the MCP endpoint or update spec to retry",
mcpServer.Name, retryCount)
}

func (r *MCPServerReconciler) applyStatus(
ctx context.Context,
mcpServer *mcpv1alpha1.MCPServer,
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/mcpserver_controller_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,9 @@ func readyConditionIsAvailable(conditions []metav1.Condition) bool {
c := meta.FindStatusCondition(conditions, ConditionTypeReady)
return c != nil && c.Status == metav1.ConditionTrue && c.Reason == ReasonAvailable
}

func duplicateHandshakeUnavailable(conditions []metav1.Condition, message string) bool {
prevReady := meta.FindStatusCondition(conditions, ConditionTypeReady)
return prevReady != nil && prevReady.Status == metav1.ConditionFalse &&
prevReady.Reason == ReasonMCPEndpointUnavailable && prevReady.Message == message
}
14 changes: 14 additions & 0 deletions internal/controller/mcpserver_controller_conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,18 @@ var _ = Describe("status condition helpers", func() {
{Type: ConditionTypeReady, Status: metav1.ConditionTrue, Reason: ReasonAvailable},
})).To(BeTrue())
})

It("duplicateHandshakeUnavailable returns true only for matching Ready=False MCPEndpointUnavailable message", func() {
msg := "MCP endpoint is not serving a valid MCP protocol: connection refused"
Expect(duplicateHandshakeUnavailable(nil, msg)).To(BeFalse())
Expect(duplicateHandshakeUnavailable([]metav1.Condition{
{Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonDeploymentUnavailable, Message: msg},
}, msg)).To(BeFalse())
Expect(duplicateHandshakeUnavailable([]metav1.Condition{
{Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonMCPEndpointUnavailable, Message: "other"},
}, msg)).To(BeFalse())
Expect(duplicateHandshakeUnavailable([]metav1.Condition{
{Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonMCPEndpointUnavailable, Message: msg},
}, msg)).To(BeTrue())
})
})
31 changes: 31 additions & 0 deletions internal/controller/mcpserver_controller_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,34 @@ func isHTTPAuthError(err error) bool {
return strings.HasSuffix(msg, ": "+http.StatusText(http.StatusUnauthorized)) ||
strings.HasSuffix(msg, ": "+http.StatusText(http.StatusForbidden))
}

// reconcileHandshakeEventsAndRetryCount emits handshake-related events and returns the updated retry count.
func (r *MCPServerReconciler) reconcileHandshakeEventsAndRetryCount(
mcpServer *mcpv1alpha1.MCPServer,
readyCondition metav1.Condition,
) int32 {
if readyCondition.Reason != ReasonMCPEndpointUnavailable {
return 0
}

prevHandshakeRetryCount := mcpServer.Status.HandshakeRetryCount
if mcpServer.Status.ObservedGeneration != mcpServer.Generation {
prevHandshakeRetryCount = 0
}

var handshakeRetryCount int32
if mcpServer.Status.ObservedGeneration == mcpServer.Generation {
handshakeRetryCount = mcpServer.Status.HandshakeRetryCount + 1
} else {
handshakeRetryCount = 1
}

if !duplicateHandshakeUnavailable(mcpServer.Status.Conditions, readyCondition.Message) {
r.emitMCPHandshakeFailed(mcpServer, readyCondition.Message)
}
if int(handshakeRetryCount) >= maxMCPHandshakeRetries && int(prevHandshakeRetryCount) < maxMCPHandshakeRetries {
r.emitMCPHandshakeRetriesExhausted(mcpServer, handshakeRetryCount)
}

return handshakeRetryCount
}
137 changes: 137 additions & 0 deletions internal/controller/mcpserver_controller_handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"strings"
"time"

"github.com/modelcontextprotocol/go-sdk/mcp"
Expand Down Expand Up @@ -449,6 +450,142 @@ var _ = Describe("MCPServer Controller - MCP Handshake Validation", func() {
Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive())
})

It("should emit a Warning MCPHandshakeFailed event only when handshake error message changes", func() {
Comment thread
ibm-adarsh marked this conversation as resolved.
failMsg := "intentional failure"
reconciler, fr := newReconcilerForTestWithFakeEvents(k8sClient, k8sClient.Scheme())
reconciler.MCPDialer = func(ctx context.Context, url string) (*mcpv1alpha1.MCPServerInfo, error) {
return nil, fmt.Errorf("%s", failMsg)
}

By("Initial reconciliation creates deployment")
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
drainFakeRecorderEvents(fr)

By("Simulating deployment becoming available")
deployment := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: resourceName, Namespace: "default",
}, deployment)).To(Succeed())
deployment.Status.Replicas = 1
deployment.Status.ReadyReplicas = 1
deployment.Status.Conditions = []appsv1.DeploymentCondition{
{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue},
{Type: appsv1.DeploymentProgressing, Status: corev1.ConditionTrue},
}
Expect(k8sClient.Status().Update(ctx, deployment)).To(Succeed())

By("First handshake failure — Warning event emitted once")
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

var handshakeFailedEvent string
Eventually(fr.Events).Should(Receive(&handshakeFailedEvent))
Expect(handshakeFailedEvent).To(ContainSubstring(corev1.EventTypeWarning))
Expect(handshakeFailedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable))
Expect(handshakeFailedEvent).To(ContainSubstring(resourceName))
Expect(handshakeFailedEvent).To(ContainSubstring(failMsg))

By("Second reconcile with same error — no duplicate handshake failed event")
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive())

By("Change error message — second Warning event emitted")
failMsg = "different failure message"
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

var secondHandshakeFailedEvent string
Eventually(fr.Events).Should(Receive(&secondHandshakeFailedEvent))
Expect(secondHandshakeFailedEvent).To(ContainSubstring(corev1.EventTypeWarning))
Expect(secondHandshakeFailedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable))
Expect(secondHandshakeFailedEvent).To(ContainSubstring(resourceName))
Expect(secondHandshakeFailedEvent).To(ContainSubstring(failMsg))
Expect(secondHandshakeFailedEvent).NotTo(Equal(handshakeFailedEvent))
})

It("should emit MCPHandshakeRetriesExhausted once when max handshake retries is reached", func() {
reconciler, fr := newReconcilerForTestWithFakeEvents(k8sClient, k8sClient.Scheme())
reconciler.MCPDialer = func(ctx context.Context, url string) (*mcpv1alpha1.MCPServerInfo, error) {
return nil, fmt.Errorf("intentional failure")
}

By("Initial reconciliation creates deployment")
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
drainFakeRecorderEvents(fr)

By("Simulating deployment becoming available")
deployment := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: resourceName, Namespace: "default",
}, deployment)).To(Succeed())
deployment.Status.Replicas = 1
deployment.Status.ReadyReplicas = 1
deployment.Status.Conditions = []appsv1.DeploymentCondition{
{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue},
{Type: appsv1.DeploymentProgressing, Status: corev1.ConditionTrue},
}
Expect(k8sClient.Status().Update(ctx, deployment)).To(Succeed())

By("Reconciling until handshake retries are exhausted")
for i := range maxMCPHandshakeRetries {
result, recErr := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(recErr).NotTo(HaveOccurred())
if i < maxMCPHandshakeRetries-1 {
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
}
}

var collected []string
Eventually(func(g Gomega) {
collected = drainEvents(fr.Events)
exhausted := 0
for _, ev := range collected {
if strings.Contains(ev, "retries exhausted") {
exhausted++
}
}
g.Expect(exhausted).To(Equal(1))
}).Should(Succeed())
var exhaustedEvent string
for _, ev := range collected {
if strings.Contains(ev, "retries exhausted") {
exhaustedEvent = ev
break
}
}
Expect(exhaustedEvent).To(ContainSubstring(corev1.EventTypeWarning))
Expect(exhaustedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable))
Expect(exhaustedEvent).To(ContainSubstring(resourceName))

mcpServer := &mcpv1alpha1.MCPServer{}
Expect(k8sClient.Get(ctx, typeNamespacedName, mcpServer)).To(Succeed())
Expect(mcpServer.Status.HandshakeRetryCount).To(BeNumerically(">=", maxMCPHandshakeRetries))

By("Further reconcile — no duplicate exhausted event")
drainFakeRecorderEvents(fr)
result, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeZero())
Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive())
})

It("should pass a context with timeout to the dialer", func() {
var receivedCtx context.Context
reconciler := &MCPServerReconciler{
Expand Down
14 changes: 14 additions & 0 deletions internal/controller/mcpserver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ func drainFakeRecorderEvents(fr *events.FakeRecorder) {
}
}

// drainEvents returns all currently buffered events from ch without blocking.
func drainEvents(ch <-chan string) []string {
var drained []string
for {
select {
case ev := <-ch:
drained = append(drained, ev)
default:
return drained
}
}
}

// newTestMCPServer returns an MCPServer with standard test defaults:
// namespace "default", SourceTypeContainerImage with ref
// "docker.io/library/test-image:latest", and port 8080.
Expand Down Expand Up @@ -152,6 +165,7 @@ var _ = Describe("MCPServer Controller", func() {
Eventually(fr.Events).Should(Receive(&first))
Expect(first).To(ContainSubstring(corev1.EventTypeNormal))
Expect(first).To(ContainSubstring(ReasonValid))
Expect(first).To(ContainSubstring(resourceName))
Expect(first).To(ContainSubstring("Accepted=True"))

_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
Expand Down