1 year ago
#352907

tavimori
How do I resolve the "`Unpin` is not implemented for `from_generator..." problem when implementing a TCP concurrent connector?
I want to implement a concurrent TCP connector using Tokio. The function should initialize multiple outbound connections and only return the one that finishes the handshake first (and drop all other connections).
At first, I want to use select!
but as far as I understand, select!
does not apply to a vector of futures. Therefore I choose to implement a VecSelect
structure that somehow acts as select!(Vec<Future>)
.
My code is as follows:
use tokio::net::{TcpStream};
use std::net::SocketAddr;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// just a test
pub async fn my_tcp_connect(target: SocketAddr) -> std::io::Result<TcpStream> {
let conn0 = TcpStream::connect(target);
let conn1 = TcpStream::connect(target);
let v = vec![conn0, conn1];
VecSelect {
futs: v,
}.await
}
struct VecSelect<T: Future + Unpin> {
futs: Vec<T>,
}
impl<T: Future + Unpin> Future for VecSelect<T> {
type Output = T::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// check all futures
for mut fut in self.futs.iter_mut() {
if let Poll::Ready(val) = Pin::new(&mut fut).poll(cx) {
return Poll::Ready(val);
}
}
Poll::Pending
}
}
And the following are the errors reported:
error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
--> src/tcp_connector.rs:13:5
|
13 | VecSelect {
| ^^^^^^^^^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
|
::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
|
111 | pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
| --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
|
= note: consider using `Box::pin`
= note: required because it appears within the type `impl Future<Output = [async output]>`
= note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
--> src/tcp_connector.rs:18:30
|
18 | struct VecSelect<T: Future + Unpin> {
| ^^^^^ required by this bound in `VecSelect`
error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
--> src/tcp_connector.rs:13:5
|
13 | / VecSelect {
14 | | futs: v,
15 | | }.await
| |_____^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
|
::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
|
111 | pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
| --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
|
= note: consider using `Box::pin`
= note: required because it appears within the type `impl Future<Output = [async output]>`
= note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
--> src/tcp_connector.rs:18:30
|
18 | struct VecSelect<T: Future + Unpin> {
| ^^^^^ required by this bound in `VecSelect`
error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
--> src/tcp_connector.rs:15:6
|
15 | }.await
| ^^^^^^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
|
::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
|
111 | pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
| --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
|
= note: consider using `Box::pin`
= note: required because it appears within the type `impl Future<Output = [async output]>`
= note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
--> src/tcp_connector.rs:18:30
|
18 | struct VecSelect<T: Future + Unpin> {
| ^^^^^ required by this bound in `VecSelect`
As I'm new to Rust and async programming, I'm not sure if my approach is in general correct.
And I'm wondering whether Unpin
is necessary for my implementation and why cannot the compiler derive Unpin
automatically?
rust
rust-tokio
0 Answers
Your Answer