From 28ba1ae6ecb8afe3964797dd42fbf616ea451caa Mon Sep 17 00:00:00 2001 From: ljy9810 Date: Tue, 5 Nov 2024 11:22:05 +0800 Subject: [PATCH] =?UTF-8?q?metrics=20=E8=A1=A5=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/BUILD.gn | 2 + ylong_runtime/src/executor/async_pool.rs | 14 +- ylong_runtime/src/executor/mod.rs | 14 +- ylong_runtime/src/metrics/metrics_task.rs | 67 +++ ylong_runtime/src/metrics/mod.rs | 3 + ylong_runtime/src/metrics/runtime.rs | 655 ++++++++++++++++++---- ylong_runtime/src/spawn.rs | 6 + ylong_runtime/src/task/task_handle.rs | 74 ++- ylong_runtime/tests/metrics.rs | 164 ++++++ 9 files changed, 850 insertions(+), 149 deletions(-) create mode 100644 ylong_runtime/src/metrics/metrics_task.rs create mode 100644 ylong_runtime/tests/metrics.rs diff --git a/test/BUILD.gn b/test/BUILD.gn index 60d87ac..19b7b38 100644 --- a/test/BUILD.gn +++ b/test/BUILD.gn @@ -24,6 +24,7 @@ ohos_rust_unittest("rust_ylong_runtime_test_ut") { "--cfg=feature=\"sync\"", "--cfg=feature=\"signal\"", "--cfg=feature=\"time\"", + "--cfg=feature=\"metrics\"", ] sources = [ "../ylong_runtime/src/lib.rs" ] @@ -47,6 +48,7 @@ ohos_rust_systemtest("rust_ylong_runtime_test_sdv") { "--cfg=feature=\"sync\"", "--cfg=feature=\"signal\"", "--cfg=feature=\"time\"", + "--cfg=feature=\"metrics\"", ] sources = [ "../ylong_runtime/tests/entry.rs" ] diff --git a/ylong_runtime/src/executor/async_pool.rs b/ylong_runtime/src/executor/async_pool.rs index 963c5ea..fa8a849 100644 --- a/ylong_runtime/src/executor/async_pool.rs +++ b/ylong_runtime/src/executor/async_pool.rs @@ -27,6 +27,8 @@ use super::worker::{get_current_ctx, run_worker, Worker}; use super::{worker, Schedule}; use crate::builder::multi_thread_builder::MultiThreadBuilder; use crate::builder::CallbackHook; +#[cfg(feature = "metrics")] +use crate::executor::global_default_metrics_task; use crate::executor::worker::WorkerContext; use crate::fastrand::fast_random; use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType}; @@ -51,8 +53,6 @@ pub(crate) struct MultiThreadScheduler { /// A set of all the local queues in the executor locals: Vec, pub(crate) handle: Arc, - #[cfg(feature = "metrics")] - steal_times: std::sync::atomic::AtomicU64, } impl Schedule for MultiThreadScheduler { @@ -79,8 +79,6 @@ impl MultiThreadScheduler { global: GlobalQueue::new(), locals, handle, - #[cfg(feature = "metrics")] - steal_times: std::sync::atomic::AtomicU64::new(0), } } @@ -325,9 +323,7 @@ impl MultiThreadScheduler { if let Some(task) = target.steal_into(destination) { #[cfg(feature = "metrics")] - self.steal_times - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - + global_default_metrics_task().increment_steal(); return Some(task); } } @@ -344,10 +340,6 @@ impl MultiThreadScheduler { pub(crate) fn get_handles(&self) -> &RwLock> { &self.handles } - - pub(crate) fn get_steal_times(&self) -> u64 { - self.steal_times.load(Acquire) - } ); } diff --git a/ylong_runtime/src/executor/mod.rs b/ylong_runtime/src/executor/mod.rs index 65cf0ba..1fbf133 100644 --- a/ylong_runtime/src/executor/mod.rs +++ b/ylong_runtime/src/executor/mod.rs @@ -94,10 +94,21 @@ impl Runtime { } } +#[cfg(feature = "metrics")] +pub(crate) fn global_default_metrics_task() -> &'static TaskMetrics { + static mut GLOBAL_DEFAULT_TASK: MaybeUninit = MaybeUninit::uninit(); + static ONCE: Once = Once::new(); + unsafe { + ONCE.call_once(|| { + GLOBAL_DEFAULT_TASK = MaybeUninit::new(TaskMetrics::new()); + }); + &*GLOBAL_DEFAULT_TASK.as_ptr() + } +} + pub(crate) fn global_default_async() -> &'static Runtime { static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit = MaybeUninit::uninit(); static ONCE: Once = Once::new(); - unsafe { ONCE.call_once(|| { let mut global_builder = GLOBAL_BUILDER.lock().unwrap(); @@ -336,6 +347,7 @@ impl Runtime { cfg_metrics!( use crate::metrics::Metrics; + use crate::metrics::TaskMetrics; impl Runtime { /// User can get some message from Runtime during running. /// diff --git a/ylong_runtime/src/metrics/metrics_task.rs b/ylong_runtime/src/metrics/metrics_task.rs new file mode 100644 index 0000000..7881ab8 --- /dev/null +++ b/ylong_runtime/src/metrics/metrics_task.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::AcqRel; + +pub struct TaskMetrics { + // Total number of "spawn" or "spawn_blocking" tasks + pub(crate) task_total_count: AtomicU64, + // Total number of finished tasks + pub(crate) task_total_finish: AtomicU64, + // Total number of "poll" has been executed + pub(crate) task_total_poll: AtomicU64, + // Total number of "Pending" has been returned + pub(crate) task_total_pending: AtomicU64, + // Total number of "wake" has been executed + pub(crate) task_total_wake: AtomicU64, + // Total number of work stealing happened + pub(crate) task_steal_times: AtomicU64, +} + +impl TaskMetrics { + pub(crate) fn new() -> TaskMetrics { + TaskMetrics { + task_total_count: AtomicU64::new(0), + task_total_finish: AtomicU64::new(0), + task_total_poll: AtomicU64::new(0), + task_total_pending: AtomicU64::new(0), + task_total_wake: AtomicU64::new(0), + task_steal_times: AtomicU64::new(0), + } + } + + pub(crate) fn increment_task(&self) { + self.task_total_count.fetch_add(1, AcqRel); + } + + pub(crate) fn increment_finish(&self) { + self.task_total_finish.fetch_add(1, AcqRel); + } + + pub(crate) fn increment_poll(&self) { + self.task_total_poll.fetch_add(1, AcqRel); + } + + pub(crate) fn increment_pending(&self) { + self.task_total_pending.fetch_add(1, AcqRel); + } + + pub(crate) fn increment_wake(&self) { + self.task_total_wake.fetch_add(1, AcqRel); + } + + pub(crate) fn increment_steal(&self) { + self.task_steal_times.fetch_add(1, AcqRel); + } +} diff --git a/ylong_runtime/src/metrics/mod.rs b/ylong_runtime/src/metrics/mod.rs index 9092f0a..73e61bf 100644 --- a/ylong_runtime/src/metrics/mod.rs +++ b/ylong_runtime/src/metrics/mod.rs @@ -11,5 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod metrics_task; mod runtime; + +pub use metrics_task::TaskMetrics; pub use runtime::Metrics; diff --git a/ylong_runtime/src/metrics/runtime.rs b/ylong_runtime/src/metrics/runtime.rs index 15c2fe1..143079a 100644 --- a/ylong_runtime/src/metrics/runtime.rs +++ b/ylong_runtime/src/metrics/runtime.rs @@ -11,20 +11,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::Ordering::Acquire; + use crate::executor::async_pool::AsyncPoolSpawner; -use crate::executor::{AsyncHandle, Runtime}; +use crate::executor::{global_default_metrics_task, AsyncHandle, Runtime}; +use crate::metrics::TaskMetrics; /// User can get some message from Runtime during running. /// /// # Example -/// ```no_run -/// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() -/// .build() -/// .unwrap(); -/// let metrics = runtime.metrics(); +/// ``` +/// use ylong_runtime::executor::get_global_runtime_metrics; +/// +/// let metrics = get_global_runtime_metrics(); /// ``` pub struct Metrics<'a> { runtime: &'a Runtime, + metrics_task: &'a TaskMetrics, } /// List of workers state. @@ -40,18 +43,20 @@ impl Metrics<'_> { const ACTIVE_STATE: usize = 3; pub(crate) fn new(runtime: &Runtime) -> Metrics { - Metrics { runtime } + Metrics { + runtime, + metrics_task: global_default_metrics_task(), + } } /// Returns workers num /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!("Runtime's workers_num:{}", metrics.workers_num()); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The workers_num:{}", metrics.workers_num()); /// ``` pub fn workers_num(&self) -> usize { match &self.runtime.async_spawner { @@ -67,14 +72,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!( - /// "Runtime's park_workers_num:{:?}", - /// metrics.park_workers_num() - /// ); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The park_workers_num:{:?}", metrics.park_workers_num()); /// ``` pub fn park_workers_num(&self) -> Option { match &self.runtime.async_spawner { @@ -92,14 +93,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!( - /// "Runtime's active_workers_num:{:?}", - /// metrics.active_workers_num() - /// ); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The active_workers_num:{:?}", metrics.active_workers_num()); /// ``` pub fn active_workers_num(&self) -> Option { match &self.runtime.async_spawner { @@ -117,14 +114,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!( - /// "Runtime's park_workers_list:{:?}", - /// metrics.park_workers_list() - /// ); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The park_workers_list:{:?}", metrics.park_workers_list()); /// ``` pub fn park_workers_list(&self) -> Option> { match &self.runtime.async_spawner { @@ -140,12 +133,11 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's active_workers_list:{:?}", + /// "The active_workers_list:{:?}", /// metrics.active_workers_list() /// ); /// ``` @@ -165,12 +157,11 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's overall_workers_list:{:?}", + /// "The overall_workers_list:{:?}", /// metrics.overall_workers_list() /// ); /// ``` @@ -201,14 +192,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!( - /// "Runtime's global_queue_length:{}", - /// metrics.global_queue_length() - /// ); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The global_queue_length:{}", metrics.global_queue_length()); /// ``` pub fn global_queue_length(&self) -> usize { match &self.runtime.async_spawner { @@ -224,12 +211,11 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's global_queue_total_task_count:{}", + /// "The global_queue_total_task_count:{}", /// metrics.global_queue_total_task_count() /// ); /// ``` @@ -250,11 +236,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!("Runtime's worker_task_len:{:?}", metrics.worker_task_len(0)); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The worker_task_len:{:?}", metrics.worker_task_len(0)); /// ``` pub fn worker_task_len(&self, index: usize) -> Option { match &self.runtime.async_spawner { @@ -278,12 +263,11 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's worker_total_task_count:{:?}", + /// "The worker_total_task_count:{:?}", /// metrics.worker_total_task_count(0) /// ); /// ``` @@ -309,14 +293,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!( - /// "Runtime's worker_poll_count:{:?}", - /// metrics.worker_poll_count(0) - /// ); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The worker_poll_count:{:?}", metrics.worker_poll_count(0)); /// ``` pub fn worker_poll_count(&self, index: usize) -> Option { match &self.runtime.async_spawner { @@ -339,18 +319,13 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!("Runtime's steal_times:{:?}", metrics.steal_times()); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The steal_times:{:?}", metrics.steal_times()); /// ``` - pub fn steal_times(&self) -> Option { - match &self.runtime.async_spawner { - #[cfg(feature = "current_thread_runtime")] - AsyncHandle::CurrentThread(_) => None, - AsyncHandle::MultiThread(spawner) => Some(spawner.exe_mng_info.get_steal_times()), - } + pub fn steal_times(&self) -> u64 { + self.metrics_task.task_steal_times.load(Acquire) } /// Returns the number of times the given worker get tasks from the global @@ -361,12 +336,11 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's worker_get_task_from_global_count:{:?}", + /// "The worker_get_task_from_global_count:{:?}", /// metrics.worker_get_task_from_global_count(0) /// ); /// ``` @@ -384,20 +358,19 @@ impl Metrics<'_> { } } - /// Returns the number of times the given worker push a task on the global - /// queue. + /// Returns the number of times the given worker had pushed tasks to the + /// global queue. /// /// This value will only increment, not decrease. /// Runtime build by `new_current_thread()` will return None. /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's worker_push_task_to_global_count:{:?}", + /// "The worker_push_task_to_global_count:{:?}", /// metrics.worker_push_task_to_global_count(0) /// ); /// ``` @@ -421,14 +394,10 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); - /// println!( - /// "Runtime's fd_registered_count:{}", - /// metrics.fd_registered_count() - /// ); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!("The fd_registered_count:{}", metrics.fd_registered_count()); /// ``` #[cfg(feature = "net")] pub fn fd_registered_count(&self) -> u64 { @@ -441,12 +410,11 @@ impl Metrics<'_> { /// /// # Example /// ``` - /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread() - /// .build() - /// .unwrap(); - /// let metrics = runtime.metrics(); + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); /// println!( - /// "Runtime's io_driver_ready_count:{}", + /// "The io_driver_ready_count:{}", /// metrics.io_driver_ready_count() /// ); /// ``` @@ -454,4 +422,465 @@ impl Metrics<'_> { pub fn io_driver_ready_count(&self) -> u64 { self.runtime.get_handle().get_ready_count() } + + /// Returns the number of tasks spawned + /// + /// # Example + /// ``` + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!( + /// "The numbers of the task which has been spawned:{}", + /// metrics.get_task_total_count() + /// ); + /// ``` + pub fn get_task_total_count(&self) -> u64 { + self.metrics_task.task_total_count.load(Acquire) + } + + /// Returns the numbers of the tasks have finished + /// + /// # Example + /// ``` + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!( + /// "The numbers of the task which has been finished:{}", + /// metrics.get_task_finish_count() + /// ); + /// ``` + pub fn get_task_finish_count(&self) -> u64 { + self.metrics_task.task_total_finish.load(Acquire) + } + + /// Returns the total numbers of tasks have been polled + /// + /// # Example + /// ``` + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!( + /// "The numbers of the task which has been ran:{}", + /// metrics.get_task_poll_count() + /// ); + /// ``` + pub fn get_task_poll_count(&self) -> u64 { + self.metrics_task.task_total_poll.load(Acquire) + } + + /// Returns the numbers of times the tasks have returned pending + /// + /// # Example + /// ``` + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!( + /// "The numbers of the task which return Pending:{}", + /// metrics.get_task_pending_count() + /// ); + /// ``` + pub fn get_task_pending_count(&self) -> u64 { + self.metrics_task.task_total_pending.load(Acquire) + } + + /// Returns the numbers of times the task which has been woken + /// + /// # Example + /// ``` + /// use ylong_runtime::executor::get_global_runtime_metrics; + /// + /// let metrics = get_global_runtime_metrics(); + /// println!( + /// "The numbers of the task which has been wake:{}", + /// metrics.get_task_wake_count() + /// ); + /// ``` + pub fn get_task_wake_count(&self) -> u64 { + self.metrics_task.task_total_wake.load(Acquire) + } +} + +#[cfg(test)] +mod test { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::thread; + use std::time::Duration; + + use crate::executor::get_global_runtime_metrics; + #[cfg(feature = "net")] + use crate::net::TcpListener; + #[cfg(target_os = "linux")] + use crate::util::num_cpus::get_cpu_num; + use crate::{block_on, spawn}; + + /// UT test cases for Metrics::workers_num() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Calibrate the workers_num + #[test] + #[cfg(target_os = "linux")] + fn ut_workers_num() { + let metrics = get_global_runtime_metrics(); + let workers_num = metrics.workers_num(); + assert_eq!(workers_num, get_cpu_num() as usize); + } + + /// UT test cases for Metrics::park_workers_num() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Calibrate the park_workers_num is not "None" + #[test] + #[cfg(target_os = "linux")] + fn ut_park_workers_num() { + let metrics = get_global_runtime_metrics(); + let park_workers_num = metrics.park_workers_num(); + assert_ne!(park_workers_num, None); + } + + /// UT test cases for Metrics::active_workers_num() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Calibrate the active_workers_num is not "None" + #[test] + #[cfg(target_os = "linux")] + fn ut_active_workers_num() { + let metrics = get_global_runtime_metrics(); + let active_workers_num = metrics.active_workers_num(); + assert_ne!(active_workers_num, None); + } + + /// UT test cases for Metrics::park_workers_list() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Calibrate the park_workers_list is not "None" + #[test] + fn ut_park_workers_list() { + let metrics = get_global_runtime_metrics(); + let park_workers_list = metrics.park_workers_list(); + assert_ne!(park_workers_list, None); + } + + /// UT test cases for Metrics::active_workers_list() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Calibrate the active_workers_list is not "None" + #[test] + fn ut_active_workers_list() { + let metrics = get_global_runtime_metrics(); + let active_workers_list = metrics.active_workers_list(); + assert_ne!(active_workers_list, None); + } + + /// UT test cases for Metrics::overall_workers_list() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Calibrate the overall_workers_list is not "None" + #[test] + fn ut_overall_workers_list() { + let metrics = get_global_runtime_metrics(); + let workers_num = metrics.workers_num(); + let workers_list = metrics.overall_workers_list(); + let active_len = workers_list.as_ref().unwrap().active.len(); + let park_len = workers_list.as_ref().unwrap().park.len(); + let list_sum = active_len + park_len; + assert!(list_sum >= workers_num); + } + + /// UT test cases for Metrics::global_queue_length() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Add tasks to the global queue + /// 3. Calibrate the current global queue length + #[test] + fn ut_global_queue_length() { + let metrics = get_global_runtime_metrics(); + for _ in 0..10 { + thread::sleep(Duration::from_millis(10)); + spawn(async { thread::sleep(Duration::from_millis(200)) }); + } + let global_queue_length = metrics.global_queue_length(); + assert!(global_queue_length > 0); + } + + /// UT test cases for Metrics::global_queue_total_task_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Add tasks to the global queue + /// 3. Calibrate the total number of tasks entering the global queue + #[test] + fn ut_global_queue_total_task_count() { + let metrics = get_global_runtime_metrics(); + let before_global = metrics.global_queue_total_task_count(); + for _ in 0..10 { + spawn(async { 1 }); + } + let after_global = metrics.global_queue_total_task_count(); + assert!(after_global - before_global >= 10); + } + + /// UT test cases for Metrics::worker_task_len() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Add tasks to the local queue + /// 3. Calibrate the current local queue length + #[test] + fn ut_worker_task_len() { + block_on(spawn(async { + let metrics = get_global_runtime_metrics(); + let len = metrics.workers_num(); + let mut task_len = 0; + for _ in 0..10 { + thread::sleep(Duration::from_millis(10)); + spawn(async { thread::sleep(Duration::from_millis(200)) }); + } + for i in 0..len { + task_len += metrics.worker_task_len(i).unwrap(); + } + assert!(task_len > 0); + })) + .unwrap(); + } + + /// UT test cases for Metrics::worker_total_task_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Add tasks to the local queue + /// 3. Calibrate the total number of tasks entering the local queue + #[test] + fn ut_worker_total_task_count() { + let metrics = get_global_runtime_metrics(); + let len = metrics.workers_num(); + let mut sum_before = 0; + for i in 0..len { + sum_before += metrics.worker_total_task_count(i).unwrap(); + } + block_on(spawn(async { + for _ in 0..10 { + spawn(async { 1 }); + } + })) + .unwrap(); + let mut sum_after = 0; + for i in 0..len { + sum_after += metrics.worker_total_task_count(i).unwrap(); + } + assert!(sum_after - sum_before >= 10); + } + + /// UT test cases for Metrics::worker_poll_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Add tasks to the local queue + /// 3. Calibrate the total number of tasks that have been polled in the + /// local queue + #[test] + fn ut_worker_poll_count() { + let metrics = get_global_runtime_metrics(); + let len = metrics.workers_num(); + let mut poll_before = 0; + for i in 0..len { + poll_before += metrics.worker_poll_count(i).unwrap(); + } + block_on(spawn(async { + for _ in 0..100 { + spawn(async { 1 }); + } + })) + .unwrap(); + thread::sleep(Duration::from_millis(10)); + let mut poll_after = 0; + for i in 0..len { + poll_after += metrics.worker_poll_count(i).unwrap(); + } + assert!(poll_after - poll_before >= 100); + } + + /// UT test cases for Metrics::worker_push_task_to_global_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Add tasks to the local queue + /// 3. Calibrate the num of push task to global queue + #[test] + fn ut_worker_push_task_to_global_count() { + let metrics = get_global_runtime_metrics(); + let len = metrics.workers_num(); + let handle = spawn(async { + for _ in 0..10000 { + spawn(async { 1 }); + } + }); + let _ = block_on(handle); + for i in 0..len { + let push_task_to_global = metrics.worker_push_task_to_global_count(i); + assert_ne!(push_task_to_global, None); + } + } + + /// UT test cases for Metrics::fd_registered_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Registering IO events in Driver + /// 3. Calibrate the num of IO events which has been registered in Driver. + #[test] + #[cfg(feature = "net")] + fn ut_fd_registered_count() { + const ADDR: &str = "127.0.0.1:0"; + let metrics = get_global_runtime_metrics(); + let registered_count_before = metrics.fd_registered_count(); + for _ in 0..10 { + block_on(async { + let _server = TcpListener::bind(ADDR).await.unwrap(); + }); + } + let registered_count_after = metrics.fd_registered_count(); + assert!(registered_count_after - registered_count_before >= 10); + } + + /// UT test cases for Metrics::get_task_total_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Obtain the current total number of submitted task + /// 3. Submit asynchronous tasks + /// 4. Calibrate the difference in the number of submitted task before and + /// after + #[test] + fn ut_get_task_total_count() { + let metrics = get_global_runtime_metrics(); + let before = metrics.get_task_total_count(); + for _ in 0..50 { + spawn(async { 1 }); + } + let after = metrics.get_task_total_count(); + assert!(after - before >= 50); + } + + /// UT test cases for Metrics::get_task_finish_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Obtain the current total number of completed tasks + /// 3. Submit asynchronous tasks + /// 4. Calibrate the difference in the number of completed tasks before and + /// after + #[test] + fn ut_get_task_total_finish_count() { + let metrics = get_global_runtime_metrics(); + let mut future_handlers = Vec::new(); + let before = metrics.get_task_finish_count(); + for _ in 0..20 { + let handler = spawn(async { 10 }); + future_handlers.push(handler); + } + for handler in future_handlers { + let _rs = block_on(handler); + } + let after = metrics.get_task_finish_count(); + assert!(after - before >= 20); + } + + struct MyFuture { + num: i32, + } + + impl MyFuture { + fn new() -> MyFuture { + MyFuture { num: 0 } + } + } + + impl Future for MyFuture { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.num < 3 { + self.get_mut().num += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + } + } + + /// UT test cases for MetricsTask::get_task_poll_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Obtain the current total number of executed tasks + /// 3. Submit asynchronous tasks + /// 4. Calibrate the difference in the number of executed tasks before and + /// after + #[test] + fn ut_get_task_total_poll_count() { + let metrics = get_global_runtime_metrics(); + let before = metrics.get_task_poll_count(); + let my_future = MyFuture::new(); + let handler = spawn(async move { + my_future.await; + }); + let _ = block_on(handler); + let after = metrics.get_task_poll_count(); + assert!(after - before >= 4); + } + + /// UT test cases for MetricsTask::get_task_pending_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Obtain the total number of current tasks that have been switched + /// 3. Submit asynchronous tasks + /// 4. Calibrate the difference in the number of tasks switched before and + /// after + #[test] + fn ut_get_task_total_pending_count() { + let metrics = get_global_runtime_metrics(); + let before = metrics.get_task_pending_count(); + let my_future = MyFuture::new(); + let handler = spawn(async move { + my_future.await; + }); + let _ = block_on(handler); + let after = metrics.get_task_pending_count(); + assert!(after - before >= 3); + } + + /// UT test cases for MetricsTask::get_task_wake_count() interface + /// + /// # Brief + /// 1. Getting Metrics + /// 2. Obtain the total number of waked tasks + /// 3. Submit asynchronous tasks + /// 4. Calibrate the difference in the number of waked tasks before and + /// after + #[test] + fn ut_get_task_total_wake_count() { + let metrics = get_global_runtime_metrics(); + let before = metrics.get_task_wake_count(); + let my_future = MyFuture::new(); + let _handler = spawn(async move { + my_future.await; + }); + let _ = block_on(_handler); + let after = metrics.get_task_wake_count(); + assert!(after - before >= 3); + } } diff --git a/ylong_runtime/src/spawn.rs b/ylong_runtime/src/spawn.rs index 2481539..52cf805 100644 --- a/ylong_runtime/src/spawn.rs +++ b/ylong_runtime/src/spawn.rs @@ -14,6 +14,8 @@ use std::future::Future; use crate::executor::global_default_async; +#[cfg(feature = "metrics")] +use crate::executor::global_default_metrics_task; use crate::task::join_handle::JoinHandle; use crate::task::TaskBuilder; @@ -64,6 +66,8 @@ where R: Send + 'static, { let rt = global_default_blocking(); + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_task(); rt.spawn_blocking(builder, task) } @@ -77,5 +81,7 @@ where R: Send + 'static, { let rt = global_default_async(); + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_task(); rt.spawn_with_attr(task, builder) } diff --git a/ylong_runtime/src/task/task_handle.rs b/ylong_runtime/src/task/task_handle.rs index 4c1fba7..021033a 100644 --- a/ylong_runtime/src/task/task_handle.rs +++ b/ylong_runtime/src/task/task_handle.rs @@ -17,6 +17,8 @@ use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; use crate::error::{ErrorKind, ScheduleError}; +#[cfg(feature = "metrics")] +use crate::executor::global_default_metrics_task; use crate::executor::Schedule; use crate::task::raw::{Header, Inner, TaskMngInfo}; use crate::task::state; @@ -65,7 +67,11 @@ where } let cur = match self.header().state.turning_to_finish() { - Ok(cur) => cur, + Ok(cur) => { + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_finish(); + cur + } Err(e) => panic!("{}", e.as_str()), }; @@ -191,23 +197,31 @@ where })); let cur = self.header().state.get_current_state(); + + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_poll(); + match res { Ok(Poll::Ready(output)) => { // send result if the JoinHandle is not dropped self.finish(cur, output); } - Ok(Poll::Pending) => match self.header().state.turning_to_idle() { - StateAction::Enqueue => { - self.get_scheduled(true); + Ok(Poll::Pending) => { + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_pending(); + match self.header().state.turning_to_idle() { + StateAction::Enqueue => { + self.get_scheduled(true); + } + StateAction::Failed(state) => panic!("task state invalid: {state}"), + StateAction::Canceled(state) => { + let output = self.get_canceled(); + self.finish(state, Err(output)); + } + _ => {} } - StateAction::Failed(state) => panic!("task state invalid: {state}"), - StateAction::Canceled(state) => { - let output = self.get_canceled(); - self.finish(state, Err(output)); - } - _ => {} - }, + } Err(_) => { let output = Err(ScheduleError::new(ErrorKind::Panic, "panic happen")); @@ -233,6 +247,8 @@ where } pub(crate) fn wake_by_ref(&self) { + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_wake(); let prev = self.header().state.turn_to_scheduling(); if state::need_enqueue(prev) { self.get_scheduled(false); @@ -293,6 +309,10 @@ where })); let cur = self.header().state.get_current_state(); + + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_poll(); + match res { Ok(Poll::Ready(output)) => { // send result if the JoinHandle is not dropped @@ -300,20 +320,24 @@ where true } - Ok(Poll::Pending) => match self.header().state.turning_to_idle() { - StateAction::Enqueue => { - let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; - ffrt_task.wake_task(); - false + Ok(Poll::Pending) => { + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_pending(); + match self.header().state.turning_to_idle() { + StateAction::Enqueue => { + let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; + ffrt_task.wake_task(); + false + } + StateAction::Failed(state) => panic!("task state invalid: {:b}", state), + StateAction::Canceled(state) => { + let output = self.ffrt_get_canceled(); + self.finish(state, Err(output)); + true + } + _ => false, } - StateAction::Failed(state) => panic!("task state invalid: {:b}", state), - StateAction::Canceled(state) => { - let output = self.ffrt_get_canceled(); - self.finish(state, Err(output)); - true - } - _ => false, - }, + } Err(_) => { let output = Err(ScheduleError::new(ErrorKind::Panic, "panic happen")); @@ -333,6 +357,8 @@ where if state::need_enqueue(prev) { let ffrt_task = unsafe { (*self.inner().task.get()).as_ref().unwrap() }; ffrt_task.wake_task(); + #[cfg(feature = "metrics")] + global_default_metrics_task().increment_wake(); } } diff --git a/ylong_runtime/tests/metrics.rs b/ylong_runtime/tests/metrics.rs new file mode 100644 index 0000000..fd2782c --- /dev/null +++ b/ylong_runtime/tests/metrics.rs @@ -0,0 +1,164 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(feature = "metrics")] + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use ylong_runtime::executor::get_global_runtime_metrics; +use ylong_runtime::{block_on, spawn}; + +/// SDV test cases for Metrics. +/// +/// # Brief +/// 1. Getting Metrics. +/// 2. Check workers num. +/// 3. Check the length of park_workers_list. +/// 4. Check the length of active_workers_list. +#[test] +fn sdv_metrics_workers_num_test() { + let metrics = get_global_runtime_metrics(); + let workers_num = metrics.workers_num(); + let park_workers_num = metrics.park_workers_num().unwrap(); + let active_workers_num = metrics.active_workers_num().unwrap(); + assert!(active_workers_num + park_workers_num >= workers_num); + let park_workers_list = metrics.park_workers_list(); + let active_workers_list = metrics.active_workers_list(); + assert_ne!(park_workers_list, None); + assert_ne!(active_workers_list, None); +} + +/// SDV test cases for Metrics. +/// +/// # Brief +/// 1. Getting Metrics. +/// 2. Generate asynchronous tasks in the global queue. +/// 3. Check the total number of tasks entering the global queue and the times +/// of get task from global queue. +/// 4. Generate asynchronous tasks in the local queue. +/// 5. Check the num of push task to global queue,the total number of tasks that +/// have been polled in the local queue, the total number of tasks entering +/// the local queue and current local queue length +#[test] +fn sdv_metrics_runtime_test() { + let metrics = get_global_runtime_metrics(); + let len = metrics.workers_num(); + let mut get_task_from_global_before = 0; + let mut worker_task_len = 0; + let global_task_count_before = metrics.global_queue_total_task_count(); + let mut get_task_from_global_after = 0; + for i in 0..len { + get_task_from_global_before += metrics.worker_get_task_from_global_count(i).unwrap(); + } + for _ in 0..3000 { + spawn(async { 1 }); + } + for i in 0..len { + get_task_from_global_after += metrics.worker_get_task_from_global_count(i).unwrap(); + } + let global_task_count_after = metrics.global_queue_total_task_count(); + assert!(get_task_from_global_after - get_task_from_global_before > 0); + assert!(global_task_count_after - global_task_count_before >= 3000); + + let mut worker_task_count_before = 0; + let mut worker_poll_count_before = 0; + + for i in 0..len { + worker_task_count_before += metrics.worker_total_task_count(i).unwrap(); + worker_poll_count_before += metrics.worker_poll_count(i).unwrap(); + } + + let _ = block_on(spawn(async move { + for _ in 0..10 { + spawn(async { 1 }); + } + let metrics_new = get_global_runtime_metrics(); + for i in 0..len { + worker_task_len += metrics_new.worker_task_len(i).unwrap(); + } + assert!(worker_task_len > 0); + })); + thread::sleep(Duration::from_millis(10)); + + let mut worker_task_count_after = 0; + let mut worker_poll_count_after = 0; + for i in 0..len { + worker_task_count_after += metrics.worker_total_task_count(i).unwrap(); + worker_poll_count_after += metrics.worker_poll_count(i).unwrap(); + let push_task_to_global = metrics.worker_push_task_to_global_count(i); + assert_ne!(push_task_to_global, None); + } + assert!(worker_task_count_after - worker_task_count_before >= 10); + assert!(worker_poll_count_after - worker_poll_count_before >= 10); +} + +struct MyFuture { + num: i32, +} + +impl MyFuture { + fn new() -> MyFuture { + MyFuture { num: 0 } + } +} + +impl Future for MyFuture { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.num < 3 { + self.get_mut().num += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +/// SDV test cases for Metrics. +/// +/// # Brief +/// 1. Getting Metrics. +/// 2. Generate asynchronous tasks in the global queue. +/// 3. Check the task metrics. +#[test] +fn sdv_metrics_task_test() { + let metrics = get_global_runtime_metrics(); + + let task_total_count_before = metrics.get_task_total_count(); + let finish_total_count_before = metrics.get_task_finish_count(); + let poll_total_count_before = metrics.get_task_poll_count(); + let pending_total_count_before = metrics.get_task_pending_count(); + let wake_total_count_before = metrics.get_task_wake_count(); + + let my_future = MyFuture::new(); + let handler = spawn(async move { + my_future.await; + }); + let _ = block_on(handler); + + let task_total_count_after = metrics.get_task_total_count(); + let finish_total_count_after = metrics.get_task_finish_count(); + let poll_total_count_after = metrics.get_task_poll_count(); + let pending_total_count_after = metrics.get_task_pending_count(); + let wake_total_count_after = metrics.get_task_wake_count(); + assert!(task_total_count_after - task_total_count_before >= 1); + assert!(finish_total_count_after - finish_total_count_before >= 1); + assert!(poll_total_count_after - poll_total_count_before >= 4); + assert!(pending_total_count_after - pending_total_count_before >= 3); + assert!(wake_total_count_after - wake_total_count_before >= 3) +} -- Gitee