Skip to content
67 changes: 58 additions & 9 deletions internal/controller/mcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ const (
eventActionConfigurationAccepted = "ConfigurationAccepted"
// eventActionServerReady is the reporting action when Ready becomes True with reason Available.
eventActionServerReady = "ServerReady"
// eventActionMCPHandshakeFailed is the reporting action when the MCP handshake fails.
eventActionMCPHandshakeFailed = "MCPHandshakeFailed"
// eventActionMCPHandshakeRetriesExhausted is the reporting action when handshake retries are exhausted.
eventActionMCPHandshakeRetriesExhausted = "MCPHandshakeRetriesExhausted"

// requeueDelayMCPHandshake is the initial delay before requeuing when an MCP handshake fails.
requeueDelayMCPHandshake = 10 * time.Second
Expand Down Expand Up @@ -348,21 +352,13 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
var serverInfo *mcpv1alpha1.MCPServerInfo
readyCondition, serverInfo = r.reconcileHandshake(ctx, mcpServer, mcpURL, readyCondition)

// Normal Event once per Ready transition to Available after a successful handshake.
if pendingServerReadyEvent &&
readyCondition.Status == metav1.ConditionTrue &&
readyCondition.Reason == ReasonAvailable {
r.emitServerReady(mcpServer)
}

var handshakeRetryCount int32
if readyCondition.Reason == ReasonMCPEndpointUnavailable {
if mcpServer.Status.ObservedGeneration == mcpServer.Generation {
handshakeRetryCount = mcpServer.Status.HandshakeRetryCount + 1
} else {
handshakeRetryCount = 1
}
}
handshakeRetryCount := r.reconcileHandshakeEventsAndRetryCount(mcpServer, readyCondition)

status := acv1alpha1.MCPServerStatus().
WithObservedGeneration(mcpServer.Generation).
Expand Down Expand Up @@ -435,6 +431,37 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil
}

// reconcileHandshakeEventsAndRetryCount emits handshake-related events and returns the updated retry count.
func (r *MCPServerReconciler) reconcileHandshakeEventsAndRetryCount(
mcpServer *mcpv1alpha1.MCPServer,
readyCondition metav1.Condition,
) int32 {
if readyCondition.Reason != ReasonMCPEndpointUnavailable {
return 0
}

prevHandshakeRetryCount := mcpServer.Status.HandshakeRetryCount
if mcpServer.Status.ObservedGeneration != mcpServer.Generation {
prevHandshakeRetryCount = 0
}

var handshakeRetryCount int32
if mcpServer.Status.ObservedGeneration == mcpServer.Generation {
handshakeRetryCount = mcpServer.Status.HandshakeRetryCount + 1
} else {
handshakeRetryCount = 1
}

if !duplicateHandshakeUnavailable(mcpServer.Status.Conditions, readyCondition.Message) {
r.emitMCPHandshakeFailed(mcpServer, readyCondition.Message)
}
if int(handshakeRetryCount) >= maxMCPHandshakeRetries && int(prevHandshakeRetryCount) < maxMCPHandshakeRetries {
r.emitMCPHandshakeRetriesExhausted(mcpServer, handshakeRetryCount)
}

return handshakeRetryCount
}

// reconcileHandshake performs the MCP handshake when the deployment is available,
// skipping it when the endpoint was already verified for the current generation.
func (r *MCPServerReconciler) reconcileHandshake(
Expand Down Expand Up @@ -1256,6 +1283,12 @@ func readyConditionIsAvailable(conditions []metav1.Condition) bool {
return c != nil && c.Status == metav1.ConditionTrue && c.Reason == ReasonAvailable
}

func duplicateHandshakeUnavailable(conditions []metav1.Condition, message string) bool {
prevReady := meta.FindStatusCondition(conditions, ConditionTypeReady)
return prevReady != nil && prevReady.Status == metav1.ConditionFalse &&
prevReady.Reason == ReasonMCPEndpointUnavailable && prevReady.Message == message
}

func (r *MCPServerReconciler) reconcilePermanentValidationError(
ctx context.Context,
mcpServer *mcpv1alpha1.MCPServer,
Expand Down Expand Up @@ -1339,6 +1372,22 @@ func (r *MCPServerReconciler) emitServerReady(mcpServer *mcpv1alpha1.MCPServer)
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeNormal, ReasonAvailable, eventActionServerReady, "MCPServer %s is ready; Ready=True", mcpServer.Name)
}

