Skip to content

Commit

Permalink
Mutate RSocketStrategies in RSocketMessageHandler
Browse files Browse the repository at this point in the history
Use rsocketStrategies field with mutate() to ensure consistency
with internal state.

Remove transparent initialization of decoders in MetadataExtractor
and expect them to be set to avoid unintended side effects.
  • Loading branch information
rstoyanchev committed Jul 26, 2019
1 parent fab0a5d commit 8574f97
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@
import org.springframework.context.EmbeddedValueResolverAware;
import org.springframework.core.KotlinDetector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -103,10 +99,6 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C

public MessageMappingMessageHandler() {
setHandlerPredicate(type -> AnnotatedElementUtils.hasAnnotation(type, Controller.class));
this.decoders.add(StringDecoder.allMimeTypes());
this.decoders.add(new ByteBufferDecoder());
this.decoders.add(new ByteArrayDecoder());
this.decoders.add(new DataBufferDecoder());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
/**
* Configure the decoders to use for de-serializing metadata entries.
* <p>By default this is not set.
* <p>When this extractor is passed into {@link RSocketStrategies.Builder} or
* {@link org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler
* RSocketMessageHandler}, the decoders may be left not set, and they will
* be initialized from the decoders already configured there.
*/
public void setDecoders(List<? extends Decoder<?>> decoders) {
this.decoders.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;

Expand Down Expand Up @@ -209,9 +208,9 @@ public RSocketStrategies build() {
return new DefaultRSocketStrategies(
this.encoders, this.decoders,
this.routeMatcher != null ? this.routeMatcher : initRouteMatcher(),
getOrInitMetadataExtractor(),
this.metadataExtractor != null ? this.metadataExtractor : initMetadataExtractor(),
this.bufferFactory != null ? this.bufferFactory : initBufferFactory(),
this.adapterRegistry != null ? this.adapterRegistry : initReactiveAdapterRegistry());
this.adapterRegistry != null ? this.adapterRegistry : ReactiveAdapterRegistry.getSharedInstance());
}

private RouteMatcher initRouteMatcher() {
Expand All @@ -220,31 +219,15 @@ private RouteMatcher initRouteMatcher() {
return new SimpleRouteMatcher(pathMatcher);
}

private MetadataExtractor getOrInitMetadataExtractor() {
if (this.metadataExtractor != null) {
if (this.metadataExtractor instanceof DefaultMetadataExtractor) {
DefaultMetadataExtractor extractor = (DefaultMetadataExtractor) this.metadataExtractor;
if (extractor.getDecoders().isEmpty()) {
extractor.setDecoders(this.decoders);
}
}
return this.metadataExtractor;
}
else {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.setDecoders(this.decoders);
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
return extractor;
}
private MetadataExtractor initMetadataExtractor() {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.setDecoders(this.decoders);
return extractor;
}

private DataBufferFactory initBufferFactory() {
return new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
}

private ReactiveAdapterRegistry initReactiveAdapterRegistry() {
return ReactiveAdapterRegistry.getSharedInstance();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ default Builder mutate() {

/**
* Create an {@code RSocketStrategies} instance with default settings.
* Equivalent to {@code RSocketStrategies.builder().build()}.
* Equivalent to {@code RSocketStrategies.builder().build()}. See individual
* builder methods for details on default settings.
*/
static RSocketStrategies create() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder().build();
Expand Down Expand Up @@ -190,9 +191,6 @@ interface Builder {
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* route from {@code "message/x.rsocket.routing.v0"} or
* {@code "text/plain"} metadata entries.
* <p>If the extractor is a {@code DefaultMetadataExtractor}, its
* {@code decoders} property will be set, if not already set, to the
* {@link #decoder(Decoder[]) decoders} configured here.
*/
Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
import org.springframework.beans.BeanUtils;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.lang.Nullable;
Expand All @@ -47,7 +43,6 @@
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.DefaultMetadataExtractor;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
Expand All @@ -74,50 +69,60 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {

private final List<Encoder<?>> encoders = new ArrayList<>();

@Nullable
private RSocketStrategies rsocketStrategies;

@Nullable
private MetadataExtractor metadataExtractor;

private RSocketStrategies strategies;

@Nullable
private MimeType defaultDataMimeType;

private MimeType defaultMetadataMimeType = MetadataExtractor.COMPOSITE_METADATA;


public RSocketMessageHandler() {
this.encoders.add(CharSequenceEncoder.allMimeTypes());
this.encoders.add(new ByteBufferEncoder());
this.encoders.add(new ByteArrayEncoder());
this.encoders.add(new DataBufferEncoder());
setRSocketStrategies(RSocketStrategies.create());
}


/**
* {@inheritDoc}
* <p>If {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is also set, this property is re-initialized with the decoders in it.
* Or vice versa, if {@link #setRSocketStrategies(RSocketStrategies)
* rsocketStrategies} is not set, it will be initialized from this and
* other properties.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the decoders in it, and
* vice versa, setting this property mutates the {@code RSocketStrategies}
* to change its decoders.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#decoder(Decoder[]) defaults} from
* {@code RSocketStrategies}.
*/
@Override
public void setDecoders(List<? extends Decoder<?>> decoders) {
super.setDecoders(decoders);
this.strategies = this.strategies.mutate()
.decoders(list -> {
list.clear();
list.addAll(decoders);
})
.build();
}

/**
* Configure the encoders to use for encoding handler method return values.
* <p>If {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is also set, this property is re-initialized with the encoders in it.
* Or vice versa, if {@link #setRSocketStrategies(RSocketStrategies)
* rsocketStrategies} is not set, it will be initialized from this and
* other properties.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the encoders in it, and
* vice versa, setting this property mutates the {@code RSocketStrategies}
* to change its encoders.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#encoder(Encoder[]) defaults} from
* {@code RSocketStrategies}.
*/
public void setEncoders(List<? extends Encoder<?>> encoders) {
this.encoders.clear();
this.encoders.addAll(encoders);
this.strategies = this.strategies.mutate()
.encoders(list -> {
list.clear();
list.addAll(encoders);
})
.build();
}

/**
Expand All @@ -128,59 +133,53 @@ public List<? extends Encoder<?>> getEncoders() {
}

/**
* Provide configuration in the form of {@link RSocketStrategies} instance
* which can also be re-used to initialize a client-side
* {@link RSocketRequester}.
* <p>When this is set, in turn it sets the following:
* <ul>
* <li>{@link #setDecoders(List)}
* <li>{@link #setEncoders(List)}
* <li>{@link #setRouteMatcher(RouteMatcher)}
* <li>{@link #setMetadataExtractor(MetadataExtractor)}
* <li>{@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)}
* </ul>
* <p>By default if this is not set, it is initialized from the above.
* {@inheritDoc}
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the RouteMatcher in it, and
* vice versa, setting this property mutates the {@code RSocketStrategies}
* to change its route matcher.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#routeMatcher(RouteMatcher) defaults}
* from {@code RSocketStrategies}.
*/
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
setDecoders(rsocketStrategies.decoders());
setEncoders(rsocketStrategies.encoders());
setRouteMatcher(rsocketStrategies.routeMatcher());
setMetadataExtractor(rsocketStrategies.metadataExtractor());
setReactiveAdapterRegistry(rsocketStrategies.reactiveAdapterRegistry());
@Override
public void setRouteMatcher(RouteMatcher routeMatcher) {
super.setRouteMatcher(routeMatcher);
this.strategies = this.strategies.mutate().routeMatcher(routeMatcher).build();
}

/**
* Return an {@link RSocketStrategies} instance initialized from the
* corresponding properties listed under {@link #setRSocketStrategies}.
* Configure the registry for adapting various reactive types.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the
* {@code ReactiveAdapterRegistry} in it, and vice versa, setting this
* property mutates the {@code RSocketStrategies} to change its adapter
* registry.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) defaults}
* from {@code RSocketStrategies}.
*/
public RSocketStrategies getRSocketStrategies() {
return this.rsocketStrategies != null ? this.rsocketStrategies : initRSocketStrategies();
}

private RSocketStrategies initRSocketStrategies() {
return RSocketStrategies.builder()
.decoders(List::clear)
.encoders(List::clear)
.decoders(decoders -> decoders.addAll(getDecoders()))
.encoders(encoders -> encoders.addAll(getEncoders()))
.routeMatcher(getRouteMatcher())
.metadataExtractor(getMetadataExtractor())
.reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build();
@Override
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry registry) {
super.setReactiveAdapterRegistry(registry);
this.strategies = this.strategies.mutate().reactiveAdapterStrategy(registry).build();
}

/**
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* route from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}.
* <p>If the extractor is a {@code DefaultMetadataExtractor}, its
* {@code decoders} property will be set, if not already set, to the
* {@link #setDecoders(List)} configured here.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the {@code MetadataExtractor}
* in it, and vice versa, setting this property mutates the
* {@code RSocketStrategies} to change its {@code MetadataExtractor}.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#metadataExtractor(MetadataExtractor)} defaults}
* from {@code RSocketStrategies}.
* @param extractor the extractor to use
*/
public void setMetadataExtractor(MetadataExtractor extractor) {
this.metadataExtractor = extractor;
this.strategies = this.strategies.mutate().metadataExtractor(this.metadataExtractor).build();
}

/**
Expand All @@ -192,6 +191,40 @@ public MetadataExtractor getMetadataExtractor() {
return this.metadataExtractor;
}

/**
* Configure this handler through an {@link RSocketStrategies} instance which
* can be re-used to initialize a client-side {@link RSocketRequester}.
* <p>When this property is set, in turn it sets the following:
* <ul>
* <li>{@link #setDecoders(List)}
* <li>{@link #setEncoders(List)}
* <li>{@link #setRouteMatcher(RouteMatcher)}
* <li>{@link #setMetadataExtractor(MetadataExtractor)}
* <li>{@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)}
* </ul>
* <p>By default this is set to {@link RSocketStrategies#create()} which in
* turn sets default settings for all related properties.
*/
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
this.strategies = rsocketStrategies;
updateStateFromRSocketStrategies();
}

private void updateStateFromRSocketStrategies() {
setDecoders(this.strategies.decoders());
setEncoders(this.strategies.encoders());
setRouteMatcher(this.strategies.routeMatcher());
setMetadataExtractor(this.strategies.metadataExtractor());
setReactiveAdapterRegistry(this.strategies.reactiveAdapterRegistry());
}

/**
* Return the {@link #setRSocketStrategies configured} {@code RSocketStrategies}.
*/
public RSocketStrategies getRSocketStrategies() {
return this.strategies;
}

/**
* Configure the default content type to use for data payloads if the
* {@code SETUP} frame did not specify one.
Expand Down Expand Up @@ -238,15 +271,6 @@ public void afterPropertiesSet() {
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());

super.afterPropertiesSet();

if (getMetadataExtractor() == null) {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.setDecoders(getDecoders());
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
setMetadataExtractor(extractor);
}

this.rsocketStrategies = initRSocketStrategies();
}

@Override
Expand Down Expand Up @@ -351,8 +375,7 @@ private MessagingRSocket createResponder(ConnectionSetupPayload setupPayload, RS
MimeType metaMimeType = StringUtils.hasText(s) ? MimeTypeUtils.parseMimeType(s) : this.defaultMetadataMimeType;
Assert.notNull(metaMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");

RSocketStrategies strategies = this.rsocketStrategies;
Assert.notNull(strategies, "No RSocketStrategies. Was afterPropertiesSet not called?");
RSocketStrategies strategies = getRSocketStrategies();
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metaMimeType, strategies);

Assert.state(this.metadataExtractor != null,
Expand Down
Loading

0 comments on commit 8574f97

Please sign in to comment.