From b9b8ef086958f50bb3a8b77d3c735edb4601b7f8 Mon Sep 17 00:00:00 2001 From: li_junsong Date: Mon, 10 Jul 2023 11:13:40 +0800 Subject: [PATCH] =?UTF-8?q?net=E6=A8=A1=E5=9D=97=E8=A7=A3=E9=99=A4?= =?UTF-8?q?=E9=9A=94=E7=A6=BB=E5=90=8E=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: li_junsong --- ylong_io/src/sys/linux/tcp/listener.rs | 2 +- ylong_io/src/sys/linux/tcp/mod.rs | 2 +- ylong_io/src/sys/linux/tcp/socket.rs | 22 +++----- ylong_io/src/sys/linux/tcp/stream.rs | 4 +- ylong_io/src/sys/linux/udp/mod.rs | 2 +- ylong_io/src/sys/linux/udp/socket.rs | 20 +++---- ylong_io/src/sys/linux/udp/udp_socket.rs | 51 +++++++++++------ ylong_runtime/src/net/async_source.rs | 9 +-- ylong_runtime/src/net/driver.rs | 69 ++++++++++------------- ylong_runtime/src/net/mod.rs | 6 +- ylong_runtime/src/net/sys/mod.rs | 2 +- ylong_runtime/src/net/sys/tcp/listener.rs | 9 +-- ylong_runtime/src/net/sys/tcp/mod.rs | 2 +- ylong_runtime/src/net/sys/tcp/stream.rs | 18 +++--- ylong_runtime/src/net/sys/udp.rs | 47 +++++++++------ 15 files changed, 138 insertions(+), 127 deletions(-) diff --git a/ylong_io/src/sys/linux/tcp/listener.rs b/ylong_io/src/sys/linux/tcp/listener.rs index 38f0eb2..bcf76fc 100644 --- a/ylong_io/src/sys/linux/tcp/listener.rs +++ b/ylong_io/src/sys/linux/tcp/listener.rs @@ -12,7 +12,7 @@ // limitations under the License. use super::{TcpSocket, TcpStream}; -use crate::{Interest, Source, Selector, Token}; +use crate::{Interest, Selector, Source, Token}; use libc::{ c_int, sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, SOCK_CLOEXEC, SOCK_NONBLOCK, }; diff --git a/ylong_io/src/sys/linux/tcp/mod.rs b/ylong_io/src/sys/linux/tcp/mod.rs index 3ae7950..3538498 100644 --- a/ylong_io/src/sys/linux/tcp/mod.rs +++ b/ylong_io/src/sys/linux/tcp/mod.rs @@ -20,4 +20,4 @@ mod socket; pub(crate) use socket::TcpSocket; mod stream; -pub use stream::TcpStream; \ No newline at end of file +pub use stream::TcpStream; diff --git a/ylong_io/src/sys/linux/tcp/socket.rs b/ylong_io/src/sys/linux/tcp/socket.rs index aee0595..e210da8 100644 --- a/ylong_io/src/sys/linux/tcp/socket.rs +++ b/ylong_io/src/sys/linux/tcp/socket.rs @@ -11,16 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::super::socket_addr::socket_addr_trans; use super::{TcpListener, TcpStream}; use libc::{ - c_int, c_void, socklen_t, AF_INET, AF_INET6, SOCK_CLOEXEC, SOCK_NONBLOCK, - SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, + c_int, c_void, socklen_t, AF_INET, AF_INET6, SOCK_CLOEXEC, SOCK_NONBLOCK, SOCK_STREAM, + SOL_SOCKET, SO_REUSEADDR, }; use std::io; use std::mem::{self, size_of}; use std::net::{self, SocketAddr}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use super::super::socket_addr::socket_addr_trans; pub(crate) struct TcpSocket { socket: c_int, @@ -41,9 +41,7 @@ impl TcpSocket { Ok(socket) => Ok(TcpSocket { socket: socket as c_int, }), - Err(err) => { - Err(err) - } + Err(err) => Err(err), } } @@ -57,9 +55,7 @@ impl TcpSocket { &set_value as *const c_int as *const c_void, size_of::() as socklen_t )) { - Err(err) => { - Err(err) - } + Err(err) => Err(err), Ok(_) => Ok(()), } } @@ -67,9 +63,7 @@ impl TcpSocket { pub(crate) fn bind(&self, addr: SocketAddr) -> io::Result<()> { let (raw_addr, addr_length) = socket_addr_trans(&addr); match syscall!(bind(self.socket, raw_addr.as_ptr(), addr_length)) { - Err(err) => { - Err(err) - } + Err(err) => Err(err), Ok(_) => Ok(()), } } @@ -89,9 +83,7 @@ impl TcpSocket { pub(crate) fn connect(self, addr: SocketAddr) -> io::Result { let (raw_addr, addr_length) = socket_addr_trans(&addr); match syscall!(connect(self.socket, raw_addr.as_ptr(), addr_length)) { - Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => { - Err(err) - } + Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), _ => { let tcp_stream = Ok(TcpStream { inner: unsafe { net::TcpStream::from_raw_fd(self.socket) }, diff --git a/ylong_io/src/sys/linux/tcp/stream.rs b/ylong_io/src/sys/linux/tcp/stream.rs index 41cb27f..1a5811e 100644 --- a/ylong_io/src/sys/linux/tcp/stream.rs +++ b/ylong_io/src/sys/linux/tcp/stream.rs @@ -12,9 +12,9 @@ // limitations under the License. use super::TcpSocket; -use crate::{Interest, Source, Selector, Token}; +use crate::{Interest, Selector, Source, Token}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; -use std::net::{self, SocketAddr, Shutdown}; +use std::net::{self, Shutdown, SocketAddr}; use std::os::unix::io::AsRawFd; /// A non-blocking TCP Stream between a local socket and a remote socket. diff --git a/ylong_io/src/sys/linux/udp/mod.rs b/ylong_io/src/sys/linux/udp/mod.rs index d632547..ecc1faa 100644 --- a/ylong_io/src/sys/linux/udp/mod.rs +++ b/ylong_io/src/sys/linux/udp/mod.rs @@ -17,4 +17,4 @@ mod socket; pub(crate) use socket::UdpSock; mod udp_socket; -pub use udp_socket::{UdpSocket, ConnectedUdpSocket}; \ No newline at end of file +pub use udp_socket::{ConnectedUdpSocket, UdpSocket}; diff --git a/ylong_io/src/sys/linux/udp/socket.rs b/ylong_io/src/sys/linux/udp/socket.rs index 5c18628..fd2bdb2 100644 --- a/ylong_io/src/sys/linux/udp/socket.rs +++ b/ylong_io/src/sys/linux/udp/socket.rs @@ -11,15 +11,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{io, mem, net}; -use std::net::SocketAddr; -use libc::{c_int, SOCK_DGRAM, AF_INET, AF_INET6, SOCK_CLOEXEC, SOCK_NONBLOCK}; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use super::super::socket_addr::socket_addr_trans; use crate::UdpSocket; +use libc::{c_int, AF_INET, AF_INET6, SOCK_CLOEXEC, SOCK_DGRAM, SOCK_NONBLOCK}; +use std::net::SocketAddr; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::{io, mem, net}; pub(crate) struct UdpSock { - socket: c_int + socket: c_int, } impl UdpSock { @@ -37,18 +37,14 @@ impl UdpSock { Ok(socket) => Ok(UdpSock { socket: socket as c_int, }), - Err(err) => { - Err(err) - } + Err(err) => Err(err), } } pub(crate) fn bind(self, addr: SocketAddr) -> io::Result { let (raw_addr, addr_length) = socket_addr_trans(&addr); match syscall!(bind(self.socket, raw_addr.as_ptr(), addr_length)) { - Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => { - Err(err) - } + Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), _ => { let udp_socket = Ok(UdpSocket { inner: unsafe { net::UdpSocket::from_raw_fd(self.socket) }, @@ -80,4 +76,4 @@ impl Drop for UdpSock { fn drop(&mut self) { self.close(); } -} \ No newline at end of file +} diff --git a/ylong_io/src/sys/linux/udp/udp_socket.rs b/ylong_io/src/sys/linux/udp/udp_socket.rs index ae04623..dfd24e1 100644 --- a/ylong_io/src/sys/linux/udp/udp_socket.rs +++ b/ylong_io/src/sys/linux/udp/udp_socket.rs @@ -11,12 +11,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, io, net}; +use super::UdpSock; +use crate::{Interest, Selector, Source, Token}; use std::fmt::Formatter; use std::net::SocketAddr; -use crate::{Interest, Selector, Source, Token}; use std::os::unix::io::AsRawFd; -use super::UdpSock; +use std::{fmt, io, net}; /// UdpSocket. The bottom layer uses std::net::UdpSocket。 /// UdpSocket supports bind\connect\send\recv\send_to\recv_from\broadcast. @@ -96,9 +96,7 @@ impl UdpSocket { /// about the underlying socket; it is left up to the user to set it in /// non-blocking mode. pub fn from_std(socket: net::UdpSocket) -> UdpSocket { - UdpSocket{ - inner: socket - } + UdpSocket { inner: socket } } /// Returns the local address that this socket is bound to. @@ -147,7 +145,8 @@ impl UdpSocket { inner.send_to(buf, target) } - /// Receives a single datagram message on the socket. On success, returns the number of bytes read and the origin. + /// Receives a single datagram message on the socket. On success, returns the number of bytes + /// read and the origin. /// The function must be called with valid byte array buf of sufficient size to hold the message bytes. /// If a message is too long to fit in the supplied buffer, excess bytes may be discarded. /// @@ -224,7 +223,7 @@ impl UdpSocket { /// Ok(()) /// } /// ``` - pub fn set_broadcast(&self, on: bool) ->io::Result<()> { + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { self.inner.set_broadcast(on) } @@ -261,9 +260,7 @@ impl ConnectedUdpSocket { /// about the underlying socket; it is left up to the user to set it in /// non-blocking mode. pub fn from_std(socket: net::UdpSocket) -> ConnectedUdpSocket { - ConnectedUdpSocket{ - inner: socket - } + ConnectedUdpSocket { inner: socket } } /// Returns the local address that this socket is bound to. @@ -350,7 +347,8 @@ impl ConnectedUdpSocket { inner.send(buf) } - /// Receives a single datagram message on the socket from the remote address to which it is connected. On success, returns the number of bytes read. + /// Receives a single datagram message on the socket from the remote address to which it is connected. + /// On success, returns the number of bytes read. /// The function must be called with valid byte array buf of sufficient size to hold the message bytes. /// If a message is too long to fit in the supplied buffer, excess bytes may be discarded. /// The connect method will connect this socket to a remote address. @@ -402,11 +400,21 @@ impl fmt::Debug for ConnectedUdpSocket { } impl Source for UdpSocket { - fn register(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + fn register( + &mut self, + selector: &Selector, + token: Token, + interests: Interest, + ) -> io::Result<()> { selector.register(self.inner.as_raw_fd(), token, interests) } - fn reregister(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + fn reregister( + &mut self, + selector: &Selector, + token: Token, + interests: Interest, + ) -> io::Result<()> { selector.reregister(self.inner.as_raw_fd(), token, interests) } @@ -415,11 +423,21 @@ impl Source for UdpSocket { } } impl Source for ConnectedUdpSocket { - fn register(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + fn register( + &mut self, + selector: &Selector, + token: Token, + interests: Interest, + ) -> io::Result<()> { selector.register(self.inner.as_raw_fd(), token, interests) } - fn reregister(&mut self, selector: &Selector, token: Token, interests: Interest) -> io::Result<()> { + fn reregister( + &mut self, + selector: &Selector, + token: Token, + interests: Interest, + ) -> io::Result<()> { selector.reregister(self.inner.as_raw_fd(), token, interests) } @@ -427,4 +445,3 @@ impl Source for ConnectedUdpSocket { selector.deregister(self.inner.as_raw_fd()) } } - diff --git a/ylong_runtime/src/net/async_source.rs b/ylong_runtime/src/net/async_source.rs index 831b33a..15400ed 100644 --- a/ylong_runtime/src/net/async_source.rs +++ b/ylong_runtime/src/net/async_source.rs @@ -11,12 +11,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::macros::cfg_io; use crate::net::ScheduleIO; +use crate::util::slab::Ref; use std::io; use std::ops::Deref; use ylong_io::{Interest, Source}; -use crate::macros::cfg_io; -use crate::util::slab::Ref; cfg_io!( use std::task::{Context, Poll}; @@ -53,7 +53,8 @@ impl AsyncSource { pub fn new(mut io: E, interest: Option) -> io::Result> { #[cfg(not(feature = "ffrt"))] let inner = { - let context = get_current_ctx().ok_or_else(|| io::Error::new(io::ErrorKind::Other, "get_current_ctx() fail"))?; + let context = get_current_ctx() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "get_current_ctx() fail"))?; match context { WorkerContext::Multi(ctx) => &ctx.handle, WorkerContext::Curr(ctx) => &ctx.handle, @@ -236,4 +237,4 @@ impl Drop for AsyncSource { let _ = inner.deregister_source(&mut io); } } -} \ No newline at end of file +} diff --git a/ylong_runtime/src/net/driver.rs b/ylong_runtime/src/net/driver.rs index 5bbe53e..a44b63d 100644 --- a/ylong_runtime/src/net/driver.rs +++ b/ylong_runtime/src/net/driver.rs @@ -13,13 +13,13 @@ use crate::cfg_ffrt; use crate::net::{Ready, ScheduleIO, Tick}; +use crate::util::bit::{Bit, Mask}; +use crate::util::slab::{Address, Ref, Slab}; use std::io; use std::ops::Deref; use std::sync::{Arc, Mutex}; use std::time::Duration; -use crate::util::bit::{Bit, Mask}; use ylong_io::{EventTrait, Events, Interest, Poll, Source, Token}; -use crate::util::slab::{Address, Ref, Slab}; const DRIVER_TICK_INIT: u8 = 0; @@ -66,26 +66,19 @@ cfg_ffrt!( #[cfg(feature = "ffrt")] impl Handle { fn new(inner: Arc) -> Self { - Handle { - inner, - } + Handle { inner } } pub(crate) fn get_ref() -> &'static Self { Driver::initialize(); - unsafe { - &*HANDLE.as_ptr() - } + unsafe { &*HANDLE.as_ptr() } } } #[cfg(not(feature = "ffrt"))] impl Handle { fn new(inner: Arc, waker: ylong_io::Waker) -> Self { - Handle { - inner, - waker, - } + Handle { inner, waker } } pub(crate) fn wake(&self) { @@ -121,27 +114,30 @@ pub(crate) struct Inner { impl Driver { #[cfg(not(feature = "ffrt"))] pub(crate) fn initialize() -> (Arc, Arc>) { - let poll = Poll::new().unwrap(); - let waker = ylong_io::Waker::new(&poll, WAKE_TOKEN) - .expect("ylong_io waker construction failed"); - let arc_poll = Arc::new(poll); - let events = Events::with_capacity(EVENTS_MAX_CAPACITY); - let slab = Slab::new(); - let allocator = slab.handle(); - let inner = Arc::new(Inner { - resources: Mutex::new(None), - allocator, - registry: arc_poll.clone(), - }); + let poll = Poll::new().unwrap(); + let waker = + ylong_io::Waker::new(&poll, WAKE_TOKEN).expect("ylong_io waker construction failed"); + let arc_poll = Arc::new(poll); + let events = Events::with_capacity(EVENTS_MAX_CAPACITY); + let slab = Slab::new(); + let allocator = slab.handle(); + let inner = Arc::new(Inner { + resources: Mutex::new(None), + allocator, + registry: arc_poll.clone(), + }); - let driver = Driver { - resources: Some(slab), - events: Some(events), - tick: DRIVER_TICK_INIT, - poll: arc_poll, - }; - - (Arc::new(Handle::new(inner, waker)), Arc::new(Mutex::new(driver))) + let driver = Driver { + resources: Some(slab), + events: Some(events), + tick: DRIVER_TICK_INIT, + poll: arc_poll, + }; + + ( + Arc::new(Handle::new(inner, waker)), + Arc::new(Mutex::new(driver)), + ) } #[cfg(feature = "ffrt")] @@ -172,11 +168,9 @@ impl Driver { /// Initializes the single instance IO driver. #[cfg(feature = "ffrt")] - pub(crate) fn try_get_mut() -> Option>{ + pub(crate) fn try_get_mut() -> Option> { Driver::initialize(); - unsafe { - & *DRIVER.as_ptr() - }.try_lock().ok() + unsafe { &*DRIVER.as_ptr() }.try_lock().ok() } /// Runs the driver. This method will blocking wait for fd events to come in and then @@ -209,7 +203,6 @@ impl Driver { Err(err) => return Err(err), } - let has_events = !events.is_empty(); for event in events.iter() { @@ -296,4 +289,4 @@ impl Drop for Inner { }); } } -} \ No newline at end of file +} diff --git a/ylong_runtime/src/net/mod.rs b/ylong_runtime/src/net/mod.rs index eca40aa..c3287d7 100644 --- a/ylong_runtime/src/net/mod.rs +++ b/ylong_runtime/src/net/mod.rs @@ -13,15 +13,15 @@ //! Asynchronous TCP/UDP binding for `ylong_runtime` -pub(crate) use driver::{Handle, Driver}; +pub(crate) use crate::schedule_io::{ScheduleIO, Tick}; +pub(crate) use driver::{Driver, Handle}; pub(crate) use linked_list::{LinkedList, Node}; pub(crate) use ready::{Ready, ReadyEvent}; pub use sys::{Listener, Stream}; -pub(crate) use crate::schedule_io::{ScheduleIO, Tick}; pub(crate) mod async_source; pub(crate) mod sys; -pub(crate) use async_source::AsyncSource; use crate::macros::cfg_io; +pub(crate) use async_source::AsyncSource; pub(crate) mod driver; mod linked_list; diff --git a/ylong_runtime/src/net/sys/mod.rs b/ylong_runtime/src/net/sys/mod.rs index 32b6586..fc29e65 100644 --- a/ylong_runtime/src/net/sys/mod.rs +++ b/ylong_runtime/src/net/sys/mod.rs @@ -11,10 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::macros::cfg_io; use std::io; use std::net::SocketAddr; use std::sync::Arc; -use crate::macros::cfg_io; cfg_io! { mod tcp; diff --git a/ylong_runtime/src/net/sys/tcp/listener.rs b/ylong_runtime/src/net/sys/tcp/listener.rs index 67a0787..18bf8ef 100644 --- a/ylong_runtime/src/net/sys/tcp/listener.rs +++ b/ylong_runtime/src/net/sys/tcp/listener.rs @@ -11,10 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ylong_io::Interest; +use crate::net::{AsyncSource, TcpStream}; use std::io; use std::net::SocketAddr; -use crate::net::{AsyncSource, TcpStream}; +use ylong_io::Interest; /// An asynchronous version of [`std::net::TcpListener`]. Provides async bind/accept methods. /// @@ -78,10 +78,7 @@ impl TcpListener { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let (stream, addr) = self .source - .async_process( - Interest::READABLE, - || self.source.accept() - ) + .async_process(Interest::READABLE, || self.source.accept()) .await?; let stream = TcpStream::new(stream)?; Ok((stream, addr)) diff --git a/ylong_runtime/src/net/sys/tcp/mod.rs b/ylong_runtime/src/net/sys/tcp/mod.rs index 5bed685..a92f5ac 100644 --- a/ylong_runtime/src/net/sys/tcp/mod.rs +++ b/ylong_runtime/src/net/sys/tcp/mod.rs @@ -15,4 +15,4 @@ mod listener; pub use listener::TcpListener; mod stream; -pub use stream::TcpStream; \ No newline at end of file +pub use stream::TcpStream; diff --git a/ylong_runtime/src/net/sys/tcp/stream.rs b/ylong_runtime/src/net/sys/tcp/stream.rs index a5e91ff..d94c57b 100644 --- a/ylong_runtime/src/net/sys/tcp/stream.rs +++ b/ylong_runtime/src/net/sys/tcp/stream.rs @@ -11,15 +11,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::net::AsyncSource; -use ylong_io::Interest; +use std::fmt::{Debug, Formatter}; use std::io; -use std::io::{IoSlice}; +use std::io::IoSlice; use std::net::{Shutdown, SocketAddr}; -use crate::io::{AsyncRead, ReadBuf, AsyncWrite}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::fmt::{Debug, Formatter}; +use ylong_io::Interest; /// An asynchronous version of [`std::net::TcpStream`] /// @@ -80,7 +80,7 @@ impl TcpStream { .async_process( // Wait until the stream is writable Interest::WRITABLE, - || Ok(()) + || Ok(()), ) .await?; @@ -107,14 +107,18 @@ impl AsyncRead for TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf<'_> + buf: &mut ReadBuf<'_>, ) -> Poll> { self.source.poll_read(cx, buf) } } impl AsyncWrite for TcpStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { self.source.poll_write(cx, buf) } diff --git a/ylong_runtime/src/net/sys/udp.rs b/ylong_runtime/src/net/sys/udp.rs index f526ee3..7f56317 100644 --- a/ylong_runtime/src/net/sys/udp.rs +++ b/ylong_runtime/src/net/sys/udp.rs @@ -11,14 +11,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::io::ReadBuf; +use crate::net::AsyncSource; use std::fmt::{Debug, Formatter}; use std::io; +use std::mem::MaybeUninit; use std::net::SocketAddr; use std::task::{Context, Poll}; use ylong_io::Interest; -use crate::io::ReadBuf; -use crate::net::AsyncSource; -use std::mem::MaybeUninit; /// Asynchronous UdpSocket. /// @@ -138,7 +138,7 @@ impl UdpSocket { let socket = ylong_io::UdpSocket::bind(local_addr)?; let connected_socket = match socket.connect(addr) { Ok(socket) => socket, - Err(e) => return Err(e) + Err(e) => return Err(e), }; ConnectedUdpSocket::new(connected_socket) } @@ -163,7 +163,8 @@ impl UdpSocket { } /// Sends data on the socket to the given address. On success, returns the number of bytes written. - /// This will return an error when the IP version of the local socket does not match that returned from SocketAddr. + /// This will return an error when the IP version of the local socket does not match that returned + /// from SocketAddr. /// /// # Return value /// The function returns: @@ -224,7 +225,8 @@ impl UdpSocket { /// Attempts to send data on the socket to a given address. /// Note that on multiple calls to a poll_* method in the send direction, - /// only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup + /// only the Waker from the Context passed to the most recent call will be scheduled to receive + /// a wakeup /// /// # Return value /// The function returns: @@ -254,7 +256,8 @@ impl UdpSocket { buf: &[u8], target: SocketAddr, ) -> Poll> { - self.source.poll_write_io(cx, || self.source.send_to(buf, target)) + self.source + .poll_write_io(cx, || self.source.send_to(buf, target)) } /// Receives a single datagram message on the socket. On success, returns the number of bytes @@ -283,8 +286,8 @@ impl UdpSocket { /// } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.source. - async_process(Interest::READABLE, || self.source.recv_from(buf)) + self.source + .async_process(Interest::READABLE, || self.source.recv_from(buf)) .await } @@ -368,7 +371,8 @@ impl UdpSocket { /// Attempts to receive a single datagram on the socket. /// Note that on multiple calls to a poll_* method in the recv direction, - /// only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. + /// only the Waker from the Context passed to the most recent call will be scheduled to receive + /// a wakeup. /// /// # Return value /// The function returns: @@ -586,12 +590,12 @@ impl ConnectedUdpSocket { .try_io(Interest::WRITABLE, || self.source.send(buf)) } - /// Attempts to send data on the socket to the remote address to which it was previously connected. /// The connect method will connect this socket to a remote address. /// This method will fail if the socket is not connected. /// Note that on multiple calls to a poll_* method in the send direction, - /// only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. + /// only the Waker from the Context passed to the most recent call will be scheduled to receive + /// a wakeup. /// /// # Return value /// The function returns: @@ -704,11 +708,13 @@ impl ConnectedUdpSocket { .try_io(Interest::READABLE, || self.source.recv(buf)) } - /// Attempts to receive a single datagram message on the socket from the remote address to which it is connected. + /// Attempts to receive a single datagram message on the socket from the remote address to which + /// it is connected. /// The connect method will connect this socket to a remote address. /// This method resolves to an error if the socket is not connected. /// Note that on multiple calls to a poll_* method in the recv direction, - /// only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. + /// only the Waker from the Context passed to the most recent call will be scheduled to receive + /// a wakeup. /// /// # Return value /// The function returns: @@ -823,9 +829,9 @@ impl ConnectedUdpSocket { #[cfg(test)] mod tests { + use crate::futures::poll_fn; use crate::io::ReadBuf; use crate::net::UdpSocket; - use crate::futures::poll_fn; use crate::{block_on, spawn}; /// UT test for `poll_send()` and `poll_recv()`. @@ -960,7 +966,9 @@ mod tests { panic!("Bind Socket Failed {}", e); } }; - broadcast_socket.set_broadcast(true).expect("set_broadcast failed"); + broadcast_socket + .set_broadcast(true) + .expect("set_broadcast failed"); assert!(broadcast_socket.broadcast().expect("get broadcast failed")); }); @@ -1026,8 +1034,11 @@ mod tests { panic!("Connect Socket Failed {}", e); } }; - assert_eq!(connected_sock.peer_addr().expect("peer_addr failed"), peer_addr); + assert_eq!( + connected_sock.peer_addr().expect("peer_addr failed"), + peer_addr + ); }); block_on(handle).expect("block_on failed"); } -} \ No newline at end of file +} -- Gitee