diff --git a/ylong_ffrt/src/sys_event.rs b/ylong_ffrt/src/sys_event.rs index c44deaa3524bff23a4a6da872f5a42dba7cdbfd6..a314b2f3377197936918c1525a5bb343ee33941b 100644 --- a/ylong_ffrt/src/sys_event.rs +++ b/ylong_ffrt/src/sys_event.rs @@ -18,6 +18,7 @@ use libc::{c_int, c_uint, c_void}; type FfrtSysEventHandleT = *mut c_void; type DestroyFunc = extern "C" fn(*mut c_void); type FfrtFdCallBack = extern "C" fn(*const c_void, c_uint); +type TimerFunc = extern "C" fn() -> c_int; #[link(name = "ffrt")] // sys_event.h @@ -38,4 +39,8 @@ extern "C" { /// Deregisters the fd from ffrt's epoll. pub fn ffrt_poller_deregister(fd: c_int) -> c_int; + + pub fn ffrt_poller_register_timerfunc(func: TimerFunc) -> bool; + + pub fn ffrt_poller_wakeup(); } diff --git a/ylong_runtime/src/builder/mod.rs b/ylong_runtime/src/builder/mod.rs index 4ff5bd206a035cf852cb1a2a8bab9e9cc1914dac..7e420575b4e0c8ebdc60dac68cf0ed6ccf5077f4 100644 --- a/ylong_runtime/src/builder/mod.rs +++ b/ylong_runtime/src/builder/mod.rs @@ -30,10 +30,7 @@ pub(crate) mod current_thread_builder; pub(crate) mod multi_thread_builder; use std::fmt::Debug; -use std::io; use std::sync::Arc; -#[cfg(all(any(feature = "time", feature = "net"), feature = "ffrt"))] -use std::sync::Once; #[cfg(feature = "current_thread_runtime")] pub use current_thread_builder::CurrentThreadBuilder; @@ -42,10 +39,9 @@ pub use multi_thread_builder::MultiThreadBuilder; pub(crate) use crate::builder::common_builder::CommonBuilder; use crate::error::ScheduleError; use crate::executor::blocking_pool::BlockPoolSpawner; -#[cfg(all(feature = "time", feature = "ffrt"))] -use crate::executor::netpoller::NetLooper; cfg_not_ffrt!( + use std::io; use crate::executor::async_pool::AsyncPoolSpawner; ); @@ -138,14 +134,6 @@ pub(crate) fn initialize_async_spawner( Ok(async_spawner) } -#[cfg(feature = "ffrt")] -pub(crate) fn initialize_ffrt_spawner(_builder: &MultiThreadBuilder) -> io::Result<()> { - // initialize reactor - #[cfg(any(feature = "time", feature = "net"))] - initialize_reactor()?; - Ok(()) -} - pub(crate) fn initialize_blocking_spawner( builder: &CommonBuilder, ) -> Result { @@ -154,16 +142,6 @@ pub(crate) fn initialize_blocking_spawner( Ok(blocking_spawner) } -#[cfg(all(feature = "time", feature = "ffrt"))] -pub(crate) fn initialize_reactor() -> io::Result<()> { - static ONCE: Once = Once::new(); - ONCE.call_once(|| { - let net_poller = NetLooper::new(); - net_poller.create_net_poller_thread(); - }); - Ok(()) -} - #[cfg(test)] mod test { use crate::builder::RuntimeBuilder; diff --git a/ylong_runtime/src/executor/driver.rs b/ylong_runtime/src/executor/driver.rs index 17f7fe1e611dac231358afa4f3380c1df194167a..08d0d20dce8bfdfb1acaa729cef9eb6a6cb477ab 100644 --- a/ylong_runtime/src/executor/driver.rs +++ b/ylong_runtime/src/executor/driver.rs @@ -110,12 +110,12 @@ impl Handle { } pub(crate) fn timer_register(&self, clock_entry: NonNull) -> Result { - let res = self.time.timer_register(clock_entry); + let res = self.time.insert(clock_entry); self.wake(); res } pub(crate) fn timer_cancel(&self, clock_entry: NonNull) { - self.time.timer_cancel(clock_entry); + self.time.cancel(clock_entry); } } diff --git a/ylong_runtime/src/executor/mod.rs b/ylong_runtime/src/executor/mod.rs index 03e18dcde99d86241a05f77c87f8295e6881f447..b2bcc92ceb68540f4b5cdb36448bc9cfd737ffa8 100644 --- a/ylong_runtime/src/executor/mod.rs +++ b/ylong_runtime/src/executor/mod.rs @@ -19,8 +19,6 @@ pub(crate) mod blocking_pool; #[cfg(feature = "current_thread_runtime")] pub(crate) mod current_thread; -#[cfg(all(any(feature = "time", feature = "net"), feature = "ffrt"))] -pub(crate) mod netpoller; use std::future::Future; use std::mem::MaybeUninit; use std::sync::Once; @@ -33,7 +31,6 @@ use crate::executor::current_thread::CurrentThreadSpawner; use crate::task::TaskBuilder; use crate::{JoinHandle, Task}; cfg_ffrt! { - use crate::builder::initialize_ffrt_spawner; use crate::ffrt::spawner::spawn; } cfg_not_ffrt! { @@ -116,11 +113,8 @@ pub(crate) fn global_default_async() -> &'static Runtime { Err(e) => panic!("initialize runtime failed: {:?}", e), }; #[cfg(feature = "ffrt")] - let runtime = match initialize_ffrt_spawner(global_builder.as_ref().unwrap()) { - Ok(()) => Runtime { - async_spawner: AsyncHandle::FfrtMultiThread, - }, - Err(e) => panic!("initialize runtime failed: {:?}", e), + let runtime = Runtime { + async_spawner: AsyncHandle::FfrtMultiThread, }; GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime); }); diff --git a/ylong_runtime/src/executor/netpoller.rs b/ylong_runtime/src/executor/netpoller.rs deleted file mode 100644 index 4bad258a8aa7d1ee39284400dd6cb85ec878f6e3..0000000000000000000000000000000000000000 --- a/ylong_runtime/src/executor/netpoller.rs +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright (c) 2023 Huawei Device Co., Ltd. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::cell::RefCell; -use std::sync::Arc; -use std::thread; - -#[cfg(feature = "time")] -use crate::time::TimeDriver; - -#[cfg(any(not(feature = "ffrt"), all(feature = "net", feature = "ffrt")))] -const NET_POLL_INTERVAL_TIME: std::time::Duration = std::time::Duration::from_millis(10); - -/// Net poller thread creation and management -#[derive(Clone)] -pub(crate) struct NetLooper { - inner: Arc, -} - -unsafe impl Send for NetLooper {} -unsafe impl Sync for NetLooper {} - -struct Inner { - join_handle: RefCell>>, -} - -impl NetLooper { - pub(crate) fn new() -> Self { - NetLooper { - inner: Arc::new(Inner { - join_handle: RefCell::new(None), - }), - } - } - - pub(crate) fn create_net_poller_thread(&self) { - // todo: now we use the default thread stack size, could be smaller - let builder = thread::Builder::new().name("yl_net_poller".to_string()); - let netpoller_handle = self.clone(); - - let result = builder.spawn(move || netpoller_handle.run()); - match result { - Ok(join_handle) => { - *self.inner.join_handle.borrow_mut() = Some(join_handle); - } - Err(e) => panic!("os cannot spawn the monitor thread: {}", e), - } - } - - fn run(&self) { - loop { - // run time driver - #[cfg(feature = "time")] - TimeDriver::get_ref().run(); - - thread::sleep(NET_POLL_INTERVAL_TIME); - } - } -} diff --git a/ylong_runtime/src/time/driver.rs b/ylong_runtime/src/time/driver.rs index dd3bb628bf6533201c9a637126d0824761072538..6b66c9af16aa74682bc8ef8d831bbd0b667f7697 100644 --- a/ylong_runtime/src/time/driver.rs +++ b/ylong_runtime/src/time/driver.rs @@ -24,6 +24,7 @@ use crate::time::Clock; cfg_ffrt! { use std::mem::MaybeUninit; use std::sync::Once; + use libc::c_int; } // Time Driver @@ -36,6 +37,11 @@ pub(crate) struct TimeHandle { inner: Arc, } +#[cfg(feature = "ffrt")] +extern "C" fn ffrt_time_run() -> c_int { + TimeDriver::get_driver_ref().run().map(|time| time.as_millis() as c_int).unwrap_or(-1) +} + impl TimeDriver { #[cfg(not(feature = "ffrt"))] pub(crate) fn initialize() -> (TimeHandle, Arc) { @@ -52,7 +58,7 @@ impl TimeDriver { } #[cfg(feature = "ffrt")] - pub(crate) fn get_ref() -> &'static Self { + pub(crate) fn get_driver_ref() -> &'static Self { static mut DRIVER: MaybeUninit = MaybeUninit::uninit(); static ONCE: Once = Once::new(); @@ -68,16 +74,42 @@ impl TimeDriver { } } + #[cfg(feature = "ffrt")] + pub(crate) fn get_ref() -> &'static Self { + let driver = TimeDriver::get_driver_ref(); + static FFRT_ONCE: Once = Once::new(); + unsafe { + FFRT_ONCE.call_once(|| { + ylong_ffrt::ffrt_poller_register_timerfunc(ffrt_time_run); + }); + } + driver + } + + #[cfg(feature = "ffrt")] + pub(crate) fn timer_register(&self, clock_entry: NonNull) -> Result { + let res = self.insert(clock_entry); + unsafe { + ylong_ffrt::ffrt_poller_wakeup(); + } + res + } + + #[cfg(feature = "ffrt")] + pub(crate) fn timer_cancel(&self, clock_entry: NonNull) { + self.cancel(clock_entry); + } + pub(crate) fn start_time(&self) -> Instant { self.start_time } - pub(crate) fn timer_register(&self, clock_entry: NonNull) -> Result { + pub(crate) fn insert(&self, clock_entry: NonNull) -> Result { let mut lock = self.wheel.lock().unwrap(); lock.insert(clock_entry) } - pub(crate) fn timer_cancel(&self, clock_entry: NonNull) { + pub(crate) fn cancel(&self, clock_entry: NonNull) { let mut lock = self.wheel.lock().unwrap(); lock.cancel(clock_entry); } @@ -93,6 +125,7 @@ impl TimeDriver { let mut waker_list: [Option; 32] = Default::default(); let mut waker_idx = 0; + let mut is_wake = false; let mut lock = self.wheel.lock().unwrap(); @@ -103,6 +136,7 @@ impl TimeDriver { TimeOut::ClockEntry(mut clock_entry) => { let elapsed = lock.elapsed(); lock.set_last_elapsed(elapsed); + is_wake = true; // Unsafe access to timer_handle is only unsafe when Sleep Drop, // but does not let `Sleep` go to `Ready` before access to timer_handle fetched @@ -133,6 +167,9 @@ impl TimeDriver { for waker in waker_list[0..waker_idx].iter_mut() { waker.take().unwrap().wake(); } + if is_wake { + timeout = Some(Duration::new(0 ,0)); + } timeout } }