From e712bb7c69c18f2acd572650a3364a0869196337 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Tue, 21 Jun 2016 15:18:31 -0700 Subject: [PATCH] Removed delay parameter from SSE metrics servlets. This will allow streams to get shared and do less work --- .../stream/HystrixConfigSseServlet.java | 22 ++--------- .../HystrixConfigurationJsonStream.java | 15 +++---- .../stream/HystrixSampleSseServlet.java | 39 +++++++++---------- .../stream/HystrixUtilizationJsonStream.java | 15 +++---- .../stream/HystrixUtilizationSseServlet.java | 23 ++--------- .../stream/HystrixConfigSseServletTest.java | 23 +++-------- 6 files changed, 48 insertions(+), 89 deletions(-) diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java index 236a67b1f..df146e987 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java @@ -18,8 +18,8 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; import rx.Observable; -import rx.functions.Func1; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -49,25 +49,16 @@ public class HystrixConfigSseServlet extends HystrixSampleSseServlet> createStream) { - this.jsonStream = new HystrixConfigurationJsonStream(createStream); + super(HystrixConfigurationStream.getInstance().observe()); } - @Override - int getDefaultDelayInMilliseconds() { - return DEFAULT_ONNEXT_DELAY_IN_MS; + /* package-private */ HystrixConfigSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + super(sampleStream, pausePollerThreadDelayInMs); } @Override @@ -90,11 +81,6 @@ protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - @Override - protected Observable getStream(int delay) { - return jsonStream.observe(delay); - } - @Override protected String convertToString(HystrixConfiguration config) throws IOException { return HystrixConfigurationJsonStream.convertToString(config); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java index 7d0e818ff..12f5f3d42 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java @@ -46,15 +46,17 @@ public class HystrixConfigurationJsonStream { private static final JsonFactory jsonFactory = new JsonFactory(); private final Func1> streamGenerator; + @Deprecated //since 1.5.4 public HystrixConfigurationJsonStream() { this.streamGenerator = new Func1>() { @Override public Observable call(Integer delay) { - return new HystrixConfigurationStream(delay).observe(); + return HystrixConfigurationStream.getInstance().observe(); } }; } + @Deprecated //since 1.5.4 public HystrixConfigurationJsonStream(Func1> streamGenerator) { this.streamGenerator = streamGenerator; } @@ -172,7 +174,8 @@ public static String convertToString(HystrixConfiguration config) throws IOExcep } /** - * @deprecated Not for public use. This prevents stream-sharing. Please use {@link HystrixConfigurationStream#observe()} + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixConfigurationStream#observe()} * @param delay interval between data emissions * @return sampled utilization as Java object, taken on a timer */ @@ -182,7 +185,9 @@ public Observable observe(int delay) { } /** - * @deprecated Not for public use. This prevents stream-sharing. Please use {@link #observeJson()} + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixConfigurationStream#observe()} + * and you can map to JSON string via {@link HystrixConfigurationJsonStream#convertToString(HystrixConfiguration)} * @param delay interval between data emissions * @return sampled utilization as JSON string, taken on a timer */ @@ -190,8 +195,4 @@ public Observable observe(int delay) { public Observable observeJson(int delay) { return streamGenerator.call(delay).map(convertToJson); } - - public Observable observeJson() { - return HystrixConfigurationStream.getInstance().observe().map(convertToJson); - } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java index 1353244bb..2882c0881 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java @@ -33,15 +33,27 @@ /** */ public abstract class HystrixSampleSseServlet extends HttpServlet { + protected final Observable sampleStream; private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class); + //wake up occasionally and check that poller is still alive. this value controls how often + private static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500; + + private final int pausePollerThreadDelayInMs; + /* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */ private static volatile boolean isDestroyed = false; - private static final String DELAY_REQ_PARAM_NAME = "delay"; + protected HystrixSampleSseServlet(Observable sampleStream) { + this.sampleStream = sampleStream; + this.pausePollerThreadDelayInMs = DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS; + } - abstract int getDefaultDelayInMilliseconds(); + protected HystrixSampleSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + this.sampleStream = sampleStream; + this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs; + } abstract int getMaxNumberConcurrentConnectionsAllowed(); @@ -51,7 +63,7 @@ public abstract class HystrixSampleSseServlet extends HttpServlet { protected abstract void decrementCurrentConcurrentConnections(); - protected abstract Observable getStream(int delay); + //protected abstract Observable getStream(); protected abstract String convertToString(SampleData sampleData) throws IOException; @@ -67,19 +79,6 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t } } - /* package-private */ - int getDelayFromHttpRequest(HttpServletRequest req) { - try { - String delay = req.getParameter(DELAY_REQ_PARAM_NAME); - if (delay != null) { - return Math.max(Integer.parseInt(delay), 1); - } - } catch (Throwable ex) { - //silently fail - } - return getDefaultDelayInMilliseconds(); - } - /** * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at @@ -125,8 +124,6 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse if (numberConnections > maxNumberConnectionsAllowed) { response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed); } else { - int delay = getDelayFromHttpRequest(request); - /* initialize response */ response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); @@ -134,11 +131,11 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse final PrintWriter writer = response.getWriter(); - Observable sampledStream = getStream(delay); + //Observable sampledStream = getStream(); //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext - sampleSubscription = sampledStream + sampleSubscription = sampleStream .observeOn(Schedulers.io()) .subscribe(new Subscriber() { @Override @@ -180,7 +177,7 @@ public void onNext(SampleData sampleData) { while (moreDataWillBeSent.get() && !isDestroyed) { try { - Thread.sleep(delay); + Thread.sleep(pausePollerThreadDelayInMs); } catch (InterruptedException e) { moreDataWillBeSent.set(false); } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java index de77d4824..c7c8dfafe 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java @@ -55,15 +55,17 @@ public String call(HystrixUtilization utilization) { } }; + @Deprecated //since 1.5.4 public HystrixUtilizationJsonStream() { this.streamGenerator = new Func1>() { @Override public Observable call(Integer delay) { - return new HystrixUtilizationStream(delay).observe(); + return HystrixUtilizationStream.getInstance().observe(); } }; } + @Deprecated //since 1.5.4 public HystrixUtilizationJsonStream(Func1> streamGenerator) { this.streamGenerator = streamGenerator; } @@ -112,7 +114,8 @@ protected static String convertToJson(HystrixUtilization utilization) throws IOE } /** - * @deprecated Not for public use. This prevents stream-sharing. Please use {@link HystrixUtilizationStream#observe()} + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixUtilizationStream#observe()} * @param delay interval between data emissions * @return sampled utilization as Java object, taken on a timer */ @@ -122,15 +125,13 @@ public Observable observe(int delay) { } /** - * @deprecated Not for public use. This prevents stream-sharing. Please use {@link #observeJson()} + * @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared. + * Please use {@link HystrixUtilizationStream#observe()} + * and the {@link #convertToJson(HystrixUtilization)} method * @param delay interval between data emissions * @return sampled utilization as JSON string, taken on a timer */ public Observable observeJson(int delay) { return streamGenerator.call(delay).map(convertToJsonFunc); } - - public Observable observeJson() { - return HystrixUtilizationStream.getInstance().observe().map(convertToJsonFunc); - } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java index 22dee84d8..19c6a85df 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java @@ -18,8 +18,8 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.metric.sample.HystrixUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; import rx.Observable; -import rx.functions.Func1; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -49,27 +49,17 @@ public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet> createStream) { - this.jsonStream = new HystrixUtilizationJsonStream(createStream); - } - - @Override - int getDefaultDelayInMilliseconds() { - return DEFAULT_ONNEXT_DELAY_IN_MS; + /* package-private */ HystrixUtilizationSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + super(sampleStream, pausePollerThreadDelayInMs); } @Override @@ -92,11 +82,6 @@ protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - @Override - protected Observable getStream(int delay) { - return jsonStream.observe(delay); - } - @Override protected String convertToString(HystrixUtilization utilization) throws IOException { return HystrixUtilizationJsonStream.convertToJson(utilization); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java index 25effdc77..dfc868b4c 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java @@ -16,7 +16,6 @@ package com.netflix.hystrix.contrib.sample.stream; import com.netflix.hystrix.config.HystrixConfiguration; -import com.netflix.hystrix.config.HystrixConfigurationStream; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,15 +57,6 @@ public HystrixConfiguration call(Long timestamp) { } }); - private Func1> generateStream(final Observable o) { - return new Func1>() { - @Override - public Observable call(Integer integer) { - return o; - } - }; - } - private final Observable streamOfOnNextThenOnError = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { @@ -109,7 +99,7 @@ public void tearDown() { @Test public void shutdownServletShouldRejectRequests() throws ServletException, IOException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10); try { servlet.init(); } catch (ServletException ex) { @@ -126,7 +116,7 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc @Test public void testConfigDataWithInfiniteOnNextStream() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10); try { servlet.init(); } catch (ServletException ex) { @@ -182,13 +172,13 @@ public void run() { Thread.sleep(100); System.out.println("WRITES : " + writes.get()); - assertEquals(9, writes.get()); + assertTrue(writes.get() >= 9); assertEquals(0, servlet.getNumberCurrentConnections()); } @Test public void testConfigDataWithStreamOnError() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnError)); + servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnError, 10); try { servlet.init(); } catch (ServletException ex) { @@ -241,7 +231,7 @@ public void run() { @Test public void testConfigDataWithStreamOnCompleted() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnCompleted)); + servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnCompleted, 10); try { servlet.init(); } catch (ServletException ex) { @@ -294,7 +284,7 @@ public void run() { @Test public void testConfigDataWithIoExceptionOnWrite() throws IOException, InterruptedException { - servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10); try { servlet.init(); } catch (ServletException ex) { @@ -303,7 +293,6 @@ public void testConfigDataWithIoExceptionOnWrite() throws IOException, Interrupt final AtomicInteger writes = new AtomicInteger(0); - when(mockReq.getParameter("delay")).thenReturn("100"); when(mockResp.getWriter()).thenReturn(mockPrintWriter); Mockito.doAnswer(new Answer() { @Override