func (r *MCPServerReconciler) emitMCPHandshakeFailed(mcpServer *mcpv1alpha1.MCPServer, message string) {
if r.Recorder == nil {
return
}
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, ReasonMCPEndpointUnavailable, eventActionMCPHandshakeFailed, "%s", message)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the emitMCPHandshakeRetriesExhausted the name of the mcp server is used - perhaps we can do here too for more consistent approach?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @matzew for pointing it out.
Done in 5fca5f7 — emitMCPHandshakeFailed now includes the MCPServer name. Updated configuration accepted/invalid messages in the same PR for consistency across all emitted events.

}

func (r *MCPServerReconciler) emitMCPHandshakeRetriesExhausted(mcpServer *mcpv1alpha1.MCPServer, retryCount int32) {
if r.Recorder == nil {
return
}
r.Recorder.Eventf(mcpServer, nil, corev1.EventTypeWarning, ReasonMCPEndpointUnavailable, eventActionMCPHandshakeRetriesExhausted,
"MCP handshake retries exhausted for MCPServer %s after %d attempts; fix the MCP endpoint or update spec to retry",
mcpServer.Name, retryCount)
}

func (r *MCPServerReconciler) applyStatus(
ctx context.Context,
mcpServer *mcpv1alpha1.MCPServer,
Expand Down
14 changes: 14 additions & 0 deletions internal/controller/mcpserver_controller_conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,18 @@ var _ = Describe("status condition helpers", func() {
{Type: ConditionTypeReady, Status: metav1.ConditionTrue, Reason: ReasonAvailable},
})).To(BeTrue())
})

It("duplicateHandshakeUnavailable returns true only for matching Ready=False MCPEndpointUnavailable message", func() {
msg := "MCP endpoint is not serving a valid MCP protocol: connection refused"
Expect(duplicateHandshakeUnavailable(nil, msg)).To(BeFalse())
Expect(duplicateHandshakeUnavailable([]metav1.Condition{
{Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonDeploymentUnavailable, Message: msg},
}, msg)).To(BeFalse())
Expect(duplicateHandshakeUnavailable([]metav1.Condition{
{Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonMCPEndpointUnavailable, Message: "other"},
}, msg)).To(BeFalse())
Expect(duplicateHandshakeUnavailable([]metav1.Condition{
{Type: ConditionTypeReady, Status: metav1.ConditionFalse, Reason: ReasonMCPEndpointUnavailable, Message: msg},
}, msg)).To(BeTrue())
})
})
129 changes: 129 additions & 0 deletions internal/controller/mcpserver_controller_handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"strings"
"time"

"github.com/modelcontextprotocol/go-sdk/mcp"
Expand Down Expand Up @@ -449,6 +450,134 @@ var _ = Describe("MCPServer Controller - MCP Handshake Validation", func() {
Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive())
})

It("should emit a Warning MCPHandshakeFailed event only when handshake error message changes", func() {
Comment thread
ibm-adarsh marked this conversation as resolved.
failMsg := "intentional failure"
reconciler, fr := newReconcilerForTestWithFakeEvents(k8sClient, k8sClient.Scheme())
reconciler.MCPDialer = func(ctx context.Context, url string) (*mcpv1alpha1.MCPServerInfo, error) {
return nil, fmt.Errorf("%s", failMsg)
}

By("Initial reconciliation creates deployment")
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
drainFakeRecorderEvents(fr)

By("Simulating deployment becoming available")
deployment := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: resourceName, Namespace: "default",
}, deployment)).To(Succeed())
deployment.Status.Replicas = 1
deployment.Status.ReadyReplicas = 1
deployment.Status.Conditions = []appsv1.DeploymentCondition{
{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue},
{Type: appsv1.DeploymentProgressing, Status: corev1.ConditionTrue},
}
Expect(k8sClient.Status().Update(ctx, deployment)).To(Succeed())

