From 396355821379af08aee04f3885e20eb3fd657be2 Mon Sep 17 00:00:00 2001 From: fqwert Date: Mon, 10 Jul 2023 19:13:26 +0800 Subject: [PATCH] async_iter --- .../examples/ylong_runtime_stream.rs | 10 ++ .../src/iter/async_iter/adapters/all.rs | 50 ++++++++ .../src/iter/async_iter/adapters/any.rs | 50 ++++++++ .../src/iter/async_iter/adapters/chain.rs | 55 +++++++++ .../src/iter/async_iter/adapters/filter.rs | 55 +++++++++ .../src/iter/async_iter/adapters/fuse.rs | 51 +++++++++ .../src/iter/async_iter/adapters/map.rs | 42 +++++++ .../src/iter/async_iter/adapters/mod.rs | 108 ++++++++++++++++++ .../src/iter/async_iter/adapters/next.rs | 35 ++++++ .../src/iter/async_iter/adapters/skip.rs | 50 ++++++++ .../src/iter/async_iter/adapters/take.rs | 49 ++++++++ ylong_runtime/src/iter/async_iter/mod.rs | 22 ++++ ylong_runtime/src/iter/async_iter/traits.rs | 61 ++++++++++ .../src/iter/async_iter/wrappers/mod.rs | 19 +++ .../iter/async_iter/wrappers/tcp_listener.rs | 37 ++++++ ylong_runtime/src/iter/mod.rs | 13 +-- ylong_runtime/src/iter/{ => par_iter}/core.rs | 0 ylong_runtime/src/iter/par_iter/mod.rs | 25 ++++ .../src/iter/{ => par_iter}/parallel/array.rs | 0 .../parallel/collections/binary_heap.rs | 2 +- .../parallel/collections/btree_map.rs | 2 +- .../parallel/collections/btree_set.rs | 2 +- .../parallel/collections/hash_map.rs | 2 +- .../parallel/collections/hash_set.rs | 2 +- .../parallel/collections/linked_list.rs | 2 +- .../parallel/collections/mod.rs | 0 .../parallel/collections/vec_deque.rs | 2 +- .../src/iter/{ => par_iter}/parallel/mod.rs | 0 .../src/iter/{ => par_iter}/parallel/slice.rs | 0 .../src/iter/{ => par_iter}/parallel/vec.rs | 0 .../src/iter/{ => par_iter}/pariter/filter.rs | 0 .../iter/{ => par_iter}/pariter/for_each.rs | 0 .../src/iter/{ => par_iter}/pariter/map.rs | 0 .../src/iter/{ => par_iter}/pariter/mod.rs | 0 .../src/iter/{ => par_iter}/pariter/sum.rs | 0 .../src/iter/{ => par_iter}/pariter/zip.rs | 0 .../src/iter/{ => par_iter}/prelude.rs | 4 +- ylong_runtime/src/lib.rs | 4 +- 38 files changed, 733 insertions(+), 21 deletions(-) create mode 100644 ylong_runtime/examples/ylong_runtime_stream.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/all.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/any.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/chain.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/filter.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/fuse.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/map.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/mod.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/next.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/skip.rs create mode 100644 ylong_runtime/src/iter/async_iter/adapters/take.rs create mode 100644 ylong_runtime/src/iter/async_iter/mod.rs create mode 100644 ylong_runtime/src/iter/async_iter/traits.rs create mode 100644 ylong_runtime/src/iter/async_iter/wrappers/mod.rs create mode 100644 ylong_runtime/src/iter/async_iter/wrappers/tcp_listener.rs rename ylong_runtime/src/iter/{ => par_iter}/core.rs (100%) create mode 100644 ylong_runtime/src/iter/par_iter/mod.rs rename ylong_runtime/src/iter/{ => par_iter}/parallel/array.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/binary_heap.rs (93%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/btree_map.rs (94%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/btree_set.rs (93%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/hash_map.rs (94%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/hash_set.rs (93%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/linked_list.rs (94%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/mod.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/collections/vec_deque.rs (93%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/mod.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/slice.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/parallel/vec.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/pariter/filter.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/pariter/for_each.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/pariter/map.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/pariter/mod.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/pariter/sum.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/pariter/zip.rs (100%) rename ylong_runtime/src/iter/{ => par_iter}/prelude.rs (83%) diff --git a/ylong_runtime/examples/ylong_runtime_stream.rs b/ylong_runtime/examples/ylong_runtime_stream.rs new file mode 100644 index 0000000..714d49e --- /dev/null +++ b/ylong_runtime/examples/ylong_runtime_stream.rs @@ -0,0 +1,10 @@ +use ylong_runtime::stream::exts::AsyncIteratorExt; +use ylong_runtime::stream::from_iter; +fn main() { + ylong_runtime::block_on(async { + let v = vec![1, 2, 3, 4, 5, 6, 7]; + while let Some(i) = from_iter(v.iter()).filter(|x| **x > 4).next().await { + println!("{}", i); + } + }); +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/all.rs b/ylong_runtime/src/iter/async_iter/adapters/all.rs new file mode 100644 index 0000000..abd07a0 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/all.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::future::Future; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct All { + iter: I, + f: F, +} + +impl All { + pub fn new(iter: I, f: F) -> Self { + All { iter, f } + } +} + +impl Future for All +where + I: AsyncIterator, + F: FnMut(I::Item) -> bool, +{ + type Output = bool; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + let Self { iter, f } = self.get_unchecked_mut(); + let mut iter = Pin::new_unchecked(iter); + while let Some(i) = ready!(iter.as_mut().poll_next(cx)) { + if !(f)(i) { + return Poll::Ready(false); + } + } + Poll::Ready(true) + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/any.rs b/ylong_runtime/src/iter/async_iter/adapters/any.rs new file mode 100644 index 0000000..c6478b0 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/any.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::future::Future; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct Any { + iter: I, + f: F, +} + +impl Any { + pub fn new(iter: I, f: F) -> Self { + Any { iter, f } + } +} + +impl<'a, I, F> Future for Any +where + I: AsyncIterator, + F: FnMut(I::Item) -> bool, +{ + type Output = bool; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + let Self { iter, f } = self.get_unchecked_mut(); + let mut iter = Pin::new_unchecked(iter); + while let Some(i) = ready!(iter.as_mut().poll_next(cx)) { + if (f)(i) { + return Poll::Ready(true); + } + } + Poll::Ready(false) + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/chain.rs b/ylong_runtime/src/iter/async_iter/adapters/chain.rs new file mode 100644 index 0000000..a1124fc --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/chain.rs @@ -0,0 +1,55 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct Chain { + opt_a: Option, + b: B, +} + +impl Chain { + pub fn new(a: A, b: B) -> Self { + Chain { opt_a: Some(a), b } + } +} + +impl AsyncIterator for Chain +where + A: AsyncIterator, + B: AsyncIterator, +{ + type Item = A::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let Self { opt_a, b } = self.get_unchecked_mut(); + match opt_a.as_mut() { + Some(a) => match ready!(Pin::new_unchecked(a).poll_next(cx)) { + Some(i) => Poll::Ready(Some(i)), + None => { + *opt_a = None; + Poll::Ready(None) + } + }, + None => match ready!(Pin::new_unchecked(b).poll_next(cx)) { + Some(i) => Poll::Ready(Some(i)), + None => Poll::Ready(None), + }, + } + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/filter.rs b/ylong_runtime/src/iter/async_iter/adapters/filter.rs new file mode 100644 index 0000000..88768f9 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/filter.rs @@ -0,0 +1,55 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct Filter { + iter: I, + predicate: P, +} + +impl Filter { + pub fn new(iter: I, predicate: P) -> Self { + Filter { iter, predicate } + } +} + +impl AsyncIterator for Filter +where + I: AsyncIterator, + P: FnMut(&I::Item) -> bool, +{ + type Item = I::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let Self { iter, predicate } = self.get_unchecked_mut(); + let mut iter = Pin::new_unchecked(iter); + loop { + match ready!(iter.as_mut().poll_next(cx)) { + Some(i) => { + if (predicate)(&i) { + return Poll::Ready(Some(i)); + } + } + None => { + return Poll::Ready(None); + } + } + } + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/fuse.rs b/ylong_runtime/src/iter/async_iter/adapters/fuse.rs new file mode 100644 index 0000000..ebc10eb --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/fuse.rs @@ -0,0 +1,51 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct Fuse { + opt_iter: Option, +} + +impl Fuse { + pub fn new(iter: I) -> Self { + Fuse { + opt_iter: Some(iter), + } + } +} +impl AsyncIterator for Fuse +where + I: AsyncIterator, +{ + type Item = I::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let Self { opt_iter } = self.get_unchecked_mut(); + match opt_iter.as_mut() { + Some(iter) => match ready!(Pin::new_unchecked(iter).poll_next(cx)) { + Some(i) => Poll::Ready(Some(i)), + None => { + *opt_iter = None; + Poll::Ready(None) + } + }, + None => Poll::Ready(None), + } + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/map.rs b/ylong_runtime/src/iter/async_iter/adapters/map.rs new file mode 100644 index 0000000..064de65 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/map.rs @@ -0,0 +1,42 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; +pub struct Map { + iter: I, + f: F, +} + +impl Map { + pub fn new(iter: I, f: F) -> Self { + Map { iter, f } + } +} + +impl AsyncIterator for Map +where + I: AsyncIterator, + F: FnMut(I::Item) -> T + Unpin + Copy, +{ + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let Self { iter, f } = self.get_unchecked_mut(); + Poll::Ready(ready!(Pin::new_unchecked(iter).poll_next(cx)).map(|i| f(i))) + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/mod.rs b/ylong_runtime/src/iter/async_iter/adapters/mod.rs new file mode 100644 index 0000000..4c269d9 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/mod.rs @@ -0,0 +1,108 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; + +mod all; +pub use all::All; + +mod any; +pub use any::Any; + +mod chain; +pub use chain::Chain; + +mod filter; +pub use filter::Filter; + +mod fuse; +pub use fuse::Fuse; + +mod map; +pub use map::Map; + +mod next; +pub use next::Next; + +mod skip; +pub use skip::Skip; + +mod take; +pub use take::Take; + +pub trait AsyncIteratorExt: AsyncIterator { + fn all bool>(self, f: F) -> All + where + Self: Sized, + { + All::new(self, f) + } + + fn any bool>(self, f: F) -> Any + where + Self: Sized, + { + Any::new(self, f) + } + + fn next(&mut self) -> Next<'_, Self> { + Next::new(self) + } + + fn chain(self, b: B) -> Chain + where + Self: Sized, + B: AsyncIterator, + { + Chain::new(self, b) + } + + fn map(self, f: F) -> Map + where + Self: Sized, + F: FnMut(Self::Item) -> T, + { + Map::new(self, f) + } + + fn filter

(self, predicate: P) -> Filter + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + Filter::new(self, predicate) + } + + fn fuse(self) -> Fuse + where + Self: Sized, + { + Fuse::new(self) + } + + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + Skip::new(self, n) + } + + fn take(self, n: usize) -> Take + where + Self: Sized, + { + Take::new(self, n) + } +} + +impl AsyncIteratorExt for A {} diff --git a/ylong_runtime/src/iter/async_iter/adapters/next.rs b/ylong_runtime/src/iter/async_iter/adapters/next.rs new file mode 100644 index 0000000..8d8ed74 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/next.rs @@ -0,0 +1,35 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct Next<'a, I: ?Sized> { + iter: &'a mut I, +} +impl<'a, I: ?Sized> Next<'a, I> { + pub fn new(iter: &'a mut I) -> Self { + Next { iter } + } +} + +impl Future for Next<'_, I> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.iter).poll_next(cx) + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/skip.rs b/ylong_runtime/src/iter/async_iter/adapters/skip.rs new file mode 100644 index 0000000..d8f757a --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/skip.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct Skip { + iter: I, + n: usize, +} + +impl Skip { + pub fn new(iter: I, n: usize) -> Self { + Skip { iter, n } + } +} + +impl AsyncIterator for Skip +where + I: AsyncIterator, +{ + type Item = I::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let Self { iter, n } = self.get_unchecked_mut(); + let mut iter = Pin::new_unchecked(iter); + while *n > 0 { + let _ = ready!(iter.as_mut().poll_next(cx)); + *n -= 1; + } + match ready!(iter.as_mut().poll_next(cx)) { + Some(i) => Poll::Ready(Some(i)), + None => Poll::Ready(None), + } + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/adapters/take.rs b/ylong_runtime/src/iter/async_iter/adapters/take.rs new file mode 100644 index 0000000..e28165c --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/adapters/take.rs @@ -0,0 +1,49 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::async_iter::traits::AsyncIterator; +use std::pin::Pin; +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +pub struct Take { + iter: I, + n: usize, +} + +impl Take { + pub fn new(iter: I, n: usize) -> Self { + Take { iter, n } + } +} + +impl AsyncIterator for Take +where + I: AsyncIterator, +{ + type Item = I::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + let Self { iter, n } = self.get_unchecked_mut(); + if *n == 0 { + return Poll::Ready(None); + } + *n -= 1; + match ready!(Pin::new_unchecked(iter).poll_next(cx)) { + Some(i) => Poll::Ready(Some(i)), + None => Poll::Ready(None), + } + } + } +} diff --git a/ylong_runtime/src/iter/async_iter/mod.rs b/ylong_runtime/src/iter/async_iter/mod.rs new file mode 100644 index 0000000..ae7b2c8 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/mod.rs @@ -0,0 +1,22 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Asynchronous iteration +//! +//! std::iter::Iterator 的异步版本,比如对于一个实现了async_iter的类型,其调用的next方法将是异步的。同时支持包括map,zip,生成不同的迭代器。 +//! +//! 目前rust并不支持async for loops。需要使用while let loop next + +mod adapters; +pub mod wrappers; +mod traits; diff --git a/ylong_runtime/src/iter/async_iter/traits.rs b/ylong_runtime/src/iter/async_iter/traits.rs new file mode 100644 index 0000000..a543697 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/traits.rs @@ -0,0 +1,61 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + ops::DerefMut, + pin::Pin, + task::{Context, Poll}, +}; + +pub trait AsyncIterator { + type Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} + +impl AsyncIterator for &mut S +where + S: ?Sized + AsyncIterator + Unpin, +{ + type Item = S::Item; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } +} + +impl

AsyncIterator for Pin

+where + P: DerefMut, + P::Target: AsyncIterator, +{ + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.get_unchecked_mut().as_mut().poll_next(cx) } + } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } +} + +pub trait FusedAsyncIterator: AsyncIterator {} diff --git a/ylong_runtime/src/iter/async_iter/wrappers/mod.rs b/ylong_runtime/src/iter/async_iter/wrappers/mod.rs new file mode 100644 index 0000000..28094d7 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/wrappers/mod.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +cfg_net!{ + mod tcp_listener; + pub use tcp_listener::TcpListenerAsyncIter; + +} + diff --git a/ylong_runtime/src/iter/async_iter/wrappers/tcp_listener.rs b/ylong_runtime/src/iter/async_iter/wrappers/tcp_listener.rs new file mode 100644 index 0000000..ef7fb05 --- /dev/null +++ b/ylong_runtime/src/iter/async_iter/wrappers/tcp_listener.rs @@ -0,0 +1,37 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::net::{TcpListener, TcpStream}; +use crate::async_iter::traits::AsyncIterator; + +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct TcpListenerAsyncIter { + inner: TcpListener, +} + +impl TcpListenerAsyncIter {} + +impl AsyncIterator for TcpListenerAsyncIter { + type Item = io::Result<(TcpStream, SocketAddr)>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, addr))) => Poll::Ready(Some(Ok((stream, addr)))), + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/ylong_runtime/src/iter/mod.rs b/ylong_runtime/src/iter/mod.rs index 5d09b48..5437389 100644 --- a/ylong_runtime/src/iter/mod.rs +++ b/ylong_runtime/src/iter/mod.rs @@ -11,15 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! ParIter module, providing asynchronous iter lambda function implementations -//! such as `for_each`, `zip`, `map`, `filter`. -mod core; +pub mod async_iter; -/// Parallel type and implementation -pub mod parallel; - -/// Parallel iterator trait -pub mod pariter; - -/// Convenient Prelude -pub mod prelude; +pub mod par_iter; \ No newline at end of file diff --git a/ylong_runtime/src/iter/core.rs b/ylong_runtime/src/iter/par_iter/core.rs similarity index 100% rename from ylong_runtime/src/iter/core.rs rename to ylong_runtime/src/iter/par_iter/core.rs diff --git a/ylong_runtime/src/iter/par_iter/mod.rs b/ylong_runtime/src/iter/par_iter/mod.rs new file mode 100644 index 0000000..5d09b48 --- /dev/null +++ b/ylong_runtime/src/iter/par_iter/mod.rs @@ -0,0 +1,25 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! ParIter module, providing asynchronous iter lambda function implementations +//! such as `for_each`, `zip`, `map`, `filter`. +mod core; + +/// Parallel type and implementation +pub mod parallel; + +/// Parallel iterator trait +pub mod pariter; + +/// Convenient Prelude +pub mod prelude; diff --git a/ylong_runtime/src/iter/parallel/array.rs b/ylong_runtime/src/iter/par_iter/parallel/array.rs similarity index 100% rename from ylong_runtime/src/iter/parallel/array.rs rename to ylong_runtime/src/iter/par_iter/parallel/array.rs diff --git a/ylong_runtime/src/iter/parallel/collections/binary_heap.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/binary_heap.rs similarity index 93% rename from ylong_runtime/src/iter/parallel/collections/binary_heap.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/binary_heap.rs index f4208d1..7d50639 100644 --- a/ylong_runtime/src/iter/parallel/collections/binary_heap.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/binary_heap.rs @@ -14,7 +14,7 @@ use std::collections::BinaryHeap; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(BinaryHeap, Vec, into_iter, impl ); diff --git a/ylong_runtime/src/iter/parallel/collections/btree_map.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/btree_map.rs similarity index 94% rename from ylong_runtime/src/iter/parallel/collections/btree_map.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/btree_map.rs index 9e460f2..b5eb150 100644 --- a/ylong_runtime/src/iter/parallel/collections/btree_map.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/btree_map.rs @@ -14,7 +14,7 @@ use std::collections::BTreeMap; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(BTreeMap, Vec<(T,V)>, into_iter, impl ); diff --git a/ylong_runtime/src/iter/parallel/collections/btree_set.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/btree_set.rs similarity index 93% rename from ylong_runtime/src/iter/parallel/collections/btree_set.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/btree_set.rs index 1405fe6..bec37da 100644 --- a/ylong_runtime/src/iter/parallel/collections/btree_set.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/btree_set.rs @@ -14,7 +14,7 @@ use std::collections::BTreeSet; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(BTreeSet, Vec, into_iter, impl ); diff --git a/ylong_runtime/src/iter/parallel/collections/hash_map.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/hash_map.rs similarity index 94% rename from ylong_runtime/src/iter/parallel/collections/hash_map.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/hash_map.rs index b22e630..a942794 100644 --- a/ylong_runtime/src/iter/parallel/collections/hash_map.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/hash_map.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(HashMap, Vec<(T, V)>, into_iter, impl ); diff --git a/ylong_runtime/src/iter/parallel/collections/hash_set.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/hash_set.rs similarity index 93% rename from ylong_runtime/src/iter/parallel/collections/hash_set.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/hash_set.rs index 72d40a1..5ea9532 100644 --- a/ylong_runtime/src/iter/parallel/collections/hash_set.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/hash_set.rs @@ -14,7 +14,7 @@ use std::collections::HashSet; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(HashSet, Vec, into_iter, impl ); diff --git a/ylong_runtime/src/iter/parallel/collections/linked_list.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/linked_list.rs similarity index 94% rename from ylong_runtime/src/iter/parallel/collections/linked_list.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/linked_list.rs index 4bbd1d7..d61b860 100644 --- a/ylong_runtime/src/iter/parallel/collections/linked_list.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/linked_list.rs @@ -14,7 +14,7 @@ use std::collections::LinkedList; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(LinkedList, Vec, into_iter, impl ); diff --git a/ylong_runtime/src/iter/parallel/collections/mod.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/mod.rs similarity index 100% rename from ylong_runtime/src/iter/parallel/collections/mod.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/mod.rs diff --git a/ylong_runtime/src/iter/parallel/collections/vec_deque.rs b/ylong_runtime/src/iter/par_iter/parallel/collections/vec_deque.rs similarity index 93% rename from ylong_runtime/src/iter/parallel/collections/vec_deque.rs rename to ylong_runtime/src/iter/par_iter/parallel/collections/vec_deque.rs index 330b16c..5d6697f 100644 --- a/ylong_runtime/src/iter/parallel/collections/vec_deque.rs +++ b/ylong_runtime/src/iter/par_iter/parallel/collections/vec_deque.rs @@ -14,7 +14,7 @@ use std::collections::VecDeque; use super::par_vec_impl; -use crate::iter::parallel::{IntoParIter, ParIter}; +use crate::par_iter::parallel::{IntoParIter, ParIter}; par_vec_impl!(VecDeque, Vec, into_iter, impl ); par_vec_impl!(&'a VecDeque, Vec<&'a T>, iter, impl <'a, T>); diff --git a/ylong_runtime/src/iter/parallel/mod.rs b/ylong_runtime/src/iter/par_iter/parallel/mod.rs similarity index 100% rename from ylong_runtime/src/iter/parallel/mod.rs rename to ylong_runtime/src/iter/par_iter/parallel/mod.rs diff --git a/ylong_runtime/src/iter/parallel/slice.rs b/ylong_runtime/src/iter/par_iter/parallel/slice.rs similarity index 100% rename from ylong_runtime/src/iter/parallel/slice.rs rename to ylong_runtime/src/iter/par_iter/parallel/slice.rs diff --git a/ylong_runtime/src/iter/parallel/vec.rs b/ylong_runtime/src/iter/par_iter/parallel/vec.rs similarity index 100% rename from ylong_runtime/src/iter/parallel/vec.rs rename to ylong_runtime/src/iter/par_iter/parallel/vec.rs diff --git a/ylong_runtime/src/iter/pariter/filter.rs b/ylong_runtime/src/iter/par_iter/pariter/filter.rs similarity index 100% rename from ylong_runtime/src/iter/pariter/filter.rs rename to ylong_runtime/src/iter/par_iter/pariter/filter.rs diff --git a/ylong_runtime/src/iter/pariter/for_each.rs b/ylong_runtime/src/iter/par_iter/pariter/for_each.rs similarity index 100% rename from ylong_runtime/src/iter/pariter/for_each.rs rename to ylong_runtime/src/iter/par_iter/pariter/for_each.rs diff --git a/ylong_runtime/src/iter/pariter/map.rs b/ylong_runtime/src/iter/par_iter/pariter/map.rs similarity index 100% rename from ylong_runtime/src/iter/pariter/map.rs rename to ylong_runtime/src/iter/par_iter/pariter/map.rs diff --git a/ylong_runtime/src/iter/pariter/mod.rs b/ylong_runtime/src/iter/par_iter/pariter/mod.rs similarity index 100% rename from ylong_runtime/src/iter/pariter/mod.rs rename to ylong_runtime/src/iter/par_iter/pariter/mod.rs diff --git a/ylong_runtime/src/iter/pariter/sum.rs b/ylong_runtime/src/iter/par_iter/pariter/sum.rs similarity index 100% rename from ylong_runtime/src/iter/pariter/sum.rs rename to ylong_runtime/src/iter/par_iter/pariter/sum.rs diff --git a/ylong_runtime/src/iter/pariter/zip.rs b/ylong_runtime/src/iter/par_iter/pariter/zip.rs similarity index 100% rename from ylong_runtime/src/iter/pariter/zip.rs rename to ylong_runtime/src/iter/par_iter/pariter/zip.rs diff --git a/ylong_runtime/src/iter/prelude.rs b/ylong_runtime/src/iter/par_iter/prelude.rs similarity index 83% rename from ylong_runtime/src/iter/prelude.rs rename to ylong_runtime/src/iter/par_iter/prelude.rs index 48a3180..cd56926 100644 --- a/ylong_runtime/src/iter/prelude.rs +++ b/ylong_runtime/src/iter/par_iter/prelude.rs @@ -11,5 +11,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use crate::iter::parallel::{AsParIter, AsParIterMut, IntoParIter}; -pub use crate::iter::pariter::ParallelIterator; +pub use super::parallel::{AsParIter, AsParIterMut, IntoParIter}; +pub use super::pariter::ParallelIterator; diff --git a/ylong_runtime/src/lib.rs b/ylong_runtime/src/lib.rs index b3dfd19..96c118a 100644 --- a/ylong_runtime/src/lib.rs +++ b/ylong_runtime/src/lib.rs @@ -38,6 +38,9 @@ use crate::task::{JoinHandle, Task, TaskBuilder}; pub mod builder; pub mod error; pub mod executor; +mod iter; +pub use iter::async_iter; +pub use iter::par_iter; #[cfg(feature = "ffrt")] pub(crate) mod ffrt; @@ -45,7 +48,6 @@ pub(crate) mod ffrt; pub mod fs; pub mod futures; pub mod io; -pub mod iter; #[cfg(feature = "macros")] mod select; #[cfg(feature = "macros")] -- Gitee