代码之家  ›  专栏  ›  技术社区  ›  Akiner Alkan

如何使用Tokio的tcpstream发送数据流?

  •  1
  • Akiner Alkan  · 技术社区  · 7 年前

    我正在尝试在Rust中实现TCP客户机。我可以读取来自服务器的数据,但无法发送数据。

    下面是我正在研究的代码:

    extern crate bytes;
    extern crate futures;
    extern crate tokio_core;
    extern crate tokio_io;
    
    use self::bytes::BytesMut;
    use self::futures::{Future, Poll, Stream};
    use self::tokio_core::net::TcpStream;
    use self::tokio_core::reactor::Core;
    use self::tokio_io::AsyncRead;
    use std::io;
    
    #[derive(Default)]
    pub struct TcpClient {}
    
    struct AsWeGetIt<R>(R);
    
    impl<R> Stream for AsWeGetIt<R>
    where
        R: AsyncRead,
    {
        type Item = BytesMut;
        type Error = io::Error;
    
        fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
            let mut buf = BytesMut::with_capacity(1000);
    
            self.0
                .read_buf(&mut buf)
                .map(|async| async.map(|_| Some(buf)))
        }
    }
    
    impl TcpClient {
        pub fn new() -> Self {
            Self {}
        }
    
        pub fn connectToTcpServer(&mut self) -> bool {
            let mut core = Core::new().unwrap();
            let handle = core.handle();
    
            let address = "127.0.0.1:2323".parse().expect("Unable to parse address");
            let connection = TcpStream::connect(&address, &handle);
    
            let client = connection
                .and_then(|tcp_stream| {
                    AsWeGetIt(tcp_stream).for_each(|buf| {
                        println!("{:?}", buf);
                        Ok(())
                    })
                })
                .map_err(|e| eprintln!("Error: {}", e));
    
            core.run(client).expect("Unable to run the event loop");
            return true;
        }
    }
    

    如何添加异步数据发送功能?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Shepmaster Tim Diekmann    6 年前

    如果希望在套接字上有两个完全独立的数据流,则可以使用 split() 方法 TcpStream 在当前版本的东京:

    let connection = TcpStream::connect(&address);
    connection.and_then(|socket| {
        let (rx, tx) = socket.split();
        //Independently use tx/rx for sending/receiving
        return Ok(());
    });
    

    拆分后,可以使用 rx (接收端)和 tx (发送端)独立。下面是一个将发送和接收视为完全独立的小例子。发送方的一半只是周期性地发送相同的消息,而接收方的一半只是打印所有输入的数据:

    extern crate futures;
    extern crate tokio;
    
    use self::futures::{Future, Poll, Stream};
    use self::tokio::net::TcpStream;
    use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf};
    use tokio::prelude::*;
    use tokio::timer::Interval;
    
    //Receiver struct that implements the future trait
    //this exclusively handles incomming data and prints it to stdout
    struct Receiver {
        rx: ReadHalf<TcpStream>, //receiving half of the socket stream
    }
    impl Future for Receiver {
        type Item = ();
        type Error = Error;
    
        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
            let mut buffer = vec![0u8; 1000]; //reserve 1000 bytes in the receive buffer
                                              //get all data that is available to us at the moment...
            while let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
                if num_bytes_read == 0 {
                    return Ok(Async::Ready(()));
                } //socket closed
                print!("{}", String::from_utf8_lossy(&buffer[..num_bytes_read]));
            }
            return Ok(Async::NotReady);
        }
    }
    
    fn main() {
        let address = "127.0.0.1:2323".parse().expect("Unable to parse address");
        let connection = TcpStream::connect(&address);
        //wait for the connection to be established
        let client = connection
            .and_then(|socket| {
                //split the successfully connected socket in half (receive / send)
                let (rx, mut tx) = socket.split();
    
                //set up a simple sender, that periodically (1sec) sends the same message
                let sender = Interval::new_interval(std::time::Duration::from_millis(1000))
                    .for_each(move |_| {
                        //this lambda is invoked once per passed second
                        tx.poll_write(&vec![82, 117, 115, 116, 10]).map_err(|_| {
                            //shut down the timer if an error occured (e.g. socket was closed)
                            tokio::timer::Error::shutdown()
                        })?;
                        return Ok(());
                    }).map_err(|e| println!("{}", e));
                //start the sender
                tokio::spawn(sender);
    
                //start the receiver
                let receiver = Receiver { rx };
                tokio::spawn(receiver.map_err(|e| println!("{}", e)));
    
                return Ok(());
            }).map_err(|e| println!("{}", e));
    
        tokio::run(client);
    }
    

    对于某些应用程序,这就足够了。但是,在连接上通常会有一个已定义的协议/格式。例如,HTTP连接总是由请求和响应组成,每个请求和响应都由一个头部和主体组成。Tokio没有直接处理字节级,而是提供了 Encoder Decoder 您可以安装在一个套接字上,该套接字对协议进行解码,并直接为您提供要使用的实体。例如,您可以查看 HTTP implementation line-based 编解码器。

    当传入消息触发传出消息时,它会变得更复杂一些。对于最简单的情况(每个传入消息只会导致一个传出消息),您可以查看 this 官方请求/响应示例。