Skip to content

Commit

Permalink
Fixes ratpack#459
Browse files Browse the repository at this point in the history
  • Loading branch information
rhart committed Oct 3, 2014
1 parent d60f5e9 commit 3f654de
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public interface ResponseTransmitter {

void transmit(HttpResponseStatus responseStatus, BasicFileAttributes basicFileAttributes, Path file);

Subscriber<Object> transmitter(HttpResponseStatus status);
Subscriber<ByteBuf> transmitter(HttpResponseStatus status);

}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ public void transmit(final HttpResponseStatus responseStatus, final BasicFileAtt
}

@Override
public Subscriber<Object> transmitter(final HttpResponseStatus responseStatus) {
return new Subscriber<Object>() {
public Subscriber<ByteBuf> transmitter(final HttpResponseStatus responseStatus) {
return new Subscriber<ByteBuf>() {
private Subscription subscription;
private final AtomicBoolean done = new AtomicBoolean();

Expand Down Expand Up @@ -203,9 +203,9 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(Object o) {
public void onNext(ByteBuf o) {
if (channel.isOpen()) {
channel.writeAndFlush(o).addListener(cancelOnFailure);
channel.writeAndFlush(new DefaultHttpContent(o)).addListener(cancelOnFailure);
if (channel.isWritable()) {
subscription.request(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package ratpack.sse

import ratpack.http.client.RequestSpec
import ratpack.test.internal.RatpackGroovyDslSpec

import java.util.concurrent.CountDownLatch
import java.util.zip.GZIPInputStream

import static io.netty.handler.codec.http.HttpResponseStatus.OK
import static ratpack.sse.ServerSentEvent.serverSentEvent
Expand All @@ -27,6 +29,11 @@ import static ratpack.stream.Streams.*

class ServerSentEventsSpec extends RatpackGroovyDslSpec {

@Override
void configureRequest(RequestSpec requestSpecification) {
requestSpecification.headers.add("Accept-Encoding", "gzip")
}

def "can send server sent event"() {
given:
handlers {
Expand All @@ -43,7 +50,7 @@ class ServerSentEventsSpec extends RatpackGroovyDslSpec {
response.headers["Content-Type"] == "text/event-stream;charset=UTF-8"
response.headers["Cache-Control"] == "no-cache, no-store, max-age=0, must-revalidate"
response.headers["Pragma"] == "no-cache"
response.body.text == "event: add\ndata: Event 1\nid: 1\n\nevent: add\ndata: Event 2\nid: 2\n\nevent: add\ndata: Event 3\nid: 3\n\n"
response.headers["Content-Encoding"] == null
}

def "can cancel a stream when a client drops connection"() {
Expand Down Expand Up @@ -78,4 +85,29 @@ class ServerSentEventsSpec extends RatpackGroovyDslSpec {
// when the connection is closed cancel should be called
cancelLatch.await()
}

def "can send compressed server sent event"() {
given:
launchConfig {
compressResponses(true)
}
handlers {
handler {
render serverSentEvents(map(publish(1..3)) { i ->
serverSentEvent { it.id(i.toString()).event("add").data("Event $i".toString()) }
})
}
}

expect:
def response = get()
response.statusCode == OK.code()
response.headers["Content-Type"] == "text/event-stream;charset=UTF-8"
response.headers["Cache-Control"] == "no-cache, no-store, max-age=0, must-revalidate"
response.headers["Pragma"] == "no-cache"
response.headers["Content-Encoding"] == "gzip"

new GZIPInputStream(response.body.inputStream).bytes ==
"event: add\ndata: Event 1\nid: 1\n\nevent: add\ndata: Event 2\nid: 2\n\nevent: add\ndata: Event 3\nid: 3\n\n".bytes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ratpack.stream.tck

import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
import io.netty.handler.codec.http.FullHttpRequest
Expand All @@ -25,15 +26,16 @@ import org.reactivestreams.Subscriber
import org.reactivestreams.tck.SubscriberBlackboxVerification
import org.reactivestreams.tck.TestEnvironment
import ratpack.event.internal.DefaultEventController
import ratpack.func.Function
import ratpack.handling.RequestOutcome
import ratpack.server.internal.DefaultResponseTransmitter
import ratpack.util.internal.IoUtils

import java.util.concurrent.atomic.AtomicBoolean

import static org.mockito.Matchers.any
import static org.mockito.Mockito.*
import static ratpack.stream.Streams.constant
import static ratpack.stream.Streams.publish
import static ratpack.stream.Streams.*

class DefaultResponseTransmitterBlackboxVerification extends SubscriberBlackboxVerification<Integer> {

Expand Down Expand Up @@ -66,13 +68,21 @@ class DefaultResponseTransmitterBlackboxVerification extends SubscriberBlackboxV
}

@Override
Publisher<Integer> createHelperPublisher(long elements) {
Publisher<ByteBuf> createHelperPublisher(long elements) {
if (elements == Long.MAX_VALUE) {
constant(1)
map(constant(1), integerToByteBuf)
} else if (elements > 0) {
publish(0..<elements)
map(publish(0..<elements), integerToByteBuf)
} else {
publish([])
}
}

def integerToByteBuf = new Function<Integer, ByteBuf>() {
@Override
ByteBuf apply(Integer i) throws Exception {
IoUtils.byteBuf(i.byteValue())
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void transmit(HttpResponseStatus responseStatus, BasicFileAttributes basi
}

@Override
public Subscriber<Object> transmitter(HttpResponseStatus status) {
public Subscriber<ByteBuf> transmitter(HttpResponseStatus status) {
throw new UnsupportedOperationException("streaming not supported while unit testing");
}
};
Expand Down

0 comments on commit 3f654de

Please sign in to comment.