Skip to content

Commit

Permalink
Fix bug in custom readiness queue (tokio-rs#571)
Browse files Browse the repository at this point in the history
A call to `Poll::poll` could corrupt the readiness queue's linked list
under certain conditions. This commit fixes the logic around inserting
the `sleep_token`, which allows producers to know when they need to
wakeup `Poll`.

Fixes tokio-rs#570
  • Loading branch information
carllerche authored Mar 16, 2017
1 parent 2d58be9 commit b1d9260
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 33 deletions.
49 changes: 16 additions & 33 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,6 @@ impl Poll {

#[inline]
fn poll2(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let mut sleep = false;

// Compute the timeout value passed to the system selector. If the
// readiness queue has pending nodes, we still want to poll the system
// selector for new events, but we don't want to block the thread to
Expand All @@ -1072,7 +1070,6 @@ impl Poll {
// inserts `sleep_marker` into the queue. This signals to any
// threads setting readiness that the `Poll::poll` is going to
// sleep, so the awakener should be used.
sleep = true;
timeout
} else {
// The readiness queue is not empty, so do not block the thread.
Expand All @@ -1082,18 +1079,6 @@ impl Poll {
// First get selector events
let res = self.selector.select(&mut events.inner, AWAKEN, timeout);

if sleep {
// Cleanup the sleep marker. Removing `sleep_marker` avoids
// unnecessary syscalls to the awakener. It also needs to be removed
// from the queue before it can be inserted again.
//
// Note, that this won't *guarantee* that the sleep marker is
// removed. If the sleep marker cannot be removed, it is no longer
// at the head of the queue, which still achieves the goal of
// avoiding extra awakener syscalls.
self.readiness_queue.try_remove_sleep_marker();
}

if try!(res) {
// Some awakeners require reading from a FD.
self.readiness_queue.inner.awakener.cleanup();
Expand Down Expand Up @@ -2009,6 +1994,22 @@ impl ReadinessQueue {
let end_marker = self.inner.end_marker();
let sleep_marker = self.inner.sleep_marker();

let tail = unsafe { *self.inner.tail_readiness.get() };

// If the tail is currently set to the sleep_marker, then check if the
// head is as well. If it is, then the queue is currently ready to
// sleep. If it is not, then the queue is not empty and there should be
// no sleeping.
if tail == sleep_marker {
return self.inner.head_readiness.load(Acquire) == sleep_marker;
}

// If the tail is not currently set to `end_marker`, then the queue is
// not empty.
if tail != end_marker {
return false;
}

self.inner.sleep_marker.next_readiness.store(ptr::null_mut(), Relaxed);

let actual = self.inner.head_readiness.compare_and_swap(
Expand All @@ -2030,24 +2031,6 @@ impl ReadinessQueue {
unsafe { *self.inner.tail_readiness.get() = sleep_marker; }
true
}

fn try_remove_sleep_marker(&self) {
let end_marker = self.inner.end_marker();
let sleep_marker = self.inner.sleep_marker();

// Set the next ptr to null
self.inner.end_marker.next_readiness.store(ptr::null_mut(), Relaxed);

let actual = self.inner.head_readiness.compare_and_swap(
sleep_marker, end_marker, AcqRel);

// If the swap is successful, then the queue is still empty.
if actual != sleep_marker {
return;
}

unsafe { *self.inner.tail_readiness.get() = end_marker; }
}
}

impl ReadinessQueueInner {
Expand Down
67 changes: 67 additions & 0 deletions test/test_custom_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,70 @@ fn drop_registration_from_non_main_thread() {
}
}
}

#[test]
fn stress_with_small_events_collection() {
const N: usize = 8;
const ITER: usize = 1_000;

use std::sync::{Arc, Barrier};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::thread;

let poll = Poll::new().unwrap();
let mut registrations = vec![];

let barrier = Arc::new(Barrier::new(N + 1));
let done = Arc::new(AtomicBool::new(false));

for i in 0..N {
let (registration, set_readiness) = Registration::new2();
poll.register(&registration, Token(i), Ready::readable(), PollOpt::edge()).unwrap();

registrations.push(registration);

let barrier = barrier.clone();
let done = done.clone();

thread::spawn(move || {
barrier.wait();

while !done.load(Acquire) {
set_readiness.set_readiness(Ready::readable()).unwrap();
}

// Set one last time
set_readiness.set_readiness(Ready::readable()).unwrap();
});
}

let mut events = Events::with_capacity(4);

barrier.wait();

for _ in 0..ITER {
poll.poll(&mut events, None).unwrap();
}

done.store(true, Release);

let mut final_ready = vec![false; N];


for i in 0..5 {
poll.poll(&mut events, None).unwrap();

for event in &events {
final_ready[event.token().0] = true;
}

if final_ready.iter().all(|v| *v) {
return;
}

thread::sleep(Duration::from_millis(10));
}

panic!("dead lock?");
}

0 comments on commit b1d9260

Please sign in to comment.