diff --git a/ylong_runtime/src/sync/mpsc/bounded/array.rs b/ylong_runtime/src/sync/mpsc/bounded/array.rs index 872969a5e258c6f9dc42eedb354c40a6452aa24e..24cd8a4c7ad90cb44e3f29ca27accae5d9aede0b 100644 --- a/ylong_runtime/src/sync/mpsc/bounded/array.rs +++ b/ylong_runtime/src/sync/mpsc/bounded/array.rs @@ -198,7 +198,8 @@ impl Container for Array { } fn len(&self) -> usize { - let head = *self.head.borrow(); + // we are only reading the index of the array, it's safe + let head = unsafe { *self.head.as_ptr() }; let tail = self.tail.load(Acquire) >> INDEX_SHIFT; tail - head } diff --git a/ylong_runtime/src/sync/mpsc/bounded/mod.rs b/ylong_runtime/src/sync/mpsc/bounded/mod.rs index d585acaa17730443128c99d8e894abf9b4eccecf..8074d2902e92b7f9089f2232ba55b145f64b564d 100644 --- a/ylong_runtime/src/sync/mpsc/bounded/mod.rs +++ b/ylong_runtime/src/sync/mpsc/bounded/mod.rs @@ -521,3 +521,45 @@ impl Drop for BoundedReceiver { self.channel.close(); } } + +#[cfg(test)] +mod bounded_mpsc_test { + use crate::sync::mpsc::bounded_channel; + + /// UT test cases for `len` while calling `recv` + /// . + /// # Brief + /// 1. Receiver receives from the queue for multiple times + /// 2. Sender 1 gets the length of the queue while the receiver is receiving + /// 3. Sender 2 sends messages to the receiver + /// 4. Check if all the returns are correct + #[test] + fn ut_bounded_mpsc_get_len_while_receiving() { + let (sender, mut receiver) = bounded_channel(5); + let sender2 = sender.clone(); + + let handle1 = crate::spawn(async move { + for _ in 0..1000 { + let ret = receiver.recv().await.unwrap(); + assert_eq!(ret, 1); + } + }); + + let handle2 = crate::spawn(async move { + for _ in 0..2000 { + let _len = sender2.len(); + } + }); + + let handle3 = crate::spawn(async move { + for _ in 0..1000 { + let ret = sender.send(1).await; + assert!(ret.is_ok()); + } + }); + + crate::block_on(handle1).unwrap(); + crate::block_on(handle2).unwrap(); + crate::block_on(handle3).unwrap(); + } +} diff --git a/ylong_runtime/src/sync/mpsc/unbounded/mod.rs b/ylong_runtime/src/sync/mpsc/unbounded/mod.rs index 6da1dd862ce0fd46d61c40ba1d085ea5290789a8..efa3f50d13973e0b31ce0e411150c1c7d13d9a57 100644 --- a/ylong_runtime/src/sync/mpsc/unbounded/mod.rs +++ b/ylong_runtime/src/sync/mpsc/unbounded/mod.rs @@ -398,3 +398,45 @@ impl Drop for UnboundedReceiver { self.channel.close(); } } + +#[cfg(test)] +mod unbounded_mpsc_test { + use crate::sync::mpsc::unbounded_channel; + + /// UT test cases for `len` while calling `recv` + /// . + /// # Brief + /// 1. Receiver receives from the queue for multiple times + /// 2. Sender 1 gets the length of the queue while the receiver is receiving + /// 3. Sender 2 sends messages to the receiver + /// 4. Check if all the returns are correct + #[test] + fn ut_unbounded_mpsc_get_len_while_receiving() { + let (sender, mut receiver) = unbounded_channel(); + let sender2 = sender.clone(); + + let handle1 = crate::spawn(async move { + for _ in 0..1000 { + let ret = receiver.recv().await.unwrap(); + assert_eq!(ret, 1); + } + }); + + let handle2 = crate::spawn(async move { + for _ in 0..2000 { + let _len = sender2.len(); + } + }); + + let handle3 = crate::spawn(async move { + for _ in 0..1000 { + let ret = sender.send(1); + assert!(ret.is_ok()); + } + }); + + crate::block_on(handle1).unwrap(); + crate::block_on(handle2).unwrap(); + crate::block_on(handle3).unwrap(); + } +} diff --git a/ylong_runtime/src/sync/mpsc/unbounded/queue.rs b/ylong_runtime/src/sync/mpsc/unbounded/queue.rs index e44c122b36f8de836f1de82b3c464de6bd1b5ede..df6b60f75d63aa9944daf24ec5e524f3075cb3ab 100644 --- a/ylong_runtime/src/sync/mpsc/unbounded/queue.rs +++ b/ylong_runtime/src/sync/mpsc/unbounded/queue.rs @@ -244,7 +244,8 @@ impl Container for Queue { } fn len(&self) -> usize { - let head = self.head.borrow().index; + // we are only reading the index of the queue, it's safe + let head = unsafe { (*self.head.as_ptr()).index }; let mut tail = self.tail.index.load(Acquire) >> INDEX_SHIFT; if tail % (CAPACITY + 1) == CAPACITY { tail = tail.wrapping_add(1);