diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java index df90651c947a..e042e4881320 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -25,11 +25,14 @@ import org.springframework.boot.kafka.autoconfigure.KafkaProperties.Listener; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.MessageListenerContainer; @@ -85,6 +88,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private @Nullable KafkaListenerObservationConvention observationConvention; + private @Nullable KafkaAdmin kafkaAdmin; + + private @Nullable ContainerCustomizer> containerCustomizer; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -197,6 +204,15 @@ void setObservationConvention(@Nullable KafkaListenerObservationConvention obser this.observationConvention = observationConvention; } + void setKafkaAdmin(@Nullable KafkaAdmin kafkaAdmin) { + this.kafkaAdmin = kafkaAdmin; + } + + void setContainerCustomizer( + @Nullable ContainerCustomizer> containerCustomizer) { + this.containerCustomizer = containerCustomizer; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -230,6 +246,20 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory> getContainerCustomizer() { + KafkaAdmin kafkaAdmin = this.kafkaAdmin; + if (kafkaAdmin == null) { + return this.containerCustomizer; + } + return (container) -> { + container.setKafkaAdmin(kafkaAdmin); + if (this.containerCustomizer != null) { + this.containerCustomizer.configure(container); + } + }; } private void configureContainer(ContainerProperties container) { diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java index 19d52507e225..dcbb77cc81dc 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchInterceptor; @@ -89,11 +90,14 @@ class KafkaAnnotationDrivenConfiguration { private final @Nullable KafkaListenerObservationConvention observationConvention; + private final KafkaConnectionDetails connectionDetails; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider recordMessageConverter, ObjectProvider> recordFilterStrategy, ObjectProvider batchMessageConverter, ObjectProvider> kafkaTemplate, + ObjectProvider connectionDetails, ObjectProvider> kafkaTransactionManager, ObjectProvider rebalanceListener, ObjectProvider commonErrorHandler, @@ -108,6 +112,7 @@ class KafkaAnnotationDrivenConfiguration { this.batchMessageConverter = batchMessageConverter .getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter)); this.kafkaTemplate = kafkaTemplate.getIfUnique(); + this.connectionDetails = connectionDetails.getObject(); this.transactionManager = kafkaTransactionManager.getIfUnique(); this.rebalanceListener = rebalanceListener.getIfUnique(); this.commonErrorHandler = commonErrorHandler.getIfUnique(); @@ -151,6 +156,12 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() { configurer.setBatchInterceptor(this.batchInterceptor); configurer.setThreadNameSupplier(this.threadNameSupplier); configurer.setObservationConvention(this.observationConvention); + KafkaProperties.Admin admin = this.properties.getListener().getAdmin(); + if (admin != null) { + KafkaAdmin kafkaAdmin = KafkaAutoConfiguration.createKafkaAdmin(this.properties.buildAdminProperties(admin), + admin, this.connectionDetails); + configurer.setKafkaAdmin(kafkaAdmin); + } return configurer; } @@ -161,9 +172,9 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() { ObjectProvider> kafkaConsumerFactory, ObjectProvider>> kafkaContainerCustomizer) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + kafkaContainerCustomizer.ifAvailable(configurer::setContainerCustomizer); configurer.configure(factory, kafkaConsumerFactory .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()))); - kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer); return factory; } diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java index 130e975028c7..1ba32e7e49f8 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java @@ -106,13 +106,19 @@ PropertiesKafkaConnectionDetails kafkaConnectionDetails(ObjectProvider kafkaTemplate(ProducerFactory kafkaProducerFactory, ProducerListener kafkaProducerListener, ObjectProvider messageConverter, - ObjectProvider observationConvention) { + ObjectProvider observationConvention, + KafkaConnectionDetails connectionDetails) { PropertyMapper map = PropertyMapper.get(); KafkaTemplate kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); observationConvention.ifUnique(kafkaTemplate::setObservationConvention); map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); Template templateProperties = this.properties.getTemplate(); + KafkaProperties.Admin admin = templateProperties.getAdmin(); + if (admin != null) { + kafkaTemplate + .setKafkaAdmin(createKafkaAdmin(this.properties.buildAdminProperties(admin), admin, connectionDetails)); + } map.from(templateProperties.getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); map.from(templateProperties.getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); map.from(templateProperties.getCloseTimeout()).to(kafkaTemplate::setCloseTimeout); @@ -180,9 +186,14 @@ KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { @ConditionalOnMissingBean KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) { Map properties = this.properties.buildAdminProperties(); + KafkaProperties.Admin admin = this.properties.getAdmin(); + return createKafkaAdmin(properties, admin, connectionDetails); + } + + static KafkaAdmin createKafkaAdmin(Map properties, KafkaProperties.Admin admin, + KafkaConnectionDetails connectionDetails) { applyKafkaConnectionDetailsForAdmin(properties, connectionDetails); KafkaAdmin kafkaAdmin = new KafkaAdmin(properties); - KafkaProperties.Admin admin = this.properties.getAdmin(); if (admin.getCloseTimeout() != null) { kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds()); } @@ -225,7 +236,7 @@ private void applyKafkaConnectionDetailsForProducer(Map properti applySslBundle(properties, producer.getSslBundle()); } - private void applyKafkaConnectionDetailsForAdmin(Map properties, + static void applyKafkaConnectionDetailsForAdmin(Map properties, KafkaConnectionDetails connectionDetails) { Configuration admin = connectionDetails.getAdmin(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, admin.getBootstrapServers()); diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java index 172ffd707808..ff136a6e0d1c 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java @@ -214,8 +214,12 @@ public Map buildProducerProperties() { * instance */ public Map buildAdminProperties() { + return buildAdminProperties(this.admin); + } + + Map buildAdminProperties(Admin admin) { Map properties = buildCommonProperties(); - properties.putAll(this.admin.buildProperties()); + properties.putAll(admin.buildProperties()); return properties; } @@ -907,6 +911,11 @@ public Map buildProperties() { public static class Template { + /** + * Admin properties used to look up the Kafka cluster id for observation. + */ + private @Nullable Admin admin; + /** * Default topic to which messages are sent. */ @@ -933,6 +942,14 @@ public static class Template { */ private boolean observationEnabled; + public @Nullable Admin getAdmin() { + return this.admin; + } + + public void setAdmin(@Nullable Admin admin) { + this.admin = admin; + } + public @Nullable String getDefaultTopic() { return this.defaultTopic; } @@ -977,6 +994,11 @@ public void setObservationEnabled(boolean observationEnabled) { public static class Listener { + /** + * Admin properties used to look up the Kafka cluster id for observation. + */ + private @Nullable Admin admin; + public enum Type { /** @@ -1100,6 +1122,14 @@ public enum Type { */ private @Nullable Duration authExceptionRetryInterval; + public @Nullable Admin getAdmin() { + return this.admin; + } + + public void setAdmin(@Nullable Admin admin) { + this.admin = admin; + } + public Type getType() { return this.type; } diff --git a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java index 932ee15a73d5..77dbd8282ee8 100644 --- a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java +++ b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java @@ -17,6 +17,7 @@ package org.springframework.boot.kafka.autoconfigure; import java.time.Duration; +import java.util.Collections; import java.util.function.Function; import org.junit.jupiter.api.BeforeEach; @@ -24,7 +25,10 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; @@ -98,4 +102,27 @@ void shouldApplyObservationConvention() { assertThat(this.factory.getContainerProperties().getObservationConvention()).isSameAs(convention); } + @Test + void shouldApplyKafkaAdmin() { + KafkaAdmin kafkaAdmin = new KafkaAdmin(Collections.emptyMap()); + this.configurer.setKafkaAdmin(kafkaAdmin); + this.configurer.configure(this.factory, this.consumerFactory); + ConcurrentMessageListenerContainer container = this.factory.createContainer("test"); + assertThat(container.getKafkaAdmin()).isSameAs(kafkaAdmin); + } + + @Test + @SuppressWarnings("unchecked") + void shouldApplyKafkaAdminAndContainerCustomizer() { + KafkaAdmin kafkaAdmin = new KafkaAdmin(Collections.emptyMap()); + ContainerCustomizer> customizer = mock( + ContainerCustomizer.class); + this.configurer.setKafkaAdmin(kafkaAdmin); + this.configurer.setContainerCustomizer(customizer); + this.configurer.configure(this.factory, this.consumerFactory); + ConcurrentMessageListenerContainer container = this.factory.createContainer("test"); + assertThat(container.getKafkaAdmin()).isSameAs(kafkaAdmin); + then(customizer).should().configure(container); + } + } diff --git a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java index e65e217272d1..acc60d80d64d 100644 --- a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java +++ b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java @@ -773,6 +773,23 @@ void templateProperties() { }); } + @Test + void templateAdminProperties() { + this.contextRunner + .withPropertyValues("spring.kafka.template.admin.client-id=templateAdmin", + "spring.kafka.template.admin.operation-timeout=60s", + "spring.kafka.template.admin.properties.fiz.buz=fix.fox") + .run((context) -> { + KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class); + KafkaAdmin admin = kafkaTemplate.getKafkaAdmin(); + assertThat(admin).isNotNull(); + assertThat(admin.getConfigurationProperties()) + .containsEntry(AdminClientConfig.CLIENT_ID_CONFIG, "templateAdmin") + .containsEntry("fiz.buz", "fix.fox"); + assertThat(admin).hasFieldOrPropertyWithValue("operationTimeout", 60); + }); + } + @SuppressWarnings("unchecked") @Test void listenerProperties() { @@ -828,6 +845,25 @@ void listenerProperties() { }); } + @Test + void listenerAdminProperties() { + this.contextRunner + .withPropertyValues("spring.kafka.listener.admin.client-id=listenerAdmin", + "spring.kafka.listener.admin.operation-timeout=60s", + "spring.kafka.listener.admin.properties.fiz.buz=fix.fox") + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + ConcurrentMessageListenerContainer container = factory.createContainer("someTopic"); + KafkaAdmin admin = container.getKafkaAdmin(); + assertThat(admin).isNotNull(); + assertThat(admin.getConfigurationProperties()) + .containsEntry(AdminClientConfig.CLIENT_ID_CONFIG, "listenerAdmin") + .containsEntry("fiz.buz", "fix.fox"); + assertThat(admin).hasFieldOrPropertyWithValue("operationTimeout", 60); + }); + } + @Test void testKafkaTemplateRecordMessageConverters() { this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) @@ -1022,6 +1058,19 @@ void testConcurrentKafkaListenerContainerFactoryWithCustomContainerCustomizer() }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomContainerCustomizerAndListenerAdminProperties() { + this.contextRunner.withUserConfiguration(ObservationEnabledContainerCustomizerConfiguration.class) + .withPropertyValues("spring.kafka.listener.admin.client-id=listenerAdmin") + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + ConcurrentMessageListenerContainer container = factory.createContainer("someTopic"); + assertThat(container.getContainerProperties().isObservationEnabled()).isEqualTo(true); + assertThat(container.getKafkaAdmin()).isNotNull(); + }); + } + @Test void specificSecurityProtocolOverridesCommonSecurityProtocol() { this.contextRunner