Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
* @author Mark Paluch
* @author John Blum
* @author Seongjun Lee
* @author Su Ko
* @see MessageListener
* @see SubscriptionListener
*/
Expand Down Expand Up @@ -168,6 +169,9 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab

private @Nullable Subscriber subscriber;

private int phase = Integer.MAX_VALUE;
private boolean autoStartup = true;

/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default,
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
Expand Down Expand Up @@ -618,6 +622,40 @@ public void removeMessageListener(MessageListener listener) {
removeMessageListener(listener, Collections.emptySet());
}

@Override
public int getPhase() {
return this.phase;
}

/**
* Specify the lifecycle phase for this container.
* Lower values start earlier and stop later.
* The default is {@code Integer.MAX_VALUE}.
*
* @see SmartLifecycle#getPhase()
* @since 4.0
*/
public void setPhase(int phase) {
this.phase = phase;
}

@Override
public boolean isAutoStartup() {
return this.autoStartup;
}

/**
* Configure if this Lifecycle connection factory should get started automatically by the container at the time that
* the containing ApplicationContext gets refreshed.
* The default is {@code true}.
*
* @see SmartLifecycle#isAutoStartup()
* @since 4.0
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {

// stop the listener if currently running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Su Ko
* @since 2.2
*/
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
Expand All @@ -67,6 +68,9 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement

private boolean running = false;

private int phase = Integer.MAX_VALUE;
private boolean autoStartup = false;

/**
* Create a new {@link DefaultStreamMessageListenerContainer}.
*
Expand All @@ -90,6 +94,14 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
} else {
this.streamOperations = this.template.opsForStream();
}

if(containerOptions.isAutoStartup().isPresent()){
this.autoStartup = containerOptions.isAutoStartup().get();
}

if(containerOptions.getPhase().isPresent()){
this.phase = containerOptions.getPhase().getAsInt();
}
}

private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainerOptions<?, ?> options) {
Expand Down Expand Up @@ -123,9 +135,21 @@ private RedisTemplate<K, V> createRedisTemplate(RedisConnectionFactory connectio

@Override
public boolean isAutoStartup() {
return false;
return this.autoStartup;
}

/**
* Configure if this Lifecycle connection factory should get started automatically by the container at the time that
* the containing ApplicationContext gets refreshed.
* The default is {@code false}.
*
* @see org.springframework.context.SmartLifecycle#isAutoStartup()
* @since 4.0
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

@Override
public void stop(Runnable callback) {

Expand Down Expand Up @@ -177,9 +201,21 @@ public boolean isRunning() {

@Override
public int getPhase() {
return Integer.MAX_VALUE;
return this.phase;
}

/**
* Specify the lifecycle phase for this container.
* Lower values start earlier and stop later.
* The default is {@code Integer.MAX_VALUE}.
*
* @see org.springframework.context.SmartLifecycle#getPhase()
* @since 4.0
*/
public void setPhase(int phase) {
this.phase = phase;
}

@Override
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.redis.stream;

import java.time.Duration;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
Expand Down Expand Up @@ -107,6 +108,7 @@
* @author Christoph Strobl
* @author Christian Rest
* @author DongCheol Kim
* @author Su Ko
* @param <K> Stream key and Stream field type.
* @param <V> Stream value type.
* @since 2.2
Expand Down Expand Up @@ -503,12 +505,14 @@ class StreamMessageListenerContainerOptions<K, V extends Record<K, ?>> {
private final @Nullable HashMapper<Object, Object, Object> hashMapper;
private final ErrorHandler errorHandler;
private final Executor executor;
private final @Nullable Integer phase;
private final @Nullable Boolean autoStartup;

@SuppressWarnings({ "unchecked", "rawtypes" })
private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable Integer batchSize,
RedisSerializer<K> keySerializer, RedisSerializer<Object> hashKeySerializer,
RedisSerializer<Object> hashValueSerializer, @Nullable Class<?> targetType,
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor) {
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor,@Nullable Integer phase, @Nullable Boolean autoStartup) {
this.pollTimeout = pollTimeout;
this.batchSize = batchSize;
this.keySerializer = keySerializer;
Expand All @@ -518,6 +522,8 @@ private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable In
this.hashMapper = (HashMapper) hashMapper;
this.errorHandler = errorHandler;
this.executor = executor;
this.phase = phase;
this.autoStartup = autoStartup;
}

