diff --git a/ylong_runtime/src/io/mod.rs b/ylong_runtime/src/io/mod.rs index 1bf50bf81a9f92c4656efc6372e05370518797fd..ef70d0d14a9040b0cbb515eb0fb7bd4ed519b5d2 100644 --- a/ylong_runtime/src/io/mod.rs +++ b/ylong_runtime/src/io/mod.rs @@ -23,6 +23,10 @@ mod buffered; mod read_buf; mod read_task; mod seek_task; +mod stderr; +mod stdin; +mod stdio; +mod stdout; mod write_task; pub use async_buf_read::{AsyncBufRead, AsyncBufReadExt}; @@ -32,7 +36,10 @@ pub use async_write::{AsyncWrite, AsyncWriteExt}; pub use buffered::{AsyncBufReader, AsyncBufWriter}; pub use read_buf::ReadBuf; pub use read_task::ReadTask; -pub use write_task::WriteTask; +pub use stderr::{stderr, Stderr}; +pub use stdin::{stdin, Stdin}; +pub(crate) use stdio::State; +pub use stdout::{stdout, Stdout}; macro_rules! poll_ready { ($e:expr) => { diff --git a/ylong_runtime/src/io/stderr.rs b/ylong_runtime/src/io/stderr.rs new file mode 100644 index 0000000000000000000000000000000000000000..a1d16fd3df63ee4b569668833a8d98f8790cca9f --- /dev/null +++ b/ylong_runtime/src/io/stderr.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::io; +use std::io::Write; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::io::{AsyncWrite, State}; +use crate::spawn_blocking; + +/// A handle to the global standard err stream of the current process. +/// +/// `Stderr` implements the [`AsyncWrite`] trait. +pub struct Stderr { + std: Option, + state: State, + has_written: bool, +} + +/// Constructs a new handle to the standard output of the current process. +/// +/// # Example +/// ``` +/// use ylong_runtime::io::stderr; +/// let _stderr = stderr(); +/// ``` +pub fn stderr() -> Stderr { + let std = io::stderr(); + Stderr { + std: Some(std), + state: State::init(), + has_written: false, + } +} + +impl AsyncWrite for Stderr { + crate::io::stdio::std_async_write!(); +} + +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; + +#[cfg(unix)] +impl AsRawFd for Stderr { + fn as_raw_fd(&self) -> RawFd { + io::stdout().as_raw_fd() + } +} + +#[cfg(unix)] +impl AsFd for Stderr { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} + +#[cfg(windows)] +use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, RawHandle}; + +#[cfg(windows)] +impl AsRawHandle for Stderr { + fn as_raw_handle(&self) -> RawHandle { + io::stdout().as_raw_handle() + } +} + +#[cfg(windows)] +impl AsHandle for Stderr { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) } + } +} diff --git a/ylong_runtime/src/io/stdin.rs b/ylong_runtime/src/io/stdin.rs new file mode 100644 index 0000000000000000000000000000000000000000..19d1ff83bd90202d138b093de9976efe8d5c9b2f --- /dev/null +++ b/ylong_runtime/src/io/stdin.rs @@ -0,0 +1,129 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::io::{AsyncRead, ReadBuf, State}; +use crate::spawn_blocking; + +/// A handle to the standard input stream of a process. +/// +/// `Stdin` implements the [`AsyncRead`] trait. +pub struct Stdin { + std: Option, + state: State, +} + +/// Constructs a new handle to the standard input of the current process. +/// +/// # Example +/// ``` +/// use ylong_runtime::io::stdin; +/// let _stdin = stdin(); +/// ``` +pub fn stdin() -> Stdin { + let stdin = io::stdin(); + Stdin { + std: Some(stdin), + state: State::init(), + } +} + +impl AsyncRead for Stdin { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + loop { + match self.state { + State::Idle(ref mut buf_op) => { + let mut buf_inner = buf_op.take().unwrap(); + + if !buf_inner.is_empty() { + buf_inner.clone_into(buf); + *buf_op = Some(buf_inner); + return Poll::Ready(Ok(())); + } + + buf_inner.set_len(buf); + + let mut std = self.std.take().unwrap(); + + let handle = spawn_blocking(move || { + let res = buf_inner.read_from(&mut std); + (res, buf_inner, std) + }); + + self.state = State::Poll(handle); + } + State::Poll(ref mut join_handle) => { + let (res, mut buf_inner, std) = match Pin::new(join_handle).poll(cx)? { + Poll::Ready(t) => t, + Poll::Pending => return Poll::Pending, + }; + self.std = Some(std); + + return match res { + Ok(_) => { + buf_inner.clone_into(buf); + self.state = State::Idle(Some(buf_inner)); + Poll::Ready(Ok(())) + } + Err(e) => { + self.state = State::Idle(Some(buf_inner)); + Poll::Ready(Err(e)) + } + }; + } + } + } + } +} + +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; + +#[cfg(unix)] +impl AsRawFd for Stdin { + fn as_raw_fd(&self) -> RawFd { + io::stdin().as_raw_fd() + } +} + +#[cfg(unix)] +impl AsFd for Stdin { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} + +#[cfg(windows)] +use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, RawHandle}; + +#[cfg(windows)] +impl AsRawHandle for Stdin { + fn as_raw_handle(&self) -> RawHandle { + io::stdin().as_raw_handle() + } +} + +#[cfg(windows)] +impl AsHandle for Stdin { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) } + } +} diff --git a/ylong_runtime/src/io/stdio.rs b/ylong_runtime/src/io/stdio.rs new file mode 100644 index 0000000000000000000000000000000000000000..5c1967428a980f64091e74cd33b4a4adc05fed25 --- /dev/null +++ b/ylong_runtime/src/io/stdio.rs @@ -0,0 +1,193 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::{Read, Write}; +use std::{cmp, io}; + +use crate::io::ReadBuf; +use crate::task::JoinHandle; + +const MAX_BUF: usize = 2 * 1024 * 1024; + +pub(crate) enum State { + Idle(Option), + Poll(JoinHandle<(io::Result, BufInner, T)>), +} + +impl State { + pub(crate) fn init() -> Self { + State::Idle(Some(BufInner::new())) + } +} + +pub(crate) struct BufInner { + inner: Vec, + pos: usize, +} + +impl BufInner { + fn new() -> Self { + BufInner { + inner: Vec::with_capacity(0), + pos: 0, + } + } + + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub(crate) fn len(&self) -> usize { + self.inner.len() - self.pos + } + + fn bytes(&self) -> &[u8] { + &self.inner[self.pos..] + } + + pub(crate) fn set_len(&mut self, buf: &mut ReadBuf<'_>) { + let len = cmp::min(buf.remaining(), MAX_BUF); + if self.inner.len() < len { + self.inner.reserve(len - self.len()); + } + unsafe { + self.inner.set_len(len); + } + } + + pub(crate) fn clone_from(&mut self, buf: &[u8]) -> usize { + let n = cmp::min(buf.len(), MAX_BUF); + self.inner.extend_from_slice(&buf[..n]); + n + } + + pub(crate) fn clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize { + let n = cmp::min(self.len(), buf.remaining()); + buf.append(&self.bytes()[..n]); + self.pos += n; + + if self.pos == self.inner.len() { + self.inner.truncate(0); + self.pos = 0; + } + n + } + + pub(crate) fn read_from(&mut self, std: &mut T) -> io::Result { + let res = loop { + match std.read(&mut self.inner) { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + res => break res, + } + }; + + match res { + Ok(n) => self.inner.truncate(n), + Err(_) => self.inner.clear(), + } + + res + } + + pub(crate) fn write_into(&mut self, std: &mut T) -> io::Result<()> { + let res = std.write_all(&self.inner); + self.inner.clear(); + res + } +} + +macro_rules! std_async_write { + () => { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + loop { + match self.state { + State::Idle(ref mut buf_op) => { + let mut buf_inner = buf_op.take().unwrap(); + + if !buf_inner.is_empty() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "inner Buf must be empty before poll!" + ))); + } + + let n = buf_inner.clone_from(buf); + + let mut std = self.std.take().unwrap(); + + let handle = spawn_blocking(move || { + let res = buf_inner.write_into(&mut std).map(|_| n); + + (res, buf_inner, std) + }); + + self.state = State::Poll(handle); + self.has_written = true; + } + State::Poll(ref mut join_handle) => { + let (res, buf_inner, std) = match Pin::new(join_handle).poll(cx)? { + Poll::Ready(t) => t, + Poll::Pending => return Poll::Pending, + }; + self.state = State::Idle(Some(buf_inner)); + self.std = Some(std); + + let n = res?; + return Poll::Ready(Ok(n)); + } + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let has_written = self.has_written; + match self.state { + State::Idle(ref mut buf_cell) => { + if !has_written { + return Poll::Ready(Ok(())); + } + let buf = buf_cell.take().unwrap(); + let mut inner = self.std.take().unwrap(); + + self.state = State::Poll(spawn_blocking(move || { + let res = inner.flush().map(|_| 0); + (res, buf, inner) + })); + + self.has_written = false; + } + State::Poll(ref mut join_handle) => { + let (res, buf, std) = match Pin::new(join_handle).poll(cx)? { + Poll::Ready(t) => t, + Poll::Pending => return Poll::Pending, + }; + self.state = State::Idle(Some(buf)); + self.std = Some(std); + + res?; + } + } + } + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + }; +} +pub(crate) use std_async_write; diff --git a/ylong_runtime/src/io/stdout.rs b/ylong_runtime/src/io/stdout.rs new file mode 100644 index 0000000000000000000000000000000000000000..f2004dcc94e9e63ad4bc6493c3546d30b76fa2d4 --- /dev/null +++ b/ylong_runtime/src/io/stdout.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::io; +use std::io::Write; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::io::{AsyncWrite, State}; +use crate::spawn_blocking; + +/// A handle to the global standard output stream of the current process. +/// +/// `Stdout` implements the [`AsyncWrite`] trait. +pub struct Stdout { + std: Option, + state: State, + has_written: bool, +} + +/// Constructs a new handle to the standard output of the current process. +/// +/// # Example +/// ``` +/// use ylong_runtime::io::stdout; +/// let _stdout = stdout(); +/// ``` +pub fn stdout() -> Stdout { + let std = io::stdout(); + Stdout { + std: Some(std), + state: State::init(), + has_written: false, + } +} + +impl AsyncWrite for Stdout { + crate::io::stdio::std_async_write!(); +} + +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; + +#[cfg(unix)] +impl AsRawFd for Stdout { + fn as_raw_fd(&self) -> RawFd { + io::stdout().as_raw_fd() + } +} + +#[cfg(unix)] +impl AsFd for Stdout { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} + +#[cfg(windows)] +use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, RawHandle}; + +#[cfg(windows)] +impl AsRawHandle for Stdout { + fn as_raw_handle(&self) -> RawHandle { + io::stdout().as_raw_handle() + } +} + +#[cfg(windows)] +impl AsHandle for Stdout { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) } + } +} diff --git a/ylong_runtime/src/util/core_affinity/windows.rs b/ylong_runtime/src/util/core_affinity/windows.rs index 6a7c5dd8c05779fe6de405ccc95f5d2bf229bc5e..577a548910a0e3db4b6d2ee1a0cef8dbbbdaa790 100644 --- a/ylong_runtime/src/util/core_affinity/windows.rs +++ b/ylong_runtime/src/util/core_affinity/windows.rs @@ -50,4 +50,4 @@ pub fn set_current_affinity(cpu: usize) -> Result<()> { 0 => Err(Error::last_os_error()), _ => Ok(()), } -} \ No newline at end of file +}