Skip to content

Commit

Permalink
separate HTTP pool for cancellation requests
Browse files Browse the repository at this point in the history
* reduces contention between queries and cancellation requests
* more aggressive timeouts for cancellation requests
  • Loading branch information
xvrl committed Mar 17, 2016
1 parent 60fc5d9 commit 1718a72
Showing 1 changed file with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query";
private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper";

private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
private static final int MAX_QUEUED_CANCELLATIONS = 64;

private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception)
throws IOException
{
Expand All @@ -93,6 +96,8 @@ private static void handleException(HttpServletResponse response, ObjectMapper o
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;

private HttpClient broadcastClient;

public AsyncQueryForwardingServlet(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
Expand All @@ -112,6 +117,23 @@ public AsyncQueryForwardingServlet(
this.requestLogger = requestLogger;
}

@Override
public void init() throws ServletException
{
// separate client with more aggressive connection timeouts
// to prevent cancellations requests from blocking queries
broadcastClient = httpClientProvider.get();
broadcastClient.setConnectTimeout(CANCELLATION_TIMEOUT_MILLIS);
broadcastClient.setMaxRequestsQueuedPerDestination(MAX_QUEUED_CANCELLATIONS);

try {
broadcastClient.start();
} catch(Exception e) {
throw new ServletException(e);
}
super.init();
}

@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
Expand All @@ -132,9 +154,10 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
// to keep the code simple, the proxy servlet will also send a request to one of the default brokers
if (!host.equals(defaultHost)) {
// issue async requests
getHttpClient()
broadcastClient
.newRequest(rewriteURI(request, host))
.method(HttpMethod.DELETE)
.timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.send(
new Response.CompleteListener()
{
Expand Down

0 comments on commit 1718a72

Please sign in to comment.