diff --git a/transports/quic/CHANGELOG.md b/transports/quic/CHANGELOG.md index ed710a9af6a..54c0c689eee 100644 --- a/transports/quic/CHANGELOG.md +++ b/transports/quic/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.7.0-alpha.2 [unreleased] + +- Add opt-in support for the `/quic` codepoint, interpreted as QUIC version draft-29. + See [PR 3151]. + +[PR 3151]: https://github.com/libp2p/rust-libp2p/pull/3151 + # 0.7.0-alpha - Initial alpha release. diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 5929a630865..b759972b91c 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-quic" -version = "0.7.0-alpha" +version = "0.7.0-alpha.2" authors = ["Parity Technologies "] edition = "2021" rust-version = "1.62.0" diff --git a/transports/quic/src/endpoint.rs b/transports/quic/src/endpoint.rs index 318af8b71b9..3eab4c595a1 100644 --- a/transports/quic/src/endpoint.rs +++ b/transports/quic/src/endpoint.rs @@ -18,7 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{provider::Provider, transport::SocketFamily, ConnectError, Connection, Error}; +use crate::{ + provider::Provider, + transport::{ProtocolVersion, SocketFamily}, + ConnectError, Connection, Error, +}; use bytes::BytesMut; use futures::{ @@ -69,6 +73,16 @@ pub struct Config { /// of a connection. pub max_connection_data: u32, + /// Support QUIC version draft-29 for dialing and listening. + /// + /// Per default only QUIC Version 1 / [`libp2p_core::multiaddr::Protocol::QuicV1`] + /// is supported. + /// + /// If support for draft-29 is enabled servers support draft-29 and version 1 on all + /// QUIC listening addresses. + /// As client the version is chosen based on the remote's address. + pub support_draft_29: bool, + /// TLS client config for the inner [`quinn_proto::ClientConfig`]. client_tls_config: Arc, /// TLS server config for the inner [`quinn_proto::ServerConfig`]. @@ -83,6 +97,7 @@ impl Config { Self { client_tls_config, server_tls_config, + support_draft_29: false, handshake_timeout: Duration::from_secs(5), max_idle_timeout: 30 * 1000, max_concurrent_stream_limit: 256, @@ -113,6 +128,7 @@ impl From for QuinnConfig { keep_alive_interval, max_connection_data, max_stream_data, + support_draft_29, handshake_timeout: _, } = config; let mut transport = quinn_proto::TransportConfig::default(); @@ -138,7 +154,10 @@ impl From for QuinnConfig { let mut client_config = quinn_proto::ClientConfig::new(client_tls_config); client_config.transport_config(transport); - let endpoint_config = quinn_proto::EndpointConfig::default(); + let mut endpoint_config = quinn_proto::EndpointConfig::default(); + if !support_draft_29 { + endpoint_config.supported_versions(vec![1]); + } QuinnConfig { client_config, @@ -280,6 +299,8 @@ pub enum ToEndpoint { Dial { /// UDP address to connect to. addr: SocketAddr, + /// Version to dial the remote on. + version: ProtocolVersion, /// Channel to return the result of the dialing to. result: oneshot::Sender>, }, @@ -403,18 +424,25 @@ impl Driver

