From 982b789604653e4a6eadaf6844d691b67b848ce9 Mon Sep 17 00:00:00 2001 From: Su Ko Date: Wed, 24 Sep 2025 16:58:05 +0900 Subject: [PATCH 1/5] [GH-3208] Feat: Allow overriding SmartLifecycle phase in RedisMessageListenerContainer. Signed-off-by: Su Ko --- .../RedisMessageListenerContainer.java | 19 +++++++++++++++++++ ...edisMessageListenerContainerUnitTests.java | 11 +++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 3487a61f1d..e6d8dd8433 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -103,6 +103,7 @@ * @author Mark Paluch * @author John Blum * @author Seongjun Lee + * @author Su Ko * @see MessageListener * @see SubscriptionListener */ @@ -168,6 +169,8 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private @Nullable Subscriber subscriber; + private int phase = Integer.MAX_VALUE; + /** * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, * there will be no ErrorHandler so that error-level logging is the only result. @@ -618,6 +621,22 @@ public void removeMessageListener(MessageListener listener) { removeMessageListener(listener, Collections.emptySet()); } + @Override + public int getPhase() { + return this.phase; + } + + /** + * Specify the lifecycle phase for this container. + * Lower values start earlier and stop later. + * The default is {@code Integer.MAX_VALUE}. + * + * @see SmartLifecycle#getPhase() + */ + public void setPhase(int phase) { + this.phase = phase; + } + private void initMapping(Map> listeners) { // stop the listener if currently running diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java index ea7dfeb557..345681c82e 100644 --- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java @@ -239,4 +239,15 @@ void shouldRemoveAllListenersWhenListenerIsNull() { assertThatNoException().isThrownBy(() -> container.removeMessageListener(null, Collections.singletonList(topic))); } + + @Test // GH-3208 + void defaultPhaseShouldBeMaxValue() { + assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE); + } + + @Test // GH-3208 + void shouldApplyConfiguredPhase() { + container.setPhase(3208); + assertThat(container.getPhase()).isEqualTo(3208); + } } From 5ae169bbaec59c3b3750a64629b1e55695f8fee2 Mon Sep 17 00:00:00 2001 From: Su Ko Date: Tue, 30 Sep 2025 11:28:58 +0900 Subject: [PATCH 2/5] [GH-3208] Feat: Allow configuring 'autoStartup' in RedisMessageListenerContainer Signed-off-by: Su Ko --- .../listener/RedisMessageListenerContainer.java | 17 +++++++++++++++++ .../RedisMessageListenerContainerUnitTests.java | 12 ++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index e6d8dd8433..c9b6508558 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -170,6 +170,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private @Nullable Subscriber subscriber; private int phase = Integer.MAX_VALUE; + private boolean autoStartup = true; /** * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, @@ -637,6 +638,22 @@ public void setPhase(int phase) { this.phase = phase; } + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * Configure if this Lifecycle connection factory should get started automatically by the container at the time that + * the containing ApplicationContext gets refreshed. + * The default is {@code true}. + * + * @see SmartLifecycle#isAutoStartup() + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + private void initMapping(Map> listeners) { // stop the listener if currently running diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java index 345681c82e..6c2dbbbe74 100644 --- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java @@ -250,4 +250,16 @@ void shouldApplyConfiguredPhase() { container.setPhase(3208); assertThat(container.getPhase()).isEqualTo(3208); } + + @Test // GH-3208 + void defaultAutoStartupShouldBeMaxValue() { + assertThat(container.isAutoStartup()).isEqualTo(true); + } + + @Test // GH-3208 + void shouldApplyConfiguredAutoStartup() { + container.setAutoStartup(false); + assertThat(container.isAutoStartup()).isEqualTo(false); + } + } From 75666ddb1e8aef43089b178b3703a31f0e21a323 Mon Sep 17 00:00:00 2001 From: Su Ko Date: Wed, 1 Oct 2025 08:28:14 +0900 Subject: [PATCH 3/5] [GH-3208] Style : add @since 4.0 Signed-off-by: Su Ko --- .../data/redis/listener/RedisMessageListenerContainer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index c9b6508558..5773cf4389 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -633,6 +633,7 @@ public int getPhase() { * The default is {@code Integer.MAX_VALUE}. * * @see SmartLifecycle#getPhase() + * @since 4.0 */ public void setPhase(int phase) { this.phase = phase; @@ -649,6 +650,7 @@ public boolean isAutoStartup() { * The default is {@code true}. * * @see SmartLifecycle#isAutoStartup() + * @since 4.0 */ public void setAutoStartup(boolean autoStartup) { this.autoStartup = autoStartup; From f7382ff32385fa839a875644de84ae6ca4386e00 Mon Sep 17 00:00:00 2001 From: Su Ko Date: Wed, 1 Oct 2025 08:40:13 +0900 Subject: [PATCH 4/5] [GH-3208] Feat: Allow configuring 'autoStartup' and 'phase' in `DefaultStreamMessageListenerContainer` Signed-off-by: Su Ko --- ...DefaultStreamMessageListenerContainer.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index 12635762b9..9260645736 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -50,6 +50,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Su Ko * @since 2.2 */ class DefaultStreamMessageListenerContainer> implements StreamMessageListenerContainer { @@ -67,6 +68,9 @@ class DefaultStreamMessageListenerContainer> implement private boolean running = false; + private int phase = Integer.MAX_VALUE; + private boolean autoStartup = false; + /** * Create a new {@link DefaultStreamMessageListenerContainer}. * @@ -123,9 +127,21 @@ private RedisTemplate createRedisTemplate(RedisConnectionFactory connectio @Override public boolean isAutoStartup() { - return false; + return this.autoStartup; } + /** + * Configure if this Lifecycle connection factory should get started automatically by the container at the time that + * the containing ApplicationContext gets refreshed. + * The default is {@code false}. + * + * @see org.springframework.context.SmartLifecycle#isAutoStartup() + * @since 4.0 + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + @Override public void stop(Runnable callback) { @@ -177,9 +193,21 @@ public boolean isRunning() { @Override public int getPhase() { - return Integer.MAX_VALUE; + return this.phase; } + /** + * Specify the lifecycle phase for this container. + * Lower values start earlier and stop later. + * The default is {@code Integer.MAX_VALUE}. + * + * @see org.springframework.context.SmartLifecycle#getPhase() + * @since 4.0 + */ + public void setPhase(int phase) { + this.phase = phase; + } + @Override public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { From 5dde5b29d4601aae55a17c1f09ead05118c0279f Mon Sep 17 00:00:00 2001 From: Su Ko Date: Wed, 1 Oct 2025 09:40:48 +0900 Subject: [PATCH 5/5] [GH-3208] Feat: Allow configuring 'autoStartup' and 'phase' in `StreamMessageListenerContainerOptions` Signed-off-by: Su Ko --- ...DefaultStreamMessageListenerContainer.java | 8 +++ .../StreamMessageListenerContainer.java | 49 ++++++++++++++++++- ...edisMessageListenerContainerUnitTests.java | 2 +- ...sageListenerContainerIntegrationTests.java | 38 ++++++++++++++ 4 files changed, 94 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index 9260645736..9be3ae51f7 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -94,6 +94,14 @@ class DefaultStreamMessageListenerContainer> implement } else { this.streamOperations = this.template.opsForStream(); } + + if(containerOptions.isAutoStartup().isPresent()){ + this.autoStartup = containerOptions.isAutoStartup().get(); + } + + if(containerOptions.getPhase().isPresent()){ + this.phase = containerOptions.getPhase().getAsInt(); + } } private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainerOptions options) { diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index 009d6c3f93..f30e81d07a 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.Executor; import java.util.function.Predicate; @@ -107,6 +108,7 @@ * @author Christoph Strobl * @author Christian Rest * @author DongCheol Kim + * @author Su Ko * @param Stream key and Stream field type. * @param Stream value type. * @since 2.2 @@ -503,12 +505,14 @@ class StreamMessageListenerContainerOptions> { private final @Nullable HashMapper hashMapper; private final ErrorHandler errorHandler; private final Executor executor; + private final @Nullable Integer phase; + private final @Nullable Boolean autoStartup; @SuppressWarnings({ "unchecked", "rawtypes" }) private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable Integer batchSize, RedisSerializer keySerializer, RedisSerializer hashKeySerializer, RedisSerializer hashValueSerializer, @Nullable Class targetType, - @Nullable HashMapper hashMapper, ErrorHandler errorHandler, Executor executor) { + @Nullable HashMapper hashMapper, ErrorHandler errorHandler, Executor executor,@Nullable Integer phase, @Nullable Boolean autoStartup) { this.pollTimeout = pollTimeout; this.batchSize = batchSize; this.keySerializer = keySerializer; @@ -518,6 +522,8 @@ private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable In this.hashMapper = (HashMapper) hashMapper; this.errorHandler = errorHandler; this.executor = executor; + this.phase = phase; + this.autoStartup = autoStartup; } /** @@ -598,6 +604,21 @@ public Executor getExecutor() { return executor; } + /** + * @return the phase. + * @since 4.0 + */ + public OptionalInt getPhase() { + return phase != null ? OptionalInt.of(phase) : OptionalInt.empty(); + } + + /** + * @return the autoStartup. + * @since 4.0 + */ + public Optional isAutoStartup() { + return autoStartup != null ? Optional.of(autoStartup) : Optional.empty(); + } } /** @@ -618,6 +639,8 @@ class StreamMessageListenerContainerOptionsBuilder> { private @Nullable Class targetType; private ErrorHandler errorHandler = LoggingErrorHandler.INSTANCE; private Executor executor = new SimpleAsyncTaskExecutor(); + private @Nullable Integer phase; + private @Nullable Boolean autoStartup; @SuppressWarnings("NullAway") private StreamMessageListenerContainerOptionsBuilder() {} @@ -679,6 +702,28 @@ public StreamMessageListenerContainerOptionsBuilder errorHandler(ErrorHand return this; } + /** + * Configure a phase for the {@link SmartLifecycle} + * + * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}. + * @since 4.0 + */ + public StreamMessageListenerContainerOptionsBuilder phase(int phase) { + this.phase = phase; + return this; + } + + /** + * Configure a autoStartup for the {@link SmartLifecycle} + * + * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}. + * @since 4.0 + */ + public StreamMessageListenerContainerOptionsBuilder autoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + return this; + } + /** * Configure a key, hash key and hash value serializer. * @@ -796,7 +841,7 @@ public StreamMessageListenerContainerOptions build() { Assert.notNull(hashValueSerializer, "Hash Value Serializer must not be null"); return new StreamMessageListenerContainerOptions<>(pollTimeout, batchSize, keySerializer, hashKeySerializer, - hashValueSerializer, targetType, hashMapper, errorHandler, executor); + hashValueSerializer, targetType, hashMapper, errorHandler, executor,phase,autoStartup); } } diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java index 6c2dbbbe74..2e1c3056e9 100644 --- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java @@ -252,7 +252,7 @@ void shouldApplyConfiguredPhase() { } @Test // GH-3208 - void defaultAutoStartupShouldBeMaxValue() { + void defaultAutoStartupShouldBeTrue() { assertThat(container.isAutoStartup()).isEqualTo(true); } diff --git a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java index 26318b1448..302b0a1ca8 100644 --- a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java @@ -384,6 +384,44 @@ void containerRestartShouldRestartSubscription() throws InterruptedException { cancelAwait(subscription); } + @Test // GH-3208 + void defaultPhaseShouldBeMaxValue() { + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + + assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE); + } + + @Test // GH-3208 + void shouldApplyConfiguredPhase() { + StreamMessageListenerContainerOptions> options = StreamMessageListenerContainerOptions.builder() + .phase(3208) + .build(); + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, options); + + assertThat(container.getPhase()).isEqualTo(3208); + } + + @Test // GH-3208 + void defaultAutoStartupShouldBeFalse() { + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + + assertThat(container.isAutoStartup()).isEqualTo(false); + } + + @Test // GH-3208 + void shouldApplyConfiguredAutoStartup() { + StreamMessageListenerContainerOptions> options = StreamMessageListenerContainerOptions.builder() + .autoStartup(true) + .build(); + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, options); + + assertThat(container.isAutoStartup()).isEqualTo(true); + } + private static void cancelAwait(Subscription subscription) { subscription.cancel();