diff --git a/java-http.iml b/java-http.iml
index 40677e9..c69001a 100644
--- a/java-http.iml
+++ b/java-http.iml
@@ -243,4 +243,5 @@
-
\ No newline at end of file
+
+
diff --git a/src/main/java/io/fusionauth/http/client/Configurable.java b/src/main/java/io/fusionauth/http/client/Configurable.java
new file mode 100644
index 0000000..c248ca9
--- /dev/null
+++ b/src/main/java/io/fusionauth/http/client/Configurable.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022, FusionAuth, All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package io.fusionauth.http.client;
+
+import io.fusionauth.http.log.LoggerFactory;
+
+public interface Configurable> {
+ T addHeader(String name, String value);
+
+ /**
+ * @return The configuration object.
+ */
+ HTTPClientConfiguration configuration();
+
+ default T withLoggerFactory(LoggerFactory loggerFactory) {
+ configuration().withLoggerFactory(loggerFactory);
+ return (T) this;
+ }
+}
diff --git a/src/main/java/io/fusionauth/http/client/HTTPData.java b/src/main/java/io/fusionauth/http/client/HTTP1Processor.java
similarity index 78%
rename from src/main/java/io/fusionauth/http/client/HTTPData.java
rename to src/main/java/io/fusionauth/http/client/HTTP1Processor.java
index ee52bd4..1d2ea6e 100644
--- a/src/main/java/io/fusionauth/http/client/HTTPData.java
+++ b/src/main/java/io/fusionauth/http/client/HTTP1Processor.java
@@ -15,6 +15,7 @@
*/
package io.fusionauth.http.client;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -22,7 +23,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-public class HTTPData {
+public class HTTP1Processor {
public static final int BufferSize = 1024;
public final List buffers = new ArrayList<>();
@@ -39,7 +40,7 @@ public class HTTPData {
public int code;
- public CompletableFuture future;
+ public CompletableFuture future;
public boolean hasBody;
@@ -51,13 +52,21 @@ public class HTTPData {
public String message;
+ public String method;
+
public int offset;
- public String protocl;
+ public int port;
+
+ public String protocol;
+
+ public long read;
public ByteBuffer request;
- public ResponseParserState state = ResponseParserState.ResponseProtocol;
+ public HTTPClientResponseParserState state = HTTPClientResponseParserState.ResponseProtocol;
+
+ public URI url;
public ByteBuffer currentBuffer() {
ByteBuffer last = buffers.isEmpty() ? null : buffers.get(buffers.size() - 1);
@@ -71,7 +80,13 @@ public ByteBuffer currentBuffer() {
public boolean isResponseComplete() {
int index = offset / BufferSize;
- ByteBuffer buffer = buffers.get(index);
+ ByteBuffer buffer;
+ try {
+ buffer = buffers.get(index);
+ } catch (IndexOutOfBoundsException e) {
+ throw e;
+ }
+
byte[] array = buffer.array();
for (int i = 0; i < buffer.position(); i++) {
if (hasBody) {
@@ -85,12 +100,12 @@ public boolean isResponseComplete() {
}
// If there is a state transition, store the value properly and reset the builder (if needed)
- ResponseParserState nextState = state.next(array[i], headers);
+ HTTPClientResponseParserState nextState = state.next(array[i], headers);
if (nextState != state) {
switch (state) {
case ResponseStatusCode -> code = Integer.parseInt(builder.toString());
case ResponseStatusMessage -> message = builder.toString();
- case ResponseProtocol -> protocl = builder.toString();
+ case ResponseProtocol -> protocol = builder.toString();
case HeaderName -> headerName = builder.toString();
case HeaderValue -> headers.computeIfAbsent(headerName.toLowerCase(), key -> new ArrayList<>()).add(builder.toString());
}
@@ -106,15 +121,16 @@ public boolean isResponseComplete() {
}
state = nextState;
- if (state == ResponseParserState.ResponseComplete) {
+ if (state == HTTPClientResponseParserState.ResponseComplete) {
hasBody = headers.containsKey("content-length");
bodyOffset = offset;
bodyLength = Integer.parseInt(headers.get("content-length").get(0));
// If there is a body, we continue in this loop, and we'll keep parsing
- if (!hasBody) {
- return true;
- }
+// if (!hasBody) {
+// return true;
+// }
+ return true;
}
// Increment the offset across all the buffers
@@ -128,7 +144,7 @@ public void markUsed() {
lastUsed = System.currentTimeMillis();
}
- public void reset() {
+ public void reset() {
bodyBytes = 0;
bodyLength = 0;
bodyOffset = 0;
@@ -141,7 +157,14 @@ public void reset() {
code = 0;
offset = 0;
message = null;
- protocl = null;
- state = ResponseParserState.ResponseStatusCode;
+ protocol = null;
+ state = HTTPClientResponseParserState.ResponseStatusCode;
+ }
+
+ /**
+ * @return The instant that this processor was last used.
+ */
+ long lastUsed() {
+ return lastUsed;
}
}
diff --git a/src/main/java/io/fusionauth/http/client/HTTPClient.java b/src/main/java/io/fusionauth/http/client/HTTPClient.java
new file mode 100644
index 0000000..d0cb066
--- /dev/null
+++ b/src/main/java/io/fusionauth/http/client/HTTPClient.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2022, FusionAuth, All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package io.fusionauth.http.client;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+
+import io.fusionauth.http.HTTPMethod;
+
+/**
+ * @author Brian Pontarelli
+ * @author Daniel DeGroff
+ */
+public class HTTPClient {
+ private static final HTTPClientThread thread;
+
+ private HTTPClientConfiguration configuration = new HTTPClientConfiguration();
+
+ private String method;
+
+ private URI url;
+
+ static {
+ try {
+ thread = new HTTPClientThread();
+ thread.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public HTTPClient configuration(HTTPClientConfiguration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public HTTPClient get() {
+ method = HTTPMethod.GET.name();
+ return this;
+ }
+
+ public HTTPClient header(String name, String value) {
+ this.configuration.addHeader(name, value);
+ return this;
+ }
+
+ public HTTPClient optionalHeader(boolean test, String name, String value) {
+ if (test) {
+ this.configuration.addHeader(name, value);
+ }
+
+ return this;
+ }
+
+ public HTTPClientResponse send() {
+ try {
+ return sendAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public CompletableFuture sendAsync() {
+ try {
+ return thread.add(url, method, configuration);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public HTTPClient url(URI url) {
+ this.url = url;
+ return this;
+ }
+}
diff --git a/src/main/java/io/fusionauth/http/client/ChannelPool.java b/src/main/java/io/fusionauth/http/client/HTTPClientChannelPool.java
similarity index 91%
rename from src/main/java/io/fusionauth/http/client/ChannelPool.java
rename to src/main/java/io/fusionauth/http/client/HTTPClientChannelPool.java
index fbe6bda..3445b7e 100644
--- a/src/main/java/io/fusionauth/http/client/ChannelPool.java
+++ b/src/main/java/io/fusionauth/http/client/HTTPClientChannelPool.java
@@ -22,12 +22,12 @@
import java.util.Queue;
/**
- * Models a pool of available Channels that are already connected to a remote server and are in a Keep-Alive state. At any point, a HTTP client
- * request might check out a Channel from the pool and return it when finished.
+ * Models a pool of available Channels that are already connected to a remote server and are in a Keep-Alive state. At any point, a HTTP
+ * client request might check out a Channel from the pool and return it when finished.
*
* @author Brian Pontarelli
*/
-public class ChannelPool {
+public class HTTPClientChannelPool {
private final Map> pool = new HashMap<>();
public synchronized void checkin(String host, SocketChannel channel) {
diff --git a/src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java b/src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java
new file mode 100644
index 0000000..0480c28
--- /dev/null
+++ b/src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2022, FusionAuth, All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package io.fusionauth.http.client;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import io.fusionauth.http.log.LoggerFactory;
+import io.fusionauth.http.log.SystemOutLoggerFactory;
+
+/**
+ * @author Daniel DeGroff
+ */
+public class HTTPClientConfiguration implements Configurable {
+ public final Map> headers = new HashMap<>();
+
+ private LoggerFactory loggerFactory = SystemOutLoggerFactory.FACTORY;
+
+ private Duration socketTimeoutDuration = Duration.ofSeconds(20);
+
+ @Override
+ public HTTPClientConfiguration addHeader(String name, String value) {
+ headers.computeIfAbsent(name.toLowerCase(), key -> new ArrayList<>()).add(value);
+ return this;
+ }
+
+ @Override
+ public HTTPClientConfiguration configuration() {
+ return this;
+ }
+
+ public LoggerFactory getLoggerFactory() {
+ return loggerFactory;
+ }
+
+ public Duration getSocketTimeoutDuration() {
+ return socketTimeoutDuration;
+ }
+
+ @Override
+ public HTTPClientConfiguration withLoggerFactory(LoggerFactory loggerFactory) {
+ Objects.requireNonNull(loggerFactory, "You cannot set LoggerFactory to null");
+ this.loggerFactory = loggerFactory;
+ return this;
+ }
+}
diff --git a/src/main/java/io/fusionauth/http/client/HTTPClientResponse.java b/src/main/java/io/fusionauth/http/client/HTTPClientResponse.java
new file mode 100644
index 0000000..7cf2337
--- /dev/null
+++ b/src/main/java/io/fusionauth/http/client/HTTPClientResponse.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright (c) 2022, FusionAuth, All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package io.fusionauth.http.client;
+
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import io.fusionauth.http.Cookie;
+import io.fusionauth.http.HTTPValues.Connections;
+import io.fusionauth.http.HTTPValues.ContentTypes;
+import io.fusionauth.http.HTTPValues.Headers;
+import io.fusionauth.http.util.HTTPTools;
+import io.fusionauth.http.util.HTTPTools.HeaderValue;
+
+/**
+ * An HTTP client response.
+ *
+ * @author Daniel DeGroff
+ */
+public class HTTPClientResponse {
+ private final Map> cookies = new HashMap<>(); // >
+
+ private final Map> headers = new HashMap<>();
+
+ private byte[] body;
+
+ private volatile boolean committed;
+
+ private Throwable exception;
+
+ private int status = 200;
+
+ private String statusMessage;
+
+ private Writer writer;
+
+ public void clearHeaders() {
+ headers.clear();
+ }
+
+ public boolean containsHeader(String name) {
+ String key = name.toLowerCase();
+ return headers.containsKey(key) && headers.get(key).size() > 0;
+ }
+
+ public boolean failure() {
+ return status < 200 || status > 299;
+ }
+
+ public byte[] getBody() {
+ return body;
+ }
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+ /**
+ * Determines the character set by parsing the {@code Content-Type} header (if it exists) to pull out the {@code charset} parameter.
+ *
+ * @return The Charset or UTF-8 if it wasn't specified in the {@code Content-Type} header.
+ */
+ public Charset getCharset() {
+ Charset charset = StandardCharsets.UTF_8;
+ String contentType = getContentType();
+ if (contentType != null) {
+ HeaderValue headerValue = HTTPTools.parseHeaderValue(contentType);
+ String charsetName = headerValue.parameters().get(ContentTypes.CharsetParameter);
+ if (charsetName != null) {
+ charset = Charset.forName(charsetName);
+ }
+ }
+
+ return charset;
+ }
+
+ public Long getContentLength() {
+ if (containsHeader(Headers.ContentLength)) {
+ return Long.parseLong(getHeader(Headers.ContentLength));
+ }
+
+ return null;
+ }
+
+ public String getContentType() {
+ return getHeader(Headers.ContentType);
+ }
+
+ public List getCookies() {
+ return cookies.values()
+ .stream()
+ .flatMap(map -> map.values().stream())
+ .collect(Collectors.toList());
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ public void setException(Throwable exception) {
+ this.exception = exception;
+ }
+
+ public String getHeader(String name) {
+ String key = name.toLowerCase();
+ return headers.containsKey(key) && headers.get(key).size() > 0 ? headers.get(key).get(0) : null;
+ }
+
+ public List getHeaders(String key) {
+ return headers.get(key.toLowerCase());
+ }
+
+ public Map> getHeadersMap() {
+ return headers;
+ }
+
+ public String getRedirect() {
+ return getHeader(Headers.Location);
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public String getStatusMessage() {
+ return statusMessage;
+ }
+
+ public void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ }
+
+ /**
+ * @return True if the response has been committed, meaning at least one byte was written back to the client. False otherwise.
+ */
+ public boolean isCommitted() {
+ return committed;
+ }
+
+ /**
+ * Sets the committed status of the response.
+ *
+ * @param committed The status.
+ */
+ public void setCommitted(boolean committed) {
+ this.committed = committed;
+ }
+
+ /**
+ * @return If the connection should be kept open (keep-alive) or not. The default is to return the Connection: keep-alive header, which
+ * this method does.
+ */
+ public boolean isKeepAlive() {
+ String value = getHeader(Headers.Connection);
+ return value == null || value.equalsIgnoreCase(Connections.KeepAlive);
+ }
+}
+
diff --git a/src/main/java/io/fusionauth/http/client/ResponseParserState.java b/src/main/java/io/fusionauth/http/client/HTTPClientResponseParserState.java
similarity index 75%
rename from src/main/java/io/fusionauth/http/client/ResponseParserState.java
rename to src/main/java/io/fusionauth/http/client/HTTPClientResponseParserState.java
index ff7f20c..94850e8 100644
--- a/src/main/java/io/fusionauth/http/client/ResponseParserState.java
+++ b/src/main/java/io/fusionauth/http/client/HTTPClientResponseParserState.java
@@ -21,10 +21,13 @@
import io.fusionauth.http.ParseException;
import io.fusionauth.http.util.HTTPTools;
-public enum ResponseParserState {
+/**
+ * @author Brian Pontarelli
+ */
+public enum HTTPClientResponseParserState {
ResponseProtocol {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == 'H' || ch == 'T' || ch == 'P' || ch == '/' || ch == '1' || ch == '.') {
return ResponseProtocol;
} else if (ch == ' ') {
@@ -42,7 +45,7 @@ public boolean store() {
ResponseProtocolSP {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == ' ') {
return ResponseProtocolSP;
} else if (HTTPTools.isDigitCharacter(ch)) {
@@ -59,7 +62,7 @@ public boolean store() {
},
ResponseStatusCode {
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == ' ') {
return ResponseStatusCodeSP;
} else if (HTTPTools.isDigitCharacter(ch)) {
@@ -76,11 +79,15 @@ public boolean store() {
ResponseStatusCodeSP {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == ' ') {
return ResponseStatusCodeSP;
} else if (HTTPTools.isValueCharacter(ch)) {
return ResponseStatusMessage;
+ } else if (ch == '\n') {
+ return ResponseStatusMessageLF;
+ } else if (ch == '\r') {
+ return ResponseStatusMessageCR;
} else {
throw new ParseException();
}
@@ -94,7 +101,7 @@ public boolean store() {
ResponseStatusMessage {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\r') {
return ResponseStatusMessageCR;
} else if (HTTPTools.isValueCharacter(ch)) {
@@ -112,7 +119,7 @@ public boolean store() {
ResponseStatusMessageCR {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\n') {
return ResponseStatusMessageLF;
} else {
@@ -128,7 +135,7 @@ public boolean store() {
ResponseStatusMessageLF {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\r') {
return ResponseMessageCR;
} else if (HTTPTools.isTokenCharacter(ch)) {
@@ -146,7 +153,7 @@ public boolean store() {
ResponseMessageCR {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\n') {
return ResponseComplete;
} else {
@@ -162,7 +169,7 @@ public boolean store() {
ResponseComplete {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
return ResponseComplete;
}
@@ -174,7 +181,7 @@ public boolean store() {
HeaderName {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (HTTPTools.isTokenCharacter(ch)) {
return HeaderName;
} else if (ch == ':') {
@@ -192,7 +199,7 @@ public boolean store() {
HeaderColon {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == ' ') {
return HeaderColon; // Re-using this state because HeaderSP would be the same
} else if (HTTPTools.isTokenCharacter(ch)) {
@@ -210,7 +217,7 @@ public boolean store() {
HeaderValue {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\r') {
return HeaderCR;
} else if (HTTPTools.isValueCharacter(ch)) {
@@ -228,7 +235,7 @@ public boolean store() {
HeaderCR {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\n') {
return HeaderLF;
} else {
@@ -244,7 +251,7 @@ public boolean store() {
HeaderLF {
@Override
- public ResponseParserState next(byte ch, Map> headers) {
+ public HTTPClientResponseParserState next(byte ch, Map> headers) {
if (ch == '\r') {
return ResponseMessageCR;
} else if (HTTPTools.isTokenCharacter(ch)) {
@@ -260,7 +267,7 @@ public boolean store() {
}
};
- public abstract ResponseParserState next(byte ch, Map> headers);
+ public abstract HTTPClientResponseParserState next(byte ch, Map> headers);
public abstract boolean store();
}
diff --git a/src/main/java/io/fusionauth/http/client/HTTPClientThread.java b/src/main/java/io/fusionauth/http/client/HTTPClientThread.java
new file mode 100644
index 0000000..21f1fbc
--- /dev/null
+++ b/src/main/java/io/fusionauth/http/client/HTTPClientThread.java
@@ -0,0 +1,371 @@
+/*
+ * Copyright (c) 2022, FusionAuth, All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package io.fusionauth.http.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.fusionauth.http.HTTPValues.ControlBytes;
+import io.fusionauth.http.HTTPValues.Protocols;
+import io.fusionauth.http.io.ByteBufferOutputStream;
+
+@SuppressWarnings("resource")
+public class HTTPClientThread extends Thread implements Closeable {
+// final Queue clientKeys = new ConcurrentLinkedQueue<>();
+
+ public static AtomicInteger counter = new AtomicInteger(0);
+ // TODO : clean up stale connections to servers
+// private final ClientReaperThread clientReaper;
+
+ private final HTTPClientChannelPool pool = new HTTPClientChannelPool();
+
+ private final Selector selector;
+
+ public HTTPClientThread() throws IOException {
+ selector = Selector.open();
+ System.out.println("Client started");
+ }
+ // private final List workerThreads = new ArrayList<>();
+
+ public CompletableFuture add(URI url, String method, HTTPClientConfiguration configuration) throws IOException {
+ // Just do enough to set up the data to attach and open the socket.
+ HTTP1Processor data = new HTTP1Processor();
+// data.logger = configuration.getLoggerFactory();
+ data.future = new CompletableFuture<>();
+ data.host = url.getHost();
+ data.method = method;
+ data.protocol = url.getScheme();
+ data.port = url.getPort() == -1
+ ? (data.protocol.equals("http") ? 80 : 443)
+ : url.getPort();
+ data.url = url;
+
+ for (String key : configuration.headers.keySet()) {
+ data.headers.put(key, new ArrayList<>(configuration.headers.get(key)));
+ }
+
+ // No channel, open a new socket w/out blocking, connect and then register for the OP_CONNECT state.
+ SocketChannel channel = pool.checkout(url.getHost());
+ if (channel == null) {
+ System.out.println("[" + counter.incrementAndGet() + "] open socket.");
+ channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(new InetSocketAddress(data.host, data.port));
+ channel.register(selector, SelectionKey.OP_CONNECT, data);
+ } else {
+ System.out.println("[" + counter.incrementAndGet() + "] re-use socket.");
+ // Use the existing socket, and attach our data and register for OP_WRITE
+// channel.register(selector, SelectionKey.OP_WRITE, data);
+// SelectionKey key =channel.register(selector, SelectionKey.OP_WRITE, data);
+ // TODO : Is this safe?
+ SelectionKey key = channel.keyFor(selector);
+ key.attach(data);
+ key.interestOps(SelectionKey.OP_WRITE);
+ }
+
+ // Time to get up!
+ selector.wakeup();
+ return data.future;
+ }
+
+ @Override
+ public void close() {
+// clientReaper.shutdown();
+
+ try {
+ selector.close();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ // If the selector has been closed, shut the thread down
+ if (!selector.isOpen()) {
+ return;
+ }
+
+ // TODO : Note This wil block, we should add a timeout for cleanup.
+ // TODO : Configuration
+ int numberOfKeys = selector.select(1_000L);
+ if (numberOfKeys <= 0) {
+ continue;
+ }
+
+ var keys = selector.selectedKeys();
+ Iterator iterator = keys.iterator();
+ while (iterator.hasNext()) {
+ var key = iterator.next();
+ if (key.isConnectable()) {
+// System.out.println("Connecting");
+ connect(key);
+ } else if (key.isReadable()) {
+ if (key.attachment() == null) {
+ System.out.println("Should we be here if we don't have an attachment? Seems like we should be in a write state?\n\n");
+ }
+ try {
+ read(key);
+ } catch (SocketException e) {
+ // Try to recover from a connection reset
+ if (e.getMessage().equals("Connection reset")) {
+ System.out.println("[" + counter + "] Handle connection reset.");
+ HTTP1Processor data = (HTTP1Processor) key.attachment();
+ data.reset();
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(new InetSocketAddress(data.host, data.port));
+ channel.register(selector, SelectionKey.OP_CONNECT, data);
+ } else {
+ throw e;
+ }
+
+ }
+
+ } else if (key.isWritable()) {
+ if (key.attachment() == null) {
+ System.out.println("Should we be here if we don't have an attachment? Seems like we should be in a different state?\n\n");
+ }
+ write(key);
+ }
+
+ iterator.remove();
+ }
+ } catch (IOException e) {
+ System.out.println("Whoa!!!\n\n");
+ e.printStackTrace();
+ } finally {
+// cleanup();
+ }
+ }
+ }
+
+ private void cancelAndCloseKey(SelectionKey key) {
+ if (key != null) {
+ HTTP1Processor data = (HTTP1Processor) key.attachment();
+
+ try (var client = key.channel()) {
+ key.cancel();
+ } catch (Throwable t) {
+ // TODO : Logger
+ }
+
+ // TODO : Tracer
+ }
+ }
+
+ private void cleanup() {
+ long now = System.currentTimeMillis();
+ // TODO : Logger
+ selector.keys()
+ .stream()
+ .filter(key -> key.attachment() != null)
+ // TODO : Configuration
+ .filter(key -> ((HTTP1Processor) key.attachment()).lastUsed() < now - 20_000)
+ // TODO : Logger
+ .forEach(this::cancelAndCloseKey);
+ }
+
+ private void connect(SelectionKey key) throws IOException {
+ SocketChannel channel = (SocketChannel) key.channel();
+ if (channel.finishConnect()) {
+ key.interestOps(SelectionKey.OP_WRITE);
+ }
+ }
+
+ private void parsePreamble(HTTP1Processor data) {
+ StringBuilder builder = new StringBuilder(512);
+
+ String url = "/";
+ if (data.url.getPath() != null) {
+ url = data.url.getPath();
+ }
+
+ if (data.url.getQuery() != null) {
+ url += data.url.getQuery();
+ }
+
+
+ if ("".equals(url)) {
+ url = "/";
+ }
+
+ ByteBufferOutputStream bbos = new ByteBufferOutputStream(1024, 8 * 1024);
+
+ // TODO : Note : Things like Host, etc we could just add headers and then use the common path of code.
+
+ // Method Path Protocol
+ bbos.write(data.method.getBytes(StandardCharsets.UTF_8));
+ bbos.write(" ".getBytes(StandardCharsets.UTF_8));
+ bbos.write(url.getBytes(StandardCharsets.UTF_8));
+ bbos.write(" ".getBytes(StandardCharsets.UTF_8));
+ bbos.write(Protocols.HTTTP1_1.getBytes(StandardCharsets.UTF_8));
+ bbos.write(ControlBytes.CRLF);
+
+ // Host
+ bbos.write("Host: ".getBytes(StandardCharsets.UTF_8));
+ bbos.write(data.url.getHost().getBytes(StandardCharsets.UTF_8));
+ bbos.write(ControlBytes.CRLF);
+
+ // User-Agent
+ bbos.write("User-Agent: java-http-client/1.0.0".getBytes(StandardCharsets.UTF_8));
+ bbos.write(ControlBytes.CRLF);
+
+// bbos.write("Accept: */*".getBytes(StandardCharsets.UTF_8));
+// bbos.write(ControlBytes.CRLF);
+
+// builder.append(data.method)
+// .append(" ")
+// .append(url)
+// .append(" ")
+// .append(Protocols.HTTTP1_1)
+// .append((byte[]) ControlBytes.CRLF)
+// .append("Host:")
+// .append(data.url.getHost())
+// .append((byte[]) ControlBytes.CRLF)
+// .append("User-Agent: java-http-client/1.0.0")
+// .append((byte[]) ControlBytes.CRLF)
+// .append("Accept: */*")
+// .append((byte[]) ControlBytes.CRLF);
+
+
+ // Headers
+ if (!data.headers.isEmpty()) {
+
+
+ for (String key : data.headers.keySet()) {
+ StringBuilder valueString = new StringBuilder();
+ bbos.write((key + ": ").getBytes(StandardCharsets.UTF_8));
+ for (String value : data.headers.get(key)) {
+ // TODO : Daniel : Need escaping?
+ valueString.append(value);
+ valueString.append("; ");
+ }
+
+ int index = valueString.indexOf("; ");
+ if (index == valueString.length() - 2) {
+ bbos.write(valueString.substring(0, valueString.length() - 2).getBytes(StandardCharsets.UTF_8));
+ } else {
+ bbos.write(valueString.toString().getBytes(StandardCharsets.UTF_8));
+ }
+
+ bbos.write(ControlBytes.CRLF);
+ }
+ }
+
+ // Close the preamble
+ bbos.write(ControlBytes.CRLF);
+
+// data.request = ByteBuffer.wrap(builder.toString().getBytes(StandardCharsets.UTF_8));
+ ByteBuffer buff = bbos.toByteBuffer();
+// System.out.println("\n---Preamble Start");
+// System.out.println(new String(buff.array(), 0, buff.limit(), StandardCharsets.UTF_8));
+// System.out.println("---Preamble End\n\n");
+ data.request = buff;
+ }
+
+ @SuppressWarnings("resource")
+ private void read(SelectionKey key) throws IOException {
+ SocketChannel client = (SocketChannel) key.channel();
+ HTTP1Processor data = (HTTP1Processor) key.attachment();
+ // TODO : Daniel : is this safe? Why do we have a read op if we are complete and we
+ // set the data attachment to null.
+ long read = data != null ? client.read(data.currentBuffer()) : 0;
+ if (read <= 0) {
+ return;
+ }
+
+ if (data.isResponseComplete()) {
+ HTTPClientResponse httpResponse = new HTTPClientResponse();
+ httpResponse.setStatus(data.code);
+ int expected = 0;
+ for (ByteBuffer buffer : data.buffers) {
+ expected += buffer.position();
+ }
+
+ ByteBuffer total = ByteBuffer.allocate(expected);
+ for (ByteBuffer buffer : data.buffers) {
+ buffer.flip();
+ byte[] actual = buffer.array();
+ byte[] used = Arrays.copyOfRange(actual, 0, buffer.limit());
+ total.put(used);
+ }
+ byte[] bytes = total.array();
+// System.out.println(bytes.length);
+// System.out.println(data.read);
+ httpResponse.setBody(bytes);
+
+
+ key.attach(null);
+
+ // TODO : Hacking - If using keep-alive, check back in... if not, don't
+ // Once we parse the response we should know if this is keep-alive from a boolean.
+ String stringBody = new String(bytes);
+ // TODO : is closing the channel enough, or should I cancel the keys as well?
+ if (stringBody.contains("connection: close")) {
+// System.out.println("close the connection");
+ key.channel().close();
+// key.interestOps(0);
+// key.cancel();
+ } else {
+// key.interestOps(0);
+// key.cancel();
+ key.interestOps(0);
+ pool.checkin(data.host, (SocketChannel) key.channel());
+
+// System.out.println(stringBody);
+ }
+
+
+ data.future.complete(httpResponse);
+
+// key.attach(null);
+
+
+ }
+
+ }
+
+ @SuppressWarnings("resource")
+ private void write(SelectionKey key) throws IOException {
+ // TODO : Daniel : We can write the preamble here
+ // The data object could manage the state. n
+ SocketChannel client = (SocketChannel) key.channel();
+ HTTP1Processor data = (HTTP1Processor) key.attachment();
+
+ // TODO : Daniel : we'll need to keep track of the state to know what we are writing.
+ parsePreamble(data);
+ client.write(data.request);
+
+ if (data.request.position() == data.request.limit()) {
+ key.interestOps(SelectionKey.OP_READ);
+ }
+ }
+}
diff --git a/src/main/java/io/fusionauth/http/client/NIOClientThread.java b/src/main/java/io/fusionauth/http/client/NIOClientThread.java
deleted file mode 100644
index 8fe86e9..0000000
--- a/src/main/java/io/fusionauth/http/client/NIOClientThread.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (c) 2022, FusionAuth, All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the License.
- */
-package io.fusionauth.http.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-
-@SuppressWarnings("resource")
-public class NIOClientThread extends Thread implements Closeable {
-// final Queue clientKeys = new ConcurrentLinkedQueue<>();
-
-// private final List workerThreads = new ArrayList<>();
-
- // TODO : clean up stale connections to servers
-// private final ClientReaperThread clientReaper;
-
- private final ChannelPool pool = new ChannelPool();
-
- private final Selector selector;
-
- public NIOClientThread() throws IOException {
- selector = Selector.open();
- System.out.println("Client started");
- }
-
- public Future add(URI uri, String method) throws IOException {
- HTTPData data = new HTTPData();
- data.request = ByteBuffer.wrap(
- """
- GET /api/system/version HTTP/1.1\r
- \r
- """.getBytes());
- data.future = new CompletableFuture<>();
- data.host = uri.getHost();
-
- SocketChannel channel = pool.checkout(uri.getHost());
- if (channel == null) {
- channel = SocketChannel.open();
- channel.configureBlocking(false);
- channel.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
-
- SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
- key.attach(data);
- } else {
- SelectionKey key = channel.keyFor(selector);
- key.attach(data);
- key.interestOps(SelectionKey.OP_WRITE);
- }
-
- // Wakeup! Time to put on a little makeup!
- selector.wakeup();
- return data.future;
- }
-
- @Override
- public void close() {
-// clientReaper.shutdown();
-
- try {
- selector.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
- public void run() {
- while (true) {
- try {
- // If the selector has been closed, shut the thread down
- if (!selector.isOpen()) {
- return;
- }
-
- int numberOfKeys = selector.select();
- if (numberOfKeys <= 0) {
- continue;
- }
-
- var keys = selector.selectedKeys();
- Iterator iterator = keys.iterator();
- while (iterator.hasNext()) {
- var key = iterator.next();
- if (key.isConnectable()) {
- System.out.println("Connecting");
- connect(key);
- } else if (key.isReadable()) {
- read(key);
- } else if (key.isWritable()) {
- write(key);
- }
-
- iterator.remove();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- private void connect(SelectionKey key) throws IOException {
- SocketChannel channel = (SocketChannel) key.channel();
- if (channel.finishConnect()) {
- key.interestOps(SelectionKey.OP_WRITE);
- }
- }
-
- @SuppressWarnings("resource")
- private void read(SelectionKey key) throws IOException {
- SocketChannel client = (SocketChannel) key.channel();
- HTTPData data = (HTTPData) key.attachment();
- long read = client.read(data.currentBuffer());
- if (read <= 0) {
- return;
- }
-
- if (data.isResponseComplete()) {
- data.future.complete(data.code);
- key.attach(null);
- pool.checkin(data.host, (SocketChannel) key.channel());
- }
- }
-
- @SuppressWarnings("resource")
- private void write(SelectionKey key) throws IOException {
- SocketChannel client = (SocketChannel) key.channel();
- HTTPData data = (HTTPData) key.attachment();
- client.write(data.request);
-
- if (data.request.position() == data.request.limit()) {
- key.interestOps(SelectionKey.OP_READ);
- }
- }
-}
diff --git a/src/main/java/io/fusionauth/http/client/SimpleNIOClient.java b/src/main/java/io/fusionauth/http/client/SimpleNIOClient.java
deleted file mode 100644
index fbce0dc..0000000
--- a/src/main/java/io/fusionauth/http/client/SimpleNIOClient.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2022, FusionAuth, All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the License.
- */
-package io.fusionauth.http.client;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Future;
-
-public class SimpleNIOClient {
- private static final NIOClientThread thread;
-
- public String method;
-
- public String url;
-
- public SimpleNIOClient get() {
- this.method = "GET";
- return this;
- }
-
- public int go() {
- try {
- Future future = thread.add(URI.create(url), method);
- return future.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public SimpleNIOClient url(String url) {
- this.url = url;
- return this;
- }
-
- static {
- try {
- thread = new NIOClientThread();
- thread.start();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java b/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java
index 64337e5..457550b 100644
--- a/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java
+++ b/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java
@@ -142,7 +142,6 @@ public HTTPServerConfiguration withClientTimeout(Duration duration) {
throw new IllegalArgumentException("You cannot set the client timeout less than 0");
}
-
this.clientTimeoutDuration = duration;
return this;
}
diff --git a/src/test/java/io/fusionauth/http/BaseTest.java b/src/test/java/io/fusionauth/http/BaseTest.java
index 08c7db0..70de1ff 100644
--- a/src/test/java/io/fusionauth/http/BaseTest.java
+++ b/src/test/java/io/fusionauth/http/BaseTest.java
@@ -39,7 +39,7 @@
*
* @author Brian Pontarelli
*/
-public class BaseTest {
+public abstract class BaseTest {
public static String certificate;
public static String privateKey;
@@ -51,12 +51,14 @@ public static void loadFiles() throws IOException {
privateKey = Files.readString(Paths.get(homeDir + "/dev/certificates/fusionauth.key"));
}
- public HTTPServer makeServer(String scheme, HTTPHandler handler) {
- return makeServer(scheme, handler, null);
- }
-
- public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter instrumenter) {
- return makeServer(scheme, handler, instrumenter, null);
+ @DataProvider
+ public Object[][] connection() {
+ // TODO: One at a time works, but doing all of them fails with a connection reset issue.
+ return new Object[][]{
+ {""},
+ {"close"},
+ {"keep-alive"}
+ };
}
@SuppressWarnings("resource")
@@ -77,6 +79,14 @@ public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter in
.withListener(listenerConfiguration);
}
+ public HTTPServer makeServer(String scheme, HTTPHandler handler) {
+ return makeServer(scheme, handler, null);
+ }
+
+ public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter instrumenter) {
+ return makeServer(scheme, handler, instrumenter, null);
+ }
+
public URI makeURI(String scheme, String params) {
if (scheme.equals("https")) {
return URI.create("https://local.fusionauth.io:4242/api/system/version" + params);
@@ -85,6 +95,17 @@ public URI makeURI(String scheme, String params) {
return URI.create("http://localhost:4242/api/system/version" + params);
}
+ /**
+ * @return The possible schemes - {@code http} and {@code https}.
+ */
+ @DataProvider
+ public Object[][] schemes() {
+ return new Object[][]{
+ {"http"},
+ {"https"}
+ };
+ }
+
public void sendBadRequest(String message) {
try (Socket socket = new Socket("127.0.0.1", 4242); OutputStream os = socket.getOutputStream(); InputStream is = socket.getInputStream()) {
os.write(message.getBytes());
@@ -98,15 +119,4 @@ public void sendBadRequest(String message) {
fail(e.getMessage());
}
}
-
- /**
- * @return The possible schemes - {@code http} and {@code https}.
- */
- @DataProvider
- public Object[][] schemes() {
- return new Object[][]{
- {"http"},
- {"https"}
- };
- }
}
diff --git a/src/test/java/io/fusionauth/http/client/CoreTest.java b/src/test/java/io/fusionauth/http/client/CoreTest.java
new file mode 100644
index 0000000..152bb98
--- /dev/null
+++ b/src/test/java/io/fusionauth/http/client/CoreTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2022, FusionAuth, All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package io.fusionauth.http.client;
+
+import java.net.URI;
+
+import io.fusionauth.http.BaseTest;
+import io.fusionauth.http.server.HTTPHandler;
+import io.fusionauth.http.server.HTTPServer;
+import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Daniel DeGroff
+ */
+public class CoreTest extends BaseTest {
+
+ // TODO : invocationCount = 1 works, but greater than 1 fails on the second iteration with a "Connection reset" SocketException
+ @Test(dataProvider = "connection", invocationCount = 1)
+ public void get_connection(String connection) {
+ HTTPClientThread.counter.set(0);
+ HTTPHandler handler = (req, res) -> res.setStatus(200);
+
+ try (HTTPServer ignore = makeServer("http", handler).start()) {
+ // Reset the connection pool since we know the server is going to tear down all sockets.
+ // TODO : Note, should we handle this by just rebuilding the connection? I would guess we have
+ // to tolerate the server terminating our connection.
+
+ // TODO : Note, if I bump this to 100_000, I'll eventually get
+ // socket errors, so I'm doing something wrong.
+ // Using roughly a ration of 1:10 so keep the timings roughly the same. You probably
+ // don't want to wait for 50,000 requests to complete w/out keep-alive.
+ long iterations = !connection.equals("close")
+// ? 50_000
+// : 5_000;
+ ? 500
+ : 50;
+ long start = System.currentTimeMillis();
+
+ HTTPClient client = new HTTPClient()
+ .url(URI.create("http://localhost:4242"))
+ .optionalHeader(!connection.equals(""), "Connection", connection)
+ .get();
+
+ for (int i = 0; i <= iterations; i++) {
+ // TODO : Note if I build the client outside of the loop,
+ // and only call send() in the loop the connection response
+ // header is not consistent. Seems like a bug.
+ HTTPClientResponse response;
+ try {
+ response = client
+ .send();
+ } catch (Exception e) {
+ System.out.println(i + 1);
+ throw e;
+ }
+
+ assertEquals(response.getStatus(), 200);
+ assertEquals(new String(response.getBody()), """
+ HTTP/1.1 200 \r
+ content-length: 0\r
+ connection: {connection}\r
+ \r
+ """
+ .replace("{connection}", connection.equals("") ? "keep-alive" : connection)
+ , "iteration [" + (i + 1) + "]");
+
+ if (i > 0 && i % 5_000 == 0) {
+ System.out.println(i);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ double average = (end - start) / (double) iterations;
+ System.out.println("\nAverage linear request time is [" + average + "]ms");
+ System.out.println("Duration: " + (end - start) + " ms\n");
+ }
+ }
+}