Skip to content

Commit

Permalink
Rewrite the internode messaging subsystem
Browse files Browse the repository at this point in the history
patch by Aleksey Yeschenko and Benedict Elliott Smith; reviewed by Alex
Petrov, Aleksey Yeschenko, and Benedict Elliott Smith for CASSANDRA-15066

Co-authored-by: Aleksey Yeschenko <[email protected]>
Co-authored-by: Benedict Elliott Smith <[email protected]>
  • Loading branch information
belliottsmith and iamaleksey committed Jun 12, 2019
1 parent dcabf7e commit 310a48e
Show file tree
Hide file tree
Showing 451 changed files with 30,729 additions and 16,729 deletions.
15 changes: 15 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,7 @@
<target name="burn-testsome" depends="build-test" description="Execute specific burn unit tests" >
<testmacro inputdir="${test.burn.src}" timeout="${test.burn.timeout}">
<test name="${test.name}" methods="${test.methods}"/>
<jvmarg value="-Dlogback.configurationFile=test/conf/logback-burntest.xml"/>
</testmacro>
</target>

Expand Down Expand Up @@ -1945,6 +1946,20 @@
<delete file="${test.distributed.listfile}"/>
</target>

<!-- Build a self-contained jar for e.g. remote execution; not currently used for running burn tests with this build script -->
<target name="burn-test-jar" depends="build-test, build" description="Create dtest-compatible jar, including all dependencies">
<jar jarfile="${build.dir}/burntest.jar">
<zipgroupfileset dir="${build.lib}" includes="*.jar" excludes="META-INF/*.SF"/>
<fileset dir="${build.classes.main}"/>
<fileset dir="${test.classes}"/>
<fileset dir="${test.conf}" excludes="logback*.xml"/>
<fileset dir="${basedir}/conf" includes="logback*.xml"/>
<zipgroupfileset dir="${build.dir.lib}/jars">
<include name="junit*.jar"/>
</zipgroupfileset>
</jar>
</target>

<target name="dtest-jar" depends="build-test, build" description="Create dtest-compatible jar, including all dependencies">
<jar jarfile="${build.dir}/dtest-${base.version}.jar">
<zipgroupfileset dir="${build.lib}" includes="*.jar" excludes="META-INF/*.SF"/>
Expand Down
26 changes: 26 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,32 @@ request_timeout_in_ms: 10000
# which picks up the OS default and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
# internode_tcp_user_timeout_in_ms = 30000

# The maximum continuous period a connection may be unwritable in application space
# internode_application_timeout_in_ms = 30000

# Global, per-endpoint and per-connection limits imposed on messages queued for delivery to other nodes
# and waiting to be processed on arrival from other nodes in the cluster. These limits are applied to the on-wire
# size of the message being sent or received.
#
# The basic per-link limit is consumed in isolation before any endpoint or global limit is imposed.
# Each node-pair has three links: urgent, small and large. So any given node may have a maximum of
# N*3*(internode_application_send_queue_capacity_in_bytes+internode_application_receive_queue_capacity_in_bytes)
# messages queued without any coordination between them although in practice, with token-aware routing, only RF*tokens
# nodes should need to communicate with significant bandwidth.
#
# The per-endpoint limit is imposed on all messages exceeding the per-link limit, simultaneously with the global limit,
# on all links to or from a single node in the cluster.
# The global limit is imposed on all messages exceeding the per-link limit, simultaneously with the per-endpoint limit,
# on all links to or from any node in the cluster.
#
# internode_application_send_queue_capacity_in_bytes: 4194304 #4MiB
# internode_application_send_queue_reserve_endpoint_capacity_in_bytes: 134217728 #128MiB
# internode_application_send_queue_reserve_global_capacity_in_bytes: 536870912 #512MiB
# internode_application_receive_queue_capacity_in_bytes: 4194304 #4MiB
# internode_application_receive_queue_reserve_endpoint_capacity_in_bytes: 134217728 #128MiB
# internode_application_receive_queue_reserve_global_capacity_in_bytes: 536870912 #512MiB


