Skip to content

Commit

Permalink
set all config objects on the async client in client factory- fixes s…
Browse files Browse the repository at this point in the history
  • Loading branch information
kramer committed Jul 1, 2015
1 parent 5470a41 commit 563b658
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 14 deletions.
4 changes: 3 additions & 1 deletion jest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,12 @@ SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new Trus
HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE;

SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
SchemeIOSessionStrategy httpsIOSessionStrategy = SSLIOSessionStrategy(sslContext, hostnameVerifier);

JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig.Builder("https://localhost:9200")
.sslSocketFactory(sslSocketFactory)
.sslSocketFactory(sslSocketFactory) // this only affects sync calls
.httpsIOSessionStrategy(httpsIOSessionStrategy) // this only affects async calls
.build()
);
```
Expand Down
76 changes: 70 additions & 6 deletions jest/src/main/java/io/searchbox/client/JestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.reactor.IOReactorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +51,9 @@ public JestClient getObject() {
client.setRequestCompressionEnabled(httpClientConfig.isRequestCompressionEnabled());
client.setServers(httpClientConfig.getServerList());
final HttpClientConnectionManager connectionManager = getConnectionManager();
final NHttpClientConnectionManager asyncConnectionManager = getAsyncConnectionManager();
client.setHttpClient(createHttpClient(connectionManager));
client.setAsyncClient(createAsyncHttpClient(asyncConnectionManager));

// set custom gson instance
Gson gson = httpClientConfig.getGson();
Expand All @@ -69,15 +79,14 @@ public JestClient getObject() {
if (httpClientConfig.getMaxConnectionIdleTime() > 0) {
log.info("Idle connection reaping enabled...");

IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager));
IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager, asyncConnectionManager));
client.setIdleConnectionReaper(reaper);
reaper.startAsync();
reaper.awaitRunning();
} else {
log.info("Idle connection reaping disabled...");
}

client.setAsyncClient(HttpAsyncClients.custom().setRoutePlanner(getRoutePlanner()).build());
return client;
}

Expand All @@ -96,10 +105,22 @@ private CloseableHttpClient createHttpClient(HttpClientConnectionManager connect
).build();
}

private CloseableHttpAsyncClient createAsyncHttpClient(NHttpClientConnectionManager connectionManager) {
return configureHttpClient(
HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(getRequestConfig())
.setProxyAuthenticationStrategy(httpClientConfig.getProxyAuthenticationStrategy())
.setRoutePlanner(getRoutePlanner())
.setDefaultCredentialsProvider(httpClientConfig.getCredentialsProvider())
).build();
}

/**
* Extension point
* <p/>
* <p>
* Example:
* </p>
* <pre>
* final JestClientFactory factory = new JestClientFactory() {
* {@literal @Override}
Expand All @@ -108,14 +129,18 @@ private CloseableHttpClient createHttpClient(HttpClientConnectionManager connect
* }
* }
* </pre>
*
* @param builder
* @return
*/
protected HttpClientBuilder configureHttpClient(final HttpClientBuilder builder) {
return builder;
}

/**
* Extension point for async client
*/
protected HttpAsyncClientBuilder configureHttpClient(final HttpAsyncClientBuilder builder) {
return builder;
}

// Extension point
protected HttpRoutePlanner getRoutePlanner() {
return httpClientConfig.getHttpRoutePlanner();
Expand All @@ -129,6 +154,45 @@ protected RequestConfig getRequestConfig() {
.build();
}

// Extension point
protected NHttpClientConnectionManager getAsyncConnectionManager() {
PoolingNHttpClientConnectionManager retval;

IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(httpClientConfig.getConnTimeout())
.setSoTimeout(httpClientConfig.getReadTimeout())
.build();

Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", httpClientConfig.getHttpIOSessionStrategy())
.register("https", httpClientConfig.getHttpsIOSessionStrategy())
.build();

try {
retval = new PoolingNHttpClientConnectionManager(
new DefaultConnectingIOReactor(ioReactorConfig),
sessionStrategyRegistry
);
} catch (IOReactorException e) {
throw new IllegalStateException(e);
}

final Integer maxTotal = httpClientConfig.getMaxTotalConnection();
if (maxTotal != null) {
retval.setMaxTotal(maxTotal);
}
final Integer defaultMaxPerRoute = httpClientConfig.getDefaultMaxTotalConnectionPerRoute();
if (defaultMaxPerRoute != null) {
retval.setDefaultMaxPerRoute(defaultMaxPerRoute);
}
final Map<HttpRoute, Integer> maxPerRoute = httpClientConfig.getMaxTotalConnectionPerRoute();
for (Map.Entry<HttpRoute, Integer> entry : maxPerRoute.entrySet()) {
retval.setMaxPerRoute(entry.getKey(), entry.getValue());
}

return retval;
}

// Extension point
protected HttpClientConnectionManager getConnectionManager() {
HttpClientConnectionManager retval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.conn.DefaultProxyRoutePlanner;
import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;

import java.net.ProxySelector;
import java.util.Collection;
Expand All @@ -34,6 +37,8 @@ public class HttpClientConfig extends ClientConfig {
private final ConnectionSocketFactory plainSocketFactory;
private final HttpRoutePlanner httpRoutePlanner;
private final AuthenticationStrategy proxyAuthenticationStrategy;
private final SchemeIOSessionStrategy httpIOSessionStrategy;
private final SchemeIOSessionStrategy httpsIOSessionStrategy;

public HttpClientConfig(Builder builder) {
super(builder);
Expand All @@ -45,6 +50,8 @@ public HttpClientConfig(Builder builder) {
this.plainSocketFactory = builder.plainSocketFactory;
this.httpRoutePlanner = builder.httpRoutePlanner;
this.proxyAuthenticationStrategy = builder.proxyAuthenticationStrategy;
this.httpIOSessionStrategy = builder.httpIOSessionStrategy;
this.httpsIOSessionStrategy = builder.httpsIOSessionStrategy;
}

public Map<HttpRoute, Integer> getMaxTotalConnectionPerRoute() {
Expand Down Expand Up @@ -79,6 +86,14 @@ public AuthenticationStrategy getProxyAuthenticationStrategy() {
return proxyAuthenticationStrategy;
}

public SchemeIOSessionStrategy getHttpIOSessionStrategy() {
return httpIOSessionStrategy;
}

public SchemeIOSessionStrategy getHttpsIOSessionStrategy() {
return httpsIOSessionStrategy;
}

public static class Builder extends ClientConfig.AbstractBuilder<HttpClientConfig, Builder> {

private Integer maxTotalConnection;
Expand All @@ -89,6 +104,8 @@ public static class Builder extends ClientConfig.AbstractBuilder<HttpClientConfi
private ConnectionSocketFactory plainSocketFactory = PlainConnectionSocketFactory.getSocketFactory();
private HttpRoutePlanner httpRoutePlanner = new SystemDefaultRoutePlanner(ProxySelector.getDefault());
private AuthenticationStrategy proxyAuthenticationStrategy;
private SchemeIOSessionStrategy httpIOSessionStrategy = NoopIOSessionStrategy.INSTANCE;
private SchemeIOSessionStrategy httpsIOSessionStrategy = SSLIOSessionStrategy.getSystemDefaultStrategy();

public Builder(HttpClientConfig httpClientConfig) {
super(httpClientConfig);
Expand Down Expand Up @@ -144,21 +161,76 @@ public Builder defaultCredentials(String username, String password) {
}

/**
* @param socketFactory The socket factory instance that will be registered for <code>https</code> scheme.
* Sets the socket factory that will be used by <b>sync</b> client for HTTP scheme.
* <p>
* <code>SSLConnectionSocketFactory.getSocketFactory()</code> is used by default.
* </p><p>
* A bad example of trust-all socket factory creation can be done as below:
* </p>
* <pre>
* // trust ALL certificates
* SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
* public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
* return true;
* }
* }).build();
*
* // skip hostname checks
* HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE;
*
* SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
* </pre>
*
* @param socketFactory socket factory instance that will be registered for <code>https</code> scheme.
* @see SSLConnectionSocketFactory
*/
public Builder sslSocketFactory(LayeredConnectionSocketFactory socketFactory) {
this.sslSocketFactory = socketFactory;
return this;
}

/**
* @param socketFactory The socket factory instance that will be registered for <code>http</code> scheme.
* Sets the socket factory that will be used by <b>sync</b> client for HTTPS scheme.
* <p>
* <code>PlainConnectionSocketFactory.getSocketFactory()</code> is used by default.
* </p>
*
* @param socketFactory socket factory instance that will be registered for <code>http</code> scheme.
* @see PlainConnectionSocketFactory
*/
public Builder plainSocketFactory(ConnectionSocketFactory socketFactory) {
this.plainSocketFactory = socketFactory;
return this;
}

/**
* Sets the socket factory that will be used by <b>async</b> client for HTTP scheme.
* <p>
* <code>NoopIOSessionStrategy.INSTANCE</code> is used by default.
* </p>
*
* @param httpIOSessionStrategy SchemeIOSessionStrategy instance that will be registered for <code>http</code> scheme.
* @see NoopIOSessionStrategy
*/
public Builder httpIOSessionStrategy(SchemeIOSessionStrategy httpIOSessionStrategy) {
this.httpIOSessionStrategy = httpIOSessionStrategy;
return this;
}

/**
* Sets the socket factory that will be used by <b>async</b> client for HTTPS scheme.
* <p>
* <code>SSLIOSessionStrategy.getSystemDefaultStrategy()</code> is used by default.
* </p>
*
* @param httpsIOSessionStrategy SchemeIOSessionStrategy instance that will be registered for <code>https</code> scheme.
* @see SSLIOSessionStrategy
*/
public Builder httpsIOSessionStrategy(SchemeIOSessionStrategy httpsIOSessionStrategy) {
this.httpsIOSessionStrategy = httpsIOSessionStrategy;
return this;
}

public Builder proxy(HttpHost proxy) {
return proxy(proxy, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package io.searchbox.client.config.idle;

import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.nio.conn.NHttpClientConnectionManager;

import java.util.concurrent.TimeUnit;

public class HttpReapableConnectionManager implements ReapableConnectionManager {
private final HttpClientConnectionManager connectionManager;
private final NHttpClientConnectionManager nConnectionManager;

public HttpReapableConnectionManager(HttpClientConnectionManager connectionManager, NHttpClientConnectionManager nConnectionManager) {
if(connectionManager == null || nConnectionManager == null) throw new IllegalArgumentException();

public HttpReapableConnectionManager(HttpClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
this.nConnectionManager = nConnectionManager;
}

@Override
public void closeIdleConnections(long idleTimeout, TimeUnit unit) {
connectionManager.closeIdleConnections(idleTimeout, unit);
nConnectionManager.closeIdleConnections(idleTimeout, unit);
}
}
15 changes: 11 additions & 4 deletions jest/src/test/java/io/searchbox/client/JestClientFactoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.junit.Test;

import static org.junit.Assert.*;
Expand Down Expand Up @@ -70,16 +72,21 @@ public void multiThreadedClientCreation() {
JestHttpClient jestClient = (JestHttpClient) factory.getObject();

assertTrue(jestClient != null);
assertNotNull(jestClient.getAsyncClient());
assertEquals(jestClient.getServerPoolSize(), 1);
assertEquals("server list should contain localhost:9200", "http://localhost:9200", jestClient.getNextServer());

final HttpClientConnectionManager connectionManager = factory.getConnectionManager();
assertTrue(connectionManager instanceof PoolingHttpClientConnectionManager);
assertEquals(10, ((PoolingHttpClientConnectionManager) connectionManager).getDefaultMaxPerRoute());
assertEquals(20, ((PoolingHttpClientConnectionManager) connectionManager).getMaxTotal());
assertEquals(5, ((PoolingHttpClientConnectionManager) connectionManager).getMaxPerRoute(routeOne));
assertEquals(6, ((PoolingHttpClientConnectionManager) connectionManager).getMaxPerRoute(routeTwo));

assertEquals(jestClient.getServerPoolSize(), 1);
assertEquals("server list should contain localhost:9200",
"http://localhost:9200", jestClient.getNextServer());
final NHttpClientConnectionManager nConnectionManager = factory.getAsyncConnectionManager();
assertTrue(nConnectionManager instanceof PoolingNHttpClientConnectionManager);
assertEquals(10, ((PoolingNHttpClientConnectionManager) nConnectionManager).getDefaultMaxPerRoute());
assertEquals(20, ((PoolingNHttpClientConnectionManager) nConnectionManager).getMaxTotal());
assertEquals(5, ((PoolingNHttpClientConnectionManager) nConnectionManager).getMaxPerRoute(routeOne));
assertEquals(6, ((PoolingNHttpClientConnectionManager) nConnectionManager).getMaxPerRoute(routeTwo));
}
}

0 comments on commit 563b658

Please sign in to comment.