From 12b6a883a807d7696b5b1180238cf05b3ad680d0 Mon Sep 17 00:00:00 2001 From: MingyuChen Date: Tue, 4 Jul 2023 19:49:21 +0800 Subject: [PATCH] ffrt io poller adaptation Signed-off-by: MingyuChen --- ylong_ffrt/src/lib.rs | 4 +- ylong_ffrt/src/sys_event.rs | 14 +- ylong_ffrt/src/task.rs | 14 +- ylong_io/src/source.rs | 9 + ylong_io/src/sys/linux/tcp/listener.rs | 11 +- ylong_io/src/sys/linux/tcp/stream.rs | 11 +- ylong_io/src/sys/linux/udp/udp_socket.rs | 50 ++--- ylong_io/src/sys/windows/tcp/listener.rs | 15 +- ylong_io/src/sys/windows/tcp/stream.rs | 5 + ylong_io/src/sys/windows/udp/udp_socket.rs | 9 + ylong_runtime/Cargo.toml | 1 + ylong_runtime/src/builder/mod.rs | 3 +- ylong_runtime/src/executor/mod.rs | 3 +- ylong_runtime/src/executor/netpoller.rs | 12 -- ylong_runtime/src/iter/core.rs | 2 +- ylong_runtime/src/lib.rs | 22 --- ylong_runtime/src/macros.rs | 22 +++ ylong_runtime/src/net/async_source.rs | 42 ++++- ylong_runtime/src/net/driver.rs | 210 ++++++++++++++------- ylong_runtime/src/net/mod.rs | 6 +- ylong_runtime/src/net/ready.rs | 51 ++++- ylong_runtime/src/task/raw.rs | 3 +- ylong_runtime/src/task/state.rs | 2 +- ylong_runtime/src/task/task_handle.rs | 10 +- 24 files changed, 354 insertions(+), 177 deletions(-) diff --git a/ylong_ffrt/src/lib.rs b/ylong_ffrt/src/lib.rs index b4187f7..ee57a10 100644 --- a/ylong_ffrt/src/lib.rs +++ b/ylong_ffrt/src/lib.rs @@ -41,8 +41,6 @@ use libc::{c_int, c_void}; pub enum Qos { /// Inherits parent's qos level Inherent = -1, - /// Unspecific qos - Unspecific, /// Lowest qos Background, /// Utility qos @@ -51,6 +49,8 @@ pub enum Qos { Default, /// User initialiated qos UserInitiated, + /// Deadline qos + DeadlineRequest, /// Highest qos UserInteractive, } diff --git a/ylong_ffrt/src/sys_event.rs b/ylong_ffrt/src/sys_event.rs index 8590851..63ba6e5 100644 --- a/ylong_ffrt/src/sys_event.rs +++ b/ylong_ffrt/src/sys_event.rs @@ -11,12 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use libc::{c_int, c_void}; +use libc::{c_int, c_uint, c_void}; // Unstable interface, rust encapsulation temporarily not provided type FfrtSysEventHandleT = *mut c_void; type DestroyFunc = extern "C" fn(*mut c_void); +type FfrtFdCallBack = extern "C" fn(*const c_void, c_uint); #[link(name = "ffrt")] // sys_event.h @@ -25,4 +26,15 @@ extern "C" { fn ffrt_sys_event_create(ty: c_int, fd: usize, filter: usize) -> FfrtSysEventHandleT; fn ffrt_sys_event_wait(event: FfrtSysEventHandleT, sec: i64) -> c_int; fn ffrt_sys_event_destroy(event: FfrtSysEventHandleT, func: DestroyFunc, arg: *mut c_void); + + /// Registers the fd to ffrt's epoll. Callback will be called when io events arrived. + pub fn ffrt_poller_register( + fd: c_int, + events: c_uint, + data: *const c_void, + callback: FfrtFdCallBack, + ) -> c_int; + + /// Deregisters the fd from ffrt's epoll. + pub fn ffrt_poller_deregister(fd: c_int) -> c_int; } diff --git a/ylong_ffrt/src/task.rs b/ylong_ffrt/src/task.rs index d8b7242..2a0cf3c 100644 --- a/ylong_ffrt/src/task.rs +++ b/ylong_ffrt/src/task.rs @@ -136,13 +136,13 @@ impl FfrtTaskAttr { } } -impl Drop for FfrtTaskAttr { - fn drop(&mut self) { - unsafe { - ffrt_task_attr_destroy(self as _); - } - } -} +// impl Drop for FfrtTaskAttr { +// fn drop(&mut self) { +// unsafe { +// ffrt_task_attr_destroy(self as _); +// } +// } +// } #[link(name = "ffrt")] // task.h diff --git a/ylong_io/src/source.rs b/ylong_io/src/source.rs index a23de00..fa5f2f6 100644 --- a/ylong_io/src/source.rs +++ b/ylong_io/src/source.rs @@ -14,6 +14,12 @@ use crate::{Interest, Selector, Token}; use std::io; +#[cfg(target_os = "linux")] +pub(crate) type Fd = i32; + +#[cfg(target_os = "windows")] +pub(crate) type Fd = std::os::windows::io::RawSocket; + /// Source Trait that every connection requires async polling in [`crate::Poll`] needs to implement. /// [`crate::Poll`] will asynchronously poll out connections, and handle it. pub trait Source { @@ -35,4 +41,7 @@ pub trait Source { /// Deregisters the connection from [`crate::Poll`]. fn deregister(&mut self, selector: &Selector) -> io::Result<()>; + + /// Returns the raw fd of this IO. + fn as_raw_fd(&self) -> Fd; } diff --git a/ylong_io/src/sys/linux/tcp/listener.rs b/ylong_io/src/sys/linux/tcp/listener.rs index bcf76fc..2450856 100644 --- a/ylong_io/src/sys/linux/tcp/listener.rs +++ b/ylong_io/src/sys/linux/tcp/listener.rs @@ -20,6 +20,7 @@ use std::io; use std::mem::{size_of, MaybeUninit}; use std::net::{self, SocketAddr}; use std::os::unix::io::{AsRawFd, FromRawFd}; +use crate::source::Fd; /// A socket server. pub struct TcpListener { @@ -90,7 +91,7 @@ impl Source for TcpListener { token: Token, interests: Interest, ) -> io::Result<()> { - selector.register(self.inner.as_raw_fd(), token, interests) + selector.register(self.as_raw_fd(), token, interests) } fn reregister( @@ -99,11 +100,15 @@ impl Source for TcpListener { token: Token, interests: Interest, ) -> io::Result<()> { - selector.reregister(self.inner.as_raw_fd(), token, interests) + selector.reregister(self.as_raw_fd(), token, interests) } fn deregister(&mut self, selector: &Selector) -> io::Result<()> { - selector.deregister(self.inner.as_raw_fd()) + selector.deregister(self.as_raw_fd()) + } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_fd() } } diff --git a/ylong_io/src/sys/linux/tcp/stream.rs b/ylong_io/src/sys/linux/tcp/stream.rs index 1a5811e..37d92d2 100644 --- a/ylong_io/src/sys/linux/tcp/stream.rs +++ b/ylong_io/src/sys/linux/tcp/stream.rs @@ -16,6 +16,7 @@ use crate::{Interest, Selector, Source, Token}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::{self, Shutdown, SocketAddr}; use std::os::unix::io::AsRawFd; +use crate::source::Fd; /// A non-blocking TCP Stream between a local socket and a remote socket. pub struct TcpStream { @@ -120,7 +121,7 @@ impl Source for TcpStream { token: Token, interests: Interest, ) -> io::Result<()> { - selector.register(self.inner.as_raw_fd(), token, interests) + selector.register(self.as_raw_fd(), token, interests) } fn reregister( @@ -129,10 +130,14 @@ impl Source for TcpStream { token: Token, interests: Interest, ) -> io::Result<()> { - selector.reregister(self.inner.as_raw_fd(), token, interests) + selector.reregister(self.as_raw_fd(), token, interests) } fn deregister(&mut self, selector: &Selector) -> io::Result<()> { - selector.deregister(self.inner.as_raw_fd()) + selector.deregister(self.as_raw_fd()) + } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_fd() } } diff --git a/ylong_io/src/sys/linux/udp/udp_socket.rs b/ylong_io/src/sys/linux/udp/udp_socket.rs index 413c7b7..d8bf933 100644 --- a/ylong_io/src/sys/linux/udp/udp_socket.rs +++ b/ylong_io/src/sys/linux/udp/udp_socket.rs @@ -13,6 +13,7 @@ use super::UdpSock; use crate::{Interest, Selector, Source, Token}; +use crate::source::Fd; use std::fmt::Formatter; use std::net::SocketAddr; use std::os::unix::io::AsRawFd; @@ -398,48 +399,37 @@ impl fmt::Debug for ConnectedUdpSocket { } impl Source for UdpSocket { - fn register( - &mut self, - selector: &Selector, - token: Token, - interests: Interest, - ) -> io::Result<()> { - selector.register(self.inner.as_raw_fd(), token, interests) + fn register(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + selector.register(self.as_raw_fd(), token, interests) } - fn reregister( - &mut self, - selector: &Selector, - token: Token, - interests: Interest, - ) -> io::Result<()> { - selector.reregister(self.inner.as_raw_fd(), token, interests) + fn reregister(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + selector.reregister(self.as_raw_fd(), token, interests) } fn deregister(&mut self, selector: &Selector) -> io::Result<()> { - selector.deregister(self.inner.as_raw_fd()) + selector.deregister(self.as_raw_fd()) + } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_fd() } } + impl Source for ConnectedUdpSocket { - fn register( - &mut self, - selector: &Selector, - token: Token, - interests: Interest, - ) -> io::Result<()> { - selector.register(self.inner.as_raw_fd(), token, interests) + fn register(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + selector.register(self.as_raw_fd(), token, interests) } - fn reregister( - &mut self, - selector: &Selector, - token: Token, - interests: Interest, - ) -> io::Result<()> { - selector.reregister(self.inner.as_raw_fd(), token, interests) + fn reregister(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + selector.reregister(self.as_raw_fd(), token, interests) } fn deregister(&mut self, selector: &Selector) -> io::Result<()> { - selector.deregister(self.inner.as_raw_fd()) + selector.deregister(self.as_raw_fd()) + } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_fd() } } diff --git a/ylong_io/src/sys/windows/tcp/listener.rs b/ylong_io/src/sys/windows/tcp/listener.rs index 199d38a..90ba172 100644 --- a/ylong_io/src/sys/windows/tcp/listener.rs +++ b/ylong_io/src/sys/windows/tcp/listener.rs @@ -14,6 +14,7 @@ use crate::sys::windows::tcp::TcpSocket; use crate::sys::NetState; use crate::{Interest, Selector, Source, TcpStream, Token}; +use crate::source::Fd; use std::fmt::Formatter; use std::net::SocketAddr; use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; @@ -73,14 +74,8 @@ impl TcpListener { } impl Source for TcpListener { - fn register( - &mut self, - selector: &Selector, - token: Token, - interests: Interest, - ) -> io::Result<()> { - self.state - .register(selector, token, interests, self.inner.as_raw_socket()) + fn register(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + self.state.register(selector, token, interests, self.as_raw_socket()) } fn reregister( @@ -95,6 +90,10 @@ impl Source for TcpListener { fn deregister(&mut self, _selector: &Selector) -> io::Result<()> { self.state.deregister() } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_socket() + } } impl fmt::Debug for TcpListener { diff --git a/ylong_io/src/sys/windows/tcp/stream.rs b/ylong_io/src/sys/windows/tcp/stream.rs index 55da9c0..d10f71e 100644 --- a/ylong_io/src/sys/windows/tcp/stream.rs +++ b/ylong_io/src/sys/windows/tcp/stream.rs @@ -14,6 +14,7 @@ use crate::sys::windows::tcp::TcpSocket; use crate::sys::NetState; use crate::{Interest, Selector, Source, Token}; +use crate::source::Fd; use std::fmt::Formatter; use std::io::{IoSlice, IoSliceMut, Read, Write}; use std::net::{Shutdown, SocketAddr}; @@ -298,4 +299,8 @@ impl Source for TcpStream { fn deregister(&mut self, _selector: &Selector) -> io::Result<()> { self.state.deregister() } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_socket() + } } diff --git a/ylong_io/src/sys/windows/udp/udp_socket.rs b/ylong_io/src/sys/windows/udp/udp_socket.rs index 1f7962c..a40cde7 100644 --- a/ylong_io/src/sys/windows/udp/udp_socket.rs +++ b/ylong_io/src/sys/windows/udp/udp_socket.rs @@ -14,6 +14,7 @@ use crate::sys::windows::udp::UdpSock; use crate::sys::NetState; use crate::{Interest, Selector, Source, Token}; +use crate::source::Fd; use std::fmt::Formatter; use std::net::SocketAddr; use std::os::windows::io::AsRawSocket; @@ -291,6 +292,10 @@ impl Source for UdpSocket { fn deregister(&mut self, _selector: &Selector) -> io::Result<()> { self.state.deregister() } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_socket() + } } impl Source for ConnectedUdpSocket { @@ -316,4 +321,8 @@ impl Source for ConnectedUdpSocket { fn deregister(&mut self, _selector: &Selector) -> io::Result<()> { self.state.deregister() } + + fn as_raw_fd(&self) -> Fd { + self.inner.as_raw_socket() + } } diff --git a/ylong_runtime/Cargo.toml b/ylong_runtime/Cargo.toml index ec3cac2..e958289 100644 --- a/ylong_runtime/Cargo.toml +++ b/ylong_runtime/Cargo.toml @@ -28,6 +28,7 @@ ffrt_full = [ "time", "fs", "ffrt", + "macros" ] # This feature controls the executor type runs below the runtime. diff --git a/ylong_runtime/src/builder/mod.rs b/ylong_runtime/src/builder/mod.rs index cc9a19b..e511cc1 100644 --- a/ylong_runtime/src/builder/mod.rs +++ b/ylong_runtime/src/builder/mod.rs @@ -33,7 +33,6 @@ use std::io; use std::sync::Arc; pub(crate) use crate::builder::common_builder::CommonBuilder; -use crate::cfg_not_ffrt; use crate::error::ScheduleError; use crate::executor::blocking_pool::BlockPoolSpawner; #[cfg(any(feature = "net", feature = "time"))] @@ -43,7 +42,7 @@ pub use current_thread_builder::CurrentThreadBuilder; pub use multi_thread_builder::MultiThreadBuilder; #[cfg(any(feature = "net", feature = "time"))] use std::sync::Once; -cfg_not_ffrt!( +crate::macros::cfg_not_ffrt!( use crate::executor::async_pool::AsyncPoolSpawner; ); diff --git a/ylong_runtime/src/executor/mod.rs b/ylong_runtime/src/executor/mod.rs index 8d03820..3e9ba91 100644 --- a/ylong_runtime/src/executor/mod.rs +++ b/ylong_runtime/src/executor/mod.rs @@ -23,8 +23,9 @@ pub(crate) mod current_thread; pub(crate) mod netpoller; use crate::builder::{initialize_blocking_spawner, RuntimeBuilder}; use crate::executor::blocking_pool::BlockPoolSpawner; +use crate::macros::{cfg_ffrt, cfg_not_ffrt}; use crate::task::TaskBuilder; -use crate::{cfg_ffrt, cfg_not_ffrt, JoinHandle, Task}; +use crate::{JoinHandle, Task}; use std::future::Future; use crate::builder::multi_thread_builder::GLOBAL_BUILDER; diff --git a/ylong_runtime/src/executor/netpoller.rs b/ylong_runtime/src/executor/netpoller.rs index 4ff035f..f3520f6 100644 --- a/ylong_runtime/src/executor/netpoller.rs +++ b/ylong_runtime/src/executor/netpoller.rs @@ -11,8 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(all(feature = "net", feature = "ffrt"))] -use crate::net::Driver as NetDriver; #[cfg(feature = "time")] use crate::time::Driver as TimerDriver; use std::cell::RefCell; @@ -60,20 +58,10 @@ impl NetLooper { fn run(&self) { loop { - // run io driver - // For ffrt feature, only monitor thread would access the net-driver, - // therefore this unwrap is safe. - #[cfg(all(feature = "net", feature = "ffrt"))] - NetDriver::try_get_mut() - .expect("get io driver failed") - .drive(Some(NET_POLL_INTERVAL_TIME)) - .expect("io driver running failed"); - // run time driver #[cfg(feature = "time")] TimerDriver::get_ref().run(); - #[cfg(not(feature = "ffrt"))] thread::sleep(NET_POLL_INTERVAL_TIME); } } diff --git a/ylong_runtime/src/iter/core.rs b/ylong_runtime/src/iter/core.rs index 5afea25..50d6001 100644 --- a/ylong_runtime/src/iter/core.rs +++ b/ylong_runtime/src/iter/core.rs @@ -16,8 +16,8 @@ use super::pariter::ParallelIterator; use crate::error::ScheduleError; use crate::executor::{global_default_async, AsyncHandle}; +use crate::macros::{cfg_ffrt, cfg_not_ffrt}; use crate::task::{JoinHandle, TaskBuilder}; -use crate::{cfg_ffrt, cfg_not_ffrt}; cfg_not_ffrt! { use crate::executor::{async_pool::AsyncPoolSpawner}; } diff --git a/ylong_runtime/src/lib.rs b/ylong_runtime/src/lib.rs index 381d326..e8878d1 100644 --- a/ylong_runtime/src/lib.rs +++ b/ylong_runtime/src/lib.rs @@ -89,25 +89,3 @@ where let rt = executor::global_default_async(); rt.block_on(task) } - -macro_rules! cfg_ffrt { - ($($item:item)*) => { - $( - #[cfg(feature = "ffrt")] - $item - )* - } -} - -pub(crate) use cfg_ffrt; - -macro_rules! cfg_not_ffrt { - ($($item:item)*) => { - $( - #[cfg(not(feature = "ffrt"))] - $item - )* - } -} - -pub(crate) use cfg_not_ffrt; diff --git a/ylong_runtime/src/macros.rs b/ylong_runtime/src/macros.rs index 39c7dd2..5bd89ba 100644 --- a/ylong_runtime/src/macros.rs +++ b/ylong_runtime/src/macros.rs @@ -21,3 +21,25 @@ macro_rules! cfg_io { } pub(crate) use cfg_io; + +macro_rules! cfg_ffrt { + ($($item:item)*) => { + $( + #[cfg(feature = "ffrt")] + $item + )* + } +} + +pub(crate) use cfg_ffrt; + +macro_rules! cfg_not_ffrt { + ($($item:item)*) => { + $( + #[cfg(not(feature = "ffrt"))] + $item + )* + } +} + +pub(crate) use cfg_not_ffrt; diff --git a/ylong_runtime/src/net/async_source.rs b/ylong_runtime/src/net/async_source.rs index 15400ed..27b9758 100644 --- a/ylong_runtime/src/net/async_source.rs +++ b/ylong_runtime/src/net/async_source.rs @@ -49,9 +49,8 @@ impl AsyncSource { /// # Error /// /// If no reactor is found or fd registration fails, an error will be returned. - /// + #[cfg(not(feature = "ffrt"))] pub fn new(mut io: E, interest: Option) -> io::Result> { - #[cfg(not(feature = "ffrt"))] let inner = { let context = get_current_ctx() .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "get_current_ctx() fail"))?; @@ -60,9 +59,28 @@ impl AsyncSource { WorkerContext::Curr(ctx) => &ctx.handle, } }; - #[cfg(feature = "ffrt")] + + let interest = interest.unwrap_or_else(|| Interest::WRITABLE.add(Interest::READABLE)); + let entry = inner.register_source(&mut io, interest)?; + Ok(AsyncSource { + io: Some(io), + entry, + }) + } + + /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource` object is created, + /// it's fd will be registered into runtime's reactor. + /// + /// If `interest` passed in is None, the interested event for fd registration will be both + /// readable and writable. + /// + /// # Error + /// + /// If no reactor is found or fd registration fails, an error will be returned. + #[cfg(feature = "ffrt")] + pub fn new(mut io: E, interest: Option) -> io::Result> { let inner = crate::net::Handle::get_ref(); - // let inner = Driver::inner(); + let interest = interest.unwrap_or_else(|| Interest::WRITABLE.add(Interest::READABLE)); let entry = inner.register_source(&mut io, interest)?; Ok(AsyncSource { @@ -221,10 +239,10 @@ impl Deref for AsyncSource { } // Deregisters fd when the `AsyncSource` object get dropped. +#[cfg(not(feature = "ffrt"))] impl Drop for AsyncSource { fn drop(&mut self) { if let Some(mut io) = self.io.take() { - #[cfg(not(feature = "ffrt"))] let inner = { let context = get_current_ctx().expect("AsyncSource drop get_current_ctx() fail"); match context { @@ -232,9 +250,19 @@ impl Drop for AsyncSource { WorkerContext::Curr(ctx) => &ctx.handle, } }; - #[cfg(feature = "ffrt")] - let inner = crate::net::Handle::get_ref(); let _ = inner.deregister_source(&mut io); } } } + +// Deregisters fd when the `AsyncSource` object get dropped. +#[cfg(feature = "ffrt")] +impl Drop for AsyncSource { + fn drop(&mut self) { + if let Some(io) = self.io.take() { + unsafe { + ylong_ffrt::ffrt_poller_deregister(io.as_raw_fd() as libc::c_int); + } + } + } +} diff --git a/ylong_runtime/src/net/driver.rs b/ylong_runtime/src/net/driver.rs index a44b63d..569f2b9 100644 --- a/ylong_runtime/src/net/driver.rs +++ b/ylong_runtime/src/net/driver.rs @@ -11,19 +11,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::cfg_ffrt; +use crate::macros::{cfg_ffrt, cfg_not_ffrt}; use crate::net::{Ready, ScheduleIO, Tick}; use crate::util::bit::{Bit, Mask}; use crate::util::slab::{Address, Ref, Slab}; use std::io; use std::ops::Deref; use std::sync::{Arc, Mutex}; -use std::time::Duration; -use ylong_io::{EventTrait, Events, Interest, Poll, Source, Token}; +use ylong_io::{Interest, Source, Token}; + +cfg_ffrt! { + use libc::{c_void, c_int, c_uint}; +} + +cfg_not_ffrt! { + use ylong_io::{Events, Poll}; + use std::time::Duration; + + const EVENTS_MAX_CAPACITY: usize = 1024; + const WAKE_TOKEN: Token = Token(1 << 31); +} const DRIVER_TICK_INIT: u8 = 0; -const EVENTS_MAX_CAPACITY: usize = 1024; // Token structure // | reserved | generation | address | @@ -33,21 +43,21 @@ const EVENTS_MAX_CAPACITY: usize = 1024; const GENERATION: Mask = Mask::new(7, 24); const ADDRESS: Mask = Mask::new(24, 0); -const WAKE_TOKEN: Token = Token(1 << 31); - /// IO reactor that listens to fd events and wakes corresponding tasks. pub(crate) struct Driver { /// Stores every IO source that is ready resources: Option>, - /// Stores IO events that need to be handled - events: Option, - /// Counter used for slab struct to compact tick: u8, /// Used for epoll + #[cfg(not(feature = "ylong_ffrt"))] poll: Arc, + + /// Stores IO events that need to be handled + #[cfg(not(feature = "ylong_ffrt"))] + events: Option, } pub(crate) struct Handle { @@ -58,9 +68,8 @@ pub(crate) struct Handle { cfg_ffrt!( use std::mem::MaybeUninit; - static mut DRIVER: MaybeUninit> = MaybeUninit::uninit(); + static mut DRIVER: MaybeUninit = MaybeUninit::uninit(); static mut HANDLE: MaybeUninit = MaybeUninit::uninit(); - use std::sync::MutexGuard; ); #[cfg(feature = "ffrt")] @@ -108,11 +117,40 @@ pub(crate) struct Inner { allocator: Slab, /// Used to register fd + #[cfg(not(feature = "ylong_ffrt"))] registry: Arc, } impl Driver { - #[cfg(not(feature = "ffrt"))] + /// IO dispatch function. Wakes the task through the token getting from the epoll events. + fn dispatch(&mut self, token: Token, ready: Ready) { + let addr_bit = Bit::from_usize(token.0); + let addr = addr_bit.get_by_mask(ADDRESS); + + let io = match self + .resources + .as_mut() + .unwrap() + .get(Address::from_usize(addr)) + { + Some(io) => io, + None => return, + }; + + if io + .set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready) + .is_err() + { + return; + } + + // Wake the io task + io.wake(ready) + } +} + +#[cfg(not(feature = "ffrt"))] +impl Driver { pub(crate) fn initialize() -> (Arc, Arc>) { let poll = Poll::new().unwrap(); let waker = @@ -140,44 +178,13 @@ impl Driver { ) } - #[cfg(feature = "ffrt")] - fn initialize() { - static ONCE: std::sync::Once = std::sync::Once::new(); - ONCE.call_once(|| unsafe { - let poll = Poll::new().unwrap(); - let arc_poll = Arc::new(poll); - let events = Events::with_capacity(EVENTS_MAX_CAPACITY); - let slab = Slab::new(); - let allocator = slab.handle(); - let inner = Arc::new(Inner { - resources: Mutex::new(None), - allocator, - registry: arc_poll.clone(), - }); - - let driver = Driver { - resources: Some(slab), - events: Some(events), - tick: DRIVER_TICK_INIT, - poll: arc_poll, - }; - HANDLE = MaybeUninit::new(Handle::new(inner)); - DRIVER = MaybeUninit::new(Mutex::new(driver)); - }); - } - - /// Initializes the single instance IO driver. - #[cfg(feature = "ffrt")] - pub(crate) fn try_get_mut() -> Option> { - Driver::initialize(); - unsafe { &*DRIVER.as_ptr() }.try_lock().ok() - } - /// Runs the driver. This method will blocking wait for fd events to come in and then /// wakes the corresponding tasks through the events. /// /// In linux environment, the driver uses epoll. pub(crate) fn drive(&mut self, time_out: Option) -> io::Result { + use ylong_io::EventTrait; + // For every 255 ticks, cleans the redundant entries inside the slab const COMPACT_INTERVAL: u8 = 255; @@ -217,42 +224,72 @@ impl Driver { self.events = Some(events); Ok(has_events) } +} - /// IO dispatch function. Wakes the task through the token getting from the epoll events. - fn dispatch(&mut self, token: Token, ready: Ready) { - let addr_bit = Bit::from_usize(token.0); - let addr = addr_bit.get_by_mask(ADDRESS); +#[cfg(feature = "ffrt")] +impl Driver { + fn initialize() { + static ONCE: std::sync::Once = std::sync::Once::new(); + ONCE.call_once(|| unsafe { + let slab = Slab::new(); + let allocator = slab.handle(); + let inner = Arc::new(Inner { + resources: Mutex::new(None), + allocator, + }); - let io = match self - .resources - .as_mut() - .unwrap() - .get(Address::from_usize(addr)) - { - Some(io) => io, - None => return, - }; + let driver = Driver { + resources: Some(slab), + tick: DRIVER_TICK_INIT, + }; + HANDLE = MaybeUninit::new(Handle::new(inner)); + DRIVER = MaybeUninit::new(driver); + }); + } - if io - .set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready) - .is_err() - { - return; + /// Initializes the single instance IO driver. + pub(crate) fn get_mut_ref() -> &'static mut Driver { + Driver::initialize(); + unsafe { + &mut *DRIVER.as_mut_ptr() } + } +} - // Wake the io task - io.wake(ready) +#[cfg(feature = "ffrt")] +extern "C" fn ffrt_dispatch_event(data: *const c_void, ready: c_uint) { + const COMPACT_INTERVAL: u8 = 255; + + let driver = Driver::get_mut_ref(); + driver.tick = driver.tick.wrapping_add(1); + if driver.tick == COMPACT_INTERVAL { + unsafe { + driver.resources.as_mut().unwrap().compact(); + } } + + let token = Token::from_usize(data as usize); + let ready = crate::net::ready::from_event_inner(ready as i32); + driver.dispatch(token, ready); } impl Inner { /// Registers the fd of the `Source` object + #[cfg(not(feature = "ffrt"))] pub(crate) fn register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result> { // Allocates space for the slab. If reaches maximum capacity, error will be returned + let (schedule_io, token) = self.allocate_schedule_io_pair()?; + + self.registry + .register(io, Token::from_usize(token), interest)?; + Ok(schedule_io) + } + + fn allocate_schedule_io_pair(&self) -> io::Result<(Ref, usize)> { let (addr, schedule_io) = unsafe { self.allocator.allocate().ok_or_else(|| { io::Error::new( @@ -261,19 +298,52 @@ impl Inner { ) })? }; - - // Initializes the token for finding the task in the slab. let mut base = Bit::from_usize(0); base.set_by_mask(GENERATION, schedule_io.generation()); base.set_by_mask(ADDRESS, addr.as_usize()); - let token = base.as_usize(); + Ok((schedule_io, base.as_usize())) + } + + /// Registers the fd of the `Source` object + #[cfg(feature = "ffrt")] + pub(crate) fn register_source( + &self, + io: &mut impl Source, + interest: Interest, + ) -> io::Result> { + // Allocates space for the slab. If reaches maximum capacity, error will be returned + let (schedule_io, token) = self.allocate_schedule_io_pair()?; + + fn interests_to_io_event(interests: Interest) -> c_uint { + let mut io_event = libc::EPOLLET as u32; + + if interests.is_readable() { + io_event |= libc::EPOLLIN as u32; + io_event |= libc::EPOLLRDHUP as u32; + } + + if interests.is_writable() { + io_event |= libc::EPOLLOUT as u32; + } + + io_event as c_uint + } + + let event = interests_to_io_event(interest); + unsafe { + ylong_ffrt::ffrt_poller_register( + io.as_raw_fd() as c_int, + event, + token as *const c_void, + ffrt_dispatch_event, + ); + } - self.registry - .register(io, Token::from_usize(token), interest)?; Ok(schedule_io) } /// Deregisters the fd of the `Source` object. + #[cfg(not(feature = "ffrt"))] pub(crate) fn deregister_source(&self, io: &mut impl Source) -> io::Result<()> { self.registry.deregister(io) } diff --git a/ylong_runtime/src/net/mod.rs b/ylong_runtime/src/net/mod.rs index c3287d7..0de3c5e 100644 --- a/ylong_runtime/src/net/mod.rs +++ b/ylong_runtime/src/net/mod.rs @@ -14,7 +14,8 @@ //! Asynchronous TCP/UDP binding for `ylong_runtime` pub(crate) use crate::schedule_io::{ScheduleIO, Tick}; -pub(crate) use driver::{Driver, Handle}; +pub(crate) use driver::Handle; + pub(crate) use linked_list::{LinkedList, Node}; pub(crate) use ready::{Ready, ReadyEvent}; pub use sys::{Listener, Stream}; @@ -32,3 +33,6 @@ cfg_io! { pub use sys::{UdpSocket, ConnectedUdpSocket}; pub use sys::{SplitReadHalf, SplitWriteHalf}; } + +#[cfg(not(feature = "ffrt"))] +pub(crate) use driver::Driver; \ No newline at end of file diff --git a/ylong_runtime/src/net/ready.rs b/ylong_runtime/src/net/ready.rs index 066fe29..048390b 100644 --- a/ylong_runtime/src/net/ready.rs +++ b/ylong_runtime/src/net/ready.rs @@ -12,13 +12,17 @@ // limitations under the License. use core::ops; -use ylong_io::{Event, EventTrait, Interest}; +use ylong_io::Interest; const READABLE: usize = 0b0_01; const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; +crate::macros::cfg_not_ffrt! { + use ylong_io::{Event, EventTrait}; +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] pub(crate) struct Ready(usize); @@ -41,6 +45,7 @@ impl Ready { pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + #[cfg(not(feature = "ffrt"))] pub(crate) fn from_event(event: &Event) -> Ready { let mut ready = Ready::EMPTY; @@ -163,6 +168,50 @@ impl ReadyEvent { } } +crate::macros::cfg_ffrt! { + fn is_readable(event: i32) -> bool { + (event as libc::c_int & libc::EPOLLIN) != 0 + || (event as libc::c_int & libc::EPOLLPRI) != 0 + } + + fn is_writable(event: i32) -> bool { + (event as libc::c_int & libc::EPOLLOUT) != 0 + } + + fn is_read_closed(event: i32) -> bool { + event as libc::c_int & libc::EPOLLHUP != 0 + || (event as libc::c_int & libc::EPOLLIN != 0 + && event as libc::c_int & libc::EPOLLRDHUP != 0) + } + + fn is_write_closed(event: i32) -> bool { + event as libc::c_int & libc::EPOLLHUP != 0 + || (event as libc::c_int & libc::EPOLLOUT != 0 + && event as libc::c_int & libc::EPOLLERR != 0) + || event as libc::c_int == libc::EPOLLERR + } + + pub(crate) fn from_event_inner(event: i32) -> Ready { + let mut ready = Ready::EMPTY; + if is_readable(event) { + ready |= Ready::READABLE; + } + + if is_writable(event) { + ready |= Ready::WRITABLE; + } + + if is_read_closed(event) { + ready |= Ready::READ_CLOSED; + } + + if is_write_closed(event) { + ready |= Ready::WRITE_CLOSED; + } + ready + } +} + /* * @title ready from_event function ut test * @design conditions of use override diff --git a/ylong_runtime/src/task/raw.rs b/ylong_runtime/src/task/raw.rs index 1cb43ab..27e22d4 100644 --- a/ylong_runtime/src/task/raw.rs +++ b/ylong_runtime/src/task/raw.rs @@ -12,10 +12,11 @@ // limitations under the License. use crate::executor::Schedule; +use crate::macros::cfg_ffrt; use crate::task::state::TaskState; use crate::task::task_handle::TaskHandle; use crate::task::{TaskBuilder, VirtualTableType}; -use crate::{cfg_ffrt, ScheduleError}; +use crate::ScheduleError; use std::cell::UnsafeCell; use std::future::Future; use std::mem; diff --git a/ylong_runtime/src/task/state.rs b/ylong_runtime/src/task/state.rs index 1466d07..7399603 100644 --- a/ylong_runtime/src/task/state.rs +++ b/ylong_runtime/src/task/state.rs @@ -11,8 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::cfg_not_ffrt; use crate::error::ErrorKind; +use crate::macros::cfg_not_ffrt; /// Task state, include SCHEDULED RUNNING COMPLETED CLOSED and so on and transform method use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed}; diff --git a/ylong_runtime/src/task/task_handle.rs b/ylong_runtime/src/task/task_handle.rs index a8e06a8..941f7c2 100644 --- a/ylong_runtime/src/task/task_handle.rs +++ b/ylong_runtime/src/task/task_handle.rs @@ -13,11 +13,11 @@ use crate::error::{ErrorKind, ScheduleError}; use crate::executor::Schedule; +use crate::macros::{cfg_ffrt, cfg_not_ffrt}; use crate::task::raw::{Header, Inner, TaskMngInfo}; use crate::task::state::StateAction; use crate::task::waker::WakerRefHeader; use crate::task::{state, Task}; -use crate::{cfg_ffrt, cfg_not_ffrt}; use std::future::Future; use std::panic; use std::ptr::NonNull; @@ -392,9 +392,11 @@ where } pub(crate) fn ffrt_wake_by_ref(&self) { - self.header().state.turn_to_scheduling(); - let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; - ffrt_task.wake_task(); + let prev = self.header().state.turn_to_scheduling(); + if !state::is_scheduling(prev) { + let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; + ffrt_task.wake_task(); + } } // Actually cancels the task during running -- Gitee