Skip to content

Commit

Permalink
Merge pull request Netflix#1539 from justinjose28/master
Browse files Browse the repository at this point in the history
Fixed a potential bug in metrics-event-stream-jaxrs
  • Loading branch information
mattrjacobs authored Apr 19, 2017
2 parents a50fde2 + 0d0a391 commit df912b2
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public abstract class AbstractHystrixStreamController {

private final int pausePollerThreadDelayInMs;

private static final AtomicInteger concurrentConnections = new AtomicInteger(0);

protected AbstractHystrixStreamController(Observable<String> sampleStream) {
this(sampleStream, DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
}
Expand All @@ -57,9 +55,7 @@ protected AbstractHystrixStreamController(Observable<String> sampleStream, int p

protected abstract int getMaxNumberConcurrentConnectionsAllowed();

protected final AtomicInteger getCurrentConnections() {
return concurrentConnections;
}
protected abstract AtomicInteger getCurrentConnections();

/**
* Maintain an open connection with the client. On initial connection send latest data of each requested event type and subsequently send all changes for each requested event type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.hystrix.contrib.metrics.controller;

import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
Expand Down Expand Up @@ -47,6 +49,7 @@
@Path("/hystrix/config.stream")
public class HystrixConfigSseController extends AbstractHystrixStreamController {

private static final AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixConfigSseController() {
Expand All @@ -68,4 +71,10 @@ protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
protected AtomicInteger getCurrentConnections() {
return concurrentConnections;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.hystrix.contrib.metrics.controller;

import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
Expand Down Expand Up @@ -47,6 +49,7 @@
@Path("/hystrix.stream")
public class HystrixMetricsStreamController extends AbstractHystrixStreamController {

private static final AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixMetricsStreamController() {
Expand All @@ -67,5 +70,9 @@ public Response getStream() {
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}
@Override
protected AtomicInteger getCurrentConnections() {
return concurrentConnections;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.hystrix.contrib.metrics.controller;

import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
Expand Down Expand Up @@ -48,6 +50,7 @@
@Path("/hystrix/request.stream")
public class HystrixRequestEventsSseController extends AbstractHystrixStreamController {

private static final AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixRequestEventsSseController() {
Expand All @@ -69,4 +72,8 @@ protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
protected AtomicInteger getCurrentConnections() {
return concurrentConnections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.hystrix.contrib.metrics.controller;

import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
Expand Down Expand Up @@ -47,6 +49,7 @@
@Path("/hystrix/utilization.stream")
public class HystrixUtilizationSseController extends AbstractHystrixStreamController {

private static final AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixUtilizationSseController() {
Expand All @@ -67,5 +70,10 @@ public Response getStream() {
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
protected AtomicInteger getCurrentConnections() {
return concurrentConnections;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ public void call(Subscriber<? super String> subscriber) {
}).subscribeOn(Schedulers.computation());

private AbstractHystrixStreamController sse = new AbstractHystrixStreamController(streamOfOnNexts) {

private final AtomicInteger concurrentConnections = new AtomicInteger(0);
@Override
protected int getMaxNumberConcurrentConnectionsAllowed() {
return 2;
}
@Override
protected AtomicInteger getCurrentConnections() {
return concurrentConnections;
}

};

Expand Down

0 comments on commit df912b2

Please sign in to comment.