diff --git a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc index bcff65e28e12..cc430356bc23 100644 --- a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc +++ b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc @@ -74,6 +74,7 @@ include-code::MyKafkaStreamsConfiguration[] By default, the streams managed by the javadoc:org.apache.kafka.streams.StreamsBuilder[] object are started automatically. You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property. +TIP: For advanced configuration, the arbitrary javadoc:org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer[] beans can be registered to configure the javadoc:org.springframework.kafka.config.StreamsBuilderFactoryBean[] bean before the javadoc:org.apache.kafka.streams.StreamsBuilder[] bean is initialized. [[messaging.kafka.additional-properties]] diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java index 6dfdb836685a..9012132edc7b 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java @@ -22,19 +22,18 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.CleanupConfig; /** @@ -76,11 +75,8 @@ KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment, } @Bean - KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( - @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean, - ObjectProvider customizers) { - customizers.orderedStream().forEach((customizer) -> customizer.customize(factoryBean)); - return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); + StreamsBuilderFactoryBeanConfigurer kafkaPropertiesStreamsBuilderFactoryBeanConfigurer() { + return new KafkaPropertiesStreamsBuilderFactoryBeanConfigurer(this.properties); } private void applyKafkaConnectionDetailsForStreams(Map properties, @@ -91,24 +87,26 @@ private void applyKafkaConnectionDetailsForStreams(Map propertie KafkaAutoConfiguration.applySslBundle(properties, streams.getSslBundle()); } - // Separate class required to avoid BeanCurrentlyInCreationException - static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { + private static final class KafkaPropertiesStreamsBuilderFactoryBeanConfigurer + implements StreamsBuilderFactoryBeanConfigurer { private final KafkaProperties properties; - private final StreamsBuilderFactoryBean factoryBean; - - KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties, StreamsBuilderFactoryBean factoryBean) { + private KafkaPropertiesStreamsBuilderFactoryBeanConfigurer(KafkaProperties properties) { this.properties = properties; - this.factoryBean = factoryBean; } @Override - public void afterPropertiesSet() { - this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); + public void configure(StreamsBuilderFactoryBean factoryBean) { + factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown()); - this.factoryBean.setCleanupConfig(cleanupConfig); + factoryBean.setCleanupConfig(cleanupConfig); + } + + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; } } diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/StreamsBuilderFactoryBeanCustomizer.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/StreamsBuilderFactoryBeanCustomizer.java deleted file mode 100644 index f65872934cc3..000000000000 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/StreamsBuilderFactoryBeanCustomizer.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.kafka.autoconfigure; - -import org.springframework.kafka.config.StreamsBuilderFactoryBean; - -/** - * Callback interface for customizing {@code StreamsBuilderFactoryBean} beans. - * - * @author EddĂș MelĂ©ndez - * @since 4.0.0 - */ -@FunctionalInterface -public interface StreamsBuilderFactoryBeanCustomizer { - - /** - * Customize the {@link StreamsBuilderFactoryBean}. - * @param factoryBean the factory bean to customize - */ - void customize(StreamsBuilderFactoryBean factoryBean); - -} diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java index 07d0658c8154..99145815b8c4 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java @@ -26,10 +26,10 @@ import org.springframework.boot.kafka.autoconfigure.DefaultKafkaConsumerFactoryCustomizer; import org.springframework.boot.kafka.autoconfigure.DefaultKafkaProducerFactoryCustomizer; import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration; -import org.springframework.boot.kafka.autoconfigure.StreamsBuilderFactoryBeanCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.MicrometerConsumerListener; @@ -75,7 +75,7 @@ private void addListener(DefaultKafkaProducerFactory factory, Meter static class KafkaStreamsMetricsConfiguration { @Bean - StreamsBuilderFactoryBeanCustomizer kafkaStreamsMetrics(MeterRegistry meterRegistry) { + StreamsBuilderFactoryBeanConfigurer kafkaStreamsMetrics(MeterRegistry meterRegistry) { return (factoryBean) -> factoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry)); } diff --git a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java index 0cd90450e12c..4545d39162a3 100644 --- a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java +++ b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java @@ -69,6 +69,8 @@ import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.config.StreamsBuilderFactoryBean.Listener; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -458,6 +460,29 @@ void streamsProperties() { }); } + @Test + void streamsBuilderFactoryBeanConfigurerIsApplied() { + Listener listener = mock(Listener.class); + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + // user's StreamsBuilderFactoryBeanConfigurer must be invoked after the + // default one + .withBean(StreamsBuilderFactoryBeanConfigurer.class, () -> (factoryBean) -> { + assertThat(factoryBean.isAutoStartup()).isFalse(); + assertThat(factoryBean).extracting("cleanupConfig.onStart").isEqualTo(true); + assertThat(factoryBean).extracting("cleanupConfig.onStop").isEqualTo(true); + factoryBean.addListener(listener); + }) + .withPropertyValues("spring.kafka.client-id=cid", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.application.name=appName", + "spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-shutdown=true", + "spring.kafka.streams.cleanup.on-startup=true") + .run((context) -> { + assertThat(context).hasSingleBean(StreamsBuilderFactoryBean.class); + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean(StreamsBuilderFactoryBean.class); + assertThat(streamsBuilderFactoryBean.getListeners()).hasSize(1); + }); + } + @Test void connectionDetailsAreAppliedToStreams() { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)