Skip to content

Commit

Permalink
simplify reactor
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Jul 22, 2024
1 parent 8a53ffa commit 4f71fa0
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 298 deletions.
4 changes: 2 additions & 2 deletions benches/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ fn spawn_server() -> SocketAddr {
while let Some(io) = incoming.next().await {
tokio::spawn(async move {
tokio::pin!(io);
// 20ms, one tick, from Minecraft
let mut ticker = tokio::time::interval(Duration::from_millis(20));
// 50ms = 1s/20, one tick, from Minecraft
let mut ticker = tokio::time::interval(Duration::from_millis(50));
loop {
tokio::select! {
None = io.next() => {
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#![feature(coroutines, proc_macro_hygiene, stmt_expr_attributes)]
#![feature(binary_heap_into_iter_sorted)]
#![feature(let_chains)]
#![feature(noop_waker)]

/// Protocol codec
mod codec;
Expand Down
35 changes: 6 additions & 29 deletions src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use concurrent_queue::ConcurrentQueue;
use futures::Stream;
use log::{debug, trace, warn};

use crate::packet::connected::{self, AckOrNack, Frame, FrameBody, FrameSet, FramesMut, Record};
use crate::packet::connected::{self, AckOrNack, Frame, FrameBody, FrameSet, FramesMut};
use crate::packet::unconnected;
use crate::resend_map::{reactor, ResendMap};
use crate::utils::u24;
use crate::resend_map::ResendMap;
use crate::utils::{u24, Reactor};
use crate::RoleContext;

/// Shared link between stream and sink
Expand Down Expand Up @@ -87,25 +87,6 @@ impl TransferLink {
}

pub(crate) fn incoming_ack(&self, records: AckOrNack) {
let to_wakes = if self.should_waking() {
let mut wakers = Vec::new();
let mut guard = reactor::Reactor::get().lock();
for record in &records.records {
match record {
Record::Range(start, end) => {
for seq_num in start.to_u32()..=end.to_u32() {
guard.cancel_timer(seq_num.into(), self.role.guid(), &mut wakers);
}
}
Record::Single(seq_num) => {
guard.cancel_timer(*seq_num, self.role.guid(), &mut wakers);
}
}
}
Some(wakers)
} else {
None
};
if let Some(dropped) = self.incoming_ack.force_push(records).unwrap() {
warn!(
"[{}] discard received ack {dropped:?}, total count: {}",
Expand All @@ -114,15 +95,11 @@ impl TransferLink {
);
}
// wake up after sends ack
if let Some(wakers) = to_wakes {
debug!(
"[{}] wake up {} wakers after receives ack",
self.role,
wakers.len()
);
for waker in wakers {
if self.should_waking() {
for waker in Reactor::get().cancel_all_timers(self.role.guid()) {
// safe to panic
waker.wake();
debug!("[{}] wake up a certain waker after receives ack", self.role,);
}
}
}
Expand Down
165 changes: 5 additions & 160 deletions src/resend_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::{Duration, Instant};
use log::trace;

use crate::packet::connected::{AckOrNack, Frame, Frames, Record};
use crate::utils::u24;
use crate::utils::{u24, Reactor};
use crate::RoleContext;

// TODO: use RTTEstimator to get adaptive RTO
Expand Down Expand Up @@ -126,177 +126,22 @@ impl ResendMap {
seq_num,
expired_at - now
);
reactor::Reactor::get().insert_timer(expired_at, seq_num, self.role.guid(), cx.waker());
Reactor::get().insert_timer(self.role.guid(), expired_at, cx.waker());
Poll::Pending
}
}

/// Specialized timer reactor for resend map
pub(crate) mod reactor {
use std::collections::{BTreeMap, HashMap};
use std::sync::OnceLock;
use std::task::Waker;
use std::time::{Duration, Instant};
use std::{mem, panic, thread};

use crate::utils::u24;

/// A unique sequence number with a global unique ID.
/// This is used to identify a timer across the different peers.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)]
struct UniqueSeq {
seq_num: u24,
guid: u64,
}

/// A simple timer reactor.
///
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
pub(crate) struct Reactor {
/// Inner state of the timer reactor.
inner: parking_lot::Mutex<ReactorInner>,
/// A condition variable to notify the reactor of new timers.
cond: parking_lot::Condvar,
}

/// Inner state of the timer reactor.
struct ReactorInner {
/// An ordered map of registered timers.
///
/// Timers are in the order in which they fire. The `UniqueSeq` in this type is relative to
/// the timer and plays a role in a unique ID for same timeout. The `Waker`
/// represents the task awaiting the timer.
timers: BTreeMap<(Instant, UniqueSeq), Waker>,
/// A mapping of unique seq num to their respective `Instant`s.
///
/// This is used to cancel timers with a given sequence number.
mapping: HashMap<UniqueSeq, Instant>,
}

impl Reactor {
pub(crate) fn get() -> &'static Reactor {
static REACTOR: OnceLock<Reactor> = OnceLock::new();

fn main_loop() {
let reactor = Reactor::get();
loop {
reactor.process_timers();
}
}

REACTOR.get_or_init(|| {
// Spawn the daemon thread to motivate the reactor.
thread::Builder::new()
.name("timer-reactor".to_string())
.spawn(main_loop)
.expect("cannot spawn timer-reactor thread");

Reactor {
inner: parking_lot::Mutex::new(ReactorInner {
timers: BTreeMap::new(),
mapping: HashMap::new(),
}),
cond: parking_lot::Condvar::new(),
}
})
}

/// Locks the reactor for exclusive access.
pub(crate) fn lock(&self) -> ReactorLock<'_> {
ReactorLock {
inner: self.inner.lock(),
cond: &self.cond,
}
}

/// Registers a timer in the reactor.
///
/// Returns the inserted timer's ID.
pub(crate) fn insert_timer(&self, when: Instant, seq_num: u24, guid: u64, waker: &Waker) {
let mut guard = self.inner.lock();
let unique_seq = UniqueSeq { seq_num, guid };
guard.mapping.insert(unique_seq, when);
guard.timers.insert((when, unique_seq), waker.clone());

// Notify that a timer has been inserted.
self.cond.notify_one();
}

/// Processes ready timers and waits for the next timer to be inserted.
fn process_timers(&self) {
let mut inner = self.inner.lock();

let now = Instant::now();

// Split timers into ready and pending timers.
//
// Careful to split just *after* `now`, so that a timer set for exactly `now` is
// considered ready.
let pending = inner
.timers
.split_off(&(now + Duration::from_nanos(1), UniqueSeq::default()));
let ready = mem::replace(&mut inner.timers, pending);

for ((_, seq_num), waker) in ready {
inner.mapping.remove(&seq_num);
// TODO: wake up maybe slow down the reactor
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| waker.wake()).ok();
}

// Calculate the duration until the next event.
let dur = inner
.timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now));

if let Some(dur) = dur {
self.cond.wait_for(&mut inner, dur);
} else {
self.cond.wait(&mut inner);
}
}
}

pub(crate) struct ReactorLock<'a> {
inner: parking_lot::MutexGuard<'a, ReactorInner>,
cond: &'a parking_lot::Condvar,
}

impl Drop for ReactorLock<'_> {
fn drop(&mut self) {
// Notify the reactor that the inner state has changed.
self.cond.notify_one();
}
}

impl ReactorLock<'_> {
pub(crate) fn cancel_timer(&mut self, seq_num: u24, guid: u64, wakers: &mut Vec<Waker>) {
let unique_seq = UniqueSeq { seq_num, guid };
if let Some(when) = self.inner.mapping.remove(&unique_seq) {
wakers.push(
self.inner
.timers
.remove(&(when, unique_seq))
.expect("timer should exist"),
);
}
}
}
}

#[cfg(test)]
mod test {
use std::collections::VecDeque;
use std::task::{Context, Poll, Waker};
use std::task::{Context, Poll};
use std::time::Duration;

use bytes::Bytes;

use super::ResendMap;
use crate::packet::connected::{AckOrNack, Flags, Frame};
use crate::tests::test_trace_log_setup;
use crate::utils::tests::{test_trace_log_setup, TestWaker};
use crate::{Reliability, RoleContext};

const TEST_RTO: Duration = Duration::from_millis(1200);
Expand Down Expand Up @@ -382,7 +227,7 @@ mod test {

let mut buffer = VecDeque::default();

let res = map.poll_wait(&mut Context::from_waker(Waker::noop()));
let res = map.poll_wait(&mut Context::from_waker(&TestWaker::create()));
assert!(matches!(res, Poll::Ready(_)));

map.process_stales(&mut buffer);
Expand Down
2 changes: 1 addition & 1 deletion src/server/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ mod test {
use futures::StreamExt;

use super::*;
use crate::tests::test_trace_log_setup;
use crate::utils::tests::test_trace_log_setup;

struct TestCase {
addr: SocketAddr,
Expand Down
Loading

0 comments on commit 4f71fa0

Please sign in to comment.