From d907392be1e1682ad37cd940ae19a162211b1a56 Mon Sep 17 00:00:00 2001
From: JunWang222 <466529050@qq.com>
Date: Wed, 4 Mar 2026 00:36:58 -0500
Subject: [PATCH] debezium/dbz#1088 Add Azure Event Hubs connection validator
Introduce Event Hubs connection validation in Conductor with schema and factory wiring, and add emulator-backed integration plus deterministic auth-path tests so connection checks are reliably covered.
Signed-off-by: JunWang222 <466529050@qq.com>
---
debezium-platform-conductor/pom.xml | 16 ++
.../ConnectionValidatorFactory.java | 1 +
.../AzureEventHubsConnectionValidator.java | 163 ++++++++++++++
.../src/main/resources/application.yml | 3 +
.../main/resources/connection-schemas.json | 25 +++
.../AzureEventHubsConnectionValidatorIT.java | 208 ++++++++++++++++++
.../AzureEventHubsTestResource.java | 61 +++++
.../ConnectionValidatorFactoryTest.java | 6 +
.../connection/CustomTestProfile.java | 2 +-
...TestAzureEventHubsConnectionValidator.java | 24 ++
.../azure_eventhubs_emulator_config.json | 24 ++
11 files changed, 532 insertions(+), 1 deletion(-)
create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java
create mode 100644 debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json
diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml
index aa9f9401..64c94a5f 100644
--- a/debezium-platform-conductor/pom.xml
+++ b/debezium-platform-conductor/pom.xml
@@ -44,6 +44,7 @@
1.6.18
2.0.3
5.3.2
+ 5.21.3
1.13.0
3.6.0
2.29.0
@@ -203,6 +204,12 @@
${version.mockwebserver}
+
+ com.azure
+ azure-messaging-eventhubs
+ ${version.azure-messaging-eventhubs}
+
+
@@ -376,6 +383,10 @@
io.quarkus
quarkus-kafka-client
+
+ com.azure
+ azure-messaging-eventhubs
+
io.quarkus
@@ -436,6 +447,11 @@
testcontainers-mssqlserver
test
+
+ org.testcontainers
+ testcontainers-azure
+ test
+
ch.qos.logback
diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java
index e736035f..1223ecd6 100644
--- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java
+++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java
@@ -28,6 +28,7 @@ public ConnectionValidator getValidator(String connectionType) {
private String mapToValidatorName(String connectionType) {
return switch (connectionType) {
case "ORACLE", "MYSQL", "MARIADB", "SQLSERVER", "POSTGRESQL" -> "DATABASE";
+ case "AZURE_EVENTS_HUBS" -> "EVENTHUBS";
default -> connectionType;
};
}
diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java
new file mode 100644
index 00000000..b66f76f6
--- /dev/null
+++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection.destination;
+
+import java.time.Duration;
+import java.util.Map;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Named;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.exception.AzureException;
+import com.azure.core.exception.ClientAuthenticationException;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.domain.views.Connection;
+import io.debezium.platform.environment.connection.ConnectionValidator;
+import io.debezium.util.Strings;
+
+/**
+ * Implementation of {@link ConnectionValidator} for Azure Event Hubs connections.
+ *
+ * This validator performs validation of Azure Event Hubs connection configurations
+ * by attempting to connect and retrieve Event Hub metadata.
+ *
+ *
+ *
+ * The validation process includes:
+ *
+ * - Connection string and hub name presence checks
+ * - Network connectivity verification via metadata retrieval
+ * - Authentication validation against the Event Hubs namespace
+ *
+ *
+ */
+@Named("EVENTHUBS")
+@ApplicationScoped
+public class AzureEventHubsConnectionValidator implements ConnectionValidator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsConnectionValidator.class);
+
+ public static final String CONNECTION_STRING_KEY = "connectionstring";
+ public static final String HUB_NAME_KEY = "hubname";
+
+ private final int defaultConnectionTimeout;
+
+ public AzureEventHubsConnectionValidator(@ConfigProperty(name = "destinations.eventhubs.connection.timeout") int defaultConnectionTimeout) {
+ this.defaultConnectionTimeout = defaultConnectionTimeout;
+ }
+
+ @Override
+ public ConnectionValidationResult validate(Connection connectionConfig) {
+ if (connectionConfig == null) {
+ return ConnectionValidationResult.failed("Connection configuration cannot be null");
+ }
+
+ try {
+ LOGGER.debug("Starting Azure Event Hubs connection validation for connection: {}", connectionConfig.getName());
+
+ Map config = connectionConfig.getConfig();
+
+ if (Strings.isNullOrBlank(getConfigString(config, CONNECTION_STRING_KEY))) {
+ return ConnectionValidationResult.failed("Event Hubs connection string must be specified");
+ }
+
+ if (Strings.isNullOrBlank(getConfigString(config, HUB_NAME_KEY))) {
+ return ConnectionValidationResult.failed("Event Hub name must be specified");
+ }
+
+ String connectionString = getConfigString(config, CONNECTION_STRING_KEY);
+ String hubName = getConfigString(config, HUB_NAME_KEY);
+
+ return performConnectionValidation(connectionString, hubName);
+ }
+ catch (Exception e) {
+ LOGGER.error("Unexpected error during Azure Event Hubs connection validation", e);
+ return ConnectionValidationResult.failed("Validation failed due to unexpected error: " + e.getMessage());
+ }
+ }
+
+ private ConnectionValidationResult performConnectionValidation(String connectionString, String hubName) {
+ EventHubProducerClient producer = null;
+
+ try {
+ LOGGER.debug("Creating EventHubProducerClient for validation");
+ producer = createProducerClient(connectionString, hubName);
+
+ LOGGER.debug("Attempting to retrieve Event Hub properties");
+ var properties = producer.getEventHubProperties();
+
+ LOGGER.debug("Successfully connected to Event Hub '{}' with {} partition(s)",
+ properties.getName(), properties.getPartitionIds().stream().count());
+
+ return ConnectionValidationResult.successful();
+ }
+ catch (ClientAuthenticationException e) {
+ LOGGER.warn("Authentication failed for Azure Event Hubs", e);
+ return ConnectionValidationResult.failed(
+ "Authentication failed - please check connection string credentials");
+ }
+ catch (AmqpException e) {
+ LOGGER.warn("AMQP error during Azure Event Hubs connection validation", e);
+ if (e.isTransient()) {
+ return ConnectionValidationResult.failed(
+ "Transient connection error - please retry or check network connectivity");
+ }
+ return ConnectionValidationResult.failed(
+ "Failed to connect to Azure Event Hubs: " + e.getMessage());
+ }
+ catch (AzureException e) {
+ LOGGER.warn("Azure error during Event Hubs connection validation", e);
+ return ConnectionValidationResult.failed(
+ "Azure Event Hubs connection error: " + e.getMessage());
+ }
+ catch (IllegalArgumentException e) {
+ LOGGER.warn("Invalid connection string format", e);
+ return ConnectionValidationResult.failed(
+ "Invalid connection string format - please verify the connection string");
+ }
+ catch (Exception e) {
+ LOGGER.error("Unexpected error during Azure Event Hubs connection validation", e);
+ return ConnectionValidationResult.failed(
+ "Generic error while connecting to Azure Event Hubs");
+ }
+ finally {
+ if (producer != null) {
+ try {
+ LOGGER.debug("Closing EventHubProducerClient");
+ producer.close();
+ }
+ catch (Exception e) {
+ LOGGER.warn("Error closing EventHubProducerClient", e);
+ }
+ }
+ }
+ }
+
+ EventHubProducerClient createProducerClient(String connectionString, String hubName) {
+ AmqpRetryOptions retryOptions = new AmqpRetryOptions()
+ .setMaxRetries(0)
+ .setTryTimeout(Duration.ofSeconds(defaultConnectionTimeout));
+
+ return new EventHubClientBuilder()
+ .connectionString(connectionString, hubName)
+ .retryOptions(retryOptions)
+ .buildProducerClient();
+ }
+
+ private String getConfigString(Map config, String key) {
+ Object value = config.get(key);
+ return value == null ? null : value.toString();
+ }
+}
diff --git a/debezium-platform-conductor/src/main/resources/application.yml b/debezium-platform-conductor/src/main/resources/application.yml
index 33c2746f..51b22e13 100644
--- a/debezium-platform-conductor/src/main/resources/application.yml
+++ b/debezium-platform-conductor/src/main/resources/application.yml
@@ -44,6 +44,9 @@ destinations:
kafka:
connection:
timeout: 60
+ eventhubs:
+ connection:
+ timeout: 60
quarkus:
hibernate-orm:
diff --git a/debezium-platform-conductor/src/main/resources/connection-schemas.json b/debezium-platform-conductor/src/main/resources/connection-schemas.json
index 936e1801..72a083a6 100644
--- a/debezium-platform-conductor/src/main/resources/connection-schemas.json
+++ b/debezium-platform-conductor/src/main/resources/connection-schemas.json
@@ -218,5 +218,30 @@
}
}
}
+ },
+ {
+ "type": "EVENTHUBS",
+ "schema": {
+ "title": "Azure Event Hubs connection properties",
+ "description": "Azure Event Hubs connection properties",
+ "type": "object",
+ "required": [
+ "connectionstring",
+ "hubname"
+ ],
+ "additionalProperties": {
+ "type": "string"
+ },
+ "properties": {
+ "connectionstring": {
+ "type": "string",
+ "title": "Connection string for the Azure Event Hubs namespace"
+ },
+ "hubname": {
+ "type": "string",
+ "title": "Name of the Event Hub"
+ }
+ }
+ }
}
]
\ No newline at end of file
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java
new file mode 100644
index 00000000..6f78d5bb
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.data.model.ConnectionEntity;
+import io.debezium.platform.domain.views.Connection;
+import io.debezium.platform.environment.connection.destination.AzureEventHubsConnectionValidator;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(value = AzureEventHubsTestResource.class, restrictToAnnotatedClass = true)
+class AzureEventHubsConnectionValidatorIT {
+
+ private static final int DEFAULT_60_SECONDS_TIMEOUT = 60;
+
+ /**
+ * The Event Hub name configured in {@code azure_eventhubs_emulator_config.json} and created
+ * by the emulator on startup.
+ */
+ private static final String EMULATOR_HUB_NAME = "eh1";
+
+ private AzureEventHubsConnectionValidator validator;
+
+ @BeforeEach
+ void setUp() {
+ validator = new AzureEventHubsConnectionValidator(DEFAULT_60_SECONDS_TIMEOUT);
+ }
+
+ /**
+ * Returns the emulator's auto-generated connection string, which contains
+ * valid credentials ({@code SharedAccessKeyName} and {@code SharedAccessKey})
+ * and the correct endpoint for the running emulator container.
+ */
+ private String getValidConnectionString() {
+ return AzureEventHubsTestResource.getConnectionString();
+ }
+
+ /**
+ * Keeps the same endpoint and key name from a valid connection string, and
+ * mutates only the shared access key to force an authentication failure.
+ */
+ private String withInvalidSharedAccessKey(String connectionString) {
+ return connectionString
+ .replaceFirst("SharedAccessKey=[^;]*", "SharedAccessKey=InvalidValue")
+ .replace("UseDevelopmentEmulator=true", "UseDevelopmentEmulator=false");
+ }
+
+ @Test
+ @DisplayName("Should successfully validate connection with valid Event Hubs configuration")
+ void shouldValidateSuccessfulConnection() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", getValidConnectionString());
+ config.put("hubname", EMULATOR_HUB_NAME);
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertTrue(result.valid(), "Connection validation should succeed");
+ }
+
+ @Test
+ @DisplayName("Should fail validation when connection string is not provided")
+ void shouldFailValidationWithoutConnectionString() {
+
+ Map config = new HashMap<>();
+ config.put("hubname", EMULATOR_HUB_NAME);
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertEquals("Event Hubs connection string must be specified", result.message());
+ }
+
+ @Test
+ @DisplayName("Should fail validation when connection string is empty")
+ void shouldFailValidationWithEmptyConnectionString() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", "");
+ config.put("hubname", EMULATOR_HUB_NAME);
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertEquals("Event Hubs connection string must be specified", result.message());
+ }
+
+ @Test
+ @DisplayName("Should fail validation when connection string is null")
+ void shouldFailValidationWithNullConnectionString() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", null);
+ config.put("hubname", EMULATOR_HUB_NAME);
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertEquals("Event Hubs connection string must be specified", result.message());
+ }
+
+ @Test
+ @DisplayName("Should fail validation when hub name is not provided")
+ void shouldFailValidationWithoutHubName() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", getValidConnectionString());
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertEquals("Event Hub name must be specified", result.message());
+ }
+
+ @Test
+ @DisplayName("Should fail validation when hub name is empty")
+ void shouldFailValidationWithEmptyHubName() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", getValidConnectionString());
+ config.put("hubname", "");
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertEquals("Event Hub name must be specified", result.message());
+ }
+
+ @Test
+ @DisplayName("Should fail validation when connection config is null")
+ void shouldFailValidationWithNullConnection() {
+
+ ConnectionValidationResult result = validator.validate(null);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertEquals("Connection configuration cannot be null", result.message());
+ }
+
+ @Test
+ @DisplayName("Should fail validation with invalid connection string format")
+ void shouldFailValidationWithInvalidConnectionString() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", "not-a-valid-connection-string");
+ config.put("hubname", EMULATOR_HUB_NAME);
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertThat(result.message()).containsIgnoringCase("connection string");
+ }
+
+ @Test
+ @DisplayName("Should fail validation with wrong credentials in connection string")
+ void shouldFailValidationWithWrongCredentials() {
+
+ String validConnectionString = getValidConnectionString();
+ String invalidConnectionString = withInvalidSharedAccessKey(validConnectionString);
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", invalidConnectionString);
+ config.put("hubname", EMULATOR_HUB_NAME);
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail with wrong credentials");
+ assertThat(result.message()).containsIgnoringCase("event hubs");
+ }
+
+ @Test
+ @DisplayName("Should fail validation with non-existent hub name")
+ void shouldFailValidationWithNonExistentHub() {
+
+ Map config = new HashMap<>();
+ config.put("connectionstring", getValidConnectionString());
+ config.put("hubname", "nonexistent-hub");
+ Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config);
+
+ ConnectionValidationResult result = validator.validate(connection);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ }
+}
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java
new file mode 100644
index 00000000..17471c0d
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection;
+
+import java.util.Map;
+
+import org.testcontainers.azure.AzuriteContainer;
+import org.testcontainers.azure.EventHubsEmulatorContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.MountableFile;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+public class AzureEventHubsTestResource implements QuarkusTestResourceLifecycleManager {
+
+ private static final String AZURITE_IMAGE = "mcr.microsoft.com/azure-storage/azurite:3.33.0";
+ private static final String EVENTHUBS_EMULATOR_IMAGE = "mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1";
+
+ private static Network network;
+ private static AzuriteContainer azurite;
+ private static EventHubsEmulatorContainer emulator;
+
+ @Override
+ public Map start() {
+ network = Network.newNetwork();
+
+ azurite = new AzuriteContainer(AZURITE_IMAGE)
+ .withNetwork(network);
+ azurite.start();
+
+ emulator = new EventHubsEmulatorContainer(EVENTHUBS_EMULATOR_IMAGE)
+ .acceptLicense()
+ .withNetwork(network)
+ .withConfig(MountableFile.forClasspathResource("/azure_eventhubs_emulator_config.json"))
+ .withAzuriteContainer(azurite);
+ emulator.start();
+
+ return Map.of(
+ "destinations.eventhubs.connection.timeout", "60");
+ }
+
+ public static String getConnectionString() {
+ return emulator.getConnectionString();
+ }
+
+ @Override
+ public void stop() {
+ if (emulator != null) {
+ emulator.stop();
+ }
+ if (azurite != null) {
+ azurite.stop();
+ }
+ if (network != null) {
+ network.close();
+ }
+ }
+}
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java
index 55630bad..a318f193 100644
--- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java
@@ -36,4 +36,10 @@ void shouldReturnKafkaValidator() {
ConnectionValidator validator = factory.getValidator("KAFKA");
assertFalse(validator.validate(new TestConnectionView(ConnectionEntity.Type.KAFKA, Map.of())).valid());
}
+
+ @Test
+ void shouldReturnEventHubsValidator() {
+ ConnectionValidator validator = factory.getValidator(ConnectionEntity.Type.AZURE_EVENTS_HUBS.name());
+ assertFalse(validator.validate(new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, Map.of())).valid());
+ }
}
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java
index d7b86752..b4d2708d 100644
--- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java
@@ -20,7 +20,7 @@ public Map getConfigOverrides() {
@Override
public Set> getEnabledAlternatives() {
- return Set.of(TestDatabaseConnectionValidator.class, TestKafkaConnectionValidator.class);
+ return Set.of(TestDatabaseConnectionValidator.class, TestKafkaConnectionValidator.class, TestAzureEventHubsConnectionValidator.class);
}
@Override
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java
new file mode 100644
index 00000000..e07ccf97
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Named;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.domain.views.Connection;
+
+@Named("EVENTHUBS")
+@ApplicationScoped
+@Alternative
+public class TestAzureEventHubsConnectionValidator implements ConnectionValidator {
+
+ @Override
+ public ConnectionValidationResult validate(Connection connectionConfig) {
+ return new ConnectionValidationResult(false);
+ }
+}
diff --git a/debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json b/debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json
new file mode 100644
index 00000000..6d26cfbf
--- /dev/null
+++ b/debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json
@@ -0,0 +1,24 @@
+{
+ "UserConfig": {
+ "NamespaceConfig": [
+ {
+ "Type": "EventHub",
+ "Name": "emulatorNs1",
+ "Entities": [
+ {
+ "Name": "eh1",
+ "PartitionCount": "2",
+ "ConsumerGroups": [
+ {
+ "Name": "cg1"
+ }
+ ]
+ }
+ ]
+ }
+ ],
+ "LoggingConfig": {
+ "Type": "File"
+ }
+ }
+}