diff --git a/ylong_runtime/benches/ylong_tokio_schedule.rs b/ylong_runtime/benches/ylong_tokio_schedule.rs index 09dbebcb1542f5c2effb2d95137a5493afa34d9a..25af439d0a5c8ce9f780a69494f5d40aaeda8b91 100644 --- a/ylong_runtime/benches/ylong_tokio_schedule.rs +++ b/ylong_runtime/benches/ylong_tokio_schedule.rs @@ -88,7 +88,7 @@ mod ylong_schedule_bench { use test::Bencher; use ylong_runtime::task::yield_now; - pub use crate::task_helpers::{fibbo, tokio_runtime}; + pub use crate::task_helpers::fibbo; ylong_schedule_task!(ylong_task_10_15, 10, 15); ylong_schedule_task!(ylong_task_120_15, 120, 15); diff --git a/ylong_runtime/src/builder/common_builder.rs b/ylong_runtime/src/builder/common_builder.rs index f663efdf4aca55c110903a0c7c9cab8d48509b9f..b176fe6d12a589f4306754e5f6ff2aca56a6e7dc 100644 --- a/ylong_runtime/src/builder/common_builder.rs +++ b/ylong_runtime/src/builder/common_builder.rs @@ -77,7 +77,7 @@ impl CommonBuilder { pub(crate) fn new() -> Self { CommonBuilder { worker_name: None, - is_affinity: true, + is_affinity: false, blocking_permanent_thread_num: BLOCKING_PERMANENT_THREAD_NUM, max_blocking_pool_size: Some(BLOCKING_MAX_THEAD_NUM), schedule_algo: ScheduleAlgo::FifoBound, diff --git a/ylong_runtime/src/executor/async_pool.rs b/ylong_runtime/src/executor/async_pool.rs index a8fc9942360591b3c3c803172799e404d9619132..07628605f5de67c2004614fe6bb2c9b119ac74e5 100644 --- a/ylong_runtime/src/executor/async_pool.rs +++ b/ylong_runtime/src/executor/async_pool.rs @@ -27,6 +27,7 @@ use super::worker::{get_current_ctx, run_worker, Worker}; use super::{worker, Schedule}; use crate::builder::multi_thread_builder::MultiThreadBuilder; use crate::builder::CallbackHook; +use crate::executor::worker::WorkerContext; use crate::fastrand::fast_random; use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType}; #[cfg(not(target_os = "macos"))] @@ -44,7 +45,7 @@ pub(crate) struct MultiThreadScheduler { /// Join Handles for all threads in the executor handles: RwLock>, /// Used for idle and wakeup logic. - sleeper: Sleeper, + pub(crate) sleeper: Sleeper, /// The global queue of the executor pub(crate) global: GlobalQueue, /// A set of all the local queues in the executor @@ -58,7 +59,7 @@ impl Schedule for MultiThreadScheduler { #[inline] fn schedule(&self, task: Task, lifo: bool) { if self.enqueue(task, lifo) { - self.wake_up_rand_one(); + self.wake_up_rand_one(false); } } } @@ -108,8 +109,25 @@ impl MultiThreadScheduler { self.sleeper.is_parked(worker_index) } - pub(crate) fn wake_up_rand_one(&self) { - if let Some(index) = self.sleeper.pop_worker() { + pub(crate) fn is_waked_by_last_search(&self, idx: usize) -> bool { + let mut search_list = self.sleeper.wake_by_search.lock().unwrap(); + let is_waked_by_last_search = search_list[idx]; + search_list[idx] = false; + if is_waked_by_last_search { + self.sleeper.inc_searching_num(); + return true; + } + false + } + + pub(crate) fn wake_up_rand_one_if_last_search(&self) { + if self.sleeper.dec_searching_num() { + self.wake_up_rand_one(true); + } + } + + pub(crate) fn wake_up_rand_one(&self, last_search: bool) { + if let Some(index) = self.sleeper.pop_worker(last_search) { self.handles .read() .unwrap() @@ -119,14 +137,25 @@ impl MultiThreadScheduler { } } - pub(crate) fn turn_to_sleep(&self, worker_index: usize) { - // If it's the last thread going to sleep, check if there are any tasks - // left. If yes, wakes up a thread. - if self.sleeper.push_worker(worker_index) && !self.has_no_work() { - self.wake_up_rand_one(); + pub(crate) fn turn_to_sleep(&self, worker_inner: &mut worker::Inner, worker_index: usize) { + let is_last_search = if worker_inner.is_searching { + worker_inner.is_searching = false; + self.sleeper.dec_searching_num() + } else { + false + }; + let is_last_active = self.sleeper.push_worker(worker_index); + + if (is_last_search || is_last_active) && !self.has_no_work() { + self.wake_up_rand_one(true); } } + #[inline] + pub(crate) fn turn_from_sleep(&self, worker_index: &usize) { + self.sleeper.pop_worker_by_id(worker_index); + } + pub(crate) fn create_local_queue(&self, index: usize) -> LocalQueue { let local_run_queue = self.locals.get(index).unwrap(); LocalQueue { @@ -146,38 +175,44 @@ impl MultiThreadScheduler { self.global.is_empty() } + // The returned value indicates whether or not to wake up another worker + fn enqueue_under_ctx(&self, mut task: Task, worker_ctx: &WorkerContext, lifo: bool) -> bool { + // if the current context is another runtime, push it to the global queue + if !std::ptr::eq(&self.global, &worker_ctx.worker.scheduler.global) { + self.global.push_back(task); + return true; + } + + if lifo { + let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut(); + let prev_task = lifo_slot.take(); + if let Some(prev) = prev_task { + // there is some task in lifo slot, therefore we put the prev task + // into run queue, and put the current task into the lifo slot + *lifo_slot = Some(task); + task = prev; + } else { + // there is no task in lifo slot, return immediately + *lifo_slot = Some(task); + return false; + } + } + + let local_run_queue = self.locals.get(worker_ctx.worker.index).unwrap(); + local_run_queue.push_back(task, &self.global); + true + } + // The returned value indicates whether or not to wake up another worker // We need to wake another worker under these circumstances: // 1. The task has been inserted into the global queue // 2. The lifo slot is taken, we push the old task into the local queue - pub(crate) fn enqueue(&self, mut task: Task, lifo: bool) -> bool { + pub(crate) fn enqueue(&self, task: Task, lifo: bool) -> bool { let cur_worker = get_current_ctx(); - // WorkerContext::Curr will never enter here. + // currently we are inside a runtime's context if let Some(worker_ctx) = cur_worker { - if !std::ptr::eq(&self.global, &worker_ctx.worker.scheduler.global) { - self.global.push_back(task); - return true; - } - - if lifo { - let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut(); - let prev_task = lifo_slot.take(); - if let Some(prev) = prev_task { - // there is some task in lifo slot, therefore we put the prev task - // into run queue, and put the current task into the lifo slot - *lifo_slot = Some(task); - task = prev; - } else { - // there is no task in lifo slot, return immediately - *lifo_slot = Some(task); - return false; - } - } - - let local_run_queue = self.locals.get(worker_ctx.worker.index).unwrap(); - local_run_queue.push_back(task, &self.global); - return true; + return self.enqueue_under_ctx(task, worker_ctx, lifo); } // If the local queue of the current worker is full, push the task into the @@ -186,90 +221,118 @@ impl MultiThreadScheduler { true } - pub(crate) fn dequeue(&self, index: usize, worker_inner: &mut worker::Inner) -> Option { - let local_run_queue = &worker_inner.run_queue; + // gets task from the global queue or the thread's own local queue + fn get_task_from_queues(&self, worker_inner: &mut worker::Inner) -> Option { let count = worker_inner.count; + let local_run_queue = &worker_inner.run_queue; - let task = { - // For every 61 times of execution, dequeue a task from the global queue first. - // Otherwise, dequeue a task from the local queue. However, if the local queue - // has no task, dequeue a task from the global queue instead. - if count % GLOBAL_POLL_INTERVAL as u32 == 0 { - let limit = local_run_queue.remaining() as usize; - // If the local queue is empty, multiple tasks are stolen from the global queue - // to the local queue. If the local queue has tasks, only dequeue one task from - // the global queue and run it. - let task = if limit == LOCAL_QUEUE_CAP { + // For every 61 times of execution, dequeue a task from the global queue first. + // Otherwise, dequeue a task from the local queue. However, if the local queue + // has no task, dequeue a task from the global queue instead. + if count % GLOBAL_POLL_INTERVAL as u32 == 0 { + let mut limit = local_run_queue.remaining() as usize; + // If the local queue is empty, multiple tasks are stolen from the global queue + // to the local queue. If the local queue has tasks, only dequeue one task from + // the global queue and run it. + if limit != LOCAL_QUEUE_CAP { + limit = 0; + } + let task = self + .global + .pop_batch(self.num_workers, local_run_queue, limit); + match task { + Some(task) => Some(task), + None => local_run_queue.pop_front(), + } + } else { + let local_task = local_run_queue.pop_front(); + match local_task { + Some(task) => Some(task), + None => { + let limit = local_run_queue.remaining() as usize; self.global .pop_batch(self.num_workers, local_run_queue, limit) - } else { - self.global.pop_front() - }; - match task { - Some(task) => Some(task), - None => local_run_queue.pop_front(), - } - } else { - let local_task = local_run_queue.pop_front(); - match local_task { - Some(task) => Some(task), - None => { - let limit = local_run_queue.remaining() as usize; - if limit > 1 { - self.global - .pop_batch(self.num_workers, local_run_queue, limit) - } else { - self.global.pop_front() - } - } } } - }; - - if task.is_some() { - return task; } + } + + fn get_task_from_searching(&self, worker_inner: &mut worker::Inner) -> Option { + const STEAL_TIME: usize = 3; // There is no task in the local queue or the global queue, so we try to steal // tasks from another worker's local queue. - // The number of stealing worker should be less than half of the total worker + // The number of stealing workers should be less than half of the total worker // number. + // Only increases the searching number only when the worker is not searching + if !worker_inner.is_searching && !self.sleeper.try_inc_searching_num() { + return None; + } - if !self.sleeper.try_inc_searching_num() { + worker_inner.is_searching = true; + + let local_run_queue = &worker_inner.run_queue; + for i in 0..STEAL_TIME { + if let Some(task) = self.steal(local_run_queue) { + return Some(task); + } + if i < STEAL_TIME - 1 { + thread::sleep(Duration::from_micros(1)); + } + } + + None + } + + pub(crate) fn dequeue( + &self, + worker_inner: &mut worker::Inner, + worker_ctx: &WorkerContext, + ) -> Option { + // dequeues from the global queue or the thread's own local queue + if let Some(task) = self.get_task_from_queues(worker_inner) { + return Some(task); + } + + if let Ok(mut driver) = worker_inner.parker.get_driver().try_lock() { + driver.run_once(); + } + worker_ctx.wake_yield(); + if !worker_inner.run_queue.is_empty() { return None; } - // start to searching. + self.get_task_from_searching(worker_inner) + } + + fn steal(&self, destination: &LocalQueue) -> Option { let num = self.locals.len(); let start = (fast_random() >> 56) as usize; for i in 0..num { let i = (start + i) % num; // skip the current worker's local queue - if i == index { + let target = self.locals.get(i).unwrap(); + + if std::ptr::eq(target, destination) { continue; } - let target = self.locals.get(i).unwrap(); - if let Some(task) = target.steal_into(local_run_queue) { + + if let Some(task) = target.steal_into(destination) { #[cfg(feature = "metrics")] self.steal_times .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if self.sleeper.dec_searching_num() { - self.wake_up_rand_one() - }; return Some(task); } } - // if there is no task to steal, we check global queue for one last time - let task_from_global = self.global.pop_front(); - // end searching - // regardless of whether a task can be stolen from the global queue, - // wake_up_rand_one is not called. - self.sleeper.dec_searching_num(); - - task_from_global + // if there is no task to steal, we check global queue for tasks + self.global.pop_batch( + self.num_workers, + destination, + destination.remaining() as usize, + ) } cfg_metrics!( @@ -545,13 +608,14 @@ pub(crate) mod test { use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; - use std::thread::spawn; + use std::thread; use crate::builder::RuntimeBuilder; use crate::executor::async_pool::{get_cpu_core, AsyncPoolSpawner, MultiThreadScheduler}; use crate::executor::driver::Driver; use crate::executor::parker::Parker; - use crate::executor::Schedule; + use crate::executor::queue::LocalQueue; + use crate::executor::{worker, Schedule}; use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType}; pub struct TestFuture { @@ -699,7 +763,7 @@ pub(crate) mod test { let mut parker = Parker::new(arc_driver); let parker_cpy = parker.clone(); - let _ = spawn(move || { + let _ = thread::spawn(move || { parker.park(); *flag_clone.lock().unwrap() = 1; tx.send(()).unwrap() @@ -729,7 +793,7 @@ pub(crate) mod test { let mut parker = Parker::new(arc_driver); let parker_cpy = parker.clone(); - let _ = spawn(move || { + let _ = thread::spawn(move || { parker.park(); *flag_clone.lock().unwrap() = 1; tx.send(()).unwrap() @@ -750,28 +814,33 @@ pub(crate) mod test { #[test] fn ut_executor_mng_info_wake_up_rand_one() { let (arc_handle, arc_driver) = Driver::initialize(); + let mut parker = Parker::new(arc_driver); let executor_mng_info = MultiThreadScheduler::new(1, arc_handle); - executor_mng_info.turn_to_sleep(0); + let local_queue = LocalQueue { + inner: executor_mng_info.locals[0].inner.clone(), + }; + let mut worker_inner = worker::Inner::new(local_queue, parker.clone()); + worker_inner.is_searching = true; + executor_mng_info.sleeper.inc_searching_num(); + executor_mng_info.turn_to_sleep(&mut worker_inner, 0); let flag = Arc::new(Mutex::new(0)); let (tx, rx) = channel(); let (flag_clone, tx) = (flag.clone(), tx); - - let mut parker = Parker::new(arc_driver); let parker_cpy = parker.clone(); - let _ = spawn(move || { + let _ = thread::spawn(move || { parker.park(); *flag_clone.lock().unwrap() = 1; tx.send(()).unwrap() }); executor_mng_info.handles.write().unwrap().push(parker_cpy); - - executor_mng_info.wake_up_rand_one(); + executor_mng_info.wake_up_rand_one(false); rx.recv().unwrap(); assert_eq!(*flag.lock().unwrap(), 1); + assert_eq!(executor_mng_info.sleeper.pop_worker(false), None); } /// UT test cases for ExecutorMngInfo::wake_up_if_one_task_left() @@ -782,19 +851,22 @@ pub(crate) mod test { #[test] fn ut_executor_mng_info_wake_up_if_one_task_left() { let (arc_handle, arc_driver) = Driver::initialize(); + let mut parker = Parker::new(arc_driver); let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone()); - executor_mng_info.turn_to_sleep(0); + let local_queue = LocalQueue { + inner: executor_mng_info.locals[0].inner.clone(), + }; + let mut worker_inner = worker::Inner::new(local_queue, parker.clone()); + executor_mng_info.turn_to_sleep(&mut worker_inner, 0); let flag = Arc::new(Mutex::new(0)); let (tx, rx) = channel(); let (flag_clone, tx) = (flag.clone(), tx); - - let mut parker = Parker::new(arc_driver); let parker_cpy = parker.clone(); - let _ = spawn(move || { + let _ = thread::spawn(move || { parker.park(); *flag_clone.lock().unwrap() = 1; tx.send(()).unwrap() @@ -814,11 +886,12 @@ pub(crate) mod test { executor_mng_info.enqueue(task, true); if !executor_mng_info.has_no_work() { - executor_mng_info.wake_up_rand_one(); + executor_mng_info.wake_up_rand_one(false); } rx.recv().unwrap(); assert_eq!(*flag.lock().unwrap(), 1); + assert_eq!(executor_mng_info.sleeper.pop_worker(false), None); } /// UT test cases for ExecutorMngInfo::from_woken_to_sleep() @@ -828,19 +901,26 @@ pub(crate) mod test { /// to park state. If the last thread is in park state, check whether /// there is a task, and if so, wake up this thread. #[test] - fn ut_from_woken_to_sleep() { + fn ut_executor_mng_info_from_woken_to_sleep() { let (arc_handle, arc_driver) = Driver::initialize(); let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone()); let flag = Arc::new(Mutex::new(0)); let (tx, rx) = channel(); - let (flag_clone, tx) = (flag.clone(), tx); let mut parker = Parker::new(arc_driver); + + let local_queue = LocalQueue { + inner: executor_mng_info.locals[0].inner.clone(), + }; + let mut worker_inner = worker::Inner::new(local_queue, parker.clone()); + worker_inner.is_searching = true; + executor_mng_info.sleeper.inc_searching_num(); + let parker_cpy = parker.clone(); - let _ = spawn(move || { + let _ = thread::spawn(move || { parker.park(); *flag_clone.lock().unwrap() = 1; tx.send(()).unwrap() @@ -858,9 +938,10 @@ pub(crate) mod test { ); executor_mng_info.enqueue(task, true); - executor_mng_info.turn_to_sleep(0); + executor_mng_info.turn_to_sleep(&mut worker_inner, 0); rx.recv().unwrap(); assert_eq!(*flag.lock().unwrap(), 1); + assert_eq!(executor_mng_info.sleeper.pop_worker(false), None); } /// UT test cases for AsyncPoolSpawner::new() diff --git a/ylong_runtime/src/executor/parker.rs b/ylong_runtime/src/executor/parker.rs index a655fb503f203b51d8b1f8ace5eb210073697ec8..ca9668cedeb34110043b63415f8ef8ac165ca405 100644 --- a/ylong_runtime/src/executor/parker.rs +++ b/ylong_runtime/src/executor/parker.rs @@ -49,7 +49,7 @@ impl Parker { } pub(crate) fn park(&mut self) { - self.inner.park(); + self.inner.park() } pub(crate) fn unpark(&self, handle: Arc) { @@ -76,7 +76,7 @@ impl Inner { for _ in 0..3 { if self .state - .compare_exchange_weak(NOTIFIED, IDLE, SeqCst, SeqCst) + .compare_exchange(NOTIFIED, IDLE, SeqCst, SeqCst) .is_ok() { return; diff --git a/ylong_runtime/src/executor/sleeper.rs b/ylong_runtime/src/executor/sleeper.rs index ef08211137cececb0532aba2f0042076d613bad8..7be47b5f532baef0f6e69ea51404f571807a7568 100644 --- a/ylong_runtime/src/executor/sleeper.rs +++ b/ylong_runtime/src/executor/sleeper.rs @@ -11,14 +11,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -pub(super) struct Sleeper { +pub(crate) struct Sleeper { record: Record, idle_list: Mutex>, num_workers: usize, + pub(crate) wake_by_search: Mutex>, } impl Sleeper { @@ -27,6 +27,7 @@ impl Sleeper { record: Record::new(num_workers), idle_list: Mutex::new(Vec::with_capacity(num_workers)), num_workers, + wake_by_search: Mutex::new(vec![false; num_workers]), } } @@ -35,7 +36,19 @@ impl Sleeper { idle_list.contains(worker_index) } - pub fn pop_worker(&self) -> Option { + pub fn pop_worker_by_id(&self, worker_index: &usize) { + let mut idle_list = self.idle_list.lock().unwrap(); + + for i in 0..idle_list.len() { + if &idle_list[i] == worker_index { + idle_list.swap_remove(i); + self.record.inc_active_num(); + break; + } + } + } + + pub fn pop_worker(&self, last_search: bool) -> Option { let (active_num, searching_num) = self.record.load_state(); if active_num >= self.num_workers || searching_num > 0 { return None; @@ -44,31 +57,44 @@ impl Sleeper { let mut idle_list = self.idle_list.lock().unwrap(); let res = idle_list.pop(); - if res.is_some() { + drop(idle_list); + if let Some(worker_idx) = res.as_ref() { + if last_search { + let mut search_list = self.wake_by_search.lock().unwrap(); + search_list[*worker_idx] = true; + } self.record.inc_active_num(); } + res } // return true if it's the last thread going to sleep. pub fn push_worker(&self, worker_index: usize) -> bool { let mut idle_list = self.idle_list.lock().unwrap(); - idle_list.push(worker_index); + idle_list.push(worker_index); self.record.dec_active_num() } + #[inline] + pub fn inc_searching_num(&self) { + self.record.inc_searching_num(); + } + pub fn try_inc_searching_num(&self) -> bool { let (active_num, searching_num) = self.record.load_state(); + if searching_num * 2 < active_num { // increment searching worker number - self.record.inc_searching_num(); + self.inc_searching_num(); return true; } false } - // reutrn true if it's the last searching thread + // return true if it's the last searching thread + #[inline] pub fn dec_searching_num(&self) -> bool { self.record.dec_searching_num() } @@ -88,30 +114,30 @@ impl Record { // Return true if it is the last searching thread fn dec_searching_num(&self) -> bool { - let ret = self.0.fetch_sub(1, SeqCst); + let ret = self.0.fetch_sub(1, Ordering::SeqCst); (ret & SEARCHING_MASK) == 1 } fn inc_searching_num(&self) { - self.0.fetch_add(1, SeqCst); + self.0.fetch_add(1, Ordering::SeqCst); } fn inc_active_num(&self) { let inc = 1 << ACTIVE_WORKER_SHIFT; - self.0.fetch_add(inc, SeqCst); + self.0.fetch_add(inc, Ordering::SeqCst); } fn dec_active_num(&self) -> bool { let dec = 1 << ACTIVE_WORKER_SHIFT; - let ret = self.0.fetch_sub(dec, SeqCst); + let ret = self.0.fetch_sub(dec, Ordering::SeqCst); let active_num = ((ret & ACTIVE_MASK) >> ACTIVE_WORKER_SHIFT) - 1; active_num == 0 } fn load_state(&self) -> (usize, usize) { - let union_num = self.0.load(SeqCst); + let union_num = self.0.load(Ordering::SeqCst); let searching_num = union_num & SEARCHING_MASK; let active_num = (union_num & ACTIVE_MASK) >> ACTIVE_WORKER_SHIFT; diff --git a/ylong_runtime/src/executor/worker.rs b/ylong_runtime/src/executor/worker.rs index 42e5452140927d70e4713d1b5396d0117a028de6..75e34c337b5b649dbe51986ee2a30684eb1712f4 100644 --- a/ylong_runtime/src/executor/worker.rs +++ b/ylong_runtime/src/executor/worker.rs @@ -21,7 +21,6 @@ use crate::executor::async_pool::MultiThreadScheduler; use crate::executor::driver::Handle; use crate::executor::parker::Parker; use crate::executor::queue::LocalQueue; -use crate::task::yield_now::wake_yielded_tasks; use crate::task::Task; thread_local! { @@ -34,11 +33,24 @@ pub(crate) struct WorkerContext { } impl WorkerContext { + #[inline] fn run(&mut self) { let worker_ref = &self.worker; worker_ref.run(self); } + pub(crate) fn wake_yield(&self) -> bool { + let mut yielded = self.worker.yielded.borrow_mut(); + if yielded.is_empty() { + return false; + } + for waker in yielded.drain(..) { + waker.wake(); + } + true + } + + #[inline] fn release(&mut self) { self.worker.release(); } @@ -128,16 +140,22 @@ impl Worker { inner.increment_count(); inner.periodic_check(self); - // get a task from the queues and execute it if let Some(task) = self.get_task(inner, worker_ctx) { + if inner.is_searching { + inner.is_searching = false; + self.scheduler.wake_up_rand_one_if_last_search(); + } task.run(); continue; } - wake_yielded_tasks(worker_ctx); + // if there is no task, park the worker self.park_timeout(inner, worker_ctx); - self.check_cancel(inner); + + if !inner.is_searching && self.scheduler.is_waked_by_last_search(self.index) { + inner.is_searching = true; + } } } @@ -148,7 +166,7 @@ impl Worker { return Some(task); } - self.scheduler.dequeue(self.index, inner) + self.scheduler.dequeue(inner, worker_ctx) } #[inline] @@ -156,16 +174,26 @@ impl Worker { inner.check_cancel(self) } + fn has_work(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> bool { + worker_ctx.worker.lifo.borrow().is_some() || !inner.run_queue.is_empty() + } + fn park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext) { // still has works to do, go back to work - if worker_ctx.worker.lifo.borrow().is_some() || !inner.run_queue.is_empty() { + if self.has_work(inner, worker_ctx) { return; } - - self.scheduler.turn_to_sleep(self.index); + self.scheduler.turn_to_sleep(inner, self.index); + inner.is_searching = false; while !inner.is_cancel { inner.parker.park(); + + if self.has_work(inner, worker_ctx) { + self.scheduler.turn_from_sleep(&self.index); + break; + } + if self.scheduler.is_parked(&self.index) { self.check_cancel(inner); continue; @@ -201,6 +229,7 @@ pub(crate) struct Inner { /// local queue pub(crate) run_queue: LocalQueue, pub(crate) parker: Parker, + pub(crate) is_searching: bool, } impl Inner { @@ -210,6 +239,7 @@ impl Inner { is_cancel: false, run_queue: run_queues, parker, + is_searching: false, } } } diff --git a/ylong_runtime/src/sync/mpsc/bounded/mod.rs b/ylong_runtime/src/sync/mpsc/bounded/mod.rs index 84da091417024f150183ed94c33db1391da98df4..d585acaa17730443128c99d8e894abf9b4eccecf 100644 --- a/ylong_runtime/src/sync/mpsc/bounded/mod.rs +++ b/ylong_runtime/src/sync/mpsc/bounded/mod.rs @@ -390,9 +390,9 @@ impl BoundedReceiver { /// * `Poll::Pending` if no messages in the channel now, but the channel is /// not closed. /// * `Poll::Ready(Ok(T))` if receiving a value successfully. - /// * `Poll::Ready(Err(RecvError))` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All + /// senders have been dropped or the channel is closed. 2. No messages + /// remaining. /// /// # Examples /// @@ -420,9 +420,8 @@ impl BoundedReceiver { /// /// # Return value /// * `Ok(T)` if receiving a value successfully. - /// * `Err(RecvError)` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Err(RecvError)` in the following situations: 1. All senders have been + /// dropped or the channel is closed. 2. No messages remaining. /// /// # Examples /// diff --git a/ylong_runtime/src/sync/mpsc/unbounded/mod.rs b/ylong_runtime/src/sync/mpsc/unbounded/mod.rs index af52e7ec8da712cdc185f2c15284ca4ec65ea7cf..6da1dd862ce0fd46d61c40ba1d085ea5290789a8 100644 --- a/ylong_runtime/src/sync/mpsc/unbounded/mod.rs +++ b/ylong_runtime/src/sync/mpsc/unbounded/mod.rs @@ -272,9 +272,9 @@ impl UnboundedReceiver { /// * `Poll::Pending` if no messages in the channel now, but the channel is /// not closed. /// * `Poll::Ready(Ok(T))` if receiving a value successfully. - /// * `Poll::Ready(Err(RecvError))` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All + /// senders have been dropped or the channel is closed. 2. No messages + /// remaining. /// /// # Examples /// @@ -301,9 +301,8 @@ impl UnboundedReceiver { /// /// # Return value /// * `Ok(T)` if receiving a value successfully. - /// * `Err(RecvError)` in the following situations: - /// 1. All senders have been dropped or the channel is closed. - /// 2. No messages remaining. + /// * `Err(RecvError)` in the following situations: 1. All senders have been + /// dropped or the channel is closed. 2. No messages remaining. /// /// # Examples /// diff --git a/ylong_runtime/src/task/yield_now.rs b/ylong_runtime/src/task/yield_now.rs index 0de2b2e94e0ea79ba4104d07f93abbe979161a55..ad56b9376d0ca370e65b6f57c5b312e962890621 100644 --- a/ylong_runtime/src/task/yield_now.rs +++ b/ylong_runtime/src/task/yield_now.rs @@ -18,7 +18,6 @@ use std::task::{Context, Poll}; cfg_not_ffrt!( use crate::executor::worker; - use crate::executor::worker::WorkerContext; ); /// Yields the current task and wakes it for a reschedule. @@ -76,17 +75,6 @@ impl Future for YieldTask { } } -#[cfg(not(feature = "ffrt"))] -pub(crate) fn wake_yielded_tasks(worker_ctx: &WorkerContext) { - let mut yielded = worker_ctx.worker.yielded.borrow_mut(); - if yielded.is_empty() { - return; - } - for waker in yielded.drain(..) { - waker.wake(); - } -} - #[cfg(test)] mod test { use crate::task::yield_now;