Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.springframework.boot.kafka.autoconfigure.KafkaProperties.Listener;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
Expand Down Expand Up @@ -85,6 +88,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {

private @Nullable KafkaListenerObservationConvention observationConvention;

private @Nullable KafkaAdmin kafkaAdmin;

private @Nullable ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>> containerCustomizer;

/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
Expand Down Expand Up @@ -197,6 +204,15 @@ void setObservationConvention(@Nullable KafkaListenerObservationConvention obser
this.observationConvention = observationConvention;
}

void setKafkaAdmin(@Nullable KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}

void setContainerCustomizer(
@Nullable ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>> containerCustomizer) {
this.containerCustomizer = containerCustomizer;
}

/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
Expand Down Expand Up @@ -230,6 +246,20 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Ob
map.from(this.batchInterceptor).to(factory::setBatchInterceptor);
map.from(this.threadNameSupplier).to(factory::setThreadNameSupplier);
map.from(properties::getChangeConsumerThreadName).to(factory::setChangeConsumerThreadName);
map.from(getContainerCustomizer()).to(factory::setContainerCustomizer);
}

private @Nullable ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>> getContainerCustomizer() {
KafkaAdmin kafkaAdmin = this.kafkaAdmin;
if (kafkaAdmin == null) {
return this.containerCustomizer;
}
return (container) -> {
container.setKafkaAdmin(kafkaAdmin);
if (this.containerCustomizer != null) {
this.containerCustomizer.configure(container);
}
};
}