{ to_endpoint: ToEndpoint, ) -> ControlFlow<(), Option> { match to_endpoint { - ToEndpoint::Dial { addr, result } => { + ToEndpoint::Dial { + addr, + result, + version, + } => { + let mut config = self.client_config.clone(); + if version == ProtocolVersion::Draft29 { + config.version(0xff00_001d); + } // This `"l"` seems necessary because an empty string is an invalid domain // name. While we don't use domain names, the underlying rustls library // is based upon the assumption that we do. - let (connection_id, connection) = - match self.endpoint.connect(self.client_config.clone(), addr, "l") { - Ok(c) => c, - Err(err) => { - let _ = result.send(Err(ConnectError::from(err).into())); - return ControlFlow::Continue(None); - } - }; + let (connection_id, connection) = match self.endpoint.connect(config, addr, "l") { + Ok(c) => c, + Err(err) => { + let _ = result.send(Err(ConnectError::from(err).into())); + return ControlFlow::Continue(None); + } + }; debug_assert_eq!(connection.side(), quinn_proto::Side::Client); let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY); diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 52eb7428ff1..01e52ec606f 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -49,12 +49,24 @@ use std::{ }; /// Implementation of the [`Transport`] trait for QUIC. +/// +/// By default only QUIC Version 1 (RFC 9000) is supported. In the [`Multiaddr`] this maps to +/// [`libp2p_core::multiaddr::Protocol::QuicV1`]. +/// The [`libp2p_core::multiaddr::Protocol::Quic`] codepoint is interpreted as QUIC version +/// draft-29 and only supported if [`Config::support_draft_29`] is set to `true`. +/// Note that in that case servers support both version an all QUIC listening addresses. +/// +/// Version draft-29 should only be used to connect to nodes from other libp2p implementations +/// that do not support `QuicV1` yet. Support for it will be removed long-term. +/// See . #[derive(Debug)] pub struct GenTransport { /// Config for the inner [`quinn_proto`] structs. quinn_config: QuinnConfig, /// Timeout for the [`Connecting`] future. handshake_timeout: Duration, + /// Whether draft-29 is supported for dialing and listening. + support_draft_29: bool, /// Streams of active [`Listener`]s. listeners: SelectAll>, /// Dialer for each socket family if no matching listener exists. @@ -65,12 +77,14 @@ impl GenTransport

{ /// Create a new [`GenTransport`] with the given [`Config`]. pub fn new(config: Config) -> Self { let handshake_timeout = config.handshake_timeout; + let support_draft_29 = config.support_draft_29; let quinn_config = config.into(); Self { listeners: SelectAll::new(), quinn_config, handshake_timeout, dialer: HashMap::new(), + support_draft_29, } } } @@ -82,14 +96,15 @@ impl Transport for GenTransport

{ type Dial = BoxFuture<'static, Result>; fn listen_on(&mut self, addr: Multiaddr) -> Result> { - let socket_addr = - multiaddr_to_socketaddr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?; + let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + .ok_or(TransportError::MultiaddrNotSupported(addr))?; let listener_id = ListenerId::new(); let listener = Listener::new( listener_id, socket_addr, self.quinn_config.clone(), self.handshake_timeout, + version, )?; self.listeners.push(listener); @@ -120,7 +135,7 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let socket_addr = multiaddr_to_socketaddr(&addr) + let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -161,8 +176,7 @@ impl Transport for GenTransport

{ &mut listeners[index].dialer_state } }; - - Ok(dialer_state.new_dial(socket_addr, self.handshake_timeout)) + Ok(dialer_state.new_dial(socket_addr, self.handshake_timeout, version)) } fn dial_as_listener( @@ -251,12 +265,14 @@ impl DialerState { &mut self, address: SocketAddr, timeout: Duration, + version: ProtocolVersion, ) -> BoxFuture<'static, Result<(PeerId, Connection), Error>> { let (rx, tx) = oneshot::channel(); let message = ToEndpoint::Dial { addr: address, result: rx, + version, }; self.pending_dials.push_back(message); @@ -299,6 +315,8 @@ struct Listener { /// Id of the listener. listener_id: ListenerId, + version: ProtocolVersion, + /// Channel to the endpoint to initiate dials. endpoint_channel: endpoint::Channel, /// Queued dials. @@ -327,6 +345,7 @@ impl Listener

{ socket_addr: SocketAddr, config: QuinnConfig, handshake_timeout: Duration, + version: ProtocolVersion, ) -> Result { let (endpoint_channel, new_connections_rx) = endpoint::Channel::new_bidirectional::

(config, socket_addr)?; @@ -338,7 +357,7 @@ impl Listener

{ pending_event = None; } else { if_watcher = None; - let ma = socketaddr_to_multiaddr(endpoint_channel.socket_addr()); + let ma = socketaddr_to_multiaddr(endpoint_channel.socket_addr(), version); pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -348,6 +367,7 @@ impl Listener

{ Ok(Listener { endpoint_channel, listener_id, + version, new_connections_rx, handshake_timeout, if_watcher, @@ -379,9 +399,11 @@ impl Listener

{ loop { match ready!(P::poll_if_event(if_watcher, cx)) { Ok(IfEvent::Up(inet)) => { - if let Some(listen_addr) = - ip_to_listenaddr(self.endpoint_channel.socket_addr(), inet.addr()) - { + if let Some(listen_addr) = ip_to_listenaddr( + self.endpoint_channel.socket_addr(), + inet.addr(), + self.version, + ) { log::debug!("New listen address: {}", listen_addr); return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, @@ -390,9 +412,11 @@ impl Listener

{ } } Ok(IfEvent::Down(inet)) => { - if let Some(listen_addr) = - ip_to_listenaddr(self.endpoint_channel.socket_addr(), inet.addr()) - { + if let Some(listen_addr) = ip_to_listenaddr( + self.endpoint_channel.socket_addr(), + inet.addr(), + self.version, + ) { log::debug!("Expired listen address: {}", listen_addr); return Poll::Ready(TransportEvent::AddressExpired { listener_id: self.listener_id, @@ -445,8 +469,9 @@ impl Stream for Listener

{ } match self.new_connections_rx.poll_next_unpin(cx) { Poll::Ready(Some(connection)) => { - let local_addr = socketaddr_to_multiaddr(connection.local_addr()); - let send_back_addr = socketaddr_to_multiaddr(&connection.remote_addr()); + let local_addr = socketaddr_to_multiaddr(connection.local_addr(), self.version); + let send_back_addr = + socketaddr_to_multiaddr(&connection.remote_addr(), self.version); let event = TransportEvent::Incoming { upgrade: Connecting::new(connection, self.handshake_timeout), local_addr, @@ -486,6 +511,12 @@ impl Drop for Listener

{ } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProtocolVersion { + V1, // i.e. RFC9000 + Draft29, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum SocketFamily { Ipv4, @@ -518,18 +549,25 @@ impl From for SocketFamily { /// /// Returns `None` if the `ip` is not the same socket family as the /// address that the endpoint is bound to. -fn ip_to_listenaddr(endpoint_addr: &SocketAddr, ip: IpAddr) -> Option { +fn ip_to_listenaddr( + endpoint_addr: &SocketAddr, + ip: IpAddr, + version: ProtocolVersion, +) -> Option { // True if either both addresses are Ipv4 or both Ipv6. if !SocketFamily::is_same(&endpoint_addr.ip(), &ip) { return None; } let socket_addr = SocketAddr::new(ip, endpoint_addr.port()); - Some(socketaddr_to_multiaddr(&socket_addr)) + Some(socketaddr_to_multiaddr(&socket_addr, version)) } /// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns None if the format /// of the multiaddr is wrong. -fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option { +fn multiaddr_to_socketaddr( + addr: &Multiaddr, + support_draft_29: bool, +) -> Option<(SocketAddr, ProtocolVersion)> { let mut iter = addr.iter(); let proto1 = iter.next()?; let proto2 = iter.next()?; @@ -541,13 +579,18 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option { _ => return None, } } + let version = match proto3 { + Protocol::QuicV1 => ProtocolVersion::V1, + Protocol::Quic if support_draft_29 => ProtocolVersion::Draft29, + _ => return None, + }; - match (proto1, proto2, proto3) { - (Protocol::Ip4(ip), Protocol::Udp(port), Protocol::QuicV1) => { - Some(SocketAddr::new(ip.into(), port)) + match (proto1, proto2) { + (Protocol::Ip4(ip), Protocol::Udp(port)) => { + Some((SocketAddr::new(ip.into(), port), version)) } - (Protocol::Ip6(ip), Protocol::Udp(port), Protocol::QuicV1) => { - Some(SocketAddr::new(ip.into(), port)) + (Protocol::Ip6(ip), Protocol::Udp(port)) => { + Some((SocketAddr::new(ip.into(), port), version)) } _ => None, } @@ -580,11 +623,15 @@ fn is_quic_addr(addr: &Multiaddr) -> bool { } /// Turns an IP address and port into the corresponding QUIC multiaddr. -fn socketaddr_to_multiaddr(socket_addr: &SocketAddr) -> Multiaddr { +fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) -> Multiaddr { + let quic_proto = match version { + ProtocolVersion::V1 => Protocol::QuicV1, + ProtocolVersion::Draft29 => Protocol::Quic, + }; Multiaddr::empty() .with(socket_addr.ip().into()) .with(Protocol::Udp(socket_addr.port())) - .with(Protocol::QuicV1) + .with(quic_proto) } #[cfg(test)] @@ -598,62 +645,88 @@ mod test { #[test] fn multiaddr_to_udp_conversion() { - assert!( - multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) - .is_none() - ); + assert!(multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/1234".parse::().unwrap(), + true + ) + .is_none()); assert_eq!( multiaddr_to_socketaddr( &"/ip4/127.0.0.1/udp/12345/quic-v1" .parse::() - .unwrap() + .unwrap(), + false ), - Some(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 12345, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,), + ProtocolVersion::V1 )) ); assert_eq!( multiaddr_to_socketaddr( &"/ip4/255.255.255.255/udp/8080/quic-v1" .parse::() - .unwrap() + .unwrap(), + false ), - Some(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), - 8080, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,), + ProtocolVersion::V1 )) ); assert_eq!( multiaddr_to_socketaddr( &"/ip4/127.0.0.1/udp/55148/quic-v1/p2p/12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ" .parse::() - .unwrap() + .unwrap(), false ), - Some(SocketAddr::new( + Some((SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55148, - )) + ), ProtocolVersion::V1)) ); assert_eq!( - multiaddr_to_socketaddr(&"/ip6/::1/udp/12345/quic-v1".parse::().unwrap()), - Some(SocketAddr::new( - IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), - 12345, + multiaddr_to_socketaddr( + &"/ip6/::1/udp/12345/quic-v1".parse::().unwrap(), + false + ), + Some(( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,), + ProtocolVersion::V1 )) ); assert_eq!( multiaddr_to_socketaddr( &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic-v1" .parse::() - .unwrap() + .unwrap(), + false + ), + Some(( + SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, + )), + 8080, + ), + ProtocolVersion::V1 + )) + ); + + assert!(multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/1234/quic".parse::().unwrap(), + false + ) + .is_none()); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/1234/quic".parse::().unwrap(), + true ), - Some(SocketAddr::new( - IpAddr::V6(Ipv6Addr::new( - 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, - )), - 8080, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,), + ProtocolVersion::Draft29 )) ); } @@ -755,6 +828,7 @@ mod test { ToEndpoint::Dial { addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), result: tx, + version: ProtocolVersion::V1, }, cx, ) diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 1df5ddf9e44..3a58b2a5068 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -36,8 +36,8 @@ async fn async_std_smoke() { #[async_std::test] async fn dial_failure() { let _ = env_logger::try_init(); - let mut a = create_transport::().1; - let mut b = create_transport::().1; + let mut a = create_default_transport::().1; + let mut b = create_default_transport::().1; let addr = start_listening(&mut a, "/ip4/127.0.0.1/udp/0/quic-v1").await; drop(a); // stop a so b can never reach it @@ -54,8 +54,8 @@ async fn dial_failure() { #[tokio::test] async fn endpoint_reuse() { let _ = env_logger::try_init(); - let (_, mut a_transport) = create_transport::(); - let (_, mut b_transport) = create_transport::(); + let (_, mut a_transport) = create_default_transport::(); + let (_, mut b_transport) = create_default_transport::(); let a_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; let ((_, b_send_back_addr, _), _) = @@ -79,8 +79,8 @@ async fn endpoint_reuse() { #[async_std::test] async fn ipv4_dial_ipv6() { let _ = env_logger::try_init(); - let (a_peer_id, mut a_transport) = create_transport::(); - let (b_peer_id, mut b_transport) = create_transport::(); + let (a_peer_id, mut a_transport) = create_default_transport::(); + let (b_peer_id, mut b_transport) = create_default_transport::(); let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic-v1").await; let ((a_connected, _, _), (b_connected, _)) = @@ -96,8 +96,8 @@ async fn ipv4_dial_ipv6() { async fn wrong_peerid() { use libp2p::PeerId; - let (a_peer_id, mut a_transport) = create_transport::(); - let (b_peer_id, mut b_transport) = create_transport::(); + let (a_peer_id, mut a_transport) = create_default_transport::(); + let (b_peer_id, mut b_transport) = create_default_transport::(); let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic-v1").await; let a_addr_random_peer = a_addr.with(Protocol::P2p(PeerId::random().into())); @@ -183,6 +183,69 @@ fn concurrent_connections_and_streams_tokio() { .quickcheck(prop:: as fn(_, _) -> _); } +#[cfg(feature = "tokio")] +#[tokio::test] +async fn draft_29_support() { + use std::task::Poll; + + use futures::{future::poll_fn, select}; + use libp2p::TransportError; + + let _ = env_logger::try_init(); + + let (_, mut a_transport) = + create_transport::(|cfg| cfg.support_draft_29 = true); + let (_, mut b_transport) = + create_transport::(|cfg| cfg.support_draft_29 = true); + + // If a server supports draft-29 all its QUIC addresses can be dialed on draft-29 or version-1 + let a_quic_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await; + let a_quic_mapped_addr = swap_protocol!(a_quic_addr, Quic => QuicV1); + let a_quic_v1_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + let a_quic_v1_mapped_addr = swap_protocol!(a_quic_v1_addr, QuicV1 => Quic); + + connect(&mut a_transport, &mut b_transport, a_quic_addr.clone()).await; + connect(&mut a_transport, &mut b_transport, a_quic_mapped_addr).await; + connect(&mut a_transport, &mut b_transport, a_quic_v1_addr).await; + connect(&mut a_transport, &mut b_transport, a_quic_v1_mapped_addr).await; + + let (_, mut c_transport) = + create_transport::(|cfg| cfg.support_draft_29 = false); + assert!(matches!( + c_transport.dial(a_quic_addr), + Err(TransportError::MultiaddrNotSupported(_)) + )); + + // Test disabling draft-29 on a server. + let (_, mut d_transport) = + create_transport::(|cfg| cfg.support_draft_29 = false); + assert!(matches!( + d_transport.listen_on("/ip4/127.0.0.1/udp/0/quic".parse().unwrap()), + Err(TransportError::MultiaddrNotSupported(_)) + )); + let d_quic_v1_addr = start_listening(&mut d_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + let d_quic_addr_mapped = swap_protocol!(d_quic_v1_addr, QuicV1 => Quic); + let dial = b_transport.dial(d_quic_addr_mapped).unwrap(); + let drive_transports = poll_fn::<(), _>(|cx| { + let _ = b_transport.poll_next_unpin(cx); + let _ = d_transport.poll_next_unpin(cx); + Poll::Pending + }); + select! { + _ = drive_transports.fuse() => {} + result = dial.fuse() => { + #[allow(clippy::single_match)] + match result { + Ok(_) => panic!("Unexpected success dialing version-1-only server with draft-29."), + // FIXME: We currently get a Handshake timeout if the server does not support our version. + // Correct would be to get an quinn error "VersionMismatch". + Err(_) => {} + // Err(e) => assert!(format!("{:?}", e).contains("VersionMismatch"), "Got unexpected error {}", e), + } + } + } +} + #[cfg(feature = "async-std")] #[async_std::test] async fn backpressure() { @@ -243,8 +306,8 @@ async fn write_after_peer_dropped_stream() { async fn smoke() { let _ = env_logger::try_init(); - let (a_peer_id, mut a_transport) = create_transport::

(); - let (b_peer_id, mut b_transport) = create_transport::

(); + let (a_peer_id, mut a_transport) = create_default_transport::

(); + let (b_peer_id, mut b_transport) = create_default_transport::

(); let addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; let ((a_connected, _, _), (b_connected, _)) = @@ -255,8 +318,8 @@ async fn smoke() { } async fn build_streams() -> (SubstreamBox, SubstreamBox) { - let (_, mut a_transport) = create_transport::

(); - let (_, mut b_transport) = create_transport::

(); + let (_, mut a_transport) = create_default_transport::

(); + let (_, mut b_transport) = create_default_transport::

(); let addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; let ((_, _, mut conn_a), (_, mut conn_b)) = @@ -303,15 +366,35 @@ async fn build_streams() -> (SubstreamBox, SubstreamBox) { (stream_a, stream_b) } +#[macro_export] +macro_rules! swap_protocol { + ($addr:expr, $From:ident => $To:ident) => { + $addr + .into_iter() + .map(|p| match p { + Protocol::$From => Protocol::$To, + _ => p, + }) + .collect::() + }; +} + fn generate_tls_keypair() -> libp2p::identity::Keypair { libp2p::identity::Keypair::generate_ed25519() } -fn create_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { +fn create_default_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { + create_transport::

(|_| {}) +} + +fn create_transport( + with_config: impl Fn(&mut quic::Config), +) -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { let keypair = generate_tls_keypair(); let peer_id = keypair.public().to_peer_id(); - - let transport = quic::GenTransport::

::new(quic::Config::new(&keypair)) + let mut config = quic::Config::new(&keypair); + with_config(&mut config); + let transport = quic::GenTransport::

::new(config) .map(|(p, c), _| (p, StreamMuxerBox::new(c))) .boxed(); @@ -349,7 +432,7 @@ fn prop( let mut listeners_tx = listeners_tx.clone(); async move { - let (peer_id, mut listener) = create_transport::

(); + let (peer_id, mut listener) = create_default_transport::

(); let addr = start_listening(&mut listener, "/ip4/127.0.0.1/udp/0/quic-v1").await; listeners_tx.send((peer_id, addr)).await.unwrap(); @@ -372,7 +455,7 @@ fn prop( // For each listener node start `number_streams` requests. P::spawn(async move { - let (_, mut dialer) = create_transport::

(); + let (_, mut dialer) = create_default_transport::

(); while let Some((_, listener_addr)) = listeners_rx.next().await { let (_, connection) = dial(&mut dialer, listener_addr.clone()).await.unwrap();