From a32f6efd2dbc6e885cede8995b4a579ac997ae66 Mon Sep 17 00:00:00 2001 From: MingyuChen Date: Thu, 18 Apr 2024 11:48:31 +0800 Subject: [PATCH] =?UTF-8?q?FFRT=20poller=E6=8E=A5=E5=8F=A3=E5=8F=98?= =?UTF-8?q?=E6=9B=B4=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: MingyuChen --- ylong_ffrt/src/lib.rs | 1 + ylong_ffrt/src/sys_event.rs | 229 +++++++++++++++++-- ylong_ffrt/src/task.rs | 7 + ylong_runtime/src/ffrt/ffrt_timer.rs | 10 +- ylong_runtime/src/net/driver.rs | 7 +- ylong_runtime/src/sync/mpsc/bounded/mod.rs | 11 +- ylong_runtime/src/sync/mpsc/unbounded/mod.rs | 11 +- ylong_runtime/src/time/sleep.rs | 2 +- 8 files changed, 233 insertions(+), 45 deletions(-) diff --git a/ylong_ffrt/src/lib.rs b/ylong_ffrt/src/lib.rs index 406058e..f578af7 100644 --- a/ylong_ffrt/src/lib.rs +++ b/ylong_ffrt/src/lib.rs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg(target_os = "linux")] //! A FFI crate for FFRT runtime. mod config; diff --git a/ylong_ffrt/src/sys_event.rs b/ylong_ffrt/src/sys_event.rs index a0974dc..1f88040 100644 --- a/ylong_ffrt/src/sys_event.rs +++ b/ylong_ffrt/src/sys_event.rs @@ -11,52 +11,233 @@ // See the License for the specific language governing permissions and // limitations under the License. -use libc::{c_int, c_uchar, c_uint, c_ulonglong, c_void}; +use std::collections::HashMap; +use std::mem::MaybeUninit; +use std::ptr::null; +use std::sync::{Arc, Mutex, Once}; -// Unstable interface, rust encapsulation temporarily not provided +use libc::{c_int, c_uchar, c_uint, c_ulonglong, c_void, EPOLL_CTL_ADD, EPOLL_CTL_DEL}; + +use crate::{ffrt_this_task_get_qos, Qos}; -type FfrtSysEventHandleT = *mut c_void; -type DestroyFunc = extern "C" fn(*mut c_void); -type FfrtFdCallBack = extern "C" fn(*const c_void, c_uint, c_uchar); +// Unstable interface, rust encapsulation temporarily not provided +type FfrtFdCallBack = extern "C" fn(*mut c_void, c_uint); type FfrtExecHook = extern "C" fn(*mut c_void); type FfrtTimerHandle = *mut c_void; +/// Ffrt Timer query result +#[repr(C)] +pub enum FfrtTimerQueryT { + /// cannot find the timer in the poller + FfrtTimerNotfound = -1, + /// the timer is not expired + FfrtTimerNotExecuted = 0, + /// the timer is expired + FfrtTimerExecuted = 1, +} + +struct FdRegisterMap { + map: Arc>>, +} + +impl FdRegisterMap { + fn get_instance() -> &'static FdRegisterMap { + static mut FD_REGISTER_MAP: MaybeUninit = MaybeUninit::uninit(); + static ONCE: Once = Once::new(); + + unsafe { + ONCE.call_once(|| { + let global = FdRegisterMap { + map: Arc::new(Mutex::new(HashMap::new())), + }; + FD_REGISTER_MAP = MaybeUninit::new(global); + }); + &*FD_REGISTER_MAP.as_ptr() + } + } +} + +struct TimerReigsterMap { + map: Arc>>, +} + +impl TimerReigsterMap { + fn get_instance() -> &'static TimerReigsterMap { + static mut TIMER_REGISTER_MAP: MaybeUninit = MaybeUninit::uninit(); + static ONCE: Once = Once::new(); + + unsafe { + ONCE.call_once(|| { + let global = TimerReigsterMap { + map: Arc::new(Mutex::new(HashMap::new())), + }; + TIMER_REGISTER_MAP = MaybeUninit::new(global); + }); + &*TIMER_REGISTER_MAP.as_ptr() + } + } +} + +/// Registers the fd to ffrt's epoll. Callback will be called when io events +/// arrived. +/// +/// # Panics +/// This function panics when epoll register fails +pub unsafe fn ffrt_poller_register( + fd: c_int, + events: c_uint, + data: *const c_void, + callback: FfrtFdCallBack, +) { + let qos = ffrt_this_task_get_qos(); + let ret = ffrt_epoll_ctl(qos, EPOLL_CTL_ADD, fd, events, data, callback); + if ret == 0 { + let fd_map = FdRegisterMap::get_instance(); + let mut map = fd_map.map.lock().unwrap(); + map.insert(fd, qos); + } else { + panic!( + "ffrt poller register failed! error: {:?}, ret: {}", + std::io::Error::last_os_error(), + ret + ); + } +} + +/// Deregisters the fd from ffrt's poller. +/// +/// # Panics +/// This function panics when the global FD map doesn't contain the fd or epoll +/// deregister fails. +pub unsafe fn ffrt_poller_deregister(fd: c_int) { + let qos = { + let fd_map = FdRegisterMap::get_instance(); + let mut map = fd_map.map.lock().unwrap(); + map.remove(&fd).expect(&format!( + "ffrt poller deregister fd {} which has not been registered", + fd + )) + }; + + extern "C" fn ffrt_callback(_data: *mut c_void, _ready: c_uint) {} + + let ret = ffrt_epoll_ctl(qos, EPOLL_CTL_DEL, fd, 0, null(), ffrt_callback); + + if ret != 0 { + panic!( + "ffrt poller deregister failed! error: {:?}, ret: {}", + std::io::Error::last_os_error(), + ret + ); + } +} + +/// Gets the number of times ffrt's poller has been polled. +pub unsafe fn ffrt_poller_get_tick() -> u8 { + let qos = ffrt_this_task_get_qos(); + ffrt_epoll_get_count(qos) +} + +/// Starts a ffrt timer +pub unsafe fn ffrt_timer_register( + duration: c_ulonglong, + waker: *mut c_void, + callback: FfrtExecHook, +) -> FfrtTimerHandle { + let qos = ffrt_this_task_get_qos(); + let handle = ffrt_timer_start(qos, duration, waker, callback, false); + if handle.is_null() { + panic!("ffrt timer register failed!"); + } + let timer_map = TimerReigsterMap::get_instance(); + let mut map = timer_map.map.lock().unwrap(); + map.insert(handle as usize, qos); + handle +} + +/// Deregisters a ffrt timer +pub unsafe fn ffrt_timer_deregister(handle: FfrtTimerHandle) { + let qos = { + let timer_map = TimerReigsterMap::get_instance(); + let mut map = timer_map.map.lock().unwrap(); + map.remove(&(handle as usize)) + .expect("ffrt timer deregister a timer which has not been registered") + }; + if ffrt_timer_stop(qos, handle) == -1 { + panic!("ffrt timer deregister failed!"); + } +} + +/// Wakes up the worker currently holding the poller +pub unsafe fn ffrt_poller_worker_wakeup() { + let qos = ffrt_this_task_get_qos(); + ffrt_poller_wakeup(qos); +} + +/// Checks if the timer has expired +pub unsafe fn ffrt_is_timer_finished(handle: FfrtTimerHandle) -> bool { + let qos = { + let timer_map = TimerReigsterMap::get_instance(); + let map = timer_map.map.lock().unwrap(); + map.get(&(handle as usize)) + .expect("ffrt query a timer that has not been registered") + .clone() + }; + match ffrt_timer_query(qos, handle) { + FfrtTimerQueryT::FfrtTimerNotfound => panic!("ffrt query a timer that cannot be found"), + FfrtTimerQueryT::FfrtTimerNotExecuted => false, + FfrtTimerQueryT::FfrtTimerExecuted => true, + } +} + #[link(name = "ffrt")] -// sys_event.h extern "C" { - #![allow(unused)] - fn ffrt_sys_event_create(ty: c_int, fd: usize, filter: usize) -> FfrtSysEventHandleT; - fn ffrt_sys_event_wait(event: FfrtSysEventHandleT, sec: i64) -> c_int; - fn ffrt_sys_event_destroy(event: FfrtSysEventHandleT, func: DestroyFunc, arg: *mut c_void); - - /// Registers the fd to ffrt's epoll. Callback will be called when io events - /// arrived. - pub fn ffrt_poller_register( + /// Controls ffrt's epoll + fn ffrt_epoll_ctl( + qos: Qos, + op: c_int, fd: c_int, events: c_uint, data: *const c_void, callback: FfrtFdCallBack, ) -> c_int; - /// Deregisters the fd from ffrt's epoll. - pub fn ffrt_poller_deregister(fd: c_int) -> c_int; - - /// Registers a timer to ffrt's timer poller. Callback will be called when - /// timer events arrived. - pub fn ffrt_timer_start( + /// @brief Start a timer on ffrt worker + /// + /// @param qos Indicates qos of the worker that runs timer. + /// @param timeout Indicates the number of milliseconds that specifies + /// timeout. @param data Indicates user data used in cb. + /// @param cb Indicates user cb which will be executed when timeout. + /// @param repeat Indicates whether to repeat this timer. + /// @return Returns a timer handle. + /// @since 12 + /// @version 1.0 + fn ffrt_timer_start( + qos: Qos, duration: c_ulonglong, waker: *mut c_void, callback: FfrtExecHook, + repeat: bool, ) -> FfrtTimerHandle; - /// Deregisters the timer from ffrt's timer poller - pub fn ffrt_timer_stop(handle: FfrtTimerHandle); + /// @brief Stop a target timer on ffrt worker + /// + /// @param qos Indicates qos of the worker that runs timer. + /// @param handle Indicates the target timer handle. + /// @return Returns 0 if success; + /// returns -1 otherwise. + /// @since 12 + /// @version 1.0 + fn ffrt_timer_stop(qos: Qos, handle: FfrtTimerHandle) -> c_int; /// Checks whether the timer has expired. A returned value of 1 indicates /// the timer has reached its deadline, otherwise, the timer has not expired /// yet. - pub fn ffrt_timer_query(handle: FfrtTimerHandle) -> c_int; + fn ffrt_timer_query(qos: Qos, handle: FfrtTimerHandle) -> FfrtTimerQueryT; /// Wakes up the poller to poll timer/io events. - pub fn ffrt_poller_wakeup(); + fn ffrt_poller_wakeup(qos: Qos); + + /// Gets the number of times the poller has been polled. + fn ffrt_epoll_get_count(qos: Qos) -> c_uchar; } diff --git a/ylong_ffrt/src/task.rs b/ylong_ffrt/src/task.rs index 7d648e0..76e6c67 100644 --- a/ylong_ffrt/src/task.rs +++ b/ylong_ffrt/src/task.rs @@ -133,4 +133,11 @@ extern "C" { /// Wakes the task pub fn ffrt_wake_coroutine(task: RawTaskCtx); + + /// @brief Obtains the qos of this task. + /// + /// @return Returns the task qos. + /// @since 12 + /// @version 1.0 + pub fn ffrt_this_task_get_qos() -> Qos; } diff --git a/ylong_runtime/src/ffrt/ffrt_timer.rs b/ylong_runtime/src/ffrt/ffrt_timer.rs index ada81ff..71b0b75 100644 --- a/ylong_runtime/src/ffrt/ffrt_timer.rs +++ b/ylong_runtime/src/ffrt/ffrt_timer.rs @@ -30,19 +30,19 @@ impl FfrtTimerEntry { let data = waker as *mut c_void; unsafe { - let ptr = ylong_ffrt::ffrt_timer_start(dur, data, timer_wake_hook); - ylong_ffrt::ffrt_poller_wakeup(); + let ptr = ylong_ffrt::ffrt_timer_register(dur, data, timer_wake_hook); + ylong_ffrt::ffrt_poller_worker_wakeup(); FfrtTimerEntry(ptr) } } - pub(crate) fn result(&self) -> bool { - unsafe { ylong_ffrt::ffrt_timer_query(self.0) == 1 } + pub(crate) fn is_finished(&self) -> bool { + unsafe { ylong_ffrt::ffrt_is_timer_finished(self.0) } } pub(crate) fn timer_deregister(&self) { unsafe { - ylong_ffrt::ffrt_timer_stop(self.0); + ylong_ffrt::ffrt_timer_deregister(self.0); } } } diff --git a/ylong_runtime/src/net/driver.rs b/ylong_runtime/src/net/driver.rs index 95af324..9437a81 100644 --- a/ylong_runtime/src/net/driver.rs +++ b/ylong_runtime/src/net/driver.rs @@ -26,7 +26,7 @@ use crate::util::slab::{Address, Ref, Slab}; cfg_ffrt! { #[cfg(all(feature = "signal", target_os = "linux"))] use crate::signal::unix::SignalDriver; - use libc::{c_void, c_int, c_uint, c_uchar}; + use libc::{c_void, c_int, c_uint}; } cfg_not_ffrt! { @@ -318,7 +318,7 @@ impl IoDriver { } #[cfg(all(feature = "ffrt", feature = "signal", target_os = "linux"))] -extern "C" fn ffrt_dispatch_signal_event(data: *const c_void, _ready: c_uint, _new_tick: c_uchar) { +extern "C" fn ffrt_dispatch_signal_event(data: *mut c_void, _ready: c_uint) { let token = Token::from_usize(data as usize); if token == SIGNAL_TOKEN { SignalDriver::get_mut_ref().broadcast(); @@ -328,11 +328,12 @@ extern "C" fn ffrt_dispatch_signal_event(data: *const c_void, _ready: c_uint, _n } #[cfg(feature = "ffrt")] -extern "C" fn ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar) { +extern "C" fn ffrt_dispatch_event(data: *mut c_void, ready: c_uint) { const COMPACT_INTERVAL: u8 = 255; let driver = IoDriver::get_mut_ref(); + let new_tick = unsafe { ylong_ffrt::ffrt_poller_get_tick() }; if new_tick == COMPACT_INTERVAL && driver.tick != new_tick { unsafe { driver.resources.as_mut().unwrap().compact(); diff --git a/ylong_runtime/src/sync/mpsc/bounded/mod.rs b/ylong_runtime/src/sync/mpsc/bounded/mod.rs index 84da091..d585aca 100644 --- a/ylong_runtime/src/sync/mpsc/bounded/mod.rs +++ b/ylong_runtime/src/sync/mpsc/bounded/mod.rs @@ -390,9 +390,9 @@ impl BoundedReceiver { /// * `Poll::Pending` if no messages in the channel now, but the channel is /// not closed. /// * `Poll::Ready(Ok(T))` if receiving a value successfully. - /// * `Poll::Ready(Err(RecvError))` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All + /// senders have been dropped or the channel is closed. 2. No messages + /// remaining. /// /// # Examples /// @@ -420,9 +420,8 @@ impl BoundedReceiver { /// /// # Return value /// * `Ok(T)` if receiving a value successfully. - /// * `Err(RecvError)` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Err(RecvError)` in the following situations: 1. All senders have been + /// dropped or the channel is closed. 2. No messages remaining. /// /// # Examples /// diff --git a/ylong_runtime/src/sync/mpsc/unbounded/mod.rs b/ylong_runtime/src/sync/mpsc/unbounded/mod.rs index af52e7e..6da1dd8 100644 --- a/ylong_runtime/src/sync/mpsc/unbounded/mod.rs +++ b/ylong_runtime/src/sync/mpsc/unbounded/mod.rs @@ -272,9 +272,9 @@ impl UnboundedReceiver { /// * `Poll::Pending` if no messages in the channel now, but the channel is /// not closed. /// * `Poll::Ready(Ok(T))` if receiving a value successfully. - /// * `Poll::Ready(Err(RecvError))` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All + /// senders have been dropped or the channel is closed. 2. No messages + /// remaining. /// /// # Examples /// @@ -301,9 +301,8 @@ impl UnboundedReceiver { /// /// # Return value /// * `Ok(T)` if receiving a value successfully. - /// * `Err(RecvError)` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Err(RecvError)` in the following situations: 1. All senders have been + /// dropped or the channel is closed. 2. No messages remaining. /// /// # Examples /// diff --git a/ylong_runtime/src/time/sleep.rs b/ylong_runtime/src/time/sleep.rs index 687bf4f..4329f96 100644 --- a/ylong_runtime/src/time/sleep.rs +++ b/ylong_runtime/src/time/sleep.rs @@ -156,7 +156,7 @@ cfg_ffrt!( // this unwrap is safe since we have already insert the timer into the entry let timer = this.inner.timer.as_ref().unwrap(); - if timer.result() { + if timer.is_finished() { Poll::Ready(()) } else { Poll::Pending -- Gitee