Skip to content

Commit

Permalink
Renaming thread pool max concurrency stream, for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jan 30, 2016
1 parent dd09d8d commit 16ea656
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.metric.consumer.CumulativeThreadPoolEventCounterStream;
import com.netflix.hystrix.metric.consumer.RollingThreadPoolConcurrencyStream;
import com.netflix.hystrix.metric.consumer.RollingThreadPoolMaxConcurrencyStream;
import com.netflix.hystrix.metric.consumer.RollingThreadPoolEventCounterStream;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool;
import com.netflix.servo.DefaultMonitorRegistry;
Expand Down Expand Up @@ -104,7 +104,7 @@ public void initialize() {
DefaultMonitorRegistry.getInstance().register(commandMetricsMonitor);
RollingThreadPoolEventCounterStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
CumulativeThreadPoolEventCounterStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
RollingThreadPoolConcurrencyStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
RollingThreadPoolMaxConcurrencyStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.netflix.hystrix.metric.HystrixCommandCompletion;
import com.netflix.hystrix.metric.consumer.CumulativeThreadPoolEventCounterStream;
import com.netflix.hystrix.metric.consumer.RollingThreadPoolConcurrencyStream;
import com.netflix.hystrix.metric.consumer.RollingThreadPoolMaxConcurrencyStream;
import com.netflix.hystrix.metric.consumer.RollingThreadPoolEventCounterStream;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import org.slf4j.Logger;
Expand Down Expand Up @@ -141,7 +141,7 @@ public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {

private final RollingThreadPoolEventCounterStream rollingCounterStream;
private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
private final RollingThreadPoolConcurrencyStream rollingThreadPoolConcurrencyStream;
private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;

private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
super(null);
Expand All @@ -151,7 +151,7 @@ private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolE

rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
rollingThreadPoolConcurrencyStream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, properties);
rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
}

/**
Expand Down Expand Up @@ -333,7 +333,7 @@ public void markThreadCompletion() {
* @return rolling max active threads
*/
public long getRollingMaxActiveThreads() {
return rollingThreadPoolConcurrencyStream.getLatestRollingMax();
return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,16 @@
import java.util.concurrent.ConcurrentMap;

/**
* Maintains a stream of concurrency distributions for a given Command.
* Maintains a stream of the maximum concurrency seen by this command.
*
* There are 2 related streams that may be consumed:
*
* A) A rolling window of the maximum concurrency seen by this command.
* B) A histogram of sampled concurrency seen by this command.
*
* A) gets calculated using a rolling window of t1 milliseconds. This window has b buckets.
* This gets calculated using a rolling window of t1 milliseconds. This window has b buckets.
* Therefore, a new rolling-max is produced every t2 (=t1/b) milliseconds
* t1 = {@link HystrixCommandProperties#metricsRollingStatisticalWindowInMilliseconds()}
* b = {@link HystrixCommandProperties#metricsRollingStatisticalWindowBuckets()}
*
* This value gets cached in this class. It may be queried using {@link #getLatestRollingMax()}
* This value is stable - there's no peeking into a bucket until it is emitted
*
* B) gets calculated by sampling the actual concurrency at some rate higher than the bucket-rolling frequency.
* Each sample gets stored in a histogram. At the moment, there's no bucketing or windowing on this stream.
* To control the emission rate, the histogram is emitted on a bucket-roll.
*
* This value is not cached. You need to consume this stream directly if you want to use it.
*
* Both A) and B) are stable - there's no peeking into a bucket until it is emitted
*
* LARGE CAVEAT:
* This will change after 1.5.0-RC.1. This was an experiment that proved too costly. Rely on this at your own peril
*/
public class RollingCommandMaxConcurrencyStream extends RollingConcurrencyStream {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@
*
* This is a stable value - there's no peeking into a bucket until it is emitted
*/
public class RollingThreadPoolConcurrencyStream extends RollingConcurrencyStream {
public class RollingThreadPoolMaxConcurrencyStream extends RollingConcurrencyStream {

private static final ConcurrentMap<String, RollingThreadPoolConcurrencyStream> streams = new ConcurrentHashMap<String, RollingThreadPoolConcurrencyStream>();
private static final ConcurrentMap<String, RollingThreadPoolMaxConcurrencyStream> streams = new ConcurrentHashMap<String, RollingThreadPoolMaxConcurrencyStream>();

public static RollingThreadPoolConcurrencyStream getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties properties) {
public static RollingThreadPoolMaxConcurrencyStream getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties properties) {
final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;

return getInstance(threadPoolKey, numCounterBuckets, counterBucketSizeInMs);
}

public static RollingThreadPoolConcurrencyStream getInstance(HystrixThreadPoolKey threadPoolKey, int numBuckets, int bucketSizeInMs) {
RollingThreadPoolConcurrencyStream initialStream = streams.get(threadPoolKey.name());
public static RollingThreadPoolMaxConcurrencyStream getInstance(HystrixThreadPoolKey threadPoolKey, int numBuckets, int bucketSizeInMs) {
RollingThreadPoolMaxConcurrencyStream initialStream = streams.get(threadPoolKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (RollingThreadPoolConcurrencyStream.class) {
RollingThreadPoolConcurrencyStream existingStream = streams.get(threadPoolKey.name());
synchronized (RollingThreadPoolMaxConcurrencyStream.class) {
RollingThreadPoolMaxConcurrencyStream existingStream = streams.get(threadPoolKey.name());
if (existingStream == null) {
RollingThreadPoolConcurrencyStream newStream =
new RollingThreadPoolConcurrencyStream(threadPoolKey, numBuckets, bucketSizeInMs);
RollingThreadPoolMaxConcurrencyStream newStream =
new RollingThreadPoolMaxConcurrencyStream(threadPoolKey, numBuckets, bucketSizeInMs);
streams.putIfAbsent(threadPoolKey.name(), newStream);
return newStream;
} else {
Expand All @@ -69,7 +69,7 @@ public static void reset() {
streams.clear();
}

public RollingThreadPoolConcurrencyStream(final HystrixThreadPoolKey threadPoolKey, final int numBuckets, final int bucketSizeInMs) {
public RollingThreadPoolMaxConcurrencyStream(final HystrixThreadPoolKey threadPoolKey, final int numBuckets, final int bucketSizeInMs) {
super(HystrixThreadPoolStartStream.getInstance(threadPoolKey), numBuckets, bucketSizeInMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

import static org.junit.Assert.*;

public class RollingThreadPoolConcurrencyStreamTest extends CommandStreamTest {
RollingThreadPoolConcurrencyStream stream;
public class RollingThreadPoolMaxConcurrencyStreamTest extends CommandStreamTest {
RollingThreadPoolMaxConcurrencyStream stream;
HystrixRequestContext context;
ExecutorService threadPool;

Expand Down Expand Up @@ -81,7 +81,7 @@ public void testEmptyStreamProducesZeros() {
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-A");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-A");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-A");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -102,7 +102,7 @@ public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedExcept
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-B");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-B");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-B");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -124,7 +124,7 @@ public void testStartsAndEndsInSameBucketSemaphoreIsolated() throws InterruptedE
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-C");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-C");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-C");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testOneCommandCarriesOverToNextBucket() throws InterruptedException
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-D");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-D");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-D");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -188,7 +188,7 @@ public void testMultipleCommandsCarryOverMultipleBuckets() throws InterruptedExc
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-E");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-E");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-E");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testMultipleCommandsCarryOverMultipleBucketsForMultipleThreadPools()
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-X");
HystrixCommandKey keyX = HystrixCommandKey.Factory.asKey("RollingConcurrency-X");
HystrixCommandKey keyY = HystrixCommandKey.Factory.asKey("RollingConcurrency-Y");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -267,7 +267,7 @@ public void testMultipleCommandsCarryOverMultipleBucketsAndThenAgeOut() throws I
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-F");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-F");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-F");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testConcurrencyStreamProperlyFiltersOutResponseFromCache() throws In
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-G");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-G");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-G");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -325,7 +325,7 @@ public void testConcurrencyStreamProperlyFiltersOutShortCircuits() throws Interr
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-H");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-H");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-H");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -367,7 +367,7 @@ public void testConcurrencyStreamProperlyFiltersOutSemaphoreRejections() throws
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-I");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-I");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-I");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testConcurrencyStreamProperlyFiltersOutThreadPoolRejections() throws
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-J");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-J");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-J");
stream = RollingThreadPoolConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();

final CountDownLatch latch = new CountDownLatch(1);
Expand Down

0 comments on commit 16ea656

Please sign in to comment.