Skip to content

Commit

Permalink
Define vertx dependency as vertx.version
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Dec 14, 2014
1 parent 3913d5c commit 8f717c1
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 15 deletions.
19 changes: 10 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<name>Vert.x Groovy Language Support</name>

<properties>
<vertx.version>3.0.0-SNAPSHOT</vertx.version>
<asciidoc.dir>${project.basedir}/src/main/asciidoc</asciidoc.dir>
</properties>

Expand All @@ -25,7 +26,7 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
Expand All @@ -38,31 +39,31 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<scope>provided</scope>
<version>1.0-SNAPSHOT</version>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-docgen</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codetrans</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<version>1.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<scope>provided</scope>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<scope>provided</scope>
<classifier>sources</classifier>
</dependency>
Expand Down Expand Up @@ -90,7 +91,7 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<type>test-jar</type>
<version>3.0.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -151,7 +152,7 @@
<artifactItem>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<classifier>sources</classifier>
</artifactItem>
</artifactItems>
Expand All @@ -170,7 +171,7 @@
<artifactItem>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<version>1.0-SNAPSHOT</version>
<version>${vertx.version}</version>
<classifier>sources</classifier>
</artifactItem>
</artifactItems>
Expand Down
9 changes: 8 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
= Vert.x reference manual
:toc: right

= Vert.x API manual

== The Event Bus
Expand All @@ -6,4 +9,8 @@ include::io.vertx.core.eventbus.adoc[]

== Buffers

include::io.vertx.core.buffer.adoc[]
include::io.vertx.core.buffer.adoc[]

== Flow Control - Streams and Pumps

include::io.vertx.core.streams.adoc[]
178 changes: 178 additions & 0 deletions src/main/asciidoc/io.vertx.core.streams.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
There are several objects in Vert.x that allow items to be read from and written.

In previous versions the
io.vertx.core.streams.adoc package was manipulating link:groovydoc/io/vertx/groovy/core/buffer/Buffer.html[`Buffer`] objects exclusively.
From now, streams are not anymore coupled to buffers and work with any kind of objects.

In Vert.x, calls to write item return immediately and writes are internally queued.

It's not hard to see that if you write to an object faster than it can actually write the data to
its underlying resource then the write queue could grow without bound - eventually resulting in
exhausting available memory.

To solve this problem a simple flow control capability is provided by some objects in the Vert.x API.

Any flow control aware object that can be written-to implements link:groovydoc/io/vertx/groovy/core/streams/ReadStream.html[`ReadStream`],
and any flow control object that can be read-from is said to implement link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html[`WriteStream`].

Let's take an example where we want to read from a `ReadStream` and write the data to a `WriteStream`.

A very simple example would be reading from a link:groovydoc/io/vertx/groovy/core/net/NetSocket.html[`NetSocket`] on a server and writing back to the
same `NetSocket` - since `NetSocket` implements both `ReadStream` and `WriteStream`, but you can
do this between any `ReadStream` and any `WriteStream`, including HTTP requests and response,
async files, WebSockets, etc.

A naive way to do this would be to directly take the data that's been read and immediately write it
to the `NetSocket`, for example:

[source,java]
----
def server = vertx.createNetServer([port:1234, host:"localhost"]);
server.connectHandler({ sock ->
sock.handler({ buffer ->
sock.write(buffer);
});
}).listen();
----

There's a problem with the above example: If data is read from the socket faster than it can be
written back to the socket, it will build up in the write queue of the `NetSocket`, eventually
running out of RAM. This might happen, for example if the client at the other end of the socket
wasn't reading very fast, effectively putting back-pressure on the connection.

Since `NetSocket` implements `WriteStream`, we can check if the `WriteStream` is full before
writing to it:

[source,java]
----
def server = vertx.createNetServer([port:1234, host:"localhost"]);
server.connectHandler({ sock ->
sock.handler({ buffer ->
if (!sock.writeQueueFull()) {
sock.write(buffer);
};
});
}).listen();
----

This example won't run out of RAM but we'll end up losing data if the write queue gets full. What we
really want to do is pause the `NetSocket` when the write queue is full. Let's do that:

[source,java]
----
def server = vertx.createNetServer([port:1234, host:"localhost"]);
server.connectHandler({ sock ->
sock.handler({ buffer ->
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
};
});
}).listen();
----

We're almost there, but not quite. The `NetSocket` now gets paused when the file is full, but we also need to unpause
it when the write queue has processed its backlog:

[source,java]
----
def server = vertx.createNetServer([port:1234, host:"localhost"]);
server.connectHandler({ sock ->
sock.handler({ buffer ->
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
sock.drainHandler({ done ->
sock.resume();
});
};
});
}).listen();
----

