Skip to content

Commit

Permalink
Merge pull request Netflix#1254 from mattrjacobs/stream-sharing
Browse files Browse the repository at this point in the history
Allow multiple consumers of sample data to only trigger work once
  • Loading branch information
mattrjacobs authored Jun 22, 2016
2 parents 7c9e48b + e712bb7 commit 2747e91
Show file tree
Hide file tree
Showing 12 changed files with 868 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -49,25 +49,16 @@ public class HystrixConfigSseServlet extends HystrixSampleSseServlet<HystrixConf

private static final long serialVersionUID = -3599771169762858235L;

private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 10000;

private final HystrixConfigurationJsonStream jsonStream;

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixConfigSseServlet() {
this.jsonStream = new HystrixConfigurationJsonStream();
}

/* package-private */ HystrixConfigSseServlet(Func1<Integer, Observable<HystrixConfiguration>> createStream) {
this.jsonStream = new HystrixConfigurationJsonStream(createStream);
super(HystrixConfigurationStream.getInstance().observe());
}

@Override
int getDefaultDelayInMilliseconds() {
return DEFAULT_ONNEXT_DELAY_IN_MS;
/* package-private */ HystrixConfigSseServlet(Observable<HystrixConfiguration> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -90,11 +81,6 @@ protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected Observable<HystrixConfiguration> getStream(int delay) {
return jsonStream.observe(delay);
}

@Override
protected String convertToString(HystrixConfiguration config) throws IOException {
return HystrixConfigurationJsonStream.convertToString(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ public class HystrixConfigurationJsonStream {
private static final JsonFactory jsonFactory = new JsonFactory();
private final Func1<Integer, Observable<HystrixConfiguration>> streamGenerator;

@Deprecated //since 1.5.4
public HystrixConfigurationJsonStream() {
this.streamGenerator = new Func1<Integer, Observable<HystrixConfiguration>>() {
@Override
public Observable<HystrixConfiguration> call(Integer delay) {
return new HystrixConfigurationStream(delay).observe();
return HystrixConfigurationStream.getInstance().observe();
}
};
}

@Deprecated //since 1.5.4
public HystrixConfigurationJsonStream(Func1<Integer, Observable<HystrixConfiguration>> streamGenerator) {
this.streamGenerator = streamGenerator;
}
Expand Down Expand Up @@ -171,10 +173,25 @@ public static String convertToString(HystrixConfiguration config) throws IOExcep
return jsonString.getBuffer().toString();
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixConfigurationStream#observe()}
* @param delay interval between data emissions
* @return sampled utilization as Java object, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<HystrixConfiguration> observe(int delay) {
return streamGenerator.call(delay);
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixConfigurationStream#observe()}
* and you can map to JSON string via {@link HystrixConfigurationJsonStream#convertToString(HystrixConfiguration)}
* @param delay interval between data emissions
* @return sampled utilization as JSON string, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<String> observeJson(int delay) {
return streamGenerator.call(delay).map(convertToJson);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,27 @@
/**
*/
public abstract class HystrixSampleSseServlet<SampleData> extends HttpServlet {
protected final Observable<SampleData> sampleStream;

private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class);

//wake up occasionally and check that poller is still alive. this value controls how often
private static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;

private final int pausePollerThreadDelayInMs;

/* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */
private static volatile boolean isDestroyed = false;

private static final String DELAY_REQ_PARAM_NAME = "delay";
protected HystrixSampleSseServlet(Observable<SampleData> sampleStream) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS;
}

abstract int getDefaultDelayInMilliseconds();
protected HystrixSampleSseServlet(Observable<SampleData> sampleStream, int pausePollerThreadDelayInMs) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
}

abstract int getMaxNumberConcurrentConnectionsAllowed();

Expand All @@ -51,7 +63,7 @@ public abstract class HystrixSampleSseServlet<SampleData> extends HttpServlet {

protected abstract void decrementCurrentConcurrentConnections();

protected abstract Observable<SampleData> getStream(int delay);
//protected abstract Observable<SampleData> getStream();

protected abstract String convertToString(SampleData sampleData) throws IOException;

Expand All @@ -67,19 +79,6 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
}
}

/* package-private */
int getDelayFromHttpRequest(HttpServletRequest req) {
try {
String delay = req.getParameter(DELAY_REQ_PARAM_NAME);
if (delay != null) {
return Math.max(Integer.parseInt(delay), 1);
}
} catch (Throwable ex) {
//silently fail
}
return getDefaultDelayInMilliseconds();
}

/**
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
Expand Down Expand Up @@ -125,20 +124,18 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse
if (numberConnections > maxNumberConnectionsAllowed) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
} else {
int delay = getDelayFromHttpRequest(request);

/* initialize response */
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

final PrintWriter writer = response.getWriter();

Observable<SampleData> sampledStream = getStream(delay);
//Observable<SampleData> sampledStream = getStream();

//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
sampleSubscription = sampledStream
sampleSubscription = sampleStream
.observeOn(Schedulers.io())
.subscribe(new Subscriber<SampleData>() {
@Override
Expand Down Expand Up @@ -180,7 +177,7 @@ public void onNext(SampleData sampleData) {

while (moreDataWillBeSent.get() && !isDestroyed) {
try {
Thread.sleep(delay);
Thread.sleep(pausePollerThreadDelayInMs);
} catch (InterruptedException e) {
moreDataWillBeSent.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ public String call(HystrixUtilization utilization) {
}
};

@Deprecated //since 1.5.4
public HystrixUtilizationJsonStream() {
this.streamGenerator = new Func1<Integer, Observable<HystrixUtilization>>() {
@Override
public Observable<HystrixUtilization> call(Integer delay) {
return new HystrixUtilizationStream(delay).observe();
return HystrixUtilizationStream.getInstance().observe();
}
};
}

@Deprecated //since 1.5.4
public HystrixUtilizationJsonStream(Func1<Integer, Observable<HystrixUtilization>> streamGenerator) {
this.streamGenerator = streamGenerator;
}
Expand Down Expand Up @@ -111,10 +113,24 @@ protected static String convertToJson(HystrixUtilization utilization) throws IOE
return jsonString.getBuffer().toString();
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixUtilizationStream#observe()}
* @param delay interval between data emissions
* @return sampled utilization as Java object, taken on a timer
*/
@Deprecated //deprecated as of 1.5.4
public Observable<HystrixUtilization> observe(int delay) {
return streamGenerator.call(delay);
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixUtilizationStream#observe()}
* and the {@link #convertToJson(HystrixUtilization)} method
* @param delay interval between data emissions
* @return sampled utilization as JSON string, taken on a timer
*/
public Observable<String> observeJson(int delay) {
return streamGenerator.call(delay).map(convertToJsonFunc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.metric.sample.HystrixUtilization;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -49,27 +49,17 @@ public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet<Hystri

private static final long serialVersionUID = -7812908330777694972L;

private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 100;

private final HystrixUtilizationJsonStream jsonStream;

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixUtilizationSseServlet() {
this.jsonStream = new HystrixUtilizationJsonStream();

super(HystrixUtilizationStream.getInstance().observe());
}

/* package-private */ HystrixUtilizationSseServlet(Func1<Integer, Observable<HystrixUtilization>> createStream) {
this.jsonStream = new HystrixUtilizationJsonStream(createStream);
}

@Override
int getDefaultDelayInMilliseconds() {
return DEFAULT_ONNEXT_DELAY_IN_MS;
/* package-private */ HystrixUtilizationSseServlet(Observable<HystrixUtilization> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -92,11 +82,6 @@ protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected Observable<HystrixUtilization> getStream(int delay) {
return jsonStream.observe(delay);
}

@Override
protected String convertToString(HystrixUtilization utilization) throws IOException {
return HystrixUtilizationJsonStream.convertToJson(utilization);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.netflix.hystrix.contrib.sample.stream;

import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -58,15 +57,6 @@ public HystrixConfiguration call(Long timestamp) {
}
});

private Func1<Integer, Observable<HystrixConfiguration>> generateStream(final Observable<HystrixConfiguration> o) {
return new Func1<Integer, Observable<HystrixConfiguration>>() {
@Override
public Observable<HystrixConfiguration> call(Integer integer) {
return o;
}
};
}

private final Observable<HystrixConfiguration> streamOfOnNextThenOnError = Observable.create(new Observable.OnSubscribe<HystrixConfiguration>() {
@Override
public void call(Subscriber<? super HystrixConfiguration> subscriber) {
Expand Down Expand Up @@ -109,7 +99,7 @@ public void tearDown() {

@Test
public void shutdownServletShouldRejectRequests() throws ServletException, IOException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts));
servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand All @@ -126,7 +116,7 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc

@Test
public void testConfigDataWithInfiniteOnNextStream() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts));
servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand Down Expand Up @@ -182,13 +172,13 @@ public void run() {
Thread.sleep(100);

System.out.println("WRITES : " + writes.get());
assertEquals(9, writes.get());
assertTrue(writes.get() >= 9);
assertEquals(0, servlet.getNumberCurrentConnections());
}

@Test
public void testConfigDataWithStreamOnError() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnError));
servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnError, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand Down Expand Up @@ -241,7 +231,7 @@ public void run() {

@Test
public void testConfigDataWithStreamOnCompleted() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnCompleted));
servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnCompleted, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand Down Expand Up @@ -294,7 +284,7 @@ public void run() {

@Test
public void testConfigDataWithIoExceptionOnWrite() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts));
servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand All @@ -303,7 +293,6 @@ public void testConfigDataWithIoExceptionOnWrite() throws IOException, Interrupt

final AtomicInteger writes = new AtomicInteger(0);

when(mockReq.getParameter("delay")).thenReturn("100");
when(mockResp.getWriter()).thenReturn(mockPrintWriter);
Mockito.doAnswer(new Answer<Void>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
import java.util.function.Supplier;

class EventStream implements Supplier<Observable<Payload>> {

private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500;
private final static int UTILIZATION_DATA_INTERVAL_IN_MS = 500;

private final Observable<Payload> source;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

Expand All @@ -57,7 +53,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {

switch (eventStreamEnum) {
case CONFIG_STREAM:
source = new HystrixConfigurationStream(CONFIGURATION_DATA_INTERVAL_IN_MS)
source = HystrixConfigurationStream.getInstance()
.observe()
.map(SerialHystrixConfiguration::toBytes)
.map(SerialHystrixMetric::toPayload);
Expand All @@ -69,7 +65,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {
.map(SerialHystrixMetric::toPayload);
break;
case UTILIZATION_STREAM:
source = new HystrixUtilizationStream(UTILIZATION_DATA_INTERVAL_IN_MS)
source = HystrixUtilizationStream.getInstance()
.observe()
.map(SerialHystrixUtilization::toBytes)
.map(SerialHystrixMetric::toPayload);
Expand Down
Loading

0 comments on commit 2747e91

Please sign in to comment.