By("First handshake failure — Warning event emitted once")
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

var handshakeFailedEvent string
Eventually(fr.Events).Should(Receive(&handshakeFailedEvent))
Expect(handshakeFailedEvent).To(ContainSubstring(corev1.EventTypeWarning))
Expect(handshakeFailedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable))
Expect(handshakeFailedEvent).To(ContainSubstring(failMsg))

By("Second reconcile with same error — no duplicate handshake failed event")
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive())
})

It("should emit MCPHandshakeRetriesExhausted once when max handshake retries is reached", func() {
reconciler, fr := newReconcilerForTestWithFakeEvents(k8sClient, k8sClient.Scheme())
reconciler.MCPDialer = func(ctx context.Context, url string) (*mcpv1alpha1.MCPServerInfo, error) {
return nil, fmt.Errorf("intentional failure")
}

By("Initial reconciliation creates deployment")
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
drainFakeRecorderEvents(fr)

By("Simulating deployment becoming available")
deployment := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: resourceName, Namespace: "default",
}, deployment)).To(Succeed())
deployment.Status.Replicas = 1
deployment.Status.ReadyReplicas = 1
deployment.Status.Conditions = []appsv1.DeploymentCondition{
{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue},
{Type: appsv1.DeploymentProgressing, Status: corev1.ConditionTrue},
}
Expect(k8sClient.Status().Update(ctx, deployment)).To(Succeed())

By("Reconciling until handshake retries are exhausted")
for i := range maxMCPHandshakeRetries {
result, recErr := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(recErr).NotTo(HaveOccurred())
if i < maxMCPHandshakeRetries-1 {
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
}
}

var collected []string
Eventually(func(g Gomega) {
for {
select {
case ev := <-fr.Events:
collected = append(collected, ev)
default:
goto check
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get rid of this goto, we can use something like this:

func drainEvents(ch <-chan string) []string {
      var events []string
      for {
          select {
          case ev := <-ch:
              events = append(events, ev)
          default:
              return events
          }
      }
  }

  Then the test becomes:

  Eventually(func(g Gomega) {
      collected = drainEvents(fr.Events)
      exhausted := 0
      for _, ev := range collected {
          if strings.Contains(ev, "retries exhausted") {
              exhausted++
          }
      }
      g.Expect(exhausted).To(Equal(1))
  }).Should(Succeed())

Alternative is using a labeled break, which is also not preferred.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE!

}
}
check:
exhausted := 0
for _, ev := range collected {
if strings.Contains(ev, "retries exhausted") {
exhausted++
}
}
g.Expect(exhausted).To(Equal(1))
}).Should(Succeed())
var exhaustedEvent string
for _, ev := range collected {
if strings.Contains(ev, "retries exhausted") {
exhaustedEvent = ev
break
}
}
Expect(exhaustedEvent).To(ContainSubstring(corev1.EventTypeWarning))
Expect(exhaustedEvent).To(ContainSubstring(ReasonMCPEndpointUnavailable))
Expect(exhaustedEvent).To(ContainSubstring(resourceName))

mcpServer := &mcpv1alpha1.MCPServer{}
Expect(k8sClient.Get(ctx, typeNamespacedName, mcpServer)).To(Succeed())
Expect(mcpServer.Status.HandshakeRetryCount).To(BeNumerically(">=", maxMCPHandshakeRetries))

By("Further reconcile — no duplicate exhausted event")
drainFakeRecorderEvents(fr)
result, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeZero())
Consistently(fr.Events, 300*time.Millisecond, 20*time.Millisecond).ShouldNot(Receive())
})

It("should pass a context with timeout to the dialer", func() {
var receivedCtx context.Context
reconciler := &MCPServerReconciler{
Expand Down