# How long before a node logs slow queries. Select queries that take longer than
# this timeout to execute, will generate an aggregated log message, so that slow queries
# can be identified. Set this value to zero to disable slow query logging.
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/auth/IAuthenticator.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ default SaslNegotiator newSaslNegotiator(InetAddress clientAddress, X509Certific
public interface SaslNegotiator
{
/**
* Evaluates the client response data and generates a byte[] reply which may be a further challenge or purely
* Evaluates the client response data and generates a byte[] response which may be a further challenge or purely
* informational in the case that the negotiation is completed on this round.
*
* This method is called each time a {@link org.apache.cassandra.transport.messages.AuthResponse} is received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.util.UUID;

import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.Message;

public final class BatchRemoveVerbHandler implements IVerbHandler<UUID>
{
public void doVerb(MessageIn<UUID> message, int id)
public static final BatchRemoveVerbHandler instance = new BatchRemoveVerbHandler();

public void doVerb(Message<UUID> message)
{
BatchlogManager.remove(message.payload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
*/
package org.apache.cassandra.batchlog;

import org.apache.cassandra.db.WriteResponse;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;

public final class BatchStoreVerbHandler implements IVerbHandler<Batch>
{
public void doVerb(MessageIn<Batch> message, int id)
public static final BatchStoreVerbHandler instance = new BatchStoreVerbHandler();

public void doVerb(Message<Batch> message)
{
BatchlogManager.store(message.payload);
MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
MessagingService.instance().send(message.emptyResponse(), message.from());
}
}
26 changes: 14 additions & 12 deletions src/java/org/apache/cassandra/batchlog/BatchlogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
Expand All @@ -77,8 +77,10 @@
import org.apache.cassandra.utils.UUIDGen;

import static com.google.common.collect.Iterables.transform;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
import static org.apache.cassandra.net.Verb.MUTATION_REQ;

public class BatchlogManager implements BatchlogManagerMBean
{
Expand All @@ -88,7 +90,7 @@ public class BatchlogManager implements BatchlogManagerMBean

private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
public static final BatchlogManager instance = new BatchlogManager();
public static final long BATCHLOG_REPLAY_TIMEOUT = Long.getLong("cassandra.batchlog.replay_timeout_in_ms", DatabaseDescriptor.getWriteRpcTimeout() * 2);
public static final long BATCHLOG_REPLAY_TIMEOUT = Long.getLong("cassandra.batchlog.replay_timeout_in_ms", DatabaseDescriptor.getWriteRpcTimeout(MILLISECONDS) * 2);

private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
Expand All @@ -112,7 +114,7 @@ public void start()
batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
StorageService.RING_DELAY,
REPLAY_INTERVAL,
TimeUnit.MILLISECONDS);
MILLISECONDS);
}

public void shutdown() throws InterruptedException
Expand Down Expand Up @@ -356,7 +358,7 @@ public int replay(RateLimiter rateLimiter, Set<InetAddressAndPort> hintedNodes)
return 0;

int gcgs = gcgs(mutations);
if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
if (MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
return 0;

replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
Expand Down Expand Up @@ -419,7 +421,7 @@ private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddressAnd
int gcgs = gcgs(mutations);

// expired
if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
if (MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
return;

for (int i = startFrom; i < replayHandlers.size(); i++)
Expand Down Expand Up @@ -490,9 +492,9 @@ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(fin
ReplicaPlan.ForTokenWrite replicaPlan = new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
liveRemoteOnly.pending(), liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all());
ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
MessageOut<Mutation> message = mutation.createMessage();
Message<Mutation> message = Message.outWithFlag(MUTATION_REQ, mutation, MessageFlag.CALL_BACK_ON_FAILURE);
for (Replica replica : liveRemoteOnly.all())
MessagingService.instance().sendWriteRR(message, replica, handler, false);
MessagingService.instance().sendWriteWithCallback(message, replica, handler, false);
return handler;
}

Expand All @@ -506,7 +508,7 @@ private static int gcgs(Collection<Mutation> mutations)

/**
* A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
* which we did not receive a successful reply.
* which we did not receive a successful response.
*/
private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
{
Expand All @@ -525,11 +527,11 @@ protected int blockFor()
}

@Override
public void response(MessageIn<T> m)
public void onResponse(Message<T> m)
{
boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddressAndPort() : m.from);
boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddressAndPort() : m.from());
assert removed;
super.response(m);
super.onResponse(m);
}
}
}
Expand Down
56 changes: 56 additions & 0 deletions src/java/org/apache/cassandra/concurrent/ImmediateExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;