private void configureContainer(ContainerProperties container) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
Expand Down Expand Up @@ -89,11 +90,14 @@ class KafkaAnnotationDrivenConfiguration {

private final @Nullable KafkaListenerObservationConvention observationConvention;

private final KafkaConnectionDetails connectionDetails;

KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> recordMessageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaConnectionDetails> connectionDetails,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
ObjectProvider<CommonErrorHandler> commonErrorHandler,
Expand All @@ -108,6 +112,7 @@ class KafkaAnnotationDrivenConfiguration {
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.connectionDetails = connectionDetails.getObject();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
this.commonErrorHandler = commonErrorHandler.getIfUnique();
Expand Down Expand Up @@ -151,6 +156,12 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
configurer.setBatchInterceptor(this.batchInterceptor);
configurer.setThreadNameSupplier(this.threadNameSupplier);
configurer.setObservationConvention(this.observationConvention);
KafkaProperties.Admin admin = this.properties.getListener().getAdmin();
if (admin != null) {
KafkaAdmin kafkaAdmin = KafkaAutoConfiguration.createKafkaAdmin(this.properties.buildAdminProperties(admin),
admin, this.connectionDetails);
configurer.setKafkaAdmin(kafkaAdmin);
}
return configurer;
}

Expand All @@ -161,9 +172,9 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaContainerCustomizer.ifAvailable(configurer::setContainerCustomizer);
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,19 @@ PropertiesKafkaConnectionDetails kafkaConnectionDetails(ObjectProvider<SslBundle
KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplateObservationConvention> observationConvention) {
ObjectProvider<KafkaTemplateObservationConvention> observationConvention,
KafkaConnectionDetails connectionDetails) {
PropertyMapper map = PropertyMapper.get();
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
observationConvention.ifUnique(kafkaTemplate::setObservationConvention);
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
Template templateProperties = this.properties.getTemplate();
KafkaProperties.Admin admin = templateProperties.getAdmin();
if (admin != null) {
kafkaTemplate
.setKafkaAdmin(createKafkaAdmin(this.properties.buildAdminProperties(admin), admin, connectionDetails));
}
map.from(templateProperties.getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(templateProperties.getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
map.from(templateProperties.getCloseTimeout()).to(kafkaTemplate::setCloseTimeout);
Expand Down Expand Up @@ -180,9 +186,14 @@ KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
@ConditionalOnMissingBean
KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) {
Map<String, Object> properties = this.properties.buildAdminProperties();
KafkaProperties.Admin admin = this.properties.getAdmin();
return createKafkaAdmin(properties, admin, connectionDetails);
}

static KafkaAdmin createKafkaAdmin(Map<String, Object> properties, KafkaProperties.Admin admin,
KafkaConnectionDetails connectionDetails) {
applyKafkaConnectionDetailsForAdmin(properties, connectionDetails);
KafkaAdmin kafkaAdmin = new KafkaAdmin(properties);
KafkaProperties.Admin admin = this.properties.getAdmin();
if (admin.getCloseTimeout() != null) {
kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds());
}
Expand Down Expand Up @@ -225,7 +236,7 @@ private void applyKafkaConnectionDetailsForProducer(Map<String, Object> properti
applySslBundle(properties, producer.getSslBundle());
}

private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
static void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
KafkaConnectionDetails connectionDetails) {
Configuration admin = connectionDetails.getAdmin();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, admin.getBootstrapServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,12 @@ public Map<String, Object> buildProducerProperties() {
* instance
*/
public Map<String, Object> buildAdminProperties() {
return buildAdminProperties(this.admin);
}

Map<String, Object> buildAdminProperties(Admin admin) {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.admin.buildProperties());
properties.putAll(admin.buildProperties());
return properties;
}

Expand Down Expand Up @@ -907,6 +911,11 @@ public Map<String, Object> buildProperties() {

public static class Template {

/**
* Admin properties used to look up the Kafka cluster id for observation.
*/
private @Nullable Admin admin;

/**
* Default topic to which messages are sent.
*/
Expand All @@ -933,6 +942,14 @@ public static class Template {
*/
private boolean observationEnabled;

public @Nullable Admin getAdmin() {
return this.admin;
}

public void setAdmin(@Nullable Admin admin) {
this.admin = admin;
}

public @Nullable String getDefaultTopic() {
return this.defaultTopic;
}
Expand Down Expand Up @@ -977,6 +994,11 @@ public void setObservationEnabled(boolean observationEnabled) {

public static class Listener {

/**
* Admin properties used to look up the Kafka cluster id for observation.
*/
private @Nullable Admin admin;

public enum Type {

/**
Expand Down Expand Up @@ -1100,6 +1122,14 @@ public enum Type {
*/
private @Nullable Duration authExceptionRetryInterval;

public @Nullable Admin getAdmin() {
return this.admin;
}

public void setAdmin(@Nullable Admin admin) {
this.admin = admin;
}

public Type getType() {
return this.type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package org.springframework.boot.kafka.autoconfigure;

import java.time.Duration;
import java.util.Collections;
import java.util.function.Function;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;

Expand Down Expand Up @@ -98,4 +102,27 @@ void shouldApplyObservationConvention() {
assertThat(this.factory.getContainerProperties().getObservationConvention()).isSameAs(convention);
}

@Test
void shouldApplyKafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(Collections.emptyMap());
this.configurer.setKafkaAdmin(kafkaAdmin);
this.configurer.configure(this.factory, this.consumerFactory);
ConcurrentMessageListenerContainer<Object, Object> container = this.factory.createContainer("test");
assertThat(container.getKafkaAdmin()).isSameAs(kafkaAdmin);
}

@Test
@SuppressWarnings("unchecked")
void shouldApplyKafkaAdminAndContainerCustomizer() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(Collections.emptyMap());
ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>> customizer = mock(
ContainerCustomizer.class);
this.configurer.setKafkaAdmin(kafkaAdmin);
this.configurer.setContainerCustomizer(customizer);
this.configurer.configure(this.factory, this.consumerFactory);
ConcurrentMessageListenerContainer<Object, Object> container = this.factory.createContainer("test");
assertThat(container.getKafkaAdmin()).isSameAs(kafkaAdmin);
then(customizer).should().configure(container);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,23 @@ void templateProperties() {
});
}

@Test
void templateAdminProperties() {
this.contextRunner
.withPropertyValues("spring.kafka.template.admin.client-id=templateAdmin",
"spring.kafka.template.admin.operation-timeout=60s",
"spring.kafka.template.admin.properties.fiz.buz=fix.fox")
.run((context) -> {
KafkaTemplate<?, ?> kafkaTemplate = context.getBean(KafkaTemplate.class);
KafkaAdmin admin = kafkaTemplate.getKafkaAdmin();
assertThat(admin).isNotNull();
assertThat(admin.getConfigurationProperties())
.containsEntry(AdminClientConfig.CLIENT_ID_CONFIG, "templateAdmin")
.containsEntry("fiz.buz", "fix.fox");
assertThat(admin).hasFieldOrPropertyWithValue("operationTimeout", 60);
});
}

@SuppressWarnings("unchecked")
@Test
void listenerProperties() {
Expand Down Expand Up @@ -828,6 +845,25 @@ void listenerProperties() {
});
}

@Test
void listenerAdminProperties() {
this.contextRunner
.withPropertyValues("spring.kafka.listener.admin.client-id=listenerAdmin",
"spring.kafka.listener.admin.operation-timeout=60s",
"spring.kafka.listener.admin.properties.fiz.buz=fix.fox")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
ConcurrentMessageListenerContainer<?, ?> container = factory.createContainer("someTopic");
KafkaAdmin admin = container.getKafkaAdmin();
assertThat(admin).isNotNull();
assertThat(admin.getConfigurationProperties())
.containsEntry(AdminClientConfig.CLIENT_ID_CONFIG, "listenerAdmin")
.containsEntry("fiz.buz", "fix.fox");
assertThat(admin).hasFieldOrPropertyWithValue("operationTimeout", 60);
});
}

@Test
void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
Expand Down Expand Up @@ -1022,6 +1058,19 @@ void testConcurrentKafkaListenerContainerFactoryWithCustomContainerCustomizer()
});
}

@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomContainerCustomizerAndListenerAdminProperties() {
this.contextRunner.withUserConfiguration(ObservationEnabledContainerCustomizerConfiguration.class)
.withPropertyValues("spring.kafka.listener.admin.client-id=listenerAdmin")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
ConcurrentMessageListenerContainer<?, ?> container = factory.createContainer("someTopic");
assertThat(container.getContainerProperties().isObservationEnabled()).isEqualTo(true);
assertThat(container.getKafkaAdmin()).isNotNull();
});
}

@Test
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
this.contextRunner
Expand Down
Loading