Skip to content

Commit

Permalink
Refactor RTMP event logic to ensure ordered processing with minimal l…
Browse files Browse the repository at this point in the history
…ocking
  • Loading branch information
mondain committed Feb 7, 2024
1 parent e5f8387 commit fd4a5ec
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* @author Jon Valliere
*/
public class RTMPClient extends BaseRTMPClientHandler {

private static final Logger log = LoggerFactory.getLogger(RTMPClient.class);

protected static final int CONNECTOR_WORKER_TIMEOUT = 7000; // milliseconds
Expand Down
8 changes: 6 additions & 2 deletions common/src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public final class Red5 {

private static Logger log = Red5LoggerFactory.getLogger(Red5.class);

private static boolean isDebug = log.isDebugEnabled();

/**
* Connection associated with the current thread. Each connection runs in a separate thread.
*/
Expand Down Expand Up @@ -118,7 +120,7 @@ public Red5() {
* Thread local connection
*/
public static void setConnectionLocal(IConnection connection) {
if (log.isDebugEnabled()) {
if (isDebug) {
log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
try {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
Expand Down Expand Up @@ -148,7 +150,9 @@ public static IConnection getConnectionLocal() {
WeakReference<IConnection> ref = connThreadLocal.get();
if (ref != null) {
IConnection connection = ref.get();
log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
if (isDebug) {
log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
}
return connection;
} else {
return null;
Expand Down
137 changes: 101 additions & 36 deletions common/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.mina.core.session.IoSession;
Expand Down Expand Up @@ -58,8 +58,8 @@ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, Status

private static boolean isDebug = log.isDebugEnabled();

// single thread pool for handling receive
protected final static ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool();
// thread pool for handling receive
protected final ExecutorService recvDispatchExecutor = Executors.newCachedThreadPool();

/** {@inheritDoc} */
public void connectionOpened(RTMPConnection conn) {
Expand Down Expand Up @@ -106,10 +106,12 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
// NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
// the client sends a few stream packets before stopping; we need to ignore them.
if (stream != null) {
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null && stream != null) {
EnsuresPacketExecutionOrder epeo = null;
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
} else {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo);
}
epeo.addPacket(message);
}
Expand All @@ -133,12 +135,16 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
case TYPE_FLEX_STREAM_SEND:
if (((Notify) message).getData() != null && stream != null) {
// Stream metadata
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null) {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
if (stream != null) {
EnsuresPacketExecutionOrder epeo = null;
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
} else {
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER, epeo);
}
epeo.addPacket(message);
}
epeo.addPacket(message);
} else {
onCommand(conn, channel, header, (Notify) message);
}
Expand Down Expand Up @@ -185,6 +191,18 @@ public void connectionClosed(RTMPConnection conn) {
if (conn.getStateCode() != RTMP.STATE_DISCONNECTED) {
// inform any callbacks for pending calls that the connection is closed
conn.sendPendingServiceCallsCloseError();
// clean up / remove the packet execution attribute
if (conn.hasAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER);
if (epeo != null) {
epeo.shutdown();
}
if (conn.removeAttribute(RTMPConnection.RTMP_EXECUTION_ORDERER)) {
log.debug("Removed packet execution attribute");
}
}
// shutdown the executor
recvDispatchExecutor.shutdownNow();
// close the connection
if (conn.getStateCode() != RTMP.STATE_DISCONNECTING) {
conn.close();
Expand Down Expand Up @@ -367,60 +385,107 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so
* Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream
* and keeps all incoming events in order.
*/
private static class EnsuresPacketExecutionOrder implements Runnable {
private class EnsuresPacketExecutionOrder implements Runnable {

public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder";
private final IEventDispatcher stream;

private LinkedBlockingQueue<IRTMPEvent> events = new LinkedBlockingQueue<>();
private final RTMPConnection conn;

private AtomicBoolean state = new AtomicBoolean();
private ConcurrentLinkedQueue<IRTMPEvent> events = new ConcurrentLinkedQueue<>();

private final IEventDispatcher stream;
private AtomicBoolean submitted = new AtomicBoolean();

private final RTMPConnection conn;
private volatile String threadName;

private int iter;
private boolean shutdown;

public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) {
log.debug("Created for stream: {} connection: {}", stream, conn);
this.stream = stream;
this.conn = conn;
}

/**
* Shutdown and clean up.
*/
public void shutdown() {
log.debug("Shutdown; events: {}", events.size());
// set shutdown flag preventing further adds
shutdown = true;
// release all events
events.forEach(event -> {
event.release();
});
// clear the queue
events.clear();
}

/**
* Add packet to the stream's incoming queue.
*
* @param packet
*/
public void addPacket(IRTMPEvent packet) {
events.offer(packet);
if (state.compareAndSet(false, true)) {
recvDispatchExecutor.submit(this);
if (!shutdown) {
log.debug("addPacket: {}", packet);
// add to queue
events.offer(packet);
// if we are not already running, submit for execution
if (submitted.compareAndSet(false, true)) {
// use last 3 digits of nano time to identify different thread instance
threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L));
log.debug("Submit: {}", threadName);
recvDispatchExecutor.submit(this);
}
} else {
log.debug("Shutdown, not adding packet");
}
}

@Override
public void run() {
// use int to identify different thread instance
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++));
iter &= 7;
Thread.currentThread().setName(threadName);
// always set connection local on dispatch threads
Red5.setConnectionLocal(conn);
// we were created for a reason, grab the event
IRTMPEvent message = events.poll();
// null check just in case queue was drained before we woke
if (message != null) {
// dispatch to stream
stream.dispatchEvent(message);
// release / clean up
message.release();
try {
// we were created for a reason, grab the event; add short timeout just in case
IRTMPEvent packet = events.peek();
// null check just in case queue was drained before we woke
if (packet != null && events.remove(packet)) {
if (isDebug) {
log.debug("Taken packet: {}", packet);
}
// dispatch to stream
stream.dispatchEvent(packet);
// release / clean up
packet.release();
}
} catch (Exception e) {
log.warn("Exception polling for next message", e);
}
// set null before resubmit
Red5.setConnectionLocal(null);
// resubmit for another go if we have more
if (!events.isEmpty()) {
recvDispatchExecutor.submit(this);
// check for shutdown and then submit or resubmit
if (!shutdown) {
if (events.isEmpty()) {
log.debug("Queue is empty");
if (submitted.compareAndSet(true, false)) {
// false state will allow resubmit at the next add
log.debug("Allow new submit");
}
} else {
// use last 3 digits of nano time to identify different thread instance
threadName = String.format("RTMPRecvDispatch@%s-%03d", conn.getSessionId(), (System.nanoTime() % 1000L));
if (isDebug) {
log.debug("Resubmit: {}", threadName);
}
// resubmitting rather than looping until empty plays nice with other threads
recvDispatchExecutor.submit(this);
}
} else {
state.set(false);
log.debug("Shutdown, no more submits");
}
// resubmitting rather than looping until empty plays nice with other threads
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa

public static final Object RTMP_HANDLER = "rtmp.handler";

public final static String RTMP_EXECUTION_ORDERER = "rtmp.execution.orderer";

/**
* Marker byte for standard or non-encrypted RTMP data.
*/
Expand Down
10 changes: 5 additions & 5 deletions io/src/main/java/org/red5/io/utils/ConversionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public static Object convert(Object source, Class<?> target) throws ConversionEx
return source;
}
final Class<?> sourceClass = source.getClass();
log.info("Source: {} target: {}", sourceClass, target);
log.debug("Source: {} target: {}", sourceClass, target);
if (target.isInstance(source) || target.isAssignableFrom(sourceClass)) {
log.info("Source: {} is already an instance of: {}", source, target);
log.debug("Source: {} is already an instance of: {}", source, target);
return source;
}
if (target.isArray()) {
log.info("Source: {} to target array: {}", source, target);
log.debug("Source: {} to target array: {}", source, target);
return convertToArray(source, target);
}
if (target.equals(String.class)) {
Expand All @@ -134,10 +134,10 @@ public static Object convert(Object source, Class<?> target) throws ConversionEx
return convertMapToList((LinkedHashMap<?, ?>) source);
} else if (sourceClass.isArray()) {
if (List.class.isAssignableFrom(target)) {
log.info("Source: {} to target list: {}", source, target);
log.debug("Source: {} to target list: {}", source, target);
return Arrays.stream((Object[]) source).collect(Collectors.toCollection(ArrayList::new));
} else if (Set.class.isAssignableFrom(target)) {
log.info("Source: {} to target set: {}", source, target);
log.debug("Source: {} to target set: {}", source, target);
// special handling for sets when the source is a list
if (source instanceof List) {
return ((List<?>) source).stream().collect(Collectors.toCollection(HashSet::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import javax.websocket.Extension;
import javax.websocket.Session;
import javax.websocket.RemoteEndpoint.Basic;

import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.websocket.Constants;
Expand Down

0 comments on commit fd4a5ec

Please sign in to comment.