diff --git a/ylong_io/src/sys/windows/events.rs b/ylong_io/src/sys/windows/events.rs index 69905edf69c50573154bd516cb8b32d867eef49d..09453271b6a557d63317a7df3f16757c5a3276ea 100644 --- a/ylong_io/src/sys/windows/events.rs +++ b/ylong_io/src/sys/windows/events.rs @@ -89,6 +89,11 @@ impl Events { pub fn is_empty(&self) -> bool { self.events.is_empty() } + + /// Returns len of Events. + pub fn len(&self) -> usize { + self.events.len() + } } impl fmt::Debug for Events { diff --git a/ylong_runtime/Cargo.toml b/ylong_runtime/Cargo.toml index ec3cac2d78812e4d9ea3fa859b56fc1cdaab2a8c..1b035acb11ddff8886c9ac0a18113d779ba61daa 100644 --- a/ylong_runtime/Cargo.toml +++ b/ylong_runtime/Cargo.toml @@ -57,6 +57,9 @@ net = ["ylong_io/tcp", "ylong_io/udp"] # Macro components macros = ["ylong_runtime_macros"] +# DFX component +dfx = [] + [dependencies] libc = "0.2.134" ylong_io = { path = "../ylong_io", optional = true } diff --git a/ylong_runtime/src/builder/current_thread_builder.rs b/ylong_runtime/src/builder/current_thread_builder.rs index 0c166218dfb1d8f40b18dd06b34146f5f064c548..dc192494a41fbf0dd57a90d71d41cf8abbd8c490 100644 --- a/ylong_runtime/src/builder/current_thread_builder.rs +++ b/ylong_runtime/src/builder/current_thread_builder.rs @@ -33,7 +33,7 @@ impl CurrentThreadBuilder { pub fn build(&mut self) -> io::Result { let async_spawner = CurrentThreadSpawner::new(); Ok(Runtime { - async_spawner: AsyncHandle::CurrentThread(async_spawner), + async_spawner: AsyncHandle::CurrentThread(Arc::new(async_spawner)), }) } } diff --git a/ylong_runtime/src/builder/multi_thread_builder.rs b/ylong_runtime/src/builder/multi_thread_builder.rs index ba99d8a00014aba0e4d97390db3fc18197a17034..40b05f8d95b7494cff962249fdf6cc189eb7a7b8 100644 --- a/ylong_runtime/src/builder/multi_thread_builder.rs +++ b/ylong_runtime/src/builder/multi_thread_builder.rs @@ -57,7 +57,7 @@ impl MultiThreadBuilder { let async_spawner = initialize_async_spawner(self)?; Ok(Runtime { - async_spawner: AsyncHandle::MultiThread(async_spawner), + async_spawner: AsyncHandle::MultiThread(Arc::new(async_spawner)), }) } diff --git a/ylong_runtime/src/dfx/metrics.rs b/ylong_runtime/src/dfx/metrics.rs new file mode 100644 index 0000000000000000000000000000000000000000..39e43a1c35a12a030601f124319b3410d18a2d42 --- /dev/null +++ b/ylong_runtime/src/dfx/metrics.rs @@ -0,0 +1,271 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::executor::async_pool::AsyncPoolSpawner; +use crate::executor::{AsyncHandle, Runtime}; + +/// User can get some message from Runtime during running. +/// +/// # Example +/// ```no_run +/// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap(); +/// let metrics = runtime.metrics(); +/// ``` +pub struct Metrics { + runtime: Runtime, +} + +impl Metrics { + pub(crate) fn new(runtime: Runtime) -> Self { + Metrics { runtime } + } + + /// Returns workers num + pub fn workers_num(&self) -> usize { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => 1, + AsyncHandle::MultiThread(spawner) => spawner.exe_mng_info.num_workers, + } + } + + /// Returns searching workers num + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn searching_workers_num(&self) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(spawner.exe_mng_info.record.load_state().1), + } + } + + /// Returns unpark workers num + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn unpark_workers_num(&self) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(spawner.exe_mng_info.record.load_state().0), + } + } + + /// Returns idle workers index list + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn idle_workers_list(&self) -> Option> { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(Self::park_statistic(spawner).0), + } + } + + /// Returns park workers index list + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn park_workers_list(&self) -> Option> { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(Self::park_statistic(spawner).1), + } + } + + /// Returns notified workers index list + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn notified_workers_list(&self) -> Option> { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(Self::park_statistic(spawner).2), + } + } + + /// Returns idle/park/notified workers index list + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn idle_park_notified_workers_list(&self) -> Option<(Vec, Vec, Vec)> { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(Self::park_statistic(spawner)), + } + } + + fn park_statistic(spawner: &AsyncPoolSpawner) -> (Vec, Vec, Vec) { + let mut idle = vec![]; + let mut park = vec![]; + let mut notified = vec![]; + + let parkers = spawner.exe_mng_info.get_handles().read().unwrap(); + for i in 0..parkers.len() { + match parkers.get(i).unwrap().get_state() { + 0 => idle.push(i), + 3 => notified.push(i), + _ => park.push(i), + } + } + (idle, park, notified) + } + + /// Returns global queue length + pub fn global_queue_depth(&self) -> usize { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(spawner) => spawner.scheduler.inner.lock().unwrap().len(), + AsyncHandle::MultiThread(spawner) => spawner.exe_mng_info.get_global().get_len(), + } + } + + /// Returns the total number of task which has entered global queue + /// + /// This value will only increment, not decrease. + pub fn global_queue_task_count(&self) -> usize { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(spawner) => + spawner.scheduler.count.load(std::sync::atomic::Ordering::Acquire), + AsyncHandle::MultiThread(spawner) => { + spawner.exe_mng_info.get_global().get_count() + } + } + } + + /// Returns the given worker thread length + /// + /// Runtime build by `new_current_thread()` will return None. + pub fn workers_task_len(&self, index: u8) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => match spawner.get_worker(index) { + Ok(worker) => { + let len = unsafe { worker.get_inner_ptr().run_queue.len() as usize}; + Some(len) + }, + Err(_) => panic!("out of index"), + }, + } + } + + /// Returns the total number of task which has entered the given worker thread + /// + /// This value will only increment, not decrease. + /// Runtime build by `new_current_thread()` will return None. + pub fn workers_task_count(&self, index: u8) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => match spawner.get_worker(index) { + Ok(worker) => { + let len = unsafe { worker.get_inner_ptr().run_queue.count() }; + Some(len) + }, + Err(_) => panic!("out of index"), + }, + } + } + + /// Returns the given worker thread length + /// + /// This value will only increment, not decrease. + /// Runtime build by `new_current_thread()` will return None. + pub fn workers_poll_count(&self, index: u8) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => match spawner.get_worker(index) { + Ok(worker) => { + let len = unsafe { worker.get_inner_ptr().count as usize}; + Some(len) + }, + Err(_) => panic!("out of index"), + }, + } + } + + /// Returns the number of steals. + /// + /// This value will only increment, not decrease. + /// Runtime build by `new_current_thread()` will return None. + pub fn steal_count(&self) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => Some(spawner.exe_mng_info.get_steal_count()), + } + } + + /// Returns the number of times the given worker get tasks from the global queue. + /// + /// This value will only increment, not decrease. + /// Runtime build by `new_current_thread()` will return None. + pub fn workers_get_task_from_global_count(&self, index: u8) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => match spawner.get_worker(index) { + Ok(worker) => { + let len = unsafe { worker.get_inner_ptr().run_queue.task_from_global_count()}; + Some(len) + }, + Err(_) => panic!("out of index"), + }, + } + } + + /// Returns the number of times the given worker push a task on the global queue. + /// + /// This value will only increment, not decrease. + /// Runtime build by `new_current_thread()` will return None. + pub fn workers_push_task_to_global_count(&self, index: u8) -> Option { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(_) => None, + AsyncHandle::MultiThread(spawner) => match spawner.get_worker(index) { + Ok(worker) => { + let len = unsafe { worker.get_inner_ptr().run_queue.task_to_global_count() }; + Some(len) + }, + Err(_) => panic!("out of index"), + }, + } + } + + /// Returns the number of IO events which has been registered in Driver. + /// + /// This value will only increment, not decrease. + #[cfg(feature = "net")] + pub fn driver_register_count(&self) -> usize { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(spawner) => spawner.handle.get_register_count(), + AsyncHandle::MultiThread(spawner) => spawner.exe_mng_info.io_handle.get_register_count(), + } + } + + /// Returns the number of IO events which has been readied in Driver. + /// + /// This value will only increment, not decrease. + #[cfg(feature = "net")] + pub fn driver_ready_count(&self) -> usize { + match &self.runtime.async_spawner { + #[cfg(feature = "current_thread_runtime")] + AsyncHandle::CurrentThread(spawner) => spawner.handle.get_ready_count(), + AsyncHandle::MultiThread(spawner) => spawner.exe_mng_info.io_handle.get_ready_count(), + } + } +} diff --git a/ylong_runtime/src/dfx/mod.rs b/ylong_runtime/src/dfx/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..999c146ea8311d5f95c8740b8717d04737177ef5 --- /dev/null +++ b/ylong_runtime/src/dfx/mod.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod metrics; +pub use metrics::Metrics; \ No newline at end of file diff --git a/ylong_runtime/src/executor/async_pool.rs b/ylong_runtime/src/executor/async_pool.rs index 3c19b170c3da51950de358a2deb5cc1a3767effb..8b31476d2382f3a2f2891d2615355a75a480d164 100644 --- a/ylong_runtime/src/executor/async_pool.rs +++ b/ylong_runtime/src/executor/async_pool.rs @@ -17,8 +17,8 @@ use crate::executor::{worker, Schedule}; use crate::executor::worker::{get_current_ctx, run_worker, Worker, WorkerContext}; use crate::task::{Task, TaskBuilder, VirtualTableType}; use crate::util::num_cpus::get_cpu_num; +use crate::cfg_dfx_not_ffrt; use std::cell::RefCell; -use std::collections::LinkedList; use std::sync::atomic::Ordering::{Acquire, SeqCst}; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Condvar, Mutex, RwLock}; @@ -56,6 +56,8 @@ pub(crate) struct MultiThreadScheduler { locals: Vec, #[cfg(feature = "net")] pub(crate) io_handle: Arc, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + steal_count: AtomicUsize, } const ACTIVE_WORKER_SHIFT: usize = 16; @@ -139,7 +141,9 @@ impl MultiThreadScheduler { global: GlobalQueue::new(), locals, #[cfg(feature = "net")] - io_handle + io_handle, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + steal_count: AtomicUsize::new(0), } } @@ -327,6 +331,8 @@ impl MultiThreadScheduler { } let target = self.locals.get(i).unwrap(); if let Some(task) = target.steal_into(local_run_queue) { + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.steal_count.fetch_add(1, SeqCst); return Some(task); } } @@ -334,9 +340,19 @@ impl MultiThreadScheduler { self.global.pop_front() } - pub(crate) fn get_global(&self) -> &Mutex> { - self.global.get_global() + pub(crate) fn get_global(&self) -> &GlobalQueue { + &self.global } + + cfg_dfx_not_ffrt!( + pub(crate) fn get_handles(&self) -> &RwLock>{ + &self.handles + } + + pub(crate) fn get_steal_count(&self) -> usize { + self.steal_count.load(Acquire) + } + ); } #[derive(Clone)] @@ -367,6 +383,9 @@ pub(crate) struct Inner { worker_name: Option, /// Stack size of each thread stack_size: Option, + /// Workers + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + workers: Mutex>> } fn get_cpu_core() -> u8 { @@ -413,6 +432,8 @@ impl AsyncPoolSpawner { before_stop: builder.common.before_stop.clone(), worker_name: builder.common.worker_name.clone(), stack_size: builder.common.stack_size, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + workers: Mutex::new(Vec::with_capacity(thread_num.into())) }), exe_mng_info: Arc::new( MultiThreadScheduler::new( @@ -456,6 +477,8 @@ impl AsyncPoolSpawner { } for (worker_id, worker) in workers.drain(..).enumerate() { + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.inner.workers.lock().unwrap().push(worker.clone()); #[cfg(feature = "net")] let work_arc_handle = self.exe_mng_info.io_handle.clone(); // set up thread attributes @@ -611,6 +634,18 @@ impl AsyncPoolSpawner { } } } + + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn get_worker(&self, index: u8) -> Result, ()>{ + let vec = self.inner.workers.lock().unwrap(); + for i in 0..vec.len() { + let worker = vec.get(i).expect("worker index out of range"); + if worker.index == index { + return Ok(worker.clone()) + } + } + Err(()) + } } #[cfg(all(test))] diff --git a/ylong_runtime/src/executor/current_thread.rs b/ylong_runtime/src/executor/current_thread.rs index 5dee6a18b20af97b3a161d03b08413cd78865f31..89605ed4fb968c9b0f3174229f1e7f2c1246c7fe 100644 --- a/ylong_runtime/src/executor/current_thread.rs +++ b/ylong_runtime/src/executor/current_thread.rs @@ -19,6 +19,8 @@ use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::atomic::AtomicBool; +#[cfg(all(feature = "dfx", not(feature = "ffrt")))] +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, Release}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; @@ -30,7 +32,7 @@ cfg_io!( pub(crate) struct CurrentThreadSpawner { pub(crate) scheduler: Arc, - parker: Arc, + pub(crate) parker: Arc, #[cfg(feature = "net")] pub(crate) handle: Arc, } @@ -38,6 +40,8 @@ pub(crate) struct CurrentThreadSpawner { #[derive(Default)] pub(crate) struct CurrentThreadScheduler { pub(crate) inner: Mutex>, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) count: AtomicUsize, } unsafe impl Sync for CurrentThreadScheduler {} @@ -46,6 +50,8 @@ impl Schedule for CurrentThreadScheduler { #[inline] fn schedule(&self, task: Task, _lifo: bool) { let mut queue = self.inner.lock().unwrap(); + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.count.fetch_add(1, std::sync::atomic::Ordering::AcqRel); queue.push_back(task); } } @@ -145,6 +151,10 @@ impl CurrentThreadSpawner { let mut queue = self.scheduler.inner.lock().unwrap(); queue.push_back(task); + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.scheduler + .count + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); handle } diff --git a/ylong_runtime/src/executor/mod.rs b/ylong_runtime/src/executor/mod.rs index 63f70af0f1c3313aef7f62b383482b6ecd94edc9..3b299d4f50b3950c3931833927ac453161f73462 100644 --- a/ylong_runtime/src/executor/mod.rs +++ b/ylong_runtime/src/executor/mod.rs @@ -24,13 +24,15 @@ pub(crate) mod netpoller; use crate::builder::{initialize_blocking_spawner, RuntimeBuilder}; use crate::executor::blocking_pool::BlockPoolSpawner; use crate::task::TaskBuilder; -use crate::{cfg_ffrt, cfg_not_ffrt, JoinHandle, Task}; +use crate::{cfg_dfx_not_ffrt, cfg_ffrt, cfg_not_ffrt, JoinHandle, Task}; use std::future::Future; use crate::builder::multi_thread_builder::GLOBAL_BUILDER; #[cfg(feature = "current_thread_runtime")] use crate::executor::current_thread::CurrentThreadSpawner; use std::mem::MaybeUninit; +#[cfg(any(feature = "current_thread_runtime", not(feature = "ffrt")))] +use std::sync::Arc; use std::sync::Once; cfg_ffrt! { use crate::builder::initialize_ffrt_spawner; @@ -60,11 +62,12 @@ impl Schedule for PlaceholderScheduler { } } +#[derive(Clone)] pub(crate) enum AsyncHandle { #[cfg(feature = "current_thread_runtime")] - CurrentThread(CurrentThreadSpawner), + CurrentThread(Arc), #[cfg(not(feature = "ffrt"))] - MultiThread(AsyncPoolSpawner), + MultiThread(Arc), #[cfg(feature = "ffrt")] FfrtMultiThread, } @@ -79,6 +82,7 @@ pub(crate) enum AsyncHandle { /// The async and blocking pools working when calling methods of this struct are stored in the /// global static executor instance. Here, keep the empty struct for compatibility /// and possibility for function extension in the future. +#[derive(Clone)] pub struct Runtime { pub(crate) async_spawner: AsyncHandle, } @@ -109,7 +113,7 @@ pub(crate) fn global_default_async() -> &'static Runtime { #[cfg(not(feature = "ffrt"))] let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap()) { Ok(s) => Runtime { - async_spawner: AsyncHandle::MultiThread(s), + async_spawner: AsyncHandle::MultiThread(Arc::new(s)), }, Err(e) => panic!("initialize runtime failed: {:?}", e), }; @@ -305,3 +309,12 @@ impl Runtime { ret } } + +cfg_dfx_not_ffrt!( + use crate::dfx::Metrics; + impl Runtime { + pub fn metrics(&self) -> Metrics { + Metrics::new(self.clone()) + } + } +); diff --git a/ylong_runtime/src/executor/parker.rs b/ylong_runtime/src/executor/parker.rs index 0b3b1d4598c015c3cc4d9bf32be33bb8b5ec3243..79f7a72492a1dc7e541a64f6b50f4bda235b5b24 100644 --- a/ylong_runtime/src/executor/parker.rs +++ b/ylong_runtime/src/executor/parker.rs @@ -77,6 +77,11 @@ impl Parker { pub(crate) fn release(&self) { self.inner.release(); } + + #[cfg(all(feature="dfx", not(feature="ffrt")))] + pub(crate) fn get_state(&self) -> usize { + self.inner.get_state() + } } impl Inner { @@ -176,6 +181,11 @@ impl Inner { &self.driver } + #[cfg(all(feature="dfx", not(feature="ffrt")))] + pub(crate) fn get_state(&self) -> usize { + self.state.load(SeqCst) + } + fn release(&self) { self.condvar.notify_all(); } diff --git a/ylong_runtime/src/executor/queue.rs b/ylong_runtime/src/executor/queue.rs index a697500f377f871f8cbf881181d420b1bf2c10b5..67d0c04b30d7c350f4ef595e6f61be2892115239 100644 --- a/ylong_runtime/src/executor/queue.rs +++ b/ylong_runtime/src/executor/queue.rs @@ -80,6 +80,30 @@ impl LocalQueue { self.inner.steal_into(dst) } + #[inline] + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn len(&self) -> u16 { + self.inner.len() + } + + #[inline] + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn count(&self) -> usize { + self.inner.count() + } + + #[inline] + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn task_from_global_count(&self) -> usize { + self.inner.task_from_global_count() + } + + #[inline] + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn task_to_global_count(&self) -> usize { + self.inner.task_to_global_count() + } + #[inline] pub(crate) fn is_empty(&self) -> bool { self.inner.is_empty() @@ -97,6 +121,12 @@ pub(crate) struct InnerBuffer { rear: AtomicU16, cap: u16, buffer: Box<[UnsafeCell>]>, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + count: AtomicUsize, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + task_from_global_count: AtomicUsize, + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + task_to_global_count: AtomicUsize, } impl InnerBuffer { @@ -111,9 +141,41 @@ impl InnerBuffer { rear: AtomicU16::new(0), cap, buffer: buffer.into(), + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + count: AtomicUsize::new(0), + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + task_from_global_count: AtomicUsize::new(0), + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + task_to_global_count: AtomicUsize::new(0), } } + /// Return queue's len. + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + fn len(&self) -> u16 { + let rear = self.rear.load(Acquire); + let (_, head) = unwrap(self.front.load(Acquire)); + rear.wrapping_sub(head) + } + + /// Returns the total number of task which has entered this LocalQueue + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + fn count(&self) -> usize { + self.count.load(Acquire) + } + + /// Returns the total number of task which has entered this LocalQueue + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + fn task_from_global_count(&self) -> usize { + self.task_from_global_count.load(Acquire) + } + + /// Returns the total number of task which has entered this LocalQueue + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + fn task_to_global_count(&self) -> usize { + self.task_to_global_count.load(Acquire) + } + /// Checks whether the queue is empty fn is_empty(&self) -> bool { let (_, head) = unwrap(self.front.load(Acquire)); @@ -182,6 +244,8 @@ impl InnerBuffer { ptr::write((*ptr).as_mut_ptr(), task); } self.rear.store(rear.wrapping_add(1), Release); + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.count.fetch_add(1, AcqRel); return; } else { match self.push_overflowed(task, global, real_pos) { @@ -230,6 +294,10 @@ impl InnerBuffer { } global.push_batch(tmp_buf, task); + + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.task_to_global_count.fetch_add(1, AcqRel); + Ok(()) } @@ -333,7 +401,11 @@ impl Drop for InnerBuffer { } pub(crate) struct GlobalQueue { + /// Current number of tasks len: AtomicUsize, + /// The total number of tasks which has entered global queue. + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + count: AtomicUsize, globals: Mutex>, } @@ -341,6 +413,8 @@ impl GlobalQueue { pub(crate) fn new() -> Self { GlobalQueue { len: AtomicUsize::new(0_usize), + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + count: AtomicUsize::new(0_usize), globals: Mutex::new(LinkedList::new()), } } @@ -357,6 +431,8 @@ impl GlobalQueue { } list.push_back(task); self.len.fetch_add(len, AcqRel); + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.count.fetch_add(len, AcqRel); } pub(super) fn pop_batch( @@ -394,6 +470,10 @@ impl GlobalQueue { drop(list); self.len.fetch_sub(count, AcqRel); inner_buf.rear.store(curr, Release); + + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + inner_buf.task_from_global_count.fetch_add(1, AcqRel); + Some(first_task) } @@ -415,18 +495,30 @@ impl GlobalQueue { list.push_back(task); drop(list); self.len.fetch_add(1, AcqRel); + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.count.fetch_add(1, AcqRel); } pub(super) fn get_global(&self) -> &Mutex> { &self.globals } + + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn get_len(&self) -> usize { + self.len.load(Acquire) + } + + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) fn get_count(&self) -> usize { + self.count.load(Acquire) + } } #[cfg(feature = "multi_instance_runtime")] #[cfg(all(test))] mod test { use crate::executor::async_pool::MultiThreadScheduler; - use crate::executor::queue::{unwrap, GlobalQueue, InnerBuffer, LocalQueue, LOCAL_QUEUE_CAP}; + use crate::executor::queue::{GlobalQueue, InnerBuffer, LocalQueue, LOCAL_QUEUE_CAP}; use crate::task::{Task, TaskBuilder, VirtualTableType}; #[cfg(feature = "net")] use crate::net::Driver; @@ -437,16 +529,18 @@ mod test { use std::task::{Context, Poll}; use std::thread::park; + #[cfg(any(not(feature = "dfx"), feature = "ffrt"))] impl InnerBuffer { fn len(&self) -> u16 { let front = self.front.load(Acquire); - let (_, real_pos) = unwrap(front); + let (_, real_pos) = crate::executor::queue::unwrap(front); let rear = self.rear.load(Acquire); rear.wrapping_sub(real_pos) } } + #[cfg(any(not(feature = "dfx"), feature = "ffrt"))] impl LocalQueue { pub fn len(&self) -> u16 { self.inner.len() diff --git a/ylong_runtime/src/executor/worker.rs b/ylong_runtime/src/executor/worker.rs index cf641b7c58af43120078c616bb3fc9c6bdd70c31..879f228306b41f8065c31608bc5f8280f48b4df6 100644 --- a/ylong_runtime/src/executor/worker.rs +++ b/ylong_runtime/src/executor/worker.rs @@ -169,7 +169,7 @@ impl Worker { } // thread 0 is responsible for dropping the tasks inside the global queue if self.index == 0 { - let mut global = self.scheduler.get_global().lock().unwrap(); + let mut global = self.scheduler.get_global().get_global().lock().unwrap(); loop { if let Some(task) = global.pop_front() { task.shutdown(); @@ -238,6 +238,18 @@ impl Worker { } } + /// Gets Worker's Inner with ptr. + /// + /// # Safety + /// We can't get Inner with `RefCell::borrow()`, because the worker will + /// always hold the borrow_mut until drop. So we can only get Inner by ptr. + /// This method can only be used to obtain values + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + pub(crate) unsafe fn get_inner_ptr(&self) -> &Inner { + let ptr = self.inner.as_ptr(); + &*ptr + } + #[inline] fn release(&self) { // wait for tasks in queue to finish diff --git a/ylong_runtime/src/lib.rs b/ylong_runtime/src/lib.rs index 5850a65465e2c8b5f90944060dd7169ed9380f23..121a38e5fd0e24af8f7a29c23e942e3d79edd5f3 100644 --- a/ylong_runtime/src/lib.rs +++ b/ylong_runtime/src/lib.rs @@ -46,6 +46,10 @@ pub mod task; #[cfg(feature = "time")] pub mod time; pub mod util; +cfg_dfx_not_ffrt!( + mod dfx; + pub use dfx::Metrics; +); cfg_io! { pub mod net; @@ -102,3 +106,14 @@ macro_rules! cfg_not_ffrt { } pub(crate) use cfg_not_ffrt; + +macro_rules! cfg_dfx_not_ffrt { + ($($item:item)*) => { + $( + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + $item + )* + } +} + +pub(crate) use cfg_dfx_not_ffrt; diff --git a/ylong_runtime/src/net/driver.rs b/ylong_runtime/src/net/driver.rs index 969919e071b5f14758be3b5434515afbeb9fddf4..182a7d412da1f0bdd7946573e955a6a23cf40e34 100644 --- a/ylong_runtime/src/net/driver.rs +++ b/ylong_runtime/src/net/driver.rs @@ -16,6 +16,8 @@ use crate::net::{Ready, ScheduleIO, Tick}; use std::io; use std::ops::Deref; use std::sync::{Arc, Mutex}; +#[cfg(all(feature = "dfx", not(feature = "ffrt")))] +use std::sync::atomic::AtomicUsize; use std::time::Duration; use crate::util::bit::{Bit, Mask}; use ylong_io::{EventTrait, Events, Interest, Poll, Source, Token}; @@ -48,6 +50,10 @@ pub(crate) struct Driver { /// Used for epoll poll: Arc, + + /// Register count. This value will only increment, not decrease. + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + handle: Arc, } pub(crate) struct Handle { @@ -90,6 +96,16 @@ impl Handle { pub(crate) fn wake(&self) { self.waker.wake().expect("ylong_io wake failed"); } + + #[cfg(feature = "dfx")] + pub(crate) fn get_register_count(&self) -> usize { + self.inner.register_count.load(std::sync::atomic::Ordering::Acquire) + } + + #[cfg(feature = "dfx")] + pub(crate) fn get_ready_count(&self) -> usize { + self.inner.ready_count.load(std::sync::atomic::Ordering::Acquire) + } } impl Deref for Handle { @@ -115,32 +131,48 @@ pub(crate) struct Inner { /// Used to register fd registry: Arc, + + /// Register count. This value will only increment, not decrease. + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + register_count: AtomicUsize, + + /// Ready events count. This value will only increment, not decrease. + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + ready_count: AtomicUsize, } impl Driver { #[cfg(not(feature = "ffrt"))] pub(crate) fn initialize() -> (Arc, Arc>) { - let poll = Poll::new().unwrap(); - let waker = ylong_io::Waker::new(&poll, WAKE_TOKEN) - .expect("ylong_io waker construction failed"); - let arc_poll = Arc::new(poll); - let events = Events::with_capacity(EVENTS_MAX_CAPACITY); - let slab = Slab::new(); - let allocator = slab.handle(); - let inner = Arc::new(Inner { - resources: Mutex::new(None), - allocator, - registry: arc_poll.clone(), - }); + let poll = Poll::new().unwrap(); + let waker = ylong_io::Waker::new(&poll, WAKE_TOKEN) + .expect("ylong_io waker construction failed"); + let arc_poll = Arc::new(poll); + let events = Events::with_capacity(EVENTS_MAX_CAPACITY); + let slab = Slab::new(); + let allocator = slab.handle(); + let inner = Arc::new(Inner { + resources: Mutex::new(None), + allocator, + registry: arc_poll.clone(), + #[cfg(feature = "dfx")] + register_count: AtomicUsize::new(0), + #[cfg(feature = "dfx")] + ready_count: AtomicUsize::new(0), + }); + let arc_handle = Arc::new(Handle::new(inner, waker)); + + let driver = Driver { + resources: Some(slab), + events: Some(events), + tick: DRIVER_TICK_INIT, + poll: arc_poll, + #[cfg(feature = "dfx")] + handle: arc_handle.clone() + }; + let arc_driver = Arc::new(Mutex::new(driver)); - let driver = Driver { - resources: Some(slab), - events: Some(events), - tick: DRIVER_TICK_INIT, - poll: arc_poll, - }; - - (Arc::new(Handle::new(inner, waker)), Arc::new(Mutex::new(driver))) + (arc_handle, arc_driver) } #[cfg(feature = "ffrt")] @@ -219,6 +251,8 @@ impl Driver { let ready = Ready::from_event(event); self.dispatch(token, ready); } + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.handle.ready_count.fetch_add(events.len(), std::sync::atomic::Ordering::AcqRel); self.events = Some(events); Ok(has_events) @@ -276,6 +310,8 @@ impl Inner { self.registry .register(io, Token::from_usize(token), interest)?; + #[cfg(all(feature = "dfx", not(feature = "ffrt")))] + self.register_count.fetch_add(1, std::sync::atomic::Ordering::AcqRel); Ok(schedule_io) } diff --git a/ylong_runtime/tests/async_fs.rs b/ylong_runtime/tests/async_fs.rs index 841d24a9a2463a0c02998f05622943721cbce6dd..43739c4268067bc999a75465abf420240fb4afcf 100644 --- a/ylong_runtime/tests/async_fs.rs +++ b/ylong_runtime/tests/async_fs.rs @@ -306,6 +306,7 @@ fn sdv_async_fs_set_permission() { let ret = file.set_permissions(perms).await; assert!(ret.is_ok()); let mut perms = file.metadata().await.unwrap().permissions(); + #[allow(clippy::permissions_set_readonly_false)] perms.set_readonly(false); let ret = file.set_permissions(perms).await; assert!(ret.is_ok());