Skip to content

Commit

Permalink
add multithreaded test and apply fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
adwhit committed Oct 2, 2024
1 parent 665dc81 commit dab170a
Showing 1 changed file with 106 additions and 9 deletions.
115 changes: 106 additions & 9 deletions crux_core/src/capability/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,39 @@ impl QueuingExecutor {
// This strategy should avoid dropping work or busy-looping
self.ready_sender.send(task_id).expect("could not requeue");
}
RunTask::Missing => {
// This is possible if a naughty future sends a wake notification while
// still running, then runs to completion and is evicted from the slab.
// Nothing to be done.
}
RunTask::Suspended | RunTask::Completed => did_some_work = true,
}
}
}
}

fn run_task(&self, task_id: TaskId) -> RunTask {
let task = self
.tasks
.lock()
.expect("Task slab poisoned")
.get_mut(*task_id as usize)
.expect("Task slot is missing")
.take();
let Some(mut task) = task else {
let mut lock = self.tasks.lock().expect("Task slab poisoned");
let Some(task) = lock.get_mut(*task_id as usize) else {
return RunTask::Missing;
};
let Some(mut task) = task.take() else {
// the slot exists but the task is missing - presumably it
// is being executed on another thread
return RunTask::Unavailable;
};

// free the mutex so other threads can make progress
drop(lock);

let waker = Arc::new(TaskWaker {
task_id,
sender: self.ready_sender.clone(),
})
.into();
let context = &mut Context::from_waker(&waker);

// ...and poll it
// poll the task
if task.as_mut().poll(context).is_pending() {
// If it's still pending, put the future back in the slot
self.tasks
Expand All @@ -164,6 +169,7 @@ impl QueuingExecutor {
}

enum RunTask {
Missing,
Unavailable,
Suspended,
Completed,
Expand All @@ -174,6 +180,12 @@ enum RunTask {
#[cfg(test)]
mod tests {

use rand::Rng;
use std::{
sync::atomic::{AtomicI32, Ordering},
task::Poll,
};

use super::*;
use crate::capability::shell_request::ShellRequest;

Expand All @@ -199,4 +211,89 @@ mod tests {
drop(spawner);
assert_eq!(Arc::strong_count(&counter), 1);
}

#[test]
fn test_multithreaded_executor() {
// We define a future which chaotically sends notifications to wake up the task
// The future has a random chance to suspend or to defer to its children which
// may also suspend. However it will ultimately resolve to `Ready` and once it
// has done so will stay finished
struct Chaotic {
ready_once: bool,
children: Vec<Chaotic>,
}

static CHAOS_COUNT: AtomicI32 = AtomicI32::new(0);

impl Chaotic {
fn new_with_children(num_children: usize) -> Self {
CHAOS_COUNT.fetch_add(1, Ordering::SeqCst);
Self {
ready_once: false,
children: (0..num_children)
.map(|_| Chaotic::new_with_children(num_children - 1))
.collect(),
}
}
}

impl Future for Chaotic {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// once we're done, we're done
if self.ready_once {
return Poll::Ready(());
}
if rand::thread_rng().gen_bool(0.1) {
cx.waker().wake_by_ref();
return Poll::Pending;
} else {
let mut ready = true;
let this = self.get_mut();
for child in &mut this.children {
if let Poll::Pending = child.poll_unpin(cx) {
ready = false;
}
}
if ready {
this.ready_once = true;
// throw a wake in for extra chaos
cx.waker().wake_by_ref();
CHAOS_COUNT.fetch_sub(1, Ordering::SeqCst);
Poll::Ready(())
} else {
Poll::Pending
}
}
}
}

let (executor, spawner) = executor_and_spawner();
// 100 futures with many (1957) children each equals lots of chaos
for _ in 0..100 {
let future = Chaotic::new_with_children(6);
spawner.spawn(future);
}
assert_eq!(CHAOS_COUNT.load(Ordering::SeqCst), 195700);
let executor = Arc::new(executor);
assert_eq!(executor.spawn_queue.len(), 100);

// Spawn 10 threads and run all
let handles = (0..10)
.map(|_| {
let executor = executor.clone();
std::thread::spawn(move || {
executor.run_all();
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.join().unwrap();
}
// nothing left in queue, all futures resolved
assert_eq!(executor.spawn_queue.len(), 0);
assert_eq!(executor.ready_queue.len(), 0);
assert_eq!(CHAOS_COUNT.load(Ordering::SeqCst), 0);
}
}

0 comments on commit dab170a

Please sign in to comment.