From e3bddee1f300f4899766e942f869af0cde6dfaf5 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Sat, 19 Aug 2023 17:59:47 +0800 Subject: [PATCH] windows signal support part1 --- open-coroutine-core/Cargo.toml | 11 ++- open-coroutine-core/src/event_loop/mod.rs | 4 +- open-coroutine-core/src/lib.rs | 3 +- open-coroutine-core/src/monitor/mod.rs | 112 +++++++++++++++++----- open-coroutine-core/src/monitor/node.rs | 19 ++++ open-coroutine-core/src/scheduler/mod.rs | 67 ++++++++++++- 6 files changed, 183 insertions(+), 33 deletions(-) diff --git a/open-coroutine-core/Cargo.toml b/open-coroutine-core/Cargo.toml index 173a15dd..271f87c6 100644 --- a/open-coroutine-core/Cargo.toml +++ b/open-coroutine-core/Cargo.toml @@ -23,14 +23,17 @@ uuid = { version = "1.3.0", features = [ crossbeam-utils = "0.8.15" crossbeam-deque = "0.8.2" polling = "2.8.0" +core_affinity = "0.8.0" open-coroutine-timer = { version = "0.1.0", path = "../open-coroutine-timer" } open-coroutine-queue = { version = "0.1.2", path = "../open-coroutine-queue" } [target."cfg(windows)".dependencies] -windows-sys = { version = "0.48.0", features = ["Win32_System_SystemInformation", "Win32_System_Diagnostics_Debug"] } - -[target.'cfg(target_os = "linux")'.dependencies] -core_affinity = "0.8.0" +windows-sys = { version = "0.48.0", features = [ + "Win32_System_SystemInformation", + "Win32_System_Diagnostics_Debug", + "Win32_Foundation", + "Win32_System_Console" +] } [dev-dependencies] backtrace = "0.3.67" diff --git a/open-coroutine-core/src/event_loop/mod.rs b/open-coroutine-core/src/event_loop/mod.rs index 8f6cf132..9d78fef9 100644 --- a/open-coroutine-core/src/event_loop/mod.rs +++ b/open-coroutine-core/src/event_loop/mod.rs @@ -21,7 +21,7 @@ pub type UserFunc = extern "C" fn(*const Suspender<(), ()>, usize) -> usize; #[derive(Debug, Copy, Clone)] pub struct EventLoops {} -#[cfg(target_os = "linux")] +#[cfg(any(target_os = "linux", windows))] static BIND: Lazy = Lazy::new(|| unsafe { EVENT_LOOPS.len() } <= num_cpus::get()); static mut INDEX: Lazy = Lazy::new(|| AtomicUsize::new(0)); @@ -82,7 +82,7 @@ impl EventLoops { std::thread::Builder::new() .name(format!("open-coroutine-event-loop-{i}")) .spawn(move || { - #[cfg(target_os = "linux")] + #[cfg(any(target_os = "linux", windows))] if *BIND { assert!( core_affinity::set_for_current(core_affinity::CoreId { diff --git a/open-coroutine-core/src/lib.rs b/open-coroutine-core/src/lib.rs index 363d2083..b4dcc24c 100644 --- a/open-coroutine-core/src/lib.rs +++ b/open-coroutine-core/src/lib.rs @@ -77,7 +77,8 @@ macro_rules! unbreakable { }; } -#[cfg(all(unix, feature = "preemptive-schedule"))] +#[allow(dead_code)] +#[cfg(feature = "preemptive-schedule")] mod monitor; #[allow( diff --git a/open-coroutine-core/src/monitor/mod.rs b/open-coroutine-core/src/monitor/mod.rs index 9716ad45..2240e8c8 100644 --- a/open-coroutine-core/src/monitor/mod.rs +++ b/open-coroutine-core/src/monitor/mod.rs @@ -28,6 +28,8 @@ impl Monitor { target_os = "android", target_os = "emscripten"))] { libc::SIGRTMIN() + } else if #[cfg(windows)] { + windows_sys::Win32::System::Console::CTRL_C_EVENT.try_into().unwrap() } else { libc::SIGURG } @@ -48,34 +50,51 @@ impl Monitor { } } - fn new() -> Self { - #[allow(clippy::fn_to_numeric_cast)] - unsafe extern "C" fn sigurg_handler(_signal: libc::c_int) { - // invoke by Monitor::signal() - if let Some(s) = crate::coroutine::suspender::Suspender::<(), ()>::current() { - //获取当前信号屏蔽集 - let mut current_mask: libc::sigset_t = std::mem::zeroed(); - assert_eq!( - 0, - libc::pthread_sigmask(libc::SIG_BLOCK, std::ptr::null(), &mut current_mask), - ); - //删除对Monitor::signum()信号的屏蔽,使信号处理函数即使在处理中,也可以再次进入信号处理函数 - assert_eq!(0, libc::sigdelset(&mut current_mask, Monitor::signum())); - assert_eq!( - 0, - libc::pthread_sigmask(libc::SIG_SETMASK, ¤t_mask, std::ptr::null_mut()) - ); - s.suspend(); + fn init_signal_handler() { + cfg_if::cfg_if! { + if #[cfg(windows)] { + unsafe extern "system" fn sigint_handler(_: u32) -> windows_sys::Win32::Foundation::BOOL { + // invoke by Monitor::signal() + if let Some(s) = crate::coroutine::suspender::Suspender::<(), ()>::current() { + s.suspend(); + } + windows_sys::Win32::Foundation::TRUE + } + assert_eq!(windows_sys::Win32::Foundation::TRUE, + unsafe{ windows_sys::Win32::System::Console::SetConsoleCtrlHandler(Some(sigint_handler), windows_sys::Win32::Foundation::TRUE) }); + } else { + #[allow(clippy::fn_to_numeric_cast)] + unsafe extern "C" fn sigurg_handler(_signal: libc::c_int) { + // invoke by Monitor::signal() + if let Some(s) = crate::coroutine::suspender::Suspender::<(), ()>::current() { + //删除对Monitor::signum()信号的屏蔽,使信号处理函数即使在处理中,也可以再次进入信号处理函数 + let mut current_mask: libc::sigset_t = std::mem::zeroed(); + assert_eq!( + 0, + libc::pthread_sigmask(libc::SIG_BLOCK, std::ptr::null(), &mut current_mask), + ); + assert_eq!(0, libc::sigdelset(&mut current_mask, Monitor::signum())); + assert_eq!( + 0, + libc::pthread_sigmask(libc::SIG_SETMASK, ¤t_mask, std::ptr::null_mut()) + ); + s.suspend(); + } + } + Monitor::register_handler(sigurg_handler as libc::sighandler_t); } } - Monitor::register_handler(sigurg_handler as libc::sighandler_t); + } + + fn new() -> Self { + Monitor::init_signal_handler(); //通过这种方式来初始化monitor线程 _ = MONITOR.get_or_init(|| { std::thread::Builder::new() .name("open-coroutine-monitor".to_string()) .spawn(|| { // todo pin this thread to the CPU core closest to the network card - #[cfg(target_os = "linux")] + #[cfg(any(target_os = "linux", windows))] assert!( core_affinity::set_for_current(core_affinity::CoreId { id: 0 }), "pin monitor thread to a single CPU core failed !" @@ -117,10 +136,19 @@ impl Monitor { if CoroutineState::Running == (*coroutine).get_state() { //只对陷入重度计算的协程发送信号抢占,对陷入执行系统调用的协程 //不发送信号(如果发送信号,会打断系统调用,进而降低总体性能) + #[cfg(unix)] assert_eq!( 0, libc::pthread_kill(node.get_pthread(), Monitor::signum()) ); + #[cfg(windows)] + assert_ne!( + 0, + windows_sys::Win32::System::Console::GenerateConsoleCtrlEvent( + Monitor::signum().try_into().unwrap(), + 0 + ) + ); } } } @@ -130,7 +158,13 @@ impl Monitor { pub(crate) fn add_task(time: u64, coroutine: Option<*const SchedulableCoroutine>) { unsafe { - let pthread = libc::pthread_self(); + cfg_if::cfg_if! { + if #[cfg(windows)] { + let pthread = windows_sys::Win32::System::Threading::GetCurrentThread(); + } else { + let pthread = libc::pthread_self(); + } + } Monitor::global() .task .insert(time, TaskNode::new(pthread, coroutine)); @@ -140,7 +174,13 @@ impl Monitor { pub(crate) fn clean_task(time: u64) { if let Some(entry) = Monitor::global().task.get_entry(time) { unsafe { - let pthread = libc::pthread_self(); + cfg_if::cfg_if! { + if #[cfg(windows)] { + let pthread = windows_sys::Win32::System::Threading::GetCurrentThread(); + } else { + let pthread = libc::pthread_self(); + } + } if !entry.is_empty() { _ = entry.remove(&TaskNode::new(pthread, None)); } @@ -149,12 +189,12 @@ impl Monitor { } } -#[cfg(all(test, unix))] +#[cfg(test)] mod tests { use super::*; - use std::time::Duration; #[ignore] + #[cfg(unix)] #[test] fn test() { extern "C" fn sigurg_handler(_signal: libc::c_int) { @@ -168,6 +208,30 @@ mod tests { } #[ignore] + #[cfg(windows)] + #[test] + fn test() { + unsafe extern "system" fn sigint_handler(_: u32) -> windows_sys::Win32::Foundation::BOOL { + println!("sigint handled"); + windows_sys::Win32::Foundation::TRUE + } + unsafe { + assert_eq!( + windows_sys::Win32::Foundation::TRUE, + windows_sys::Win32::System::Console::SetConsoleCtrlHandler( + Some(sigint_handler), + windows_sys::Win32::Foundation::TRUE + ) + ) + }; + let time = open_coroutine_timer::get_timeout_time(Duration::from_millis(10)); + Monitor::add_task(time, None); + std::thread::sleep(Duration::from_millis(20)); + Monitor::clean_task(time); + } + + #[ignore] + #[cfg(unix)] #[test] fn test_clean() { extern "C" fn sigurg_handler(_signal: libc::c_int) { diff --git a/open-coroutine-core/src/monitor/node.rs b/open-coroutine-core/src/monitor/node.rs index 5a1f0cb9..f89fa887 100644 --- a/open-coroutine-core/src/monitor/node.rs +++ b/open-coroutine-core/src/monitor/node.rs @@ -2,19 +2,38 @@ use crate::scheduler::SchedulableCoroutine; #[derive(Debug)] pub(crate) struct TaskNode { + #[cfg(windows)] + pthread: windows_sys::Win32::Foundation::HANDLE, + #[cfg(unix)] pthread: libc::pthread_t, coroutine: Option<*const SchedulableCoroutine>, } +#[allow(dead_code)] impl TaskNode { + #[cfg(unix)] pub fn new(pthread: libc::pthread_t, coroutine: Option<*const SchedulableCoroutine>) -> Self { TaskNode { pthread, coroutine } } + #[cfg(windows)] + pub fn new( + pthread: windows_sys::Win32::Foundation::HANDLE, + coroutine: Option<*const SchedulableCoroutine>, + ) -> Self { + TaskNode { pthread, coroutine } + } + + #[cfg(unix)] pub fn get_pthread(&self) -> libc::pthread_t { self.pthread } + #[cfg(windows)] + pub fn get_pthread(&self) -> windows_sys::Win32::Foundation::HANDLE { + self.pthread + } + pub fn get_coroutine(&self) -> Option<*const SchedulableCoroutine> { self.coroutine } diff --git a/open-coroutine-core/src/scheduler/mod.rs b/open-coroutine-core/src/scheduler/mod.rs index 5ff0d662..fba7bfca 100644 --- a/open-coroutine-core/src/scheduler/mod.rs +++ b/open-coroutine-core/src/scheduler/mod.rs @@ -140,7 +140,7 @@ impl Scheduler { Some(mut coroutine) => { _ = coroutine.set_scheduler(self); cfg_if::cfg_if! { - if #[cfg(all(unix, feature = "preemptive-schedule"))] { + if #[cfg(feature = "preemptive-schedule")] { let start = open_coroutine_timer::get_timeout_time(Duration::from_millis(10)); crate::monitor::Monitor::add_task(start, Some(&coroutine)); } @@ -175,7 +175,7 @@ impl Scheduler { _ => unreachable!("should never execute to here"), }; cfg_if::cfg_if! { - if #[cfg(all(unix, feature = "preemptive-schedule"))] { + if #[cfg(feature = "preemptive-schedule")] { //还没执行到10ms就主动yield或者执行完毕了,此时需要清理任务 //否则下一个协程执行不到10ms就会被抢占调度 crate::monitor::Monitor::clean_task(start); @@ -319,6 +319,69 @@ mod tests { scheduler.try_schedule(); } + #[cfg(windows)] + #[test] + fn simple_preemptive_schedule() -> std::io::Result<()> { + //fixme not success now + use std::sync::{Arc, Condvar, Mutex}; + static mut TEST_FLAG0: bool = true; + let pair = Arc::new((Mutex::new(true), Condvar::new())); + let pair2 = Arc::clone(&pair); + let handler = std::thread::Builder::new() + .name("test_preemptive_schedule".to_string()) + .spawn(move || { + let scheduler = Box::leak(Box::new(Scheduler::new())); + _ = scheduler.submit( + |_, _| { + unsafe { + while TEST_FLAG0 { + windows_sys::Win32::System::Threading::Sleep(10); + } + } + 1 + }, + None, + ); + _ = scheduler.submit( + |_, _| { + unsafe { TEST_FLAG0 = false }; + 2 + }, + None, + ); + scheduler.try_schedule(); + + let (lock, cvar) = &*pair2; + let mut pending = lock.lock().unwrap(); + *pending = false; + // notify the condvar that the value has changed. + cvar.notify_one(); + }) + .expect("failed to spawn thread"); + + // wait for the thread to start up + let (lock, cvar) = &*pair; + let result = cvar + .wait_timeout_while( + lock.lock().unwrap(), + Duration::from_millis(3000), + |&mut pending| pending, + ) + .unwrap(); + if result.1.timed_out() { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "preemptive schedule failed", + )) + } else { + unsafe { + handler.join().unwrap(); + assert!(!TEST_FLAG0); + } + Ok(()) + } + } + #[cfg(all(unix, feature = "preemptive-schedule"))] #[test] fn preemptive_schedule() -> std::io::Result<()> {