From 40f3a9f00819cd9dac2698b03be61f2f8412e211 Mon Sep 17 00:00:00 2001 From: Daniel DeGroff Date: Sun, 9 Oct 2022 19:59:47 -0600 Subject: [PATCH] Working --- java-http.iml | 3 +- .../fusionauth/http/client/Configurable.java | 32 ++ .../{HTTPData.java => HTTP1Processor.java} | 51 ++- .../io/fusionauth/http/client/HTTPClient.java | 89 +++++ ...elPool.java => HTTPClientChannelPool.java} | 6 +- .../http/client/HTTPClientConfiguration.java | 63 +++ .../http/client/HTTPClientResponse.java | 180 +++++++++ ...ava => HTTPClientResponseParserState.java} | 39 +- .../http/client/HTTPClientThread.java | 371 ++++++++++++++++++ .../http/client/NIOClientThread.java | 155 -------- .../http/client/SimpleNIOClient.java | 56 --- .../http/server/HTTPServerConfiguration.java | 1 - .../java/io/fusionauth/http/BaseTest.java | 46 ++- .../io/fusionauth/http/client/CoreTest.java | 92 +++++ 14 files changed, 920 insertions(+), 264 deletions(-) create mode 100644 src/main/java/io/fusionauth/http/client/Configurable.java rename src/main/java/io/fusionauth/http/client/{HTTPData.java => HTTP1Processor.java} (78%) create mode 100644 src/main/java/io/fusionauth/http/client/HTTPClient.java rename src/main/java/io/fusionauth/http/client/{ChannelPool.java => HTTPClientChannelPool.java} (91%) create mode 100644 src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java create mode 100644 src/main/java/io/fusionauth/http/client/HTTPClientResponse.java rename src/main/java/io/fusionauth/http/client/{ResponseParserState.java => HTTPClientResponseParserState.java} (75%) create mode 100644 src/main/java/io/fusionauth/http/client/HTTPClientThread.java delete mode 100644 src/main/java/io/fusionauth/http/client/NIOClientThread.java delete mode 100644 src/main/java/io/fusionauth/http/client/SimpleNIOClient.java create mode 100644 src/test/java/io/fusionauth/http/client/CoreTest.java diff --git a/java-http.iml b/java-http.iml index 40677e9..c69001a 100644 --- a/java-http.iml +++ b/java-http.iml @@ -243,4 +243,5 @@ - \ No newline at end of file + + diff --git a/src/main/java/io/fusionauth/http/client/Configurable.java b/src/main/java/io/fusionauth/http/client/Configurable.java new file mode 100644 index 0000000..c248ca9 --- /dev/null +++ b/src/main/java/io/fusionauth/http/client/Configurable.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022, FusionAuth, All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package io.fusionauth.http.client; + +import io.fusionauth.http.log.LoggerFactory; + +public interface Configurable> { + T addHeader(String name, String value); + + /** + * @return The configuration object. + */ + HTTPClientConfiguration configuration(); + + default T withLoggerFactory(LoggerFactory loggerFactory) { + configuration().withLoggerFactory(loggerFactory); + return (T) this; + } +} diff --git a/src/main/java/io/fusionauth/http/client/HTTPData.java b/src/main/java/io/fusionauth/http/client/HTTP1Processor.java similarity index 78% rename from src/main/java/io/fusionauth/http/client/HTTPData.java rename to src/main/java/io/fusionauth/http/client/HTTP1Processor.java index ee52bd4..1d2ea6e 100644 --- a/src/main/java/io/fusionauth/http/client/HTTPData.java +++ b/src/main/java/io/fusionauth/http/client/HTTP1Processor.java @@ -15,6 +15,7 @@ */ package io.fusionauth.http.client; +import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -22,7 +23,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -public class HTTPData { +public class HTTP1Processor { public static final int BufferSize = 1024; public final List buffers = new ArrayList<>(); @@ -39,7 +40,7 @@ public class HTTPData { public int code; - public CompletableFuture future; + public CompletableFuture future; public boolean hasBody; @@ -51,13 +52,21 @@ public class HTTPData { public String message; + public String method; + public int offset; - public String protocl; + public int port; + + public String protocol; + + public long read; public ByteBuffer request; - public ResponseParserState state = ResponseParserState.ResponseProtocol; + public HTTPClientResponseParserState state = HTTPClientResponseParserState.ResponseProtocol; + + public URI url; public ByteBuffer currentBuffer() { ByteBuffer last = buffers.isEmpty() ? null : buffers.get(buffers.size() - 1); @@ -71,7 +80,13 @@ public ByteBuffer currentBuffer() { public boolean isResponseComplete() { int index = offset / BufferSize; - ByteBuffer buffer = buffers.get(index); + ByteBuffer buffer; + try { + buffer = buffers.get(index); + } catch (IndexOutOfBoundsException e) { + throw e; + } + byte[] array = buffer.array(); for (int i = 0; i < buffer.position(); i++) { if (hasBody) { @@ -85,12 +100,12 @@ public boolean isResponseComplete() { } // If there is a state transition, store the value properly and reset the builder (if needed) - ResponseParserState nextState = state.next(array[i], headers); + HTTPClientResponseParserState nextState = state.next(array[i], headers); if (nextState != state) { switch (state) { case ResponseStatusCode -> code = Integer.parseInt(builder.toString()); case ResponseStatusMessage -> message = builder.toString(); - case ResponseProtocol -> protocl = builder.toString(); + case ResponseProtocol -> protocol = builder.toString(); case HeaderName -> headerName = builder.toString(); case HeaderValue -> headers.computeIfAbsent(headerName.toLowerCase(), key -> new ArrayList<>()).add(builder.toString()); } @@ -106,15 +121,16 @@ public boolean isResponseComplete() { } state = nextState; - if (state == ResponseParserState.ResponseComplete) { + if (state == HTTPClientResponseParserState.ResponseComplete) { hasBody = headers.containsKey("content-length"); bodyOffset = offset; bodyLength = Integer.parseInt(headers.get("content-length").get(0)); // If there is a body, we continue in this loop, and we'll keep parsing - if (!hasBody) { - return true; - } +// if (!hasBody) { +// return true; +// } + return true; } // Increment the offset across all the buffers @@ -128,7 +144,7 @@ public void markUsed() { lastUsed = System.currentTimeMillis(); } - public void reset() { + public void reset() { bodyBytes = 0; bodyLength = 0; bodyOffset = 0; @@ -141,7 +157,14 @@ public void reset() { code = 0; offset = 0; message = null; - protocl = null; - state = ResponseParserState.ResponseStatusCode; + protocol = null; + state = HTTPClientResponseParserState.ResponseStatusCode; + } + + /** + * @return The instant that this processor was last used. + */ + long lastUsed() { + return lastUsed; } } diff --git a/src/main/java/io/fusionauth/http/client/HTTPClient.java b/src/main/java/io/fusionauth/http/client/HTTPClient.java new file mode 100644 index 0000000..d0cb066 --- /dev/null +++ b/src/main/java/io/fusionauth/http/client/HTTPClient.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2022, FusionAuth, All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package io.fusionauth.http.client; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CompletableFuture; + +import io.fusionauth.http.HTTPMethod; + +/** + * @author Brian Pontarelli + * @author Daniel DeGroff + */ +public class HTTPClient { + private static final HTTPClientThread thread; + + private HTTPClientConfiguration configuration = new HTTPClientConfiguration(); + + private String method; + + private URI url; + + static { + try { + thread = new HTTPClientThread(); + thread.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public HTTPClient configuration(HTTPClientConfiguration configuration) { + this.configuration = configuration; + return this; + } + + public HTTPClient get() { + method = HTTPMethod.GET.name(); + return this; + } + + public HTTPClient header(String name, String value) { + this.configuration.addHeader(name, value); + return this; + } + + public HTTPClient optionalHeader(boolean test, String name, String value) { + if (test) { + this.configuration.addHeader(name, value); + } + + return this; + } + + public HTTPClientResponse send() { + try { + return sendAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public CompletableFuture sendAsync() { + try { + return thread.add(url, method, configuration); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public HTTPClient url(URI url) { + this.url = url; + return this; + } +} diff --git a/src/main/java/io/fusionauth/http/client/ChannelPool.java b/src/main/java/io/fusionauth/http/client/HTTPClientChannelPool.java similarity index 91% rename from src/main/java/io/fusionauth/http/client/ChannelPool.java rename to src/main/java/io/fusionauth/http/client/HTTPClientChannelPool.java index fbe6bda..3445b7e 100644 --- a/src/main/java/io/fusionauth/http/client/ChannelPool.java +++ b/src/main/java/io/fusionauth/http/client/HTTPClientChannelPool.java @@ -22,12 +22,12 @@ import java.util.Queue; /** - * Models a pool of available Channels that are already connected to a remote server and are in a Keep-Alive state. At any point, a HTTP client - * request might check out a Channel from the pool and return it when finished. + * Models a pool of available Channels that are already connected to a remote server and are in a Keep-Alive state. At any point, a HTTP + * client request might check out a Channel from the pool and return it when finished. * * @author Brian Pontarelli */ -public class ChannelPool { +public class HTTPClientChannelPool { private final Map> pool = new HashMap<>(); public synchronized void checkin(String host, SocketChannel channel) { diff --git a/src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java b/src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java new file mode 100644 index 0000000..0480c28 --- /dev/null +++ b/src/main/java/io/fusionauth/http/client/HTTPClientConfiguration.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022, FusionAuth, All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package io.fusionauth.http.client; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import io.fusionauth.http.log.LoggerFactory; +import io.fusionauth.http.log.SystemOutLoggerFactory; + +/** + * @author Daniel DeGroff + */ +public class HTTPClientConfiguration implements Configurable { + public final Map> headers = new HashMap<>(); + + private LoggerFactory loggerFactory = SystemOutLoggerFactory.FACTORY; + + private Duration socketTimeoutDuration = Duration.ofSeconds(20); + + @Override + public HTTPClientConfiguration addHeader(String name, String value) { + headers.computeIfAbsent(name.toLowerCase(), key -> new ArrayList<>()).add(value); + return this; + } + + @Override + public HTTPClientConfiguration configuration() { + return this; + } + + public LoggerFactory getLoggerFactory() { + return loggerFactory; + } + + public Duration getSocketTimeoutDuration() { + return socketTimeoutDuration; + } + + @Override + public HTTPClientConfiguration withLoggerFactory(LoggerFactory loggerFactory) { + Objects.requireNonNull(loggerFactory, "You cannot set LoggerFactory to null"); + this.loggerFactory = loggerFactory; + return this; + } +} diff --git a/src/main/java/io/fusionauth/http/client/HTTPClientResponse.java b/src/main/java/io/fusionauth/http/client/HTTPClientResponse.java new file mode 100644 index 0000000..7cf2337 --- /dev/null +++ b/src/main/java/io/fusionauth/http/client/HTTPClientResponse.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2022, FusionAuth, All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package io.fusionauth.http.client; + +import java.io.Writer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import io.fusionauth.http.Cookie; +import io.fusionauth.http.HTTPValues.Connections; +import io.fusionauth.http.HTTPValues.ContentTypes; +import io.fusionauth.http.HTTPValues.Headers; +import io.fusionauth.http.util.HTTPTools; +import io.fusionauth.http.util.HTTPTools.HeaderValue; + +/** + * An HTTP client response. + * + * @author Daniel DeGroff + */ +public class HTTPClientResponse { + private final Map> cookies = new HashMap<>(); // > + + private final Map> headers = new HashMap<>(); + + private byte[] body; + + private volatile boolean committed; + + private Throwable exception; + + private int status = 200; + + private String statusMessage; + + private Writer writer; + + public void clearHeaders() { + headers.clear(); + } + + public boolean containsHeader(String name) { + String key = name.toLowerCase(); + return headers.containsKey(key) && headers.get(key).size() > 0; + } + + public boolean failure() { + return status < 200 || status > 299; + } + + public byte[] getBody() { + return body; + } + + public void setBody(byte[] body) { + this.body = body; + } + + /** + * Determines the character set by parsing the {@code Content-Type} header (if it exists) to pull out the {@code charset} parameter. + * + * @return The Charset or UTF-8 if it wasn't specified in the {@code Content-Type} header. + */ + public Charset getCharset() { + Charset charset = StandardCharsets.UTF_8; + String contentType = getContentType(); + if (contentType != null) { + HeaderValue headerValue = HTTPTools.parseHeaderValue(contentType); + String charsetName = headerValue.parameters().get(ContentTypes.CharsetParameter); + if (charsetName != null) { + charset = Charset.forName(charsetName); + } + } + + return charset; + } + + public Long getContentLength() { + if (containsHeader(Headers.ContentLength)) { + return Long.parseLong(getHeader(Headers.ContentLength)); + } + + return null; + } + + public String getContentType() { + return getHeader(Headers.ContentType); + } + + public List getCookies() { + return cookies.values() + .stream() + .flatMap(map -> map.values().stream()) + .collect(Collectors.toList()); + } + + public Throwable getException() { + return exception; + } + + public void setException(Throwable exception) { + this.exception = exception; + } + + public String getHeader(String name) { + String key = name.toLowerCase(); + return headers.containsKey(key) && headers.get(key).size() > 0 ? headers.get(key).get(0) : null; + } + + public List getHeaders(String key) { + return headers.get(key.toLowerCase()); + } + + public Map> getHeadersMap() { + return headers; + } + + public String getRedirect() { + return getHeader(Headers.Location); + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getStatusMessage() { + return statusMessage; + } + + public void setStatusMessage(String statusMessage) { + this.statusMessage = statusMessage; + } + + /** + * @return True if the response has been committed, meaning at least one byte was written back to the client. False otherwise. + */ + public boolean isCommitted() { + return committed; + } + + /** + * Sets the committed status of the response. + * + * @param committed The status. + */ + public void setCommitted(boolean committed) { + this.committed = committed; + } + + /** + * @return If the connection should be kept open (keep-alive) or not. The default is to return the Connection: keep-alive header, which + * this method does. + */ + public boolean isKeepAlive() { + String value = getHeader(Headers.Connection); + return value == null || value.equalsIgnoreCase(Connections.KeepAlive); + } +} + diff --git a/src/main/java/io/fusionauth/http/client/ResponseParserState.java b/src/main/java/io/fusionauth/http/client/HTTPClientResponseParserState.java similarity index 75% rename from src/main/java/io/fusionauth/http/client/ResponseParserState.java rename to src/main/java/io/fusionauth/http/client/HTTPClientResponseParserState.java index ff7f20c..94850e8 100644 --- a/src/main/java/io/fusionauth/http/client/ResponseParserState.java +++ b/src/main/java/io/fusionauth/http/client/HTTPClientResponseParserState.java @@ -21,10 +21,13 @@ import io.fusionauth.http.ParseException; import io.fusionauth.http.util.HTTPTools; -public enum ResponseParserState { +/** + * @author Brian Pontarelli + */ +public enum HTTPClientResponseParserState { ResponseProtocol { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == 'H' || ch == 'T' || ch == 'P' || ch == '/' || ch == '1' || ch == '.') { return ResponseProtocol; } else if (ch == ' ') { @@ -42,7 +45,7 @@ public boolean store() { ResponseProtocolSP { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == ' ') { return ResponseProtocolSP; } else if (HTTPTools.isDigitCharacter(ch)) { @@ -59,7 +62,7 @@ public boolean store() { }, ResponseStatusCode { - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == ' ') { return ResponseStatusCodeSP; } else if (HTTPTools.isDigitCharacter(ch)) { @@ -76,11 +79,15 @@ public boolean store() { ResponseStatusCodeSP { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == ' ') { return ResponseStatusCodeSP; } else if (HTTPTools.isValueCharacter(ch)) { return ResponseStatusMessage; + } else if (ch == '\n') { + return ResponseStatusMessageLF; + } else if (ch == '\r') { + return ResponseStatusMessageCR; } else { throw new ParseException(); } @@ -94,7 +101,7 @@ public boolean store() { ResponseStatusMessage { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\r') { return ResponseStatusMessageCR; } else if (HTTPTools.isValueCharacter(ch)) { @@ -112,7 +119,7 @@ public boolean store() { ResponseStatusMessageCR { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\n') { return ResponseStatusMessageLF; } else { @@ -128,7 +135,7 @@ public boolean store() { ResponseStatusMessageLF { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\r') { return ResponseMessageCR; } else if (HTTPTools.isTokenCharacter(ch)) { @@ -146,7 +153,7 @@ public boolean store() { ResponseMessageCR { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\n') { return ResponseComplete; } else { @@ -162,7 +169,7 @@ public boolean store() { ResponseComplete { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { return ResponseComplete; } @@ -174,7 +181,7 @@ public boolean store() { HeaderName { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (HTTPTools.isTokenCharacter(ch)) { return HeaderName; } else if (ch == ':') { @@ -192,7 +199,7 @@ public boolean store() { HeaderColon { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == ' ') { return HeaderColon; // Re-using this state because HeaderSP would be the same } else if (HTTPTools.isTokenCharacter(ch)) { @@ -210,7 +217,7 @@ public boolean store() { HeaderValue { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\r') { return HeaderCR; } else if (HTTPTools.isValueCharacter(ch)) { @@ -228,7 +235,7 @@ public boolean store() { HeaderCR { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\n') { return HeaderLF; } else { @@ -244,7 +251,7 @@ public boolean store() { HeaderLF { @Override - public ResponseParserState next(byte ch, Map> headers) { + public HTTPClientResponseParserState next(byte ch, Map> headers) { if (ch == '\r') { return ResponseMessageCR; } else if (HTTPTools.isTokenCharacter(ch)) { @@ -260,7 +267,7 @@ public boolean store() { } }; - public abstract ResponseParserState next(byte ch, Map> headers); + public abstract HTTPClientResponseParserState next(byte ch, Map> headers); public abstract boolean store(); } diff --git a/src/main/java/io/fusionauth/http/client/HTTPClientThread.java b/src/main/java/io/fusionauth/http/client/HTTPClientThread.java new file mode 100644 index 0000000..21f1fbc --- /dev/null +++ b/src/main/java/io/fusionauth/http/client/HTTPClientThread.java @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2022, FusionAuth, All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package io.fusionauth.http.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import io.fusionauth.http.HTTPValues.ControlBytes; +import io.fusionauth.http.HTTPValues.Protocols; +import io.fusionauth.http.io.ByteBufferOutputStream; + +@SuppressWarnings("resource") +public class HTTPClientThread extends Thread implements Closeable { +// final Queue clientKeys = new ConcurrentLinkedQueue<>(); + + public static AtomicInteger counter = new AtomicInteger(0); + // TODO : clean up stale connections to servers +// private final ClientReaperThread clientReaper; + + private final HTTPClientChannelPool pool = new HTTPClientChannelPool(); + + private final Selector selector; + + public HTTPClientThread() throws IOException { + selector = Selector.open(); + System.out.println("Client started"); + } + // private final List workerThreads = new ArrayList<>(); + + public CompletableFuture add(URI url, String method, HTTPClientConfiguration configuration) throws IOException { + // Just do enough to set up the data to attach and open the socket. + HTTP1Processor data = new HTTP1Processor(); +// data.logger = configuration.getLoggerFactory(); + data.future = new CompletableFuture<>(); + data.host = url.getHost(); + data.method = method; + data.protocol = url.getScheme(); + data.port = url.getPort() == -1 + ? (data.protocol.equals("http") ? 80 : 443) + : url.getPort(); + data.url = url; + + for (String key : configuration.headers.keySet()) { + data.headers.put(key, new ArrayList<>(configuration.headers.get(key))); + } + + // No channel, open a new socket w/out blocking, connect and then register for the OP_CONNECT state. + SocketChannel channel = pool.checkout(url.getHost()); + if (channel == null) { + System.out.println("[" + counter.incrementAndGet() + "] open socket."); + channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.connect(new InetSocketAddress(data.host, data.port)); + channel.register(selector, SelectionKey.OP_CONNECT, data); + } else { + System.out.println("[" + counter.incrementAndGet() + "] re-use socket."); + // Use the existing socket, and attach our data and register for OP_WRITE +// channel.register(selector, SelectionKey.OP_WRITE, data); +// SelectionKey key =channel.register(selector, SelectionKey.OP_WRITE, data); + // TODO : Is this safe? + SelectionKey key = channel.keyFor(selector); + key.attach(data); + key.interestOps(SelectionKey.OP_WRITE); + } + + // Time to get up! + selector.wakeup(); + return data.future; + } + + @Override + public void close() { +// clientReaper.shutdown(); + + try { + selector.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + public void run() { + while (true) { + try { + // If the selector has been closed, shut the thread down + if (!selector.isOpen()) { + return; + } + + // TODO : Note This wil block, we should add a timeout for cleanup. + // TODO : Configuration + int numberOfKeys = selector.select(1_000L); + if (numberOfKeys <= 0) { + continue; + } + + var keys = selector.selectedKeys(); + Iterator iterator = keys.iterator(); + while (iterator.hasNext()) { + var key = iterator.next(); + if (key.isConnectable()) { +// System.out.println("Connecting"); + connect(key); + } else if (key.isReadable()) { + if (key.attachment() == null) { + System.out.println("Should we be here if we don't have an attachment? Seems like we should be in a write state?\n\n"); + } + try { + read(key); + } catch (SocketException e) { + // Try to recover from a connection reset + if (e.getMessage().equals("Connection reset")) { + System.out.println("[" + counter + "] Handle connection reset."); + HTTP1Processor data = (HTTP1Processor) key.attachment(); + data.reset(); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.connect(new InetSocketAddress(data.host, data.port)); + channel.register(selector, SelectionKey.OP_CONNECT, data); + } else { + throw e; + } + + } + + } else if (key.isWritable()) { + if (key.attachment() == null) { + System.out.println("Should we be here if we don't have an attachment? Seems like we should be in a different state?\n\n"); + } + write(key); + } + + iterator.remove(); + } + } catch (IOException e) { + System.out.println("Whoa!!!\n\n"); + e.printStackTrace(); + } finally { +// cleanup(); + } + } + } + + private void cancelAndCloseKey(SelectionKey key) { + if (key != null) { + HTTP1Processor data = (HTTP1Processor) key.attachment(); + + try (var client = key.channel()) { + key.cancel(); + } catch (Throwable t) { + // TODO : Logger + } + + // TODO : Tracer + } + } + + private void cleanup() { + long now = System.currentTimeMillis(); + // TODO : Logger + selector.keys() + .stream() + .filter(key -> key.attachment() != null) + // TODO : Configuration + .filter(key -> ((HTTP1Processor) key.attachment()).lastUsed() < now - 20_000) + // TODO : Logger + .forEach(this::cancelAndCloseKey); + } + + private void connect(SelectionKey key) throws IOException { + SocketChannel channel = (SocketChannel) key.channel(); + if (channel.finishConnect()) { + key.interestOps(SelectionKey.OP_WRITE); + } + } + + private void parsePreamble(HTTP1Processor data) { + StringBuilder builder = new StringBuilder(512); + + String url = "/"; + if (data.url.getPath() != null) { + url = data.url.getPath(); + } + + if (data.url.getQuery() != null) { + url += data.url.getQuery(); + } + + + if ("".equals(url)) { + url = "/"; + } + + ByteBufferOutputStream bbos = new ByteBufferOutputStream(1024, 8 * 1024); + + // TODO : Note : Things like Host, etc we could just add headers and then use the common path of code. + + // Method Path Protocol + bbos.write(data.method.getBytes(StandardCharsets.UTF_8)); + bbos.write(" ".getBytes(StandardCharsets.UTF_8)); + bbos.write(url.getBytes(StandardCharsets.UTF_8)); + bbos.write(" ".getBytes(StandardCharsets.UTF_8)); + bbos.write(Protocols.HTTTP1_1.getBytes(StandardCharsets.UTF_8)); + bbos.write(ControlBytes.CRLF); + + // Host + bbos.write("Host: ".getBytes(StandardCharsets.UTF_8)); + bbos.write(data.url.getHost().getBytes(StandardCharsets.UTF_8)); + bbos.write(ControlBytes.CRLF); + + // User-Agent + bbos.write("User-Agent: java-http-client/1.0.0".getBytes(StandardCharsets.UTF_8)); + bbos.write(ControlBytes.CRLF); + +// bbos.write("Accept: */*".getBytes(StandardCharsets.UTF_8)); +// bbos.write(ControlBytes.CRLF); + +// builder.append(data.method) +// .append(" ") +// .append(url) +// .append(" ") +// .append(Protocols.HTTTP1_1) +// .append((byte[]) ControlBytes.CRLF) +// .append("Host:") +// .append(data.url.getHost()) +// .append((byte[]) ControlBytes.CRLF) +// .append("User-Agent: java-http-client/1.0.0") +// .append((byte[]) ControlBytes.CRLF) +// .append("Accept: */*") +// .append((byte[]) ControlBytes.CRLF); + + + // Headers + if (!data.headers.isEmpty()) { + + + for (String key : data.headers.keySet()) { + StringBuilder valueString = new StringBuilder(); + bbos.write((key + ": ").getBytes(StandardCharsets.UTF_8)); + for (String value : data.headers.get(key)) { + // TODO : Daniel : Need escaping? + valueString.append(value); + valueString.append("; "); + } + + int index = valueString.indexOf("; "); + if (index == valueString.length() - 2) { + bbos.write(valueString.substring(0, valueString.length() - 2).getBytes(StandardCharsets.UTF_8)); + } else { + bbos.write(valueString.toString().getBytes(StandardCharsets.UTF_8)); + } + + bbos.write(ControlBytes.CRLF); + } + } + + // Close the preamble + bbos.write(ControlBytes.CRLF); + +// data.request = ByteBuffer.wrap(builder.toString().getBytes(StandardCharsets.UTF_8)); + ByteBuffer buff = bbos.toByteBuffer(); +// System.out.println("\n---Preamble Start"); +// System.out.println(new String(buff.array(), 0, buff.limit(), StandardCharsets.UTF_8)); +// System.out.println("---Preamble End\n\n"); + data.request = buff; + } + + @SuppressWarnings("resource") + private void read(SelectionKey key) throws IOException { + SocketChannel client = (SocketChannel) key.channel(); + HTTP1Processor data = (HTTP1Processor) key.attachment(); + // TODO : Daniel : is this safe? Why do we have a read op if we are complete and we + // set the data attachment to null. + long read = data != null ? client.read(data.currentBuffer()) : 0; + if (read <= 0) { + return; + } + + if (data.isResponseComplete()) { + HTTPClientResponse httpResponse = new HTTPClientResponse(); + httpResponse.setStatus(data.code); + int expected = 0; + for (ByteBuffer buffer : data.buffers) { + expected += buffer.position(); + } + + ByteBuffer total = ByteBuffer.allocate(expected); + for (ByteBuffer buffer : data.buffers) { + buffer.flip(); + byte[] actual = buffer.array(); + byte[] used = Arrays.copyOfRange(actual, 0, buffer.limit()); + total.put(used); + } + byte[] bytes = total.array(); +// System.out.println(bytes.length); +// System.out.println(data.read); + httpResponse.setBody(bytes); + + + key.attach(null); + + // TODO : Hacking - If using keep-alive, check back in... if not, don't + // Once we parse the response we should know if this is keep-alive from a boolean. + String stringBody = new String(bytes); + // TODO : is closing the channel enough, or should I cancel the keys as well? + if (stringBody.contains("connection: close")) { +// System.out.println("close the connection"); + key.channel().close(); +// key.interestOps(0); +// key.cancel(); + } else { +// key.interestOps(0); +// key.cancel(); + key.interestOps(0); + pool.checkin(data.host, (SocketChannel) key.channel()); + +// System.out.println(stringBody); + } + + + data.future.complete(httpResponse); + +// key.attach(null); + + + } + + } + + @SuppressWarnings("resource") + private void write(SelectionKey key) throws IOException { + // TODO : Daniel : We can write the preamble here + // The data object could manage the state. n + SocketChannel client = (SocketChannel) key.channel(); + HTTP1Processor data = (HTTP1Processor) key.attachment(); + + // TODO : Daniel : we'll need to keep track of the state to know what we are writing. + parsePreamble(data); + client.write(data.request); + + if (data.request.position() == data.request.limit()) { + key.interestOps(SelectionKey.OP_READ); + } + } +} diff --git a/src/main/java/io/fusionauth/http/client/NIOClientThread.java b/src/main/java/io/fusionauth/http/client/NIOClientThread.java deleted file mode 100644 index 8fe86e9..0000000 --- a/src/main/java/io/fusionauth/http/client/NIOClientThread.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2022, FusionAuth, All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the License. - */ -package io.fusionauth.http.client; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; - -@SuppressWarnings("resource") -public class NIOClientThread extends Thread implements Closeable { -// final Queue clientKeys = new ConcurrentLinkedQueue<>(); - -// private final List workerThreads = new ArrayList<>(); - - // TODO : clean up stale connections to servers -// private final ClientReaperThread clientReaper; - - private final ChannelPool pool = new ChannelPool(); - - private final Selector selector; - - public NIOClientThread() throws IOException { - selector = Selector.open(); - System.out.println("Client started"); - } - - public Future add(URI uri, String method) throws IOException { - HTTPData data = new HTTPData(); - data.request = ByteBuffer.wrap( - """ - GET /api/system/version HTTP/1.1\r - \r - """.getBytes()); - data.future = new CompletableFuture<>(); - data.host = uri.getHost(); - - SocketChannel channel = pool.checkout(uri.getHost()); - if (channel == null) { - channel = SocketChannel.open(); - channel.configureBlocking(false); - channel.connect(new InetSocketAddress(uri.getHost(), uri.getPort())); - - SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); - key.attach(data); - } else { - SelectionKey key = channel.keyFor(selector); - key.attach(data); - key.interestOps(SelectionKey.OP_WRITE); - } - - // Wakeup! Time to put on a little makeup! - selector.wakeup(); - return data.future; - } - - @Override - public void close() { -// clientReaper.shutdown(); - - try { - selector.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - public void run() { - while (true) { - try { - // If the selector has been closed, shut the thread down - if (!selector.isOpen()) { - return; - } - - int numberOfKeys = selector.select(); - if (numberOfKeys <= 0) { - continue; - } - - var keys = selector.selectedKeys(); - Iterator iterator = keys.iterator(); - while (iterator.hasNext()) { - var key = iterator.next(); - if (key.isConnectable()) { - System.out.println("Connecting"); - connect(key); - } else if (key.isReadable()) { - read(key); - } else if (key.isWritable()) { - write(key); - } - - iterator.remove(); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private void connect(SelectionKey key) throws IOException { - SocketChannel channel = (SocketChannel) key.channel(); - if (channel.finishConnect()) { - key.interestOps(SelectionKey.OP_WRITE); - } - } - - @SuppressWarnings("resource") - private void read(SelectionKey key) throws IOException { - SocketChannel client = (SocketChannel) key.channel(); - HTTPData data = (HTTPData) key.attachment(); - long read = client.read(data.currentBuffer()); - if (read <= 0) { - return; - } - - if (data.isResponseComplete()) { - data.future.complete(data.code); - key.attach(null); - pool.checkin(data.host, (SocketChannel) key.channel()); - } - } - - @SuppressWarnings("resource") - private void write(SelectionKey key) throws IOException { - SocketChannel client = (SocketChannel) key.channel(); - HTTPData data = (HTTPData) key.attachment(); - client.write(data.request); - - if (data.request.position() == data.request.limit()) { - key.interestOps(SelectionKey.OP_READ); - } - } -} diff --git a/src/main/java/io/fusionauth/http/client/SimpleNIOClient.java b/src/main/java/io/fusionauth/http/client/SimpleNIOClient.java deleted file mode 100644 index fbce0dc..0000000 --- a/src/main/java/io/fusionauth/http/client/SimpleNIOClient.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2022, FusionAuth, All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the License. - */ -package io.fusionauth.http.client; - -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.Future; - -public class SimpleNIOClient { - private static final NIOClientThread thread; - - public String method; - - public String url; - - public SimpleNIOClient get() { - this.method = "GET"; - return this; - } - - public int go() { - try { - Future future = thread.add(URI.create(url), method); - return future.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public SimpleNIOClient url(String url) { - this.url = url; - return this; - } - - static { - try { - thread = new NIOClientThread(); - thread.start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} diff --git a/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java b/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java index 64337e5..457550b 100644 --- a/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java +++ b/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java @@ -142,7 +142,6 @@ public HTTPServerConfiguration withClientTimeout(Duration duration) { throw new IllegalArgumentException("You cannot set the client timeout less than 0"); } - this.clientTimeoutDuration = duration; return this; } diff --git a/src/test/java/io/fusionauth/http/BaseTest.java b/src/test/java/io/fusionauth/http/BaseTest.java index 08c7db0..70de1ff 100644 --- a/src/test/java/io/fusionauth/http/BaseTest.java +++ b/src/test/java/io/fusionauth/http/BaseTest.java @@ -39,7 +39,7 @@ * * @author Brian Pontarelli */ -public class BaseTest { +public abstract class BaseTest { public static String certificate; public static String privateKey; @@ -51,12 +51,14 @@ public static void loadFiles() throws IOException { privateKey = Files.readString(Paths.get(homeDir + "/dev/certificates/fusionauth.key")); } - public HTTPServer makeServer(String scheme, HTTPHandler handler) { - return makeServer(scheme, handler, null); - } - - public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter instrumenter) { - return makeServer(scheme, handler, instrumenter, null); + @DataProvider + public Object[][] connection() { + // TODO: One at a time works, but doing all of them fails with a connection reset issue. + return new Object[][]{ + {""}, + {"close"}, + {"keep-alive"} + }; } @SuppressWarnings("resource") @@ -77,6 +79,14 @@ public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter in .withListener(listenerConfiguration); } + public HTTPServer makeServer(String scheme, HTTPHandler handler) { + return makeServer(scheme, handler, null); + } + + public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter instrumenter) { + return makeServer(scheme, handler, instrumenter, null); + } + public URI makeURI(String scheme, String params) { if (scheme.equals("https")) { return URI.create("https://local.fusionauth.io:4242/api/system/version" + params); @@ -85,6 +95,17 @@ public URI makeURI(String scheme, String params) { return URI.create("http://localhost:4242/api/system/version" + params); } + /** + * @return The possible schemes - {@code http} and {@code https}. + */ + @DataProvider + public Object[][] schemes() { + return new Object[][]{ + {"http"}, + {"https"} + }; + } + public void sendBadRequest(String message) { try (Socket socket = new Socket("127.0.0.1", 4242); OutputStream os = socket.getOutputStream(); InputStream is = socket.getInputStream()) { os.write(message.getBytes()); @@ -98,15 +119,4 @@ public void sendBadRequest(String message) { fail(e.getMessage()); } } - - /** - * @return The possible schemes - {@code http} and {@code https}. - */ - @DataProvider - public Object[][] schemes() { - return new Object[][]{ - {"http"}, - {"https"} - }; - } } diff --git a/src/test/java/io/fusionauth/http/client/CoreTest.java b/src/test/java/io/fusionauth/http/client/CoreTest.java new file mode 100644 index 0000000..152bb98 --- /dev/null +++ b/src/test/java/io/fusionauth/http/client/CoreTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022, FusionAuth, All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the License. + */ +package io.fusionauth.http.client; + +import java.net.URI; + +import io.fusionauth.http.BaseTest; +import io.fusionauth.http.server.HTTPHandler; +import io.fusionauth.http.server.HTTPServer; +import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; + +/** + * @author Daniel DeGroff + */ +public class CoreTest extends BaseTest { + + // TODO : invocationCount = 1 works, but greater than 1 fails on the second iteration with a "Connection reset" SocketException + @Test(dataProvider = "connection", invocationCount = 1) + public void get_connection(String connection) { + HTTPClientThread.counter.set(0); + HTTPHandler handler = (req, res) -> res.setStatus(200); + + try (HTTPServer ignore = makeServer("http", handler).start()) { + // Reset the connection pool since we know the server is going to tear down all sockets. + // TODO : Note, should we handle this by just rebuilding the connection? I would guess we have + // to tolerate the server terminating our connection. + + // TODO : Note, if I bump this to 100_000, I'll eventually get + // socket errors, so I'm doing something wrong. + // Using roughly a ration of 1:10 so keep the timings roughly the same. You probably + // don't want to wait for 50,000 requests to complete w/out keep-alive. + long iterations = !connection.equals("close") +// ? 50_000 +// : 5_000; + ? 500 + : 50; + long start = System.currentTimeMillis(); + + HTTPClient client = new HTTPClient() + .url(URI.create("http://localhost:4242")) + .optionalHeader(!connection.equals(""), "Connection", connection) + .get(); + + for (int i = 0; i <= iterations; i++) { + // TODO : Note if I build the client outside of the loop, + // and only call send() in the loop the connection response + // header is not consistent. Seems like a bug. + HTTPClientResponse response; + try { + response = client + .send(); + } catch (Exception e) { + System.out.println(i + 1); + throw e; + } + + assertEquals(response.getStatus(), 200); + assertEquals(new String(response.getBody()), """ + HTTP/1.1 200 \r + content-length: 0\r + connection: {connection}\r + \r + """ + .replace("{connection}", connection.equals("") ? "keep-alive" : connection) + , "iteration [" + (i + 1) + "]"); + + if (i > 0 && i % 5_000 == 0) { + System.out.println(i); + } + } + + long end = System.currentTimeMillis(); + double average = (end - start) / (double) iterations; + System.out.println("\nAverage linear request time is [" + average + "]ms"); + System.out.println("Duration: " + (end - start) + " ms\n"); + } + } +}