Skip to content

Commit

Permalink
New version of crossbeam-deque (tokio-rs#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina authored and carllerche committed Jul 11, 2018
1 parent 36c817f commit 19da6ff
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 47 deletions.
15 changes: 7 additions & 8 deletions ci/tsan
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ race:std*mpsc_queue
# Probably more fences in std.
race:__call_tls_dtors

# The crossbeam deque uses fences.
race:crossbeam_deque
# The epoch-based GC uses fences.
race:crossbeam_epoch

# This is excluded as this race shows up due to using the stealing features of
# the deque. Unfortunately, the implementation uses a fence, which makes tsan
# unhappy.
#
# TODO: It would be nice to not have to filter this out.
race:try_steal_task
# Push and steal operations in crossbeam-deque may cause data races, but such
# data races are safe. If a data race happens, the value read by `steal` is
# forgotten and the steal operation is then retried.
race:crossbeam_deque*push
race:crossbeam_deque*steal

# This filters out expected data race in the treiber stack implementations.
# Treiber stacks are inherently racy. The pop operation will attempt to access
Expand Down
2 changes: 1 addition & 1 deletion tokio-threadpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam-deque = "0.3"
crossbeam-deque = "0.5.0"
num_cpus = "1.2"
rand = "0.5"
log = "0.4"
Expand Down
27 changes: 13 additions & 14 deletions tokio-threadpool/src/worker/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ pub(crate) struct WorkerEntry {
next_sleeper: UnsafeCell<usize>,

// Worker half of deque
deque: deque::Deque<Arc<Task>>,
worker: deque::Worker<Arc<Task>>,

// Stealer half of deque
steal: deque::Stealer<Arc<Task>>,
stealer: deque::Stealer<Arc<Task>>,

// Thread parker
pub park: UnsafeCell<BoxPark>,
Expand All @@ -42,14 +42,13 @@ pub(crate) struct WorkerEntry {

impl WorkerEntry {
pub fn new(park: BoxPark, unpark: BoxUnpark) -> Self {
let w = deque::Deque::new();
let s = w.stealer();
let (w, s) = deque::fifo();

WorkerEntry {
state: AtomicUsize::new(State::default().into()),
next_sleeper: UnsafeCell::new(0),
deque: w,
steal: s,
worker: w,
stealer: s,
inbound: Queue::new(),
park: UnsafeCell::new(park),
unpark,
Expand Down Expand Up @@ -188,23 +187,23 @@ impl WorkerEntry {
///
/// This **must** only be called by the thread that owns the worker entry.
/// This function is not `Sync`.
pub fn pop_task(&self) -> deque::Steal<Arc<Task>> {
self.deque.steal()
pub fn pop_task(&self) -> Option<Arc<Task>> {
self.worker.pop()
}

/// Steal a task
///
/// This is called by *other* workers to steal a task for processing. This
/// function is `Sync`.
pub fn steal_task(&self) -> deque::Steal<Arc<Task>> {
self.steal.steal()
pub fn steal_task(&self) -> Option<Arc<Task>> {
self.stealer.steal()
}

/// Drain (and drop) all tasks that are queued for work.
///
/// This is called when the pool is shutting down.
pub fn drain_tasks(&self) {
while let Some(_) = self.deque.pop() {
while let Some(_) = self.worker.pop() {
}
}

Expand All @@ -215,7 +214,7 @@ impl WorkerEntry {

#[inline]
pub fn push_internal(&self, task: Arc<Task>) {
self.deque.push(task);
self.worker.push(task);
}

#[inline]
Expand All @@ -239,8 +238,8 @@ impl fmt::Debug for WorkerEntry {
fmt.debug_struct("WorkerEntry")
.field("state", &self.state.load(Relaxed))
.field("next_sleeper", &"UnsafeCell<usize>")
.field("deque", &self.deque)
.field("steal", &self.steal)
.field("worker", &self.worker)
.field("stealer", &self.stealer)
.field("park", &"UnsafeCell<BoxPark>")
.field("unpark", &"BoxUnpark")
.field("inbound", &self.inbound)
Expand Down
38 changes: 14 additions & 24 deletions tokio-threadpool/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,53 +377,43 @@ impl Worker {
///
/// Returns `true` if work was found.
fn try_run_owned_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool {
use deque::Steal::*;

// Poll the internal queue for a task to run
match self.entry().pop_task() {
Data(task) => {
Some(task) => {
self.run_task(task, notify, sender);
true
}
Empty => false,
Retry => true,
None => false,
}
}

/// Tries to steal a task from another worker.
///
/// Returns `true` if work was found
fn try_steal_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool {
use deque::Steal::*;

debug_assert!(!self.is_blocking.get());

let len = self.inner.workers.len();
let mut idx = self.inner.rand_usize() % len;
let mut found_work = false;
let start = idx;

loop {
if idx < len {
match self.inner.workers[idx].steal_task() {
Data(task) => {
trace!("stole task");
if let Some(task) = self.inner.workers[idx].steal_task() {
trace!("stole task");

self.run_task(task, notify, sender);
self.run_task(task, notify, sender);

trace!("try_steal_task -- signal_work; self={}; from={}",
self.id.0, idx);
trace!("try_steal_task -- signal_work; self={}; from={}",
self.id.0, idx);

// Signal other workers that work is available
//
// TODO: Should this be called here or before
// `run_task`?
self.inner.signal_work(&self.inner);
// Signal other workers that work is available
//
// TODO: Should this be called here or before
// `run_task`?
self.inner.signal_work(&self.inner);

return true;
}
Empty => {}
Retry => found_work = true,
return true;
}

idx += 1;
Expand All @@ -436,7 +426,7 @@ impl Worker {
}
}

found_work
false
}

fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>, sender: &mut Sender) {
Expand Down

0 comments on commit 19da6ff

Please sign in to comment.