public class ImmediateExecutor extends AbstractExecutorService implements LocalAwareExecutorService
{
public static final ImmediateExecutor INSTANCE = new ImmediateExecutor();

private ImmediateExecutor() {}

public void execute(Runnable command, ExecutorLocals locals)
{
command.run();
}

public void maybeExecuteImmediately(Runnable command)
{
command.run();
}

public void execute(Runnable command)
{
command.run();
}

public int getActiveTaskCount() { return 0; }
public long getCompletedTaskCount() { return 0; }
public int getPendingTaskCount() { return 0; }
public int getMaximumPoolSize() { return 0; }
public void shutdown() { }
public List<Runnable> shutdownNow() { return Collections.emptyList(); }
public boolean isShutdown() { return false; }
public boolean isTerminated() { return false; }
public boolean awaitTermination(long timeout, TimeUnit unit) { return true; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public InfiniteLoopExecutor start()
return this;
}

public void shutdown()
public void shutdownNow()
{
isShutdown = true;
thread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.utils.memory.BufferPool;

/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
Expand All @@ -35,6 +36,7 @@ public class NamedThreadFactory implements ThreadFactory
{
private static volatile String globalPrefix;
public static void setGlobalPrefix(String prefix) { globalPrefix = prefix; }
public static String globalPrefix() { return globalPrefix == null ? "" : globalPrefix; }

public final String id;
private final int priority;
Expand Down
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
*/
package org.apache.cassandra.concurrent;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import org.apache.cassandra.utils.ExecutorUtils;

/**
* Centralized location for shared executors
Expand Down Expand Up @@ -48,12 +53,10 @@ public class ScheduledExecutors
public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");

@VisibleForTesting
public static void shutdownAndWait() throws InterruptedException
public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
for (ExecutorService executor : executors)
executor.shutdownNow();
for (ExecutorService executor : executors)
executor.awaitTermination(60, TimeUnit.SECONDS);
List<ExecutorService> executors = ImmutableList.of(scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks);
ExecutorUtils.shutdownNow(executors);
ExecutorUtils.awaitTermination(timeout, unit, executors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
Expand Down Expand Up @@ -114,17 +115,21 @@ public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTa
return executor;
}

public void shutdown() throws InterruptedException
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
shuttingDown = true;
for (SEPExecutor executor : executors)
executor.shutdownNow();

terminateWorkers();

long until = System.nanoTime() + TimeUnit.MINUTES.toNanos(1L);
long until = System.nanoTime() + unit.toNanos(timeout);
for (SEPExecutor executor : executors)
{
executor.shutdown.await(until - System.nanoTime(), TimeUnit.NANOSECONDS);
if (!executor.isTerminated())
throw new TimeoutException(executor.name + " not terminated");
}
}

void terminateWorkers()
Expand Down
20 changes: 2 additions & 18 deletions src/java/org/apache/cassandra/concurrent/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
*/
package org.apache.cassandra.concurrent;

import java.util.Arrays;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;

public enum Stage
{
READ,
Expand All @@ -35,18 +30,7 @@ public enum Stage
MISC,
TRACING,
INTERNAL_RESPONSE,
READ_REPAIR;

public static Iterable<Stage> jmxEnabledStages()
{
return Iterables.filter(Arrays.asList(values()), new Predicate<Stage>()
{
public boolean apply(Stage stage)
{
return stage != TRACING;
}
});
}
IMMEDIATE;

public String getJmxType()
{
Expand All @@ -58,13 +42,13 @@ public String getJmxType()
case MISC:
case TRACING:
case INTERNAL_RESPONSE:
case IMMEDIATE:
return "internal";
case MUTATION:
case COUNTER_MUTATION:
case VIEW_MUTATION:
case READ:
case REQUEST_RESPONSE:
case READ_REPAIR:
return "request";
default:
throw new AssertionError("Unknown stage " + this);
Expand Down
Loading

0 comments on commit 310a48e

Please sign in to comment.