diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml index aa9f9401..64c94a5f 100644 --- a/debezium-platform-conductor/pom.xml +++ b/debezium-platform-conductor/pom.xml @@ -44,6 +44,7 @@ 1.6.18 2.0.3 5.3.2 + 5.21.3 1.13.0 3.6.0 2.29.0 @@ -203,6 +204,12 @@ ${version.mockwebserver} + + com.azure + azure-messaging-eventhubs + ${version.azure-messaging-eventhubs} + + @@ -376,6 +383,10 @@ io.quarkus quarkus-kafka-client + + com.azure + azure-messaging-eventhubs + io.quarkus @@ -436,6 +447,11 @@ testcontainers-mssqlserver test + + org.testcontainers + testcontainers-azure + test + ch.qos.logback diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java index e736035f..1223ecd6 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/ConnectionValidatorFactory.java @@ -28,6 +28,7 @@ public ConnectionValidator getValidator(String connectionType) { private String mapToValidatorName(String connectionType) { return switch (connectionType) { case "ORACLE", "MYSQL", "MARIADB", "SQLSERVER", "POSTGRESQL" -> "DATABASE"; + case "AZURE_EVENTS_HUBS" -> "EVENTHUBS"; default -> connectionType; }; } diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java new file mode 100644 index 00000000..b66f76f6 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AzureEventHubsConnectionValidator.java @@ -0,0 +1,163 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection.destination; + +import java.time.Duration; +import java.util.Map; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Named; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.exception.AzureException; +import com.azure.core.exception.ClientAuthenticationException; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerClient; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.ConnectionValidator; +import io.debezium.util.Strings; + +/** + * Implementation of {@link ConnectionValidator} for Azure Event Hubs connections. + *

+ * This validator performs validation of Azure Event Hubs connection configurations + * by attempting to connect and retrieve Event Hub metadata. + *

+ * + *

+ * The validation process includes: + *

+ *

+ */ +@Named("EVENTHUBS") +@ApplicationScoped +public class AzureEventHubsConnectionValidator implements ConnectionValidator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsConnectionValidator.class); + + public static final String CONNECTION_STRING_KEY = "connectionstring"; + public static final String HUB_NAME_KEY = "hubname"; + + private final int defaultConnectionTimeout; + + public AzureEventHubsConnectionValidator(@ConfigProperty(name = "destinations.eventhubs.connection.timeout") int defaultConnectionTimeout) { + this.defaultConnectionTimeout = defaultConnectionTimeout; + } + + @Override + public ConnectionValidationResult validate(Connection connectionConfig) { + if (connectionConfig == null) { + return ConnectionValidationResult.failed("Connection configuration cannot be null"); + } + + try { + LOGGER.debug("Starting Azure Event Hubs connection validation for connection: {}", connectionConfig.getName()); + + Map config = connectionConfig.getConfig(); + + if (Strings.isNullOrBlank(getConfigString(config, CONNECTION_STRING_KEY))) { + return ConnectionValidationResult.failed("Event Hubs connection string must be specified"); + } + + if (Strings.isNullOrBlank(getConfigString(config, HUB_NAME_KEY))) { + return ConnectionValidationResult.failed("Event Hub name must be specified"); + } + + String connectionString = getConfigString(config, CONNECTION_STRING_KEY); + String hubName = getConfigString(config, HUB_NAME_KEY); + + return performConnectionValidation(connectionString, hubName); + } + catch (Exception e) { + LOGGER.error("Unexpected error during Azure Event Hubs connection validation", e); + return ConnectionValidationResult.failed("Validation failed due to unexpected error: " + e.getMessage()); + } + } + + private ConnectionValidationResult performConnectionValidation(String connectionString, String hubName) { + EventHubProducerClient producer = null; + + try { + LOGGER.debug("Creating EventHubProducerClient for validation"); + producer = createProducerClient(connectionString, hubName); + + LOGGER.debug("Attempting to retrieve Event Hub properties"); + var properties = producer.getEventHubProperties(); + + LOGGER.debug("Successfully connected to Event Hub '{}' with {} partition(s)", + properties.getName(), properties.getPartitionIds().stream().count()); + + return ConnectionValidationResult.successful(); + } + catch (ClientAuthenticationException e) { + LOGGER.warn("Authentication failed for Azure Event Hubs", e); + return ConnectionValidationResult.failed( + "Authentication failed - please check connection string credentials"); + } + catch (AmqpException e) { + LOGGER.warn("AMQP error during Azure Event Hubs connection validation", e); + if (e.isTransient()) { + return ConnectionValidationResult.failed( + "Transient connection error - please retry or check network connectivity"); + } + return ConnectionValidationResult.failed( + "Failed to connect to Azure Event Hubs: " + e.getMessage()); + } + catch (AzureException e) { + LOGGER.warn("Azure error during Event Hubs connection validation", e); + return ConnectionValidationResult.failed( + "Azure Event Hubs connection error: " + e.getMessage()); + } + catch (IllegalArgumentException e) { + LOGGER.warn("Invalid connection string format", e); + return ConnectionValidationResult.failed( + "Invalid connection string format - please verify the connection string"); + } + catch (Exception e) { + LOGGER.error("Unexpected error during Azure Event Hubs connection validation", e); + return ConnectionValidationResult.failed( + "Generic error while connecting to Azure Event Hubs"); + } + finally { + if (producer != null) { + try { + LOGGER.debug("Closing EventHubProducerClient"); + producer.close(); + } + catch (Exception e) { + LOGGER.warn("Error closing EventHubProducerClient", e); + } + } + } + } + + EventHubProducerClient createProducerClient(String connectionString, String hubName) { + AmqpRetryOptions retryOptions = new AmqpRetryOptions() + .setMaxRetries(0) + .setTryTimeout(Duration.ofSeconds(defaultConnectionTimeout)); + + return new EventHubClientBuilder() + .connectionString(connectionString, hubName) + .retryOptions(retryOptions) + .buildProducerClient(); + } + + private String getConfigString(Map config, String key) { + Object value = config.get(key); + return value == null ? null : value.toString(); + } +} diff --git a/debezium-platform-conductor/src/main/resources/application.yml b/debezium-platform-conductor/src/main/resources/application.yml index 33c2746f..51b22e13 100644 --- a/debezium-platform-conductor/src/main/resources/application.yml +++ b/debezium-platform-conductor/src/main/resources/application.yml @@ -44,6 +44,9 @@ destinations: kafka: connection: timeout: 60 + eventhubs: + connection: + timeout: 60 quarkus: hibernate-orm: diff --git a/debezium-platform-conductor/src/main/resources/connection-schemas.json b/debezium-platform-conductor/src/main/resources/connection-schemas.json index 936e1801..72a083a6 100644 --- a/debezium-platform-conductor/src/main/resources/connection-schemas.json +++ b/debezium-platform-conductor/src/main/resources/connection-schemas.json @@ -218,5 +218,30 @@ } } } + }, + { + "type": "EVENTHUBS", + "schema": { + "title": "Azure Event Hubs connection properties", + "description": "Azure Event Hubs connection properties", + "type": "object", + "required": [ + "connectionstring", + "hubname" + ], + "additionalProperties": { + "type": "string" + }, + "properties": { + "connectionstring": { + "type": "string", + "title": "Connection string for the Azure Event Hubs namespace" + }, + "hubname": { + "type": "string", + "title": "Name of the Event Hub" + } + } + } } ] \ No newline at end of file diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java new file mode 100644 index 00000000..6f78d5bb --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsConnectionValidatorIT.java @@ -0,0 +1,208 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.data.model.ConnectionEntity; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.destination.AzureEventHubsConnectionValidator; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(value = AzureEventHubsTestResource.class, restrictToAnnotatedClass = true) +class AzureEventHubsConnectionValidatorIT { + + private static final int DEFAULT_60_SECONDS_TIMEOUT = 60; + + /** + * The Event Hub name configured in {@code azure_eventhubs_emulator_config.json} and created + * by the emulator on startup. + */ + private static final String EMULATOR_HUB_NAME = "eh1"; + + private AzureEventHubsConnectionValidator validator; + + @BeforeEach + void setUp() { + validator = new AzureEventHubsConnectionValidator(DEFAULT_60_SECONDS_TIMEOUT); + } + + /** + * Returns the emulator's auto-generated connection string, which contains + * valid credentials ({@code SharedAccessKeyName} and {@code SharedAccessKey}) + * and the correct endpoint for the running emulator container. + */ + private String getValidConnectionString() { + return AzureEventHubsTestResource.getConnectionString(); + } + + /** + * Keeps the same endpoint and key name from a valid connection string, and + * mutates only the shared access key to force an authentication failure. + */ + private String withInvalidSharedAccessKey(String connectionString) { + return connectionString + .replaceFirst("SharedAccessKey=[^;]*", "SharedAccessKey=InvalidValue") + .replace("UseDevelopmentEmulator=true", "UseDevelopmentEmulator=false"); + } + + @Test + @DisplayName("Should successfully validate connection with valid Event Hubs configuration") + void shouldValidateSuccessfulConnection() { + + Map config = new HashMap<>(); + config.put("connectionstring", getValidConnectionString()); + config.put("hubname", EMULATOR_HUB_NAME); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertTrue(result.valid(), "Connection validation should succeed"); + } + + @Test + @DisplayName("Should fail validation when connection string is not provided") + void shouldFailValidationWithoutConnectionString() { + + Map config = new HashMap<>(); + config.put("hubname", EMULATOR_HUB_NAME); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Event Hubs connection string must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when connection string is empty") + void shouldFailValidationWithEmptyConnectionString() { + + Map config = new HashMap<>(); + config.put("connectionstring", ""); + config.put("hubname", EMULATOR_HUB_NAME); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Event Hubs connection string must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when connection string is null") + void shouldFailValidationWithNullConnectionString() { + + Map config = new HashMap<>(); + config.put("connectionstring", null); + config.put("hubname", EMULATOR_HUB_NAME); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Event Hubs connection string must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when hub name is not provided") + void shouldFailValidationWithoutHubName() { + + Map config = new HashMap<>(); + config.put("connectionstring", getValidConnectionString()); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Event Hub name must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when hub name is empty") + void shouldFailValidationWithEmptyHubName() { + + Map config = new HashMap<>(); + config.put("connectionstring", getValidConnectionString()); + config.put("hubname", ""); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Event Hub name must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when connection config is null") + void shouldFailValidationWithNullConnection() { + + ConnectionValidationResult result = validator.validate(null); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Connection configuration cannot be null", result.message()); + } + + @Test + @DisplayName("Should fail validation with invalid connection string format") + void shouldFailValidationWithInvalidConnectionString() { + + Map config = new HashMap<>(); + config.put("connectionstring", "not-a-valid-connection-string"); + config.put("hubname", EMULATOR_HUB_NAME); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertThat(result.message()).containsIgnoringCase("connection string"); + } + + @Test + @DisplayName("Should fail validation with wrong credentials in connection string") + void shouldFailValidationWithWrongCredentials() { + + String validConnectionString = getValidConnectionString(); + String invalidConnectionString = withInvalidSharedAccessKey(validConnectionString); + + Map config = new HashMap<>(); + config.put("connectionstring", invalidConnectionString); + config.put("hubname", EMULATOR_HUB_NAME); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail with wrong credentials"); + assertThat(result.message()).containsIgnoringCase("event hubs"); + } + + @Test + @DisplayName("Should fail validation with non-existent hub name") + void shouldFailValidationWithNonExistentHub() { + + Map config = new HashMap<>(); + config.put("connectionstring", getValidConnectionString()); + config.put("hubname", "nonexistent-hub"); + Connection connection = new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + } +} diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java new file mode 100644 index 00000000..17471c0d --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AzureEventHubsTestResource.java @@ -0,0 +1,61 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection; + +import java.util.Map; + +import org.testcontainers.azure.AzuriteContainer; +import org.testcontainers.azure.EventHubsEmulatorContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.MountableFile; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class AzureEventHubsTestResource implements QuarkusTestResourceLifecycleManager { + + private static final String AZURITE_IMAGE = "mcr.microsoft.com/azure-storage/azurite:3.33.0"; + private static final String EVENTHUBS_EMULATOR_IMAGE = "mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1"; + + private static Network network; + private static AzuriteContainer azurite; + private static EventHubsEmulatorContainer emulator; + + @Override + public Map start() { + network = Network.newNetwork(); + + azurite = new AzuriteContainer(AZURITE_IMAGE) + .withNetwork(network); + azurite.start(); + + emulator = new EventHubsEmulatorContainer(EVENTHUBS_EMULATOR_IMAGE) + .acceptLicense() + .withNetwork(network) + .withConfig(MountableFile.forClasspathResource("/azure_eventhubs_emulator_config.json")) + .withAzuriteContainer(azurite); + emulator.start(); + + return Map.of( + "destinations.eventhubs.connection.timeout", "60"); + } + + public static String getConnectionString() { + return emulator.getConnectionString(); + } + + @Override + public void stop() { + if (emulator != null) { + emulator.stop(); + } + if (azurite != null) { + azurite.stop(); + } + if (network != null) { + network.close(); + } + } +} diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java index 55630bad..a318f193 100644 --- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/ConnectionValidatorFactoryTest.java @@ -36,4 +36,10 @@ void shouldReturnKafkaValidator() { ConnectionValidator validator = factory.getValidator("KAFKA"); assertFalse(validator.validate(new TestConnectionView(ConnectionEntity.Type.KAFKA, Map.of())).valid()); } + + @Test + void shouldReturnEventHubsValidator() { + ConnectionValidator validator = factory.getValidator(ConnectionEntity.Type.AZURE_EVENTS_HUBS.name()); + assertFalse(validator.validate(new TestConnectionView(ConnectionEntity.Type.AZURE_EVENTS_HUBS, Map.of())).valid()); + } } diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java index d7b86752..b4d2708d 100644 --- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/CustomTestProfile.java @@ -20,7 +20,7 @@ public Map getConfigOverrides() { @Override public Set> getEnabledAlternatives() { - return Set.of(TestDatabaseConnectionValidator.class, TestKafkaConnectionValidator.class); + return Set.of(TestDatabaseConnectionValidator.class, TestKafkaConnectionValidator.class, TestAzureEventHubsConnectionValidator.class); } @Override diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java new file mode 100644 index 00000000..e07ccf97 --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/TestAzureEventHubsConnectionValidator.java @@ -0,0 +1,24 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Alternative; +import jakarta.inject.Named; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.domain.views.Connection; + +@Named("EVENTHUBS") +@ApplicationScoped +@Alternative +public class TestAzureEventHubsConnectionValidator implements ConnectionValidator { + + @Override + public ConnectionValidationResult validate(Connection connectionConfig) { + return new ConnectionValidationResult(false); + } +} diff --git a/debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json b/debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json new file mode 100644 index 00000000..6d26cfbf --- /dev/null +++ b/debezium-platform-conductor/src/test/resources/azure_eventhubs_emulator_config.json @@ -0,0 +1,24 @@ +{ + "UserConfig": { + "NamespaceConfig": [ + { + "Type": "EventHub", + "Name": "emulatorNs1", + "Entities": [ + { + "Name": "eh1", + "PartitionCount": "2", + "ConsumerGroups": [ + { + "Name": "cg1" + } + ] + } + ] + } + ], + "LoggingConfig": { + "Type": "File" + } + } +}