1 year ago

#352907

test-img

tavimori

How do I resolve the "`Unpin` is not implemented for `from_generator..." problem when implementing a TCP concurrent connector?

I want to implement a concurrent TCP connector using Tokio. The function should initialize multiple outbound connections and only return the one that finishes the handshake first (and drop all other connections).

At first, I want to use select! but as far as I understand, select! does not apply to a vector of futures. Therefore I choose to implement a VecSelect structure that somehow acts as select!(Vec<Future>).

My code is as follows:

use tokio::net::{TcpStream};
use std::net::SocketAddr;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};


// just a test
pub async fn my_tcp_connect(target: SocketAddr) -> std::io::Result<TcpStream> {
    let conn0 = TcpStream::connect(target);
    let conn1 = TcpStream::connect(target);
    let v = vec![conn0, conn1];
    VecSelect {
        futs: v,
    }.await
}

struct VecSelect<T: Future + Unpin> {
    futs: Vec<T>,
}

impl<T: Future + Unpin> Future for VecSelect<T> {
    type Output = T::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // check all futures
        for mut fut in self.futs.iter_mut() {
            if let Poll::Ready(val) = Pin::new(&mut fut).poll(cx) {
                return Poll::Ready(val);
            }
        }
       
        Poll::Pending
    }
}

And the following are the errors reported:

error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
   --> src/tcp_connector.rs:13:5
    |
13  |     VecSelect {
    |     ^^^^^^^^^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
    |
   ::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
    |
111 |     pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
    |                                                        --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl Future<Output = [async output]>`
    = note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
   --> src/tcp_connector.rs:18:30
    |
18  | struct VecSelect<T: Future + Unpin> {
    |                              ^^^^^ required by this bound in `VecSelect`

error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
   --> src/tcp_connector.rs:13:5
    |
13  | /     VecSelect {
14  | |         futs: v,
15  | |     }.await
    | |_____^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
    |
   ::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
    |
111 |       pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
    |                                                          --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl Future<Output = [async output]>`
    = note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
   --> src/tcp_connector.rs:18:30
    |
18  | struct VecSelect<T: Future + Unpin> {
    |                              ^^^^^ required by this bound in `VecSelect`

error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
   --> src/tcp_connector.rs:15:6
    |
15  |     }.await
    |      ^^^^^^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
    |
   ::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
    |
111 |     pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
    |                                                        --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl Future<Output = [async output]>`
    = note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
   --> src/tcp_connector.rs:18:30
    |
18  | struct VecSelect<T: Future + Unpin> {
    |                              ^^^^^ required by this bound in `VecSelect`

As I'm new to Rust and async programming, I'm not sure if my approach is in general correct.

And I'm wondering whether Unpin is necessary for my implementation and why cannot the compiler derive Unpin automatically?

rust

rust-tokio

0 Answers

Your Answer

Accepted video resources