And there we have it. The link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#drainHandler(io.vertx.core.Handler)[`drainHandler`] event handler will
get called when the write queue is ready to accept more data, this resumes the `NetSocket` which
allows it to read more data.

It's very common to want to do this when writing Vert.x applications, so we provide a helper class
called link:groovydoc/io/vertx/groovy/core/streams/Pump.html[`Pump`] which does all this hard work for you. You just feed it the `ReadStream` and
the `WriteStream` and it tell it to start:

[source,java]
----
import io.vertx.groovy.core.streams.Pump
def server = vertx.createNetServer([port:1234, host:"localhost"]);
server.connectHandler({ sock ->
Pump.pump(sock, sock).start();
}).listen();
----

Which does exactly the same thing as the more verbose example.

Let's look at the methods on `ReadStream` and `WriteStream` in more detail:

=== ReadStream

`ReadStream` is implemented by link:groovydoc/io/vertx/groovy/core/http/HttpClientResponse.html[`HttpClientResponse`], link:groovydoc/io/vertx/groovy/core/datagram/DatagramSocket.html[`DatagramSocket`],
link:groovydoc/io/vertx/groovy/core/http/HttpClientRequest.html[`HttpClientRequest`], link:groovydoc/io/vertx/groovy/core/http/HttpServerFileUpload.html[`HttpServerFileUpload`],
link:groovydoc/io/vertx/groovy/core/http/HttpServerRequest.html[`HttpServerRequest`], link:groovydoc/io/vertx/groovy/core/http/HttpServerRequestStream.html[`HttpServerRequestStream`],
link:groovydoc/io/vertx/groovy/core/eventbus/MessageConsumer.html[`MessageConsumer`], link:groovydoc/io/vertx/groovy/core/net/NetSocket.html[`NetSocket`], link:groovydoc/io/vertx/groovy/core/net/NetSocketStream.html[`NetSocketStream`],
link:groovydoc/io/vertx/groovy/core/http/WebSocket.html[`WebSocket`], link:groovydoc/io/vertx/groovy/core/http/WebSocketStream.html[`WebSocketStream`], link:groovydoc/io/vertx/groovy/core/TimeoutStream.html[`TimeoutStream`],
link:groovydoc/io/vertx/groovy/core/file/AsyncFile.html[`AsyncFile`].

Functions:

- link:groovydoc/io/vertx/groovy/core/streams/ReadStream.html#handler(io.vertx.core.Handler)[`handler`]:
set a handler which will receive items from the ReadStream.
- link:groovydoc/io/vertx/groovy/core/streams/ReadStream.html#pause()[`pause`]:
pause the handler. When paused no items will be received in the handler.
- link:groovydoc/io/vertx/groovy/core/streams/ReadStream.html#resume()[`resume`]:
resume the handler. The handler will be called if any item arrives.
- link:groovydoc/io/vertx/groovy/core/streams/ReadStream.html#exceptionHandler(io.vertx.core.Handler)[`exceptionHandler`]:
Will be called if an exception occurs on the ReadStream.
- link:groovydoc/io/vertx/groovy/core/streams/ReadStream.html#endHandler(io.vertx.core.Handler)[`endHandler`]:
Will be called when end of stream is reached. This might be when EOF is reached if the ReadStream represents a file,
or when end of request is reached if it's an HTTP request, or when the connection is closed if it's a TCP socket.

=== WriteStream

`WriteStream` is implemented by link:groovydoc/io/vertx/groovy/core/http/HttpClientRequest.html[`HttpClientRequest`], link:groovydoc/io/vertx/groovy/core/http/HttpServerResponse.html[`HttpServerResponse`]
link:groovydoc/io/vertx/groovy/core/http/WebSocket.html[`WebSocket`], link:groovydoc/io/vertx/groovy/core/net/NetSocket.html[`NetSocket`], link:groovydoc/io/vertx/groovy/core/file/AsyncFile.html[`AsyncFile`],
link:groovydoc/io/vertx/groovy/core/datagram/PacketWritestream.html[`PacketWritestream`] and link:groovydoc/io/vertx/groovy/core/eventbus/MessageProducer.html[`MessageProducer`]

Functions:

