Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions debezium-platform-conductor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<version.blaze-persistence>1.6.18</version.blaze-persistence>
<version.test-containers>2.0.3</version.test-containers>
<version.mockwebserver>5.3.2</version.mockwebserver>
<version.azure-messaging-eventhubs>5.21.3</version.azure-messaging-eventhubs>
<version.impsort>1.13.0</version.impsort>
<version.checkstyle.plugin>3.6.0</version.checkstyle.plugin>
<version.code.formatter>2.29.0</version.code.formatter>
Expand Down Expand Up @@ -203,6 +204,12 @@
<version>${version.mockwebserver}</version>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>${version.azure-messaging-eventhubs}</version>
</dependency>

</dependencies>
</dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -376,6 +383,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -436,6 +447,11 @@
<artifactId>testcontainers-mssqlserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-azure</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public ConnectionValidator getValidator(String connectionType) {
private String mapToValidatorName(String connectionType) {
return switch (connectionType) {
case "ORACLE", "MYSQL", "MARIADB", "SQLSERVER", "POSTGRESQL" -> "DATABASE";
case "AZURE_EVENTS_HUBS" -> "EVENTHUBS";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessary.

default -> connectionType;
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.connection.destination;

import java.time.Duration;
import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.exception.AzureException;
import com.azure.core.exception.ClientAuthenticationException;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;

import io.debezium.platform.data.dto.ConnectionValidationResult;
import io.debezium.platform.domain.views.Connection;
import io.debezium.platform.environment.connection.ConnectionValidator;
import io.debezium.util.Strings;

/**
* Implementation of {@link ConnectionValidator} for Azure Event Hubs connections.
* <p>
* This validator performs validation of Azure Event Hubs connection configurations
* by attempting to connect and retrieve Event Hub metadata.
* </p>
*
* <p>
* The validation process includes:
* <ul>
* <li>Connection string and hub name presence checks</li>
* <li>Network connectivity verification via metadata retrieval</li>
* <li>Authentication validation against the Event Hubs namespace</li>
* </ul>
* </p>
*/
@Named("EVENTHUBS")
@ApplicationScoped
public class AzureEventHubsConnectionValidator implements ConnectionValidator {

private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsConnectionValidator.class);

public static final String CONNECTION_STRING_KEY = "connectionstring";
public static final String HUB_NAME_KEY = "hubname";

private final int defaultConnectionTimeout;

public AzureEventHubsConnectionValidator(@ConfigProperty(name = "destinations.eventhubs.connection.timeout") int defaultConnectionTimeout) {
this.defaultConnectionTimeout = defaultConnectionTimeout;
}

@Override
public ConnectionValidationResult validate(Connection connectionConfig) {
if (connectionConfig == null) {
return ConnectionValidationResult.failed("Connection configuration cannot be null");
}

try {
LOGGER.debug("Starting Azure Event Hubs connection validation for connection: {}", connectionConfig.getName());

Map<String, Object> config = connectionConfig.getConfig();

if (Strings.isNullOrBlank(getConfigString(config, CONNECTION_STRING_KEY))) {
return ConnectionValidationResult.failed("Event Hubs connection string must be specified");
}

if (Strings.isNullOrBlank(getConfigString(config, HUB_NAME_KEY))) {
return ConnectionValidationResult.failed("Event Hub name must be specified");
}

String connectionString = getConfigString(config, CONNECTION_STRING_KEY);
String hubName = getConfigString(config, HUB_NAME_KEY);

return performConnectionValidation(connectionString, hubName);
}
catch (Exception e) {
LOGGER.error("Unexpected error during Azure Event Hubs connection validation", e);
return ConnectionValidationResult.failed("Validation failed due to unexpected error: " + e.getMessage());
}
}

private ConnectionValidationResult performConnectionValidation(String connectionString, String hubName) {
EventHubProducerClient producer = null;

try {
LOGGER.debug("Creating EventHubProducerClient for validation");
producer = createProducerClient(connectionString, hubName);

LOGGER.debug("Attempting to retrieve Event Hub properties");
var properties = producer.getEventHubProperties();

LOGGER.debug("Successfully connected to Event Hub '{}' with {} partition(s)",
properties.getName(), properties.getPartitionIds().stream().count());

return ConnectionValidationResult.successful();
}
catch (ClientAuthenticationException e) {
LOGGER.warn("Authentication failed for Azure Event Hubs", e);
return ConnectionValidationResult.failed(
"Authentication failed - please check connection string credentials");
}
catch (AmqpException e) {
LOGGER.warn("AMQP error during Azure Event Hubs connection validation", e);
if (e.isTransient()) {
return ConnectionValidationResult.failed(
"Transient connection error - please retry or check network connectivity");
}
return ConnectionValidationResult.failed(
"Failed to connect to Azure Event Hubs: " + e.getMessage());
}
catch (AzureException e) {
LOGGER.warn("Azure error during Event Hubs connection validation", e);
return ConnectionValidationResult.failed(
"Azure Event Hubs connection error: " + e.getMessage());
}
catch (IllegalArgumentException e) {
LOGGER.warn("Invalid connection string format", e);
return ConnectionValidationResult.failed(
"Invalid connection string format - please verify the connection string");
}
catch (Exception e) {
LOGGER.error("Unexpected error during Azure Event Hubs connection validation", e);
return ConnectionValidationResult.failed(
"Generic error while connecting to Azure Event Hubs");
}
finally {
if (producer != null) {
try {
LOGGER.debug("Closing EventHubProducerClient");
producer.close();
}
catch (Exception e) {
LOGGER.warn("Error closing EventHubProducerClient", e);
}
}
}
}

EventHubProducerClient createProducerClient(String connectionString, String hubName) {
AmqpRetryOptions retryOptions = new AmqpRetryOptions()
.setMaxRetries(0)
.setTryTimeout(Duration.ofSeconds(defaultConnectionTimeout));

return new EventHubClientBuilder()
.connectionString(connectionString, hubName)
.retryOptions(retryOptions)
.buildProducerClient();
}

private String getConfigString(Map<String, Object> config, String key) {
Object value = config.get(key);
return value == null ? null : value.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ destinations:
kafka:
connection:
timeout: 60
eventhubs:
connection:
timeout: 60

quarkus:
hibernate-orm:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,30 @@
}
}
}
},
{
"type": "EVENTHUBS",
"schema": {
"title": "Azure Event Hubs connection properties",
"description": "Azure Event Hubs connection properties",
"type": "object",
"required": [
"connectionstring",
"hubname"
],
"additionalProperties": {
"type": "string"
},
"properties": {
"connectionstring": {
"type": "string",
"title": "Connection string for the Azure Event Hubs namespace"
},
"hubname": {
"type": "string",
"title": "Name of the Event Hub"
}
}
}
}
]
Loading