Skip to content

Commit

Permalink
Support for setupPayload in RSocketRequester
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Jul 29, 2019
1 parent 55946bf commit 2c878e9
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,29 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono;

import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeType;

/**
Expand All @@ -45,11 +53,26 @@
*/
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {

private static final Map<String, Object> HINTS = Collections.emptyMap();


@Nullable
private MimeType dataMimeType;

private MimeType metadataMimeType = MetadataExtractor.COMPOSITE_METADATA;

@Nullable
private Object setupData;

@Nullable
private String setupRoute;

@Nullable
private Object[] setupRouteVars;

@Nullable
private Map<Object, MimeType> setupMetadata;

@Nullable
private RSocketStrategies strategies;

Expand All @@ -71,6 +94,26 @@ public RSocketRequester.Builder metadataMimeType(MimeType mimeType) {
return this;
}

@Override
public RSocketRequester.Builder setupData(Object data) {
this.setupData = data;
return this;
}

@Override
public RSocketRequester.Builder setupRoute(String route, Object... routeVars) {
this.setupRoute = route;
this.setupRouteVars = routeVars;
return this;
}

@Override
public RSocketRequester.Builder setupMetadata(Object metadata, @Nullable MimeType mimeType) {
this.setupMetadata = (this.setupMetadata == null ? new LinkedHashMap<>(4) : this.setupMetadata);
this.setupMetadata.put(metadata, mimeType);
return this;
}

@Override
public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies) {
this.strategies = strategies;
Expand Down Expand Up @@ -120,12 +163,52 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
factory.dataMimeType(dataMimeType.toString());
factory.metadataMimeType(this.metadataMimeType.toString());

Payload setupPayload = getSetupPayload(dataMimeType, rsocketStrategies);
if (setupPayload != null) {
factory.setupPayload(setupPayload);
}

return factory.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies));
}

@Nullable
private Payload getSetupPayload(MimeType dataMimeType, RSocketStrategies strategies) {
DataBuffer metadata = null;
if (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata)) {
metadata = new MetadataEncoder(this.metadataMimeType, strategies)
.metadataAndOrRoute(this.setupMetadata, this.setupRoute, this.setupRouteVars)
.encode();
}
DataBuffer data = null;
if (this.setupData != null) {
try {
ResolvableType type = ResolvableType.forClass(this.setupData.getClass());
Encoder<Object> encoder = strategies.encoder(type, dataMimeType);
Assert.notNull(encoder, () -> "No encoder for " + dataMimeType + ", " + type);
data = encoder.encodeValue(this.setupData, strategies.dataBufferFactory(), type, dataMimeType, HINTS);
}
catch (Throwable ex) {
if (metadata != null) {
DataBufferUtils.release(metadata);
}
throw ex;
}
}
if (metadata == null && data == null) {
return null;
}
metadata = metadata != null ? metadata : emptyBuffer(strategies);
data = data != null ? data : emptyBuffer(strategies);
return PayloadUtils.createPayload(metadata, data);
}

private DataBuffer emptyBuffer(RSocketStrategies strategies) {
return strategies.dataBufferFactory().wrap(new byte[0]);
}

private RSocketStrategies getRSocketStrategies() {
if (!this.strategiesConfigurers.isEmpty()) {
RSocketStrategies.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Consumer;

import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
Expand Down Expand Up @@ -139,6 +140,28 @@ interface Builder {
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);

/**
* Set the data for the setup payload. The data will be encoded
* according to the configured {@link #dataMimeType(MimeType)}.
* <p>By default this is not set.
*/
RSocketRequester.Builder setupData(Object data);

/**
* Set the route for the setup payload. The rules for formatting and
* encoding the route are the same as those for a request route as
* described in {@link #route(String, Object...)}.
* <p>By default this is not set.
*/
RSocketRequester.Builder setupRoute(String route, Object... routeVars);

/**
* Add metadata entry to the setup payload. Composite metadata must be
* in use if this is called more than once or in addition to
* {@link #setupRoute(String, Object...)}.
*/
RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType);

/**
* Provide {@link RSocketStrategies} to use.
* <p>By default this is based on default settings of
Expand All @@ -157,12 +180,20 @@ interface Builder {

/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <p>Note that the data and metadata mime types cannot be set directly
* on the {@code ClientRSocketFactory}. Use shortcuts on this builder
* {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)}
* instead.
* <p>To configure client side responding, see
* <ul>
* <li>The data and metadata mime types cannot be set directly
* on the {@code ClientRSocketFactory} and will be overridden. Use the
* shortcuts {@link #dataMimeType(MimeType)} and
* {@link #metadataMimeType(MimeType)} on this builder instead.
* <li>The frame decoder also cannot be set directly and instead is set
* to match the configured {@code DataBufferFactory}.
* <li>For the
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload)
* setupPayload}, consider using methods on this builder to specify the
* route, other metadata, and data as Object values to be encoded.
* <li>To configure client side responding, see
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
* </ul>
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ interface Builder {
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata. This option is applicable to client or server
* responders.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* <p>By default this is {@link DefaultMetadataExtractor} created with
* the {@link #decoder(Decoder[]) configured} decoders and extracting a
* route from {@code "message/x.rsocket.routing.v0"} metadata.
*/
Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,24 @@ public void mimeTypesCannotBeChangedAtRSocketFactoryLevel() {
assertThat(requester.metadataMimeType()).isEqualTo(metaMimeType);
}

@Test
public void setupRoute() {
RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
.setupRoute("toA")
.setupData("My data")
.connect(this.transport)
.block();

ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.block();

assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA");
assertThat(setupPayload.getDataUtf8()).isEqualTo("My data");
}

@Test
public void frameDecoderMatchesDataBufferFactory() throws Exception {
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -37,12 +36,10 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;

/**
* Client-side handling of requests initiated from the server side.
Expand Down Expand Up @@ -112,10 +109,9 @@ private static void connectAndRunTest(String connectionRoute) {
RSocketRequester requester = null;
try {
requester = RSocketRequester.builder()
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
.setupRoute(connectionRoute)
.rsocketStrategies(strategies)
.rsocketFactory(clientResponderConfigurer)
.rsocketFactory(factory -> factory.setupPayload(ByteBufPayload.create("", connectionRoute)))
.connectTcp("localhost", server.address().getPort())
.block();

Expand Down Expand Up @@ -266,9 +262,7 @@ public RSocketMessageHandler serverMessageHandler() {

@Bean
public RSocketStrategies rsocketStrategies() {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes());
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
return RSocketStrategies.builder().metadataExtractor(extractor).build();
return RSocketStrategies.create();
}
}

Expand Down

0 comments on commit 2c878e9

Please sign in to comment.