- link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#write(java.lang.Object)[`write`]:
write an object to the WriteStream. This method will never block. Writes are queued internally and asynchronously
written to the underlying resource.
- link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#setWriteQueueMaxSize(int)[`setWriteQueueMaxSize`]:
set the number of object at which the write queue is considered _full_, and the method link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#writeQueueFull()[`writeQueueFull`]
returns `true`. Note that, when the write queue is considered full, if write is called the data will still be accepted
and queued. The actual number depends on the stream implementation, for link:groovydoc/io/vertx/groovy/core/buffer/Buffer.html[`Buffer`] the size
represents the actual number of bytes written and not the number of buffers.
- link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#writeQueueFull()[`writeQueueFull`]:
returns `true` if the write queue is considered full.
- link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#exceptionHandler(io.vertx.core.Handler)[`exceptionHandler`]:
Will be called if an exception occurs on the `WriteStream`.
- link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#drainHandler(io.vertx.core.Handler)[`drainHandler`]:
The handler will be called if the `WriteStream` is considered no longer full.

=== Pump

Instances of Pump have the following methods:

- link:groovydoc/io/vertx/groovy/core/streams/Pump.html#start()[`start`]:
Start the pump.
- link:groovydoc/io/vertx/groovy/core/streams/Pump.html#stop()[`stop`]:
Stops the pump. When the pump starts it is in stopped mode.
- link:groovydoc/io/vertx/groovy/core/streams/Pump.html#setWriteQueueMaxSize(int)[`setWriteQueueMaxSize`]:
This has the same meaning as link:groovydoc/io/vertx/groovy/core/streams/WriteStream.html#setWriteQueueMaxSize(int)[`setWriteQueueMaxSize`] on the `WriteStream`.

A pump can be started and stopped multiple times.

When a pump is first created it is _not_ started. You need to call the `start()` method to start it.
38 changes: 34 additions & 4 deletions src/main/groovy/io/vertx/groovy/core/http/HttpClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,24 @@ public class HttpClient implements Measured {
def ret = ((io.vertx.core.metrics.Measured) this.delegate).metrics()?.collectEntries({k, v -> [k, v.getMap()]});
return ret;
}
public HttpClientRequest request(HttpMethod method, String absoluteURI) {
def ret= HttpClientRequest.FACTORY.apply(this.delegate.request(method, absoluteURI));
return ret;
}
/**
* Set an exception handler
* Create a new http client request.
*
* @return A reference to this, so multiple invocations can be chained together.
* The returned request does not have yet a response handler and one should be set before sending
* any data to the remote server.
*
* @param method the http method
* @param port the remote server port
* @param host the remote server host
* @param requestURI the request uri
* @return the http client request
*/
public HttpClient exceptionHandler(Handler<Throwable> handler) {
def ret= HttpClient.FACTORY.apply(this.delegate.exceptionHandler(handler));
public HttpClientRequest request(HttpMethod method, int port, String host, String requestURI) {
def ret= HttpClientRequest.FACTORY.apply(this.delegate.request(method, port, host, requestURI));
return ret;
}
public HttpClientRequest request(HttpMethod method, String absoluteURI, Handler<HttpClientResponse> responseHandler) {
Expand All @@ -90,6 +101,25 @@ public class HttpClient implements Measured {
}));
return ret;
}
public WebSocketStream websocket(int port, String host, String requestURI) {
def ret= WebSocketStream.FACTORY.apply(this.delegate.websocket(port, host, requestURI));
return ret;
}
public WebSocketStream websocket(int port, String host, String requestURI, MultiMap headers) {
def ret= WebSocketStream.FACTORY.apply(this.delegate.websocket(port, host, requestURI, (io.vertx.core.MultiMap)headers.getDelegate()));
return ret;
}
public WebSocketStream websocket(int port, String host, String requestURI, MultiMap headers, WebsocketVersion version) {
def ret= WebSocketStream.FACTORY.apply(this.delegate.websocket(port, host, requestURI, (io.vertx.core.MultiMap)headers.getDelegate(), version));
return ret;
}
/**
* @return a {@link io.vertx.core.http.WebSocketStream} configured with the specified arguments
*/
public WebSocketStream websocket(int port, String host, String requestURI, MultiMap headers, WebsocketVersion version, String subProtocols) {
def ret= WebSocketStream.FACTORY.apply(this.delegate.websocket(port, host, requestURI, (io.vertx.core.MultiMap)headers.getDelegate(), version, subProtocols));
return ret;
}
public HttpClient connectWebsocket(int port, String host, String requestURI, Handler<WebSocket> wsConnect) {
def ret= HttpClient.FACTORY.apply(this.delegate.connectWebsocket(port, host, requestURI, new Handler<io.vertx.core.http.WebSocket>() {
public void handle(io.vertx.core.http.WebSocket event) {
Expand Down
Loading

0 comments on commit 8f717c1

Please sign in to comment.