Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions debezium-platform-conductor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,33 @@
<scope>import</scope>
</dependency>

<!-- Quarkus LangChain4j BOM to manage quarkus-langchain4j extensions -->
<dependency>
<groupId>io.quarkiverse.langchain4j</groupId>
<artifactId>quarkus-langchain4j-bom</artifactId>
<version>1.4.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Override langchain4j-bom from debezium-parent to match quarkus-langchain4j:1.4.2 -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-bom</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Quarkus MCP Server BOM to manage MCP server extensions -->
<dependency>
<groupId>io.quarkiverse.mcp</groupId>
<artifactId>quarkus-mcp-server-bom</artifactId>
<version>1.8.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
Expand All @@ -79,6 +106,13 @@
<scope>import</scope>
</dependency>

<!-- langchain4j-core needs 1.9.1 for vertex-ai-anthropic compatibility -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-core</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>com.blazebit</groupId>
<artifactId>blaze-persistence-integration-jackson-jakarta</artifactId>
Expand Down Expand Up @@ -475,6 +509,38 @@
<artifactId>quarkus-test-kafka-companion</artifactId>
<scope>test</scope>
</dependency>

<!-- AI/MCP Dependencies -->
<!-- Quarkus LangChain4j Extensions - versions managed by quarkus-langchain4j-bom -->
<dependency>
<groupId>io.quarkiverse.langchain4j</groupId>
<artifactId>quarkus-langchain4j-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.langchain4j</groupId>
<artifactId>quarkus-langchain4j-anthropic</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.langchain4j</groupId>
<artifactId>quarkus-langchain4j-mcp</artifactId>
</dependency>

<!-- MCP Server Extensions - versions managed by quarkus-mcp-server-bom -->
<dependency>
<groupId>io.quarkiverse.mcp</groupId>
<artifactId>quarkus-mcp-server-http</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.mcp</groupId>
<artifactId>quarkus-mcp-server-hibernate-validator</artifactId>
</dependency>

<!-- Vanilla LangChain4j for custom VertexAI Anthropic model (dev profile only) -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-vertex-ai-anthropic</artifactId>
<version>1.9.1-beta17</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.ai.chat;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import jakarta.enterprise.context.SessionScoped;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.common.annotation.Incubating;

import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.rag.content.Content;
import dev.langchain4j.service.TokenStream;
import dev.langchain4j.service.tool.ToolExecution;

/**
* ChatService using fully declarative Quarkus LangChain4j approach.
* - Assistant is auto-generated via @RegisterAiService
* - MCP tools configured declaratively in application.properties
* - StreamingChatModel injected from ChatModelProducer (profile-based)
*
* @author Mario Fiore Vitale
*/
@Incubating
@SessionScoped
public class ChatService implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(ChatService.class);

private final Zium zium;

public ChatService(Zium zium) {
this.zium = zium;
}

/**
* Chat method for WebSocket - logs intermediate responses, returns only final answer
*/
public CompletableFuture<String> chatWithLogging(String message) {
CompletableFuture<String> futureResponse = new CompletableFuture<>();

LOG.info("Processing chat message: {}", message);

TokenStream tokenStream = zium.chat(message);

tokenStream
.onPartialResponse((String partialResponse) -> {
LOG.debug("Partial response: {}", partialResponse);
})
.onRetrieved((List<Content> contents) -> {
LOG.info("Retrieved contents: {}", contents);
})
.onIntermediateResponse((ChatResponse intermediateResponse) -> {
LOG.info("Intermediate response: {}", intermediateResponse);
})
.onToolExecuted((ToolExecution toolExecution) -> {
LOG.info("Tool execution result: {}", toolExecution);
})
.onCompleteResponse((ChatResponse response) -> {
LOG.info("Chat completed successfully");
String finalAnswer = response.aiMessage().text();
futureResponse.complete(finalAnswer);
})
.onError((Throwable error) -> {
LOG.error("Error during chat processing", error);
futureResponse.completeExceptionally(error);
})
.start();

return futureResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.ai.chat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.common.annotation.Incubating;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;

/**
* WebSocket endpoint for chat using quarkus-websockets-next.
* This API properly supports @SessionScoped beans for chat memory.
*
* @author Mario Fiore Vitale
*/
@Incubating
@WebSocket(path = "/api/chat")
public class ChatWebSocket {

private static final Logger LOG = LoggerFactory.getLogger(ChatWebSocket.class);

private final ChatService chatService;

public ChatWebSocket(ChatService chatService) {
this.chatService = chatService;
}

@OnOpen
public void onOpen(WebSocketConnection connection) {
LOG.info("WebSocket opened: {}", connection.id());
connection.sendTextAndAwait("{\"type\":\"connected\",\"message\":\"Connected to chat service\"}");
}

@OnClose
public void onClose(WebSocketConnection connection) {
LOG.info("WebSocket closed: {}", connection.id());
}

@OnError
public void onError(WebSocketConnection connection, Throwable throwable) {
LOG.error("WebSocket error on connection: {}", connection.id(), throwable);
connection.sendTextAndAwait("{\"type\":\"error\",\"message\":\"" + escapeJson(throwable.getMessage()) + "\"}");
}

@OnTextMessage
public void onMessage(String message, WebSocketConnection connection) {
LOG.info("Received message from {}: {}", connection.id(), message);

try {
// Send processing indicator
connection.sendTextAndAwait("{\"type\":\"processing\",\"message\":\"Processing your request...\"}");

chatService.chatWithLogging(message).whenComplete((response, error) -> {
if (error != null) {
LOG.error("Error processing chat message", error);
connection.sendTextAndAwait("{\"type\":\"error\",\"message\":\"" + escapeJson(error.getMessage()) + "\"}");
}
else {
// Send the complete final answer
String jsonResponse = "{\"type\":\"answer\",\"content\":\"" + escapeJson(response) + "\"}";
connection.sendTextAndAwait(jsonResponse);
}
});
}
catch (Exception e) {
LOG.error("Error handling message", e);
connection.sendTextAndAwait("{\"type\":\"error\",\"message\":\"" + escapeJson(e.getMessage()) + "\"}");
}
}

private String escapeJson(String input) {
if (input == null) {
return "";
}
return input.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.ai.chat;

import jakarta.enterprise.context.SessionScoped;

import io.debezium.common.annotation.Incubating;
import io.quarkiverse.langchain4j.RegisterAiService;
import io.quarkiverse.langchain4j.mcp.runtime.McpToolBox;

import dev.langchain4j.service.TokenStream;

/**
* Debezium Platform AI Assistant using declarative Quarkus LangChain4j.
* Uses MCP protocol to access Platform tools.
*/
@Incubating
@RegisterAiService
@SessionScoped
public interface Zium {

@McpToolBox({ "debezium-platform", "context7" })
TokenStream chat(String userMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.ai.dto;

import java.util.List;
import java.util.Map;

import jakarta.validation.constraints.NotEmpty;

/**
* AI-specific destination DTO for MCP tools.
* Simplified version optimized for LLM interaction with validation for schema generation.
*/
public class AiDestination {
private String description;

@NotEmpty
private String type;

@NotEmpty
private String schema;

private List<AiVaultReference> vaults;
private Map<String, Object> config;

@NotEmpty
private String name;

private Long id;

public AiDestination() {
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public List<AiVaultReference> getVaults() {
return vaults;
}

public void setVaults(List<AiVaultReference> vaults) {
this.vaults = vaults;
}

public Map<String, Object> getConfig() {
return config;
}

public void setConfig(Map<String, Object> config) {
this.config = config;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}
}
Loading
Loading