/**
Expand Down Expand Up @@ -598,6 +604,21 @@ public Executor getExecutor() {
return executor;
}

/**
* @return the phase.
* @since 4.0
*/
public OptionalInt getPhase() {
return phase != null ? OptionalInt.of(phase) : OptionalInt.empty();
}

/**
* @return the autoStartup.
* @since 4.0
*/
public Optional<Boolean> isAutoStartup() {
return autoStartup != null ? Optional.of(autoStartup) : Optional.empty();
}
}

/**
Expand All @@ -618,6 +639,8 @@ class StreamMessageListenerContainerOptionsBuilder<K, V extends Record<K, ?>> {
private @Nullable Class<?> targetType;
private ErrorHandler errorHandler = LoggingErrorHandler.INSTANCE;
private Executor executor = new SimpleAsyncTaskExecutor();
private @Nullable Integer phase;
private @Nullable Boolean autoStartup;

@SuppressWarnings("NullAway")
private StreamMessageListenerContainerOptionsBuilder() {}
Expand Down Expand Up @@ -679,6 +702,28 @@ public StreamMessageListenerContainerOptionsBuilder<K, V> errorHandler(ErrorHand
return this;
}

/**
* Configure a phase for the {@link SmartLifecycle}
*
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
* @since 4.0
*/
public StreamMessageListenerContainerOptionsBuilder<K, V> phase(int phase) {
this.phase = phase;
return this;
}

/**
* Configure a autoStartup for the {@link SmartLifecycle}
*
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
* @since 4.0
*/
public StreamMessageListenerContainerOptionsBuilder<K, V> autoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
return this;
}

/**
* Configure a key, hash key and hash value serializer.
*
Expand Down Expand Up @@ -796,7 +841,7 @@ public StreamMessageListenerContainerOptions<K, V> build() {
Assert.notNull(hashValueSerializer, "Hash Value Serializer must not be null");

return new StreamMessageListenerContainerOptions<>(pollTimeout, batchSize, keySerializer, hashKeySerializer,
hashValueSerializer, targetType, hashMapper, errorHandler, executor);
hashValueSerializer, targetType, hashMapper, errorHandler, executor,phase,autoStartup);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,27 @@ void shouldRemoveAllListenersWhenListenerIsNull() {

assertThatNoException().isThrownBy(() -> container.removeMessageListener(null, Collections.singletonList(topic)));
}

@Test // GH-3208
void defaultPhaseShouldBeMaxValue() {
assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
}

@Test // GH-3208
void shouldApplyConfiguredPhase() {
container.setPhase(3208);
assertThat(container.getPhase()).isEqualTo(3208);
}

@Test // GH-3208
void defaultAutoStartupShouldBeTrue() {
assertThat(container.isAutoStartup()).isEqualTo(true);
}

@Test // GH-3208
void shouldApplyConfiguredAutoStartup() {
container.setAutoStartup(false);
assertThat(container.isAutoStartup()).isEqualTo(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,44 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
cancelAwait(subscription);
}

@Test // GH-3208
void defaultPhaseShouldBeMaxValue() {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, containerOptions);

assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
}

@Test // GH-3208
void shouldApplyConfiguredPhase() {
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
.phase(3208)
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, options);

assertThat(container.getPhase()).isEqualTo(3208);
}

@Test // GH-3208
void defaultAutoStartupShouldBeFalse() {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, containerOptions);

assertThat(container.isAutoStartup()).isEqualTo(false);
}

@Test // GH-3208
void shouldApplyConfiguredAutoStartup() {
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
.autoStartup(true)
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, options);

assertThat(container.isAutoStartup()).isEqualTo(true);
}

private static void cancelAwait(Subscription subscription) {

subscription.cancel();
Expand Down