Skip to content

Commit

Permalink
davids comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hsaraogi committed Jan 11, 2018
1 parent 0a3acf7 commit f0d2e8d
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
public abstract class QosClientLimitsConfig implements Serializable {
// These numbers come from the metrics released and are scaled by a factor of 2,
// we should probably scale up more or set to no limit?
public static final long BYTES_READ_PER_SECOND_PER_CLIENT = 500_000L;
public static final long BYTES_WRITTEN_PER_SECOND_PER_CLIENT = 1_000_000L;
public static final long BYTES_READ_PER_SECOND_PER_CLIENT = 50_000_000L; // 50 MB/s
public static final long BYTES_WRITTEN_PER_SECOND_PER_CLIENT = 10_000_000L; // 10 MB/s

@Value.Default
public QosLimitsConfig limits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.palantir.atlasdb.qos.config;

import java.util.List;
import java.util.Collection;
import java.util.function.Supplier;

import com.palantir.atlasdb.qos.ratelimit.guava.RateLimiter;
Expand All @@ -27,15 +27,17 @@
public class SimpleThrottlingStrategy implements ThrottlingStrategy {
private static final double ONCE_EVERY_HUNDRED_SECONDS = 0.01;
private final RateLimiter rateLimiter;
@GuardedBy("this") private double multiplier;
@GuardedBy("this")
private double multiplier;

public SimpleThrottlingStrategy() {
this.rateLimiter = RateLimiter.create(ONCE_EVERY_HUNDRED_SECONDS);
this.multiplier = 1.0;
}

@Override
public double getClientLimitMultiplier(Supplier<List<CassandraHealthMetricMeasurement>> metricMeasurements) {
public double getClientLimitMultiplier(
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> metricMeasurements) {
if (shouldAdjust()) {
if (cassandraIsUnhealthy(metricMeasurements)) {
halveTheRateMultiplier();
Expand All @@ -59,7 +61,8 @@ private synchronized void halveTheRateMultiplier() {
multiplier = Math.max(0.1, multiplier * 0.5);
}

private boolean cassandraIsUnhealthy(Supplier<List<CassandraHealthMetricMeasurement>> metricMeasurements) {
private boolean cassandraIsUnhealthy(
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> metricMeasurements) {
return metricMeasurements.get().stream()
.anyMatch(metricMeasurement -> !metricMeasurement.isMeasurementWithinLimits());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.palantir.atlasdb.qos.config;

import java.util.List;
import java.util.Collection;
import java.util.function.Supplier;

public interface ThrottlingStrategy {
double getClientLimitMultiplier(Supplier<List<CassandraHealthMetricMeasurement>> metricMeasurements);
double getClientLimitMultiplier(
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> metricMeasurements);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.palantir.atlasdb.qos.ratelimit;

import java.util.List;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

Expand All @@ -29,7 +29,7 @@
import com.palantir.remoting3.clients.ClientConfigurations;
import com.palantir.remoting3.jaxrs.JaxRsClient;

public class CassandraMetricsClientLimitMultiplier implements ClientLimitMultiplier {
public final class CassandraMetricsClientLimitMultiplier implements ClientLimitMultiplier {
private final ThrottlingStrategy throttlingStrategy;
private final CassandraMetricMeasurementsLoader cassandraHealthMetrics;

Expand All @@ -50,7 +50,8 @@ public static ClientLimitMultiplier create(Supplier<QosCassandraMetricsRuntimeCo
installConfig.throttlingStrategy());

CassandraMetricMeasurementsLoader cassandraMetricMeasurementsLoader = new CassandraMetricMeasurementsLoader(
() -> runtimeConfig.get().cassandraHealthMetrics(), metricsService, Executors.newSingleThreadScheduledExecutor());
() -> runtimeConfig.get().cassandraHealthMetrics(), metricsService,
Executors.newSingleThreadScheduledExecutor());
return new CassandraMetricsClientLimitMultiplier(throttlingStrategy, cassandraMetricMeasurementsLoader);
}

Expand All @@ -62,7 +63,7 @@ public double getClientLimitMultiplier(QosPriority qosPriority) {
return throttlingStrategy.getClientLimitMultiplier(getCassandraHealthMetricMeasurements());
}

private Supplier<List<CassandraHealthMetricMeasurement>> getCassandraHealthMetricMeasurements() {
private Supplier<Collection<? extends CassandraHealthMetricMeasurement>> getCassandraHealthMetricMeasurements() {
return cassandraHealthMetrics::getCassandraHealthMetricMeasurements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.qos.ratelimit;

import com.palantir.atlasdb.qos.config.QosPriority;

public class OneReturningClientLimitMultiplier implements ClientLimitMultiplier {
public enum OneReturningClientLimitMultiplier implements ClientLimitMultiplier {
INSTANCE;

@Override
public double getClientLimitMultiplier(QosPriority qosPriority) {
return 1.0;
}

public static ClientLimitMultiplier create() {
return new OneReturningClientLimitMultiplier();
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class QosServiceTest {
@Test
public void defaultsToFixedLimit() {
QosClientConfigLoader qosClientConfigLoader = QosClientConfigLoader.create(ImmutableMap::of);
OneReturningClientLimitMultiplier oneReturningClientLimitMultiplier = new OneReturningClientLimitMultiplier();
OneReturningClientLimitMultiplier oneReturningClientLimitMultiplier =
OneReturningClientLimitMultiplier.INSTANCE;
resource = new QosResource(qosClientConfigLoader, oneReturningClientLimitMultiplier);
assertThat(QosClientLimitsConfig.BYTES_READ_PER_SECOND_PER_CLIENT).isEqualTo(resource.readLimit("foo"));
assertThat(QosClientLimitsConfig.BYTES_WRITTEN_PER_SECOND_PER_CLIENT).isEqualTo(resource.writeLimit("foo"));
Expand Down Expand Up @@ -84,7 +85,8 @@ public void canScaleDownLimits() {
.build()));
CassandraMetricsClientLimitMultiplier clientLimitMultiplier = mock(CassandraMetricsClientLimitMultiplier.class);
resource = new QosResource(qosClientConfigLoader, clientLimitMultiplier);
when(clientLimitMultiplier.getClientLimitMultiplier(QosPriority.MEDIUM)).thenReturn(1.0, 1.0, 0.5, 0.5, 0.25, 0.25);
when(clientLimitMultiplier.getClientLimitMultiplier(QosPriority.MEDIUM)).thenReturn(1.0, 1.0, 0.5, 0.5, 0.25,
0.25);
assertEquals(100L, resource.readLimit("foo"));
assertEquals(20L, resource.writeLimit("foo"));
assertEquals(50L, resource.readLimit("foo"));
Expand All @@ -105,7 +107,8 @@ public void canScaleDownAndScaleUpLimits() {
.build()));
CassandraMetricsClientLimitMultiplier clientLimitMultiplier = mock(CassandraMetricsClientLimitMultiplier.class);
resource = new QosResource(qosClientConfigLoader, clientLimitMultiplier);
when(clientLimitMultiplier.getClientLimitMultiplier(QosPriority.MEDIUM)).thenReturn(1.0, 1.0, 0.5, 0.5, 0.55, 0.55);
when(clientLimitMultiplier.getClientLimitMultiplier(QosPriority.MEDIUM)).thenReturn(1.0, 1.0, 0.5, 0.5, 0.55,
0.55);
assertEquals(100L, resource.readLimit("foo"));
assertEquals(20L, resource.writeLimit("foo"));
assertEquals(50L, resource.readLimit("foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand All @@ -38,7 +38,7 @@ public void clientLimitMultiplierIsOneForNoMetrics() throws Exception {
public void clientLimitMultiplierIsIncreasingForCassandraHealthyIndicatingMetrics() throws Exception {
SimpleThrottlingStrategy simpleThrottlingStrategy = new SimpleThrottlingStrategy();

Supplier<List<CassandraHealthMetricMeasurement>> metricMeasurements = () -> ImmutableList.of(
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> metricMeasurements = () -> ImmutableList.of(
getMetricMeasurement(5, 10, 8), getMetricMeasurement(4, 8, 6));

assertThat(simpleThrottlingStrategy.getClientLimitMultiplier(metricMeasurements)).isEqualTo(1.0);
Expand All @@ -48,7 +48,7 @@ public void clientLimitMultiplierIsIncreasingForCassandraHealthyIndicatingMetric
public void clientLimitMultiplierIsDecreasingForOneCassandraUnhealthyIndicatingMetrics() throws Exception {
SimpleThrottlingStrategy simpleThrottlingStrategy = new SimpleThrottlingStrategy();

Supplier<List<CassandraHealthMetricMeasurement>> metricMeasurements = () -> ImmutableList.of(
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> metricMeasurements = () -> ImmutableList.of(
getMetricMeasurement(5, 10, 15), getMetricMeasurement(4, 8, 6));

assertThat(simpleThrottlingStrategy.getClientLimitMultiplier(metricMeasurements)).isEqualTo(0.5);
Expand All @@ -58,7 +58,7 @@ public void clientLimitMultiplierIsDecreasingForOneCassandraUnhealthyIndicatingM
public void clientLimitMultiplierIsDecreasingForAllCassandraUnhealthyIndicatingMetrics() throws Exception {
SimpleThrottlingStrategy simpleThrottlingStrategy = new SimpleThrottlingStrategy();

Supplier<List<CassandraHealthMetricMeasurement>> metricMeasurements = () -> ImmutableList.of(
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> metricMeasurements = () -> ImmutableList.of(
getMetricMeasurement(5, 10, 15), getMetricMeasurement(4, 8, 2));

assertThat(simpleThrottlingStrategy.getClientLimitMultiplier(metricMeasurements)).isEqualTo(0.5);
Expand All @@ -69,11 +69,11 @@ public void clientLimitMultiplierIsDecreasingAndThenIncreasingForCassandraUnheal
throws Exception {
SimpleThrottlingStrategy simpleThrottlingStrategy = new SimpleThrottlingStrategy();

Supplier<List<CassandraHealthMetricMeasurement>> unhealthyMetricMeasurements = () -> ImmutableList.of(
getMetricMeasurement(5, 10, 15), getMetricMeasurement(4, 8, 2));
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> unhealthyMetricMeasurements = () ->
ImmutableList.of(getMetricMeasurement(5, 10, 15), getMetricMeasurement(4, 8, 2));

Supplier<List<CassandraHealthMetricMeasurement>> healthyMetricMeasurements = () -> ImmutableList.of(
getMetricMeasurement(5, 10, 8), getMetricMeasurement(4, 8, 6));
Supplier<Collection<? extends CassandraHealthMetricMeasurement>> healthyMetricMeasurements = () ->
ImmutableList.of(getMetricMeasurement(5, 10, 8), getMetricMeasurement(4, 8, 6));

assertThat(simpleThrottlingStrategy.getClientLimitMultiplier(unhealthyMetricMeasurements)).isEqualTo(0.5);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,4 @@ private CassandraHealthMetricMeasurement getCassandraMetricMeasurement(Double cu
.upperLimit(UPPER_LIMIT)
.build();
}

}
}

0 comments on commit f0d2e8d

Please sign in to comment.