, etc.) in your responses.
+ Tool calls happen automatically in the background - only show the results to the user.
+ Present information naturally as if you performed the action yourself.
+
+ IMPORTANT SAFETY RULE: For any WRITE operations (createSource, createDestination, createPipeline),
+ you MUST ALWAYS call the tool first with userConfirmed=false to get the confirmation details,
+ then present those details to the user and ask for their explicit confirmation,
+ and ONLY after the user confirms should you call the tool again with userConfirmed=true.
+ NEVER execute write operations without user confirmation.
+ """;
+ }
+}
diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/ai/model/CustomVertexAiAnthropicStreamingChatModel.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/ai/model/CustomVertexAiAnthropicStreamingChatModel.java
new file mode 100644
index 00000000..399b64be
--- /dev/null
+++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/ai/model/CustomVertexAiAnthropicStreamingChatModel.java
@@ -0,0 +1,774 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.ai.model;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.cert.X509Certificate;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import dev.langchain4j.agent.tool.ToolSpecification;
+import dev.langchain4j.data.message.AiMessage;
+import dev.langchain4j.data.message.ChatMessage;
+import dev.langchain4j.data.message.SystemMessage;
+import dev.langchain4j.data.message.ToolExecutionResultMessage;
+import dev.langchain4j.data.message.UserMessage;
+import dev.langchain4j.model.ModelProvider;
+import dev.langchain4j.model.chat.StreamingChatModel;
+import dev.langchain4j.model.chat.listener.ChatModelListener;
+import dev.langchain4j.model.chat.request.ChatRequest;
+import dev.langchain4j.model.chat.request.ChatRequestParameters;
+import dev.langchain4j.model.chat.response.ChatResponse;
+import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
+import dev.langchain4j.model.output.FinishReason;
+import dev.langchain4j.model.output.TokenUsage;
+
+/**
+ * Custom streaming chat model for internal Vertex AI-compatible Anthropic API.
+ * This implementation works with APIs that use Vertex AI request/response format
+ * but have custom endpoints and bearer token authentication.
+ *
+ *
+ * The following code is mainly generated by Claude since this is just a custom adapter to internal model serving service
+ *
+ * @author Mario Fiore Vitale
+ */
+public class CustomVertexAiAnthropicStreamingChatModel implements StreamingChatModel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CustomVertexAiAnthropicStreamingChatModel.class);
+ private static final String ANTHROPIC_VERSION = "vertex-2023-10-16";
+
+ // Static initializer to configure SSL to trust all certificates (for development only)
+ static {
+ try {
+ // Create a trust manager that trusts all certificates
+ TrustManager[] trustAllCerts = new TrustManager[]{
+ new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ public void checkClientTrusted(X509Certificate[] certs, String authType) {
+ }
+
+ public void checkServerTrusted(X509Certificate[] certs, String authType) {
+ }
+ }
+ };
+
+ // Install the all-trusting trust manager
+ SSLContext sc = SSLContext.getInstance("SSL");
+ sc.init(null, trustAllCerts, new java.security.SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+
+ // Install the all-trusting host verifier
+ HttpsURLConnection.setDefaultHostnameVerifier((hostname, session) -> true);
+
+ LOG.info("SSL trust-all configured for CustomVertexAiAnthropicStreamingChatModel");
+ }
+ catch (Exception e) {
+ LOG.error("Failed to configure SSL trust-all", e);
+ }
+ }
+
+ private final String baseUrl;
+ private final String apiKey;
+ private final String modelName;
+ private final String modelType; // "sonnet" or "haiku"
+ private final Integer maxTokens;
+ private final Double temperature;
+ private final Boolean logRequests;
+ private final Boolean logResponses;
+ private final String systemPrompt;
+ private final ObjectMapper objectMapper;
+
+ private CustomVertexAiAnthropicStreamingChatModel(Builder builder) {
+ this.baseUrl = builder.baseUrl;
+ this.apiKey = builder.apiKey;
+ this.modelName = builder.modelName;
+ this.modelType = determineModelType(builder.modelName);
+ this.maxTokens = builder.maxTokens != null ? builder.maxTokens : 4096;
+ this.temperature = builder.temperature;
+ this.logRequests = builder.logRequests != null ? builder.logRequests : false;
+ this.logResponses = builder.logResponses != null ? builder.logResponses : false;
+ this.systemPrompt = builder.systemPrompt;
+
+ // Configure ObjectMapper to handle unknown types gracefully
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.configure(com.fasterxml.jackson.databind.SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ }
+
+ private String determineModelType(String modelName) {
+ if (modelName.toLowerCase().contains("haiku")) {
+ return "haiku";
+ }
+ else if (modelName.toLowerCase().contains("sonnet")) {
+ return "sonnet";
+ }
+ else {
+ // Default to sonnet
+ return "sonnet";
+ }
+ }
+
+ @Override
+ public void doChat(ChatRequest chatRequest, StreamingChatResponseHandler handler) {
+ try {
+ // Build the endpoint URL
+ String endpoint = String.format("%s/%s/models/%s:streamRawPredict",
+ baseUrl, modelType, modelName);
+
+ if (logRequests) {
+ LOG.info("Calling endpoint: {}", endpoint);
+ }
+
+ // Convert ChatRequest to Vertex AI format
+ Map requestBody = buildRequestBody(chatRequest);
+
+ if (logRequests) {
+ try {
+ LOG.info("Request body: {}", objectMapper.writeValueAsString(requestBody));
+ }
+ catch (Exception e) {
+ LOG.warn("Could not serialize request body for logging: {}", e.getMessage());
+ LOG.info("Request endpoint: {}", endpoint);
+ }
+ }
+
+ // Make HTTP request
+ makeStreamingRequest(endpoint, requestBody, handler);
+
+ }
+ catch (Exception e) {
+ LOG.error("Error during chat", e);
+ try {
+ handler.onError(e);
+ }
+ catch (Exception userException) {
+ LOG.warn("User's onError handler threw an exception, ignoring", userException);
+ }
+ }
+ }
+
+ private Map buildRequestBody(ChatRequest chatRequest) {
+ Map body = new HashMap<>();
+ body.put("anthropic_version", ANTHROPIC_VERSION);
+
+ // Add system message if configured
+ if (systemPrompt != null && !systemPrompt.isEmpty()) {
+ body.put("system", systemPrompt);
+ }
+
+ // Convert messages
+ List