From e616d374efd5efce5fe2361dee4e3075bf003678 Mon Sep 17 00:00:00 2001 From: li_junsong Date: Tue, 27 Jun 2023 17:24:15 +0800 Subject: [PATCH 1/2] =?UTF-8?q?time=E6=A8=A1=E5=9D=97=E9=80=82=E9=85=8Dlin?= =?UTF-8?q?ked=5Flist?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: li_junsong --- ylong_runtime/src/time/driver.rs | 15 ++++--- ylong_runtime/src/time/mod.rs | 28 ++++++++---- ylong_runtime/src/time/sleep.rs | 5 ++- ylong_runtime/src/time/timer_handle.rs | 31 ------------- ylong_runtime/src/time/wheel.rs | 62 +++++++++++++------------- 5 files changed, 61 insertions(+), 80 deletions(-) delete mode 100644 ylong_runtime/src/time/timer_handle.rs diff --git a/ylong_runtime/src/time/driver.rs b/ylong_runtime/src/time/driver.rs index f961347..bb056a3 100644 --- a/ylong_runtime/src/time/driver.rs +++ b/ylong_runtime/src/time/driver.rs @@ -13,11 +13,12 @@ * limitations under the License. */ -use crate::time::timer_handle::TimerHandle; use crate::time::wheel::Wheel; +use crate::time::Clock; use std::convert::TryInto; use std::fmt::Error; use std::mem::MaybeUninit; +use std::ptr::NonNull; use std::sync::{Mutex, Once}; use std::task::Waker; use std::time::Instant; @@ -49,9 +50,9 @@ impl Driver { self.start_time } - pub(crate) fn insert(&self, timer_handle: TimerHandle) -> Result { + pub(crate) fn insert(&self, clock_ptr: NonNull) -> Result { let mut lock = self.wheel.lock().unwrap(); - lock.insert(timer_handle) + lock.insert(clock_ptr) } pub(crate) fn run(&self) { @@ -68,17 +69,17 @@ impl Driver { let mut lock = self.wheel.lock().unwrap(); - while let Some(timer_handle) = lock.poll(now) { + while let Some(mut clock_ptr) = lock.poll(now) { let elapsed = lock.elapsed(); lock.set_last_elapsed(elapsed); // Unsafe access to timer_handle is only unsafe when Sleep Drop, // but does not let `Sleep` go to `Ready` before access to timer_handle fetched by poll. - let timer_handle = unsafe { timer_handle.inner().as_mut() }; - waker_list[waker_idx] = timer_handle.take_waker(); + let clock_handle = unsafe { clock_ptr.as_mut() }; + waker_list[waker_idx] = clock_handle.take_waker(); waker_idx += 1; - timer_handle.set_result(true); + clock_handle.set_result(true); if waker_idx == waker_list.len() { for waker in waker_list.iter_mut() { diff --git a/ylong_runtime/src/time/mod.rs b/ylong_runtime/src/time/mod.rs index 2578dc4..edf4907 100644 --- a/ylong_runtime/src/time/mod.rs +++ b/ylong_runtime/src/time/mod.rs @@ -20,7 +20,6 @@ mod error; mod sleep; mod timeout; mod timer; -mod timer_handle; mod wheel; pub(crate) use driver::Driver; @@ -28,8 +27,8 @@ pub use sleep::{sleep, sleep_until}; pub use timeout::timeout; pub use timer::{periodic_schedule, timer, timer_at, Timer}; -use crate::time::timer_handle::TimerHandle; -use std::ptr::NonNull; +use crate::util::link_list::{Link, Node}; +use std::ptr::{addr_of_mut, NonNull}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::task::Waker; @@ -52,6 +51,9 @@ pub(crate) struct Clock { // Corresponding waker, // which is used to wake up sleep coroutine. waker: Option, + + // Linked_list node. + node: Node, } impl Clock { @@ -63,6 +65,7 @@ impl Clock { duration: 0, result: AtomicBool::new(false), waker: None, + node: Node::new(), } } @@ -113,11 +116,20 @@ impl Clock { pub(crate) fn set_result(&mut self, result: bool) { self.result.store(result, Relaxed); } +} - // Creates a TimerHandle to point to the current structure. - pub(crate) fn handle(&self) -> TimerHandle { - TimerHandle { - inner: NonNull::from(self), - } +impl Default for Clock { + fn default() -> Self { + Clock::new() + } +} + +unsafe impl Link for Clock { + unsafe fn node(mut ptr: NonNull) -> NonNull> + where + Self: Sized, + { + let node_ptr = addr_of_mut!(ptr.as_mut().node); + NonNull::new_unchecked(node_ptr) } } diff --git a/ylong_runtime/src/time/sleep.rs b/ylong_runtime/src/time/sleep.rs index 87f47a7..c9f4af6 100644 --- a/ylong_runtime/src/time/sleep.rs +++ b/ylong_runtime/src/time/sleep.rs @@ -18,6 +18,7 @@ use crate::time::Driver; use std::convert::TryInto; use std::future::Future; use std::pin::Pin; +use std::ptr::NonNull; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -97,7 +98,7 @@ impl Sleep { fn cancel(&mut self) { let driver = Driver::get_ref(); let mut lock = driver.wheel.lock().unwrap(); - lock.cancel(&self.timer.handle()); + lock.cancel(NonNull::from(&self.timer)); } } @@ -117,7 +118,7 @@ impl Future for Sleep { self.timer.set_expiration(ms); self.timer.set_waker(cx.waker().clone()); - match driver.insert(self.timer.handle()) { + match driver.insert(NonNull::from(&self.timer)) { Ok(_) => self.need_insert = false, Err(_) => { // Even if the insertion fails, there is no need to insert again here, diff --git a/ylong_runtime/src/time/timer_handle.rs b/ylong_runtime/src/time/timer_handle.rs deleted file mode 100644 index 6ca90bd..0000000 --- a/ylong_runtime/src/time/timer_handle.rs +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2023 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::time::Clock; -use std::ptr::NonNull; - -// Pointer structure to Timer to circumvent lifecycle issues caused by references. -#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] -pub(crate) struct TimerHandle { - pub(crate) inner: NonNull, -} - -impl TimerHandle { - // Return inner, - // which is a non_null pointer to Timer. - pub(crate) fn inner(&self) -> NonNull { - self.inner - } -} diff --git a/ylong_runtime/src/time/wheel.rs b/ylong_runtime/src/time/wheel.rs index cbeb149..29fbf9d 100644 --- a/ylong_runtime/src/time/wheel.rs +++ b/ylong_runtime/src/time/wheel.rs @@ -13,10 +13,12 @@ * limitations under the License. */ -use crate::time::timer_handle::TimerHandle; -use std::collections::VecDeque; +use crate::time::Clock; +use crate::util::link_list::LinkedList; use std::fmt::Error; +use std::mem; use std::mem::MaybeUninit; +use std::ptr::NonNull; // In a slots, the number of slot. const SLOTS_NUM: usize = 64; @@ -51,7 +53,7 @@ pub(crate) struct Wheel { // These corresponding timers have expired, // and are ready to be triggered. - trigger: VecDeque, + trigger: LinkedList, } impl Wheel { @@ -111,8 +113,8 @@ impl Wheel { } // Insert the corresponding TimerHandle into the specified position in the timing wheel. - pub(crate) fn insert(&mut self, timer_handle: TimerHandle) -> Result { - let expiration = unsafe { timer_handle.inner().as_ref().expiration() }; + pub(crate) fn insert(&mut self, mut clock_ptr: NonNull) -> Result { + let expiration = unsafe { clock_ptr.as_ref().expiration() }; if expiration <= self.elapsed() { // This means that the timeout period has passed, @@ -123,23 +125,21 @@ impl Wheel { let level = self.find_level(expiration); // Unsafe access to timer_handle is only unsafe when Sleep Drop, // `Sleep` here does not go into `Ready`. - unsafe { timer_handle.inner().as_mut().set_level(level) }; + unsafe { clock_ptr.as_mut().set_level(level) }; - self.levels[level].insert(timer_handle, self.elapsed); + self.levels[level].insert(clock_ptr, self.elapsed); Ok(expiration) } - pub(crate) fn cancel(&mut self, timer_handle: &TimerHandle) { + pub(crate) fn cancel(&mut self, clock_ptr: NonNull) { // Unsafe access to timer_handle is only unsafe when Sleep Drop, // `Sleep` here does not go into `Ready`. - let level = unsafe { timer_handle.inner().as_ref().level() }; - self.levels[level].cancel(timer_handle); - for (index, handle) in self.trigger.iter().enumerate() { - if handle == timer_handle { - self.trigger.remove(index).unwrap(); - break; - } + let level = unsafe { clock_ptr.as_ref().level() }; + self.levels[level].cancel(clock_ptr); + + unsafe { + LinkedList::remove(clock_ptr); } } @@ -165,7 +165,7 @@ impl Wheel { } // Determine which timers have timed out at the current time. - pub(crate) fn poll(&mut self, now: u64) -> Option { + pub(crate) fn poll(&mut self, now: u64) -> Option> { loop { if let Some(handle) = self.trigger.pop_back() { return Some(handle); @@ -206,13 +206,13 @@ pub struct Level { occupied: u64, // slots in a level. - slots: [VecDeque; SLOTS_NUM], + slots: [LinkedList; SLOTS_NUM], } impl Level { // Specify the level and create a Level structure. pub(crate) fn new(level: usize) -> Self { - let mut slots: [MaybeUninit>; SLOTS_NUM] = + let mut slots: [MaybeUninit>; SLOTS_NUM] = unsafe { MaybeUninit::uninit().assume_init() }; for slot in slots.iter_mut() { @@ -220,7 +220,7 @@ impl Level { } unsafe { - let slots = std::mem::transmute::<_, [VecDeque; SLOTS_NUM]>(slots); + let slots = mem::transmute::<_, [LinkedList; SLOTS_NUM]>(slots); Self { level, occupied: 0, @@ -232,31 +232,28 @@ impl Level { // Based on the elapsed which the current time wheel is running, // and the expected expiration time of the timer_handle, // find the corresponding slot and insert it. - pub(crate) fn insert(&mut self, timer_handle: TimerHandle, elapsed: u64) { - let duration = unsafe { timer_handle.inner().as_ref().expiration() } - elapsed; + pub(crate) fn insert(&mut self, mut clock_ptr: NonNull, elapsed: u64) { + let duration = unsafe { clock_ptr.as_ref().expiration() } - elapsed; // Unsafe access to timer_handle is only unsafe when Sleep Drop, // `Sleep` here does not go into `Ready`. - unsafe { timer_handle.inner().as_mut().set_duration(duration) }; + unsafe { clock_ptr.as_mut().set_duration(duration) }; let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize; - self.slots[slot].push_front(timer_handle); + self.slots[slot].push_front(clock_ptr); self.occupied |= 1 << slot; } - pub(crate) fn cancel(&mut self, timer_handle: &TimerHandle) { + pub(crate) fn cancel(&mut self, clock_ptr: NonNull) { // Unsafe access to timer_handle is only unsafe when Sleep Drop, // `Sleep` here does not go into `Ready`. - let duration = unsafe { timer_handle.inner().as_ref().duration() }; + let duration = unsafe { clock_ptr.as_ref().duration() }; let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize; - for (index, handle) in self.slots[slot].iter().enumerate() { - if handle == timer_handle { - self.slots[slot].remove(index).unwrap(); - break; - } + unsafe { + LinkedList::remove(clock_ptr); } if self.slots[slot].is_empty() { @@ -295,9 +292,10 @@ impl Level { } // Fetch all timers in a slot of the corresponding level. - pub(crate) fn take_slot(&mut self, slot: usize) -> VecDeque { + pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList { self.occupied &= !(1 << slot); - std::mem::take(&mut self.slots[slot]) + let handle = mem::take(&mut self.slots[slot]); + handle } } -- Gitee From 56de66373451f0115847dc6ee7c667898f4aed94 Mon Sep 17 00:00:00 2001 From: li_junsong Date: Wed, 28 Jun 2023 10:54:27 +0800 Subject: [PATCH 2/2] =?UTF-8?q?async=5Fbuf=E6=B5=8B=E8=AF=95=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: li_junsong --- ylong_runtime/tests/async_buf_read.rs | 2 ++ ylong_runtime/tests/async_buf_write.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ylong_runtime/tests/async_buf_read.rs b/ylong_runtime/tests/async_buf_read.rs index f683f38..19198c7 100644 --- a/ylong_runtime/tests/async_buf_read.rs +++ b/ylong_runtime/tests/async_buf_read.rs @@ -13,6 +13,7 @@ * limitations under the License. */ +use std::fs; use std::io::SeekFrom; use ylong_runtime::fs::File; use ylong_runtime::io::{ @@ -236,4 +237,5 @@ fn sdv_buf_reader_seek() { assert_eq!(buf, "dolor".as_bytes()); }); ylong_runtime::block_on(handle1).unwrap(); + assert!(fs::remove_file("./tests/buf_reader_seek_file").is_ok()); } diff --git a/ylong_runtime/tests/async_buf_write.rs b/ylong_runtime/tests/async_buf_write.rs index c3d3430..091c208 100644 --- a/ylong_runtime/tests/async_buf_write.rs +++ b/ylong_runtime/tests/async_buf_write.rs @@ -13,6 +13,7 @@ * limitations under the License. */ +use std::fs; use std::io::{IoSlice, SeekFrom}; use ylong_runtime::fs::File; use ylong_runtime::io::{AsyncBufWriter, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; @@ -141,4 +142,5 @@ fn sdv_buf_writer_seek() { assert_eq!(buf, "dolor".as_bytes()); }); ylong_runtime::block_on(handle1).unwrap(); + assert!(fs::remove_file("./tests/buf_writer_seek_file").is_ok()); } -- Gitee