Skip to content

Commit

Permalink
Merge branch 'cassandra-3.0' into cassandra-3.11
Browse files Browse the repository at this point in the history
  • Loading branch information
adelapena committed Oct 1, 2021
2 parents 9bf14a6 + 4f8afe8 commit c6e897d
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
* Avoid sending CDC column if not enabled (CASSANDRA-16770)
Merged from 3.0:
* Immediately apply stream throughput, considering negative values as unthrottled (CASSANDRA-16959)
* Do not release new SSTables in offline transactions (CASSANDRA-16975)
* ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
* CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1480,8 +1480,10 @@ public int getStreamingSocketTimeout()

public void setStreamThroughputMbPerSec(int value)
{
int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
logger.info("setstreamthroughput: throttle set to {}", value);
StreamManager.StreamRateLimiter.updateThroughput();
logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
}

public int getStreamThroughputMbPerSec()
Expand All @@ -1491,8 +1493,10 @@ public int getStreamThroughputMbPerSec()

public void setInterDCStreamThroughputMbPerSec(int value)
{
int oldValue = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(value);
logger.info("setinterdcstreamthroughput: throttle set to {}", value);
StreamManager.StreamRateLimiter.updateInterDCThroughput();
logger.info("setinterdcstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
}

public int getInterDCStreamThroughputMbPerSec()
Expand Down
58 changes: 40 additions & 18 deletions src/java/org/apache/cassandra/streaming/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -63,41 +64,62 @@ public static StreamRateLimiter getRateLimiter(InetAddress peer)

public static class StreamRateLimiter
{
private static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
public static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
private static final RateLimiter limiter = RateLimiter.create(calculateRateInBytes());
private static final RateLimiter interDCLimiter = RateLimiter.create(calculateInterDCRateInBytes());
private final boolean isLocalDC;

public StreamRateLimiter(InetAddress peer)
{
double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
mayUpdateThroughput(throughput, limiter);

double interDCThroughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
mayUpdateThroughput(interDCThroughput, interDCLimiter);

if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
else
isLocalDC = true;
}

private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
{
// if throughput is set to 0, throttling is disabled
if (limit == 0)
limit = Double.MAX_VALUE;
if (rateLimiter.getRate() != limit)
rateLimiter.setRate(limit);
}

public void acquire(int toTransfer)
{
limiter.acquire(toTransfer);
if (!isLocalDC)
interDCLimiter.acquire(toTransfer);
}

public static void updateThroughput()
{
limiter.setRate(calculateRateInBytes());
}

public static void updateInterDCThroughput()
{
interDCLimiter.setRate(calculateInterDCRateInBytes());
}

private static double calculateRateInBytes()
{
return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() > 0
? DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
: Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
}

private static double calculateInterDCRateInBytes()
{
return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() > 0
? DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
: Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
}

@VisibleForTesting
public static double getRateLimiterRateInBytes()
{
return limiter.getRate();
}

@VisibleForTesting
public static double getInterDCRateLimiterRateInBytes()
{
return interDCLimiter.getRate();
}
}

private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming in the system, or 0 to disable throttling")
public class SetInterDCStreamThroughput extends NodeToolCmd
{
@SuppressWarnings("UnusedDeclaration")
@Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
private Integer interDCStreamThroughput = null;
private int interDCStreamThroughput;

@Override
public void execute(NodeProbe probe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling")
public class SetStreamThroughput extends NodeToolCmd
{
@SuppressWarnings("UnusedDeclaration")
@Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
private Integer streamThroughput = null;
private int streamThroughput;

@Override
public void execute(NodeProbe probe)
Expand Down
91 changes: 91 additions & 0 deletions test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.streaming;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.StorageService;

import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT;
import static org.junit.Assert.assertEquals;

public class StreamManagerTest
{
private static int defaultStreamThroughputMbPerSec;
private static int defaultInterDCStreamThroughputMbPerSec;

@BeforeClass
public static void setupClass()
{
Config c = DatabaseDescriptor.loadConfig();
defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec;
defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec;
DatabaseDescriptor.daemonInitialization(() -> c);
}

@Test
public void testUpdateStreamThroughput()
{
// Initialized value check
assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);

// Positive value check
StorageService.instance.setStreamThroughputMbPerSec(500);
assertEquals(500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);

// Max positive value check
StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE);
assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);

// Zero value check
StorageService.instance.setStreamThroughputMbPerSec(0);
assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);

// Negative value check
StorageService.instance.setStreamThroughputMbPerSec(-200);
assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
}

@Test
public void testUpdateInterDCStreamThroughput()
{
// Initialized value check
assertEquals(defaultInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);

// Positive value check
StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
assertEquals(200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);

// Max positive value check
StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);

// Zero value check
StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);

// Negative value check
StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
}
}

0 comments on commit c6e897d

Please sign in to comment.