From 563b658c9d2ca01f7d5f2048704e387db4adfdff Mon Sep 17 00:00:00 2001 From: kramer Date: Wed, 1 Jul 2015 23:31:24 +0200 Subject: [PATCH] set all config objects on the async client in client factory- fixes #219 --- jest/README.md | 4 +- .../searchbox/client/JestClientFactory.java | 76 +++++++++++++++++-- .../client/config/HttpClientConfig.java | 76 ++++++++++++++++++- .../idle/HttpReapableConnectionManager.java | 8 +- .../client/JestClientFactoryTest.java | 15 +++- 5 files changed, 165 insertions(+), 14 deletions(-) diff --git a/jest/README.md b/jest/README.md index fff018e8c..27280ef6c 100644 --- a/jest/README.md +++ b/jest/README.md @@ -452,10 +452,12 @@ SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new Trus HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE; SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext, hostnameVerifier); +SchemeIOSessionStrategy httpsIOSessionStrategy = SSLIOSessionStrategy(sslContext, hostnameVerifier); JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder("https://localhost:9200") - .sslSocketFactory(sslSocketFactory) + .sslSocketFactory(sslSocketFactory) // this only affects sync calls + .httpsIOSessionStrategy(httpsIOSessionStrategy) // this only affects async calls .build() ); ``` diff --git a/jest/src/main/java/io/searchbox/client/JestClientFactory.java b/jest/src/main/java/io/searchbox/client/JestClientFactory.java index 23a792b37..f4d16dab2 100755 --- a/jest/src/main/java/io/searchbox/client/JestClientFactory.java +++ b/jest/src/main/java/io/searchbox/client/JestClientFactory.java @@ -18,7 +18,15 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.nio.conn.NHttpClientConnectionManager; +import org.apache.http.nio.conn.SchemeIOSessionStrategy; +import org.apache.http.nio.reactor.IOReactorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +51,9 @@ public JestClient getObject() { client.setRequestCompressionEnabled(httpClientConfig.isRequestCompressionEnabled()); client.setServers(httpClientConfig.getServerList()); final HttpClientConnectionManager connectionManager = getConnectionManager(); + final NHttpClientConnectionManager asyncConnectionManager = getAsyncConnectionManager(); client.setHttpClient(createHttpClient(connectionManager)); + client.setAsyncClient(createAsyncHttpClient(asyncConnectionManager)); // set custom gson instance Gson gson = httpClientConfig.getGson(); @@ -69,7 +79,7 @@ public JestClient getObject() { if (httpClientConfig.getMaxConnectionIdleTime() > 0) { log.info("Idle connection reaping enabled..."); - IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager)); + IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager, asyncConnectionManager)); client.setIdleConnectionReaper(reaper); reaper.startAsync(); reaper.awaitRunning(); @@ -77,7 +87,6 @@ public JestClient getObject() { log.info("Idle connection reaping disabled..."); } - client.setAsyncClient(HttpAsyncClients.custom().setRoutePlanner(getRoutePlanner()).build()); return client; } @@ -96,10 +105,22 @@ private CloseableHttpClient createHttpClient(HttpClientConnectionManager connect ).build(); } + private CloseableHttpAsyncClient createAsyncHttpClient(NHttpClientConnectionManager connectionManager) { + return configureHttpClient( + HttpAsyncClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(getRequestConfig()) + .setProxyAuthenticationStrategy(httpClientConfig.getProxyAuthenticationStrategy()) + .setRoutePlanner(getRoutePlanner()) + .setDefaultCredentialsProvider(httpClientConfig.getCredentialsProvider()) + ).build(); + } + /** * Extension point - *

+ *

* Example: + *

*
      * final JestClientFactory factory = new JestClientFactory() {
      *    {@literal @Override}
@@ -108,14 +129,18 @@ private CloseableHttpClient createHttpClient(HttpClientConnectionManager connect
      *    }
      * }
      * 
- * - * @param builder - * @return */ protected HttpClientBuilder configureHttpClient(final HttpClientBuilder builder) { return builder; } + /** + * Extension point for async client + */ + protected HttpAsyncClientBuilder configureHttpClient(final HttpAsyncClientBuilder builder) { + return builder; + } + // Extension point protected HttpRoutePlanner getRoutePlanner() { return httpClientConfig.getHttpRoutePlanner(); @@ -129,6 +154,45 @@ protected RequestConfig getRequestConfig() { .build(); } + // Extension point + protected NHttpClientConnectionManager getAsyncConnectionManager() { + PoolingNHttpClientConnectionManager retval; + + IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setConnectTimeout(httpClientConfig.getConnTimeout()) + .setSoTimeout(httpClientConfig.getReadTimeout()) + .build(); + + Registry sessionStrategyRegistry = RegistryBuilder.create() + .register("http", httpClientConfig.getHttpIOSessionStrategy()) + .register("https", httpClientConfig.getHttpsIOSessionStrategy()) + .build(); + + try { + retval = new PoolingNHttpClientConnectionManager( + new DefaultConnectingIOReactor(ioReactorConfig), + sessionStrategyRegistry + ); + } catch (IOReactorException e) { + throw new IllegalStateException(e); + } + + final Integer maxTotal = httpClientConfig.getMaxTotalConnection(); + if (maxTotal != null) { + retval.setMaxTotal(maxTotal); + } + final Integer defaultMaxPerRoute = httpClientConfig.getDefaultMaxTotalConnectionPerRoute(); + if (defaultMaxPerRoute != null) { + retval.setDefaultMaxPerRoute(defaultMaxPerRoute); + } + final Map maxPerRoute = httpClientConfig.getMaxTotalConnectionPerRoute(); + for (Map.Entry entry : maxPerRoute.entrySet()) { + retval.setMaxPerRoute(entry.getKey(), entry.getValue()); + } + + return retval; + } + // Extension point protected HttpClientConnectionManager getConnectionManager() { HttpClientConnectionManager retval; diff --git a/jest/src/main/java/io/searchbox/client/config/HttpClientConfig.java b/jest/src/main/java/io/searchbox/client/config/HttpClientConfig.java index 7caee170a..68602e6a4 100644 --- a/jest/src/main/java/io/searchbox/client/config/HttpClientConfig.java +++ b/jest/src/main/java/io/searchbox/client/config/HttpClientConfig.java @@ -14,6 +14,9 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.conn.DefaultProxyRoutePlanner; import org.apache.http.impl.conn.SystemDefaultRoutePlanner; +import org.apache.http.nio.conn.NoopIOSessionStrategy; +import org.apache.http.nio.conn.SchemeIOSessionStrategy; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import java.net.ProxySelector; import java.util.Collection; @@ -34,6 +37,8 @@ public class HttpClientConfig extends ClientConfig { private final ConnectionSocketFactory plainSocketFactory; private final HttpRoutePlanner httpRoutePlanner; private final AuthenticationStrategy proxyAuthenticationStrategy; + private final SchemeIOSessionStrategy httpIOSessionStrategy; + private final SchemeIOSessionStrategy httpsIOSessionStrategy; public HttpClientConfig(Builder builder) { super(builder); @@ -45,6 +50,8 @@ public HttpClientConfig(Builder builder) { this.plainSocketFactory = builder.plainSocketFactory; this.httpRoutePlanner = builder.httpRoutePlanner; this.proxyAuthenticationStrategy = builder.proxyAuthenticationStrategy; + this.httpIOSessionStrategy = builder.httpIOSessionStrategy; + this.httpsIOSessionStrategy = builder.httpsIOSessionStrategy; } public Map getMaxTotalConnectionPerRoute() { @@ -79,6 +86,14 @@ public AuthenticationStrategy getProxyAuthenticationStrategy() { return proxyAuthenticationStrategy; } + public SchemeIOSessionStrategy getHttpIOSessionStrategy() { + return httpIOSessionStrategy; + } + + public SchemeIOSessionStrategy getHttpsIOSessionStrategy() { + return httpsIOSessionStrategy; + } + public static class Builder extends ClientConfig.AbstractBuilder { private Integer maxTotalConnection; @@ -89,6 +104,8 @@ public static class Builder extends ClientConfig.AbstractBuilderhttps scheme. + * Sets the socket factory that will be used by sync client for HTTP scheme. + *

+ * SSLConnectionSocketFactory.getSocketFactory() is used by default. + *

+ * A bad example of trust-all socket factory creation can be done as below: + *

+ *
+         * // trust ALL certificates
+         * SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
+         *     public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+         *         return true;
+         *     }
+         * }).build();
+         *
+         * // skip hostname checks
+         * HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE;
+         *
+         * SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
+         * 
+ * + * @param socketFactory socket factory instance that will be registered for https scheme. + * @see SSLConnectionSocketFactory */ public Builder sslSocketFactory(LayeredConnectionSocketFactory socketFactory) { this.sslSocketFactory = socketFactory; @@ -152,13 +190,47 @@ public Builder sslSocketFactory(LayeredConnectionSocketFactory socketFactory) { } /** - * @param socketFactory The socket factory instance that will be registered for http scheme. + * Sets the socket factory that will be used by sync client for HTTPS scheme. + *

+ * PlainConnectionSocketFactory.getSocketFactory() is used by default. + *

+ * + * @param socketFactory socket factory instance that will be registered for http scheme. + * @see PlainConnectionSocketFactory */ public Builder plainSocketFactory(ConnectionSocketFactory socketFactory) { this.plainSocketFactory = socketFactory; return this; } + /** + * Sets the socket factory that will be used by async client for HTTP scheme. + *

+ * NoopIOSessionStrategy.INSTANCE is used by default. + *

+ * + * @param httpIOSessionStrategy SchemeIOSessionStrategy instance that will be registered for http scheme. + * @see NoopIOSessionStrategy + */ + public Builder httpIOSessionStrategy(SchemeIOSessionStrategy httpIOSessionStrategy) { + this.httpIOSessionStrategy = httpIOSessionStrategy; + return this; + } + + /** + * Sets the socket factory that will be used by async client for HTTPS scheme. + *

+ * SSLIOSessionStrategy.getSystemDefaultStrategy() is used by default. + *

+ * + * @param httpsIOSessionStrategy SchemeIOSessionStrategy instance that will be registered for https scheme. + * @see SSLIOSessionStrategy + */ + public Builder httpsIOSessionStrategy(SchemeIOSessionStrategy httpsIOSessionStrategy) { + this.httpsIOSessionStrategy = httpsIOSessionStrategy; + return this; + } + public Builder proxy(HttpHost proxy) { return proxy(proxy, null); } diff --git a/jest/src/main/java/io/searchbox/client/config/idle/HttpReapableConnectionManager.java b/jest/src/main/java/io/searchbox/client/config/idle/HttpReapableConnectionManager.java index db7ca0045..847119b3f 100644 --- a/jest/src/main/java/io/searchbox/client/config/idle/HttpReapableConnectionManager.java +++ b/jest/src/main/java/io/searchbox/client/config/idle/HttpReapableConnectionManager.java @@ -1,18 +1,24 @@ package io.searchbox.client.config.idle; import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.nio.conn.NHttpClientConnectionManager; import java.util.concurrent.TimeUnit; public class HttpReapableConnectionManager implements ReapableConnectionManager { private final HttpClientConnectionManager connectionManager; + private final NHttpClientConnectionManager nConnectionManager; + + public HttpReapableConnectionManager(HttpClientConnectionManager connectionManager, NHttpClientConnectionManager nConnectionManager) { + if(connectionManager == null || nConnectionManager == null) throw new IllegalArgumentException(); - public HttpReapableConnectionManager(HttpClientConnectionManager connectionManager) { this.connectionManager = connectionManager; + this.nConnectionManager = nConnectionManager; } @Override public void closeIdleConnections(long idleTimeout, TimeUnit unit) { connectionManager.closeIdleConnections(idleTimeout, unit); + nConnectionManager.closeIdleConnections(idleTimeout, unit); } } diff --git a/jest/src/test/java/io/searchbox/client/JestClientFactoryTest.java b/jest/src/test/java/io/searchbox/client/JestClientFactoryTest.java index bada1cc89..79ca2c1bc 100755 --- a/jest/src/test/java/io/searchbox/client/JestClientFactoryTest.java +++ b/jest/src/test/java/io/searchbox/client/JestClientFactoryTest.java @@ -8,6 +8,8 @@ import org.apache.http.conn.routing.HttpRoute; import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.nio.conn.NHttpClientConnectionManager; import org.junit.Test; import static org.junit.Assert.*; @@ -70,7 +72,9 @@ public void multiThreadedClientCreation() { JestHttpClient jestClient = (JestHttpClient) factory.getObject(); assertTrue(jestClient != null); - assertNotNull(jestClient.getAsyncClient()); + assertEquals(jestClient.getServerPoolSize(), 1); + assertEquals("server list should contain localhost:9200", "http://localhost:9200", jestClient.getNextServer()); + final HttpClientConnectionManager connectionManager = factory.getConnectionManager(); assertTrue(connectionManager instanceof PoolingHttpClientConnectionManager); assertEquals(10, ((PoolingHttpClientConnectionManager) connectionManager).getDefaultMaxPerRoute()); @@ -78,8 +82,11 @@ public void multiThreadedClientCreation() { assertEquals(5, ((PoolingHttpClientConnectionManager) connectionManager).getMaxPerRoute(routeOne)); assertEquals(6, ((PoolingHttpClientConnectionManager) connectionManager).getMaxPerRoute(routeTwo)); - assertEquals(jestClient.getServerPoolSize(), 1); - assertEquals("server list should contain localhost:9200", - "http://localhost:9200", jestClient.getNextServer()); + final NHttpClientConnectionManager nConnectionManager = factory.getAsyncConnectionManager(); + assertTrue(nConnectionManager instanceof PoolingNHttpClientConnectionManager); + assertEquals(10, ((PoolingNHttpClientConnectionManager) nConnectionManager).getDefaultMaxPerRoute()); + assertEquals(20, ((PoolingNHttpClientConnectionManager) nConnectionManager).getMaxTotal()); + assertEquals(5, ((PoolingNHttpClientConnectionManager) nConnectionManager).getMaxPerRoute(routeOne)); + assertEquals(6, ((PoolingNHttpClientConnectionManager) nConnectionManager).getMaxPerRoute(routeTwo)); } }