-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathlib.rs
More file actions
101 lines (93 loc) · 3.95 KB
/
lib.rs
File metadata and controls
101 lines (93 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use anyhow::anyhow;
use aws_smithy_async::rt::sleep::{AsyncSleep, Sleep};
use aws_smithy_runtime_api::client::http::{
HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
};
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::result::ConnectorError;
use aws_smithy_runtime_api::client::retries::ErrorKind;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::http::Response;
use aws_smithy_types::body::SdkBody;
use http_body_util::{BodyStream, StreamBody};
use std::time::Duration;
use sync_wrapper::SyncStream;
use wstd::http::{Body as WstdBody, BodyExt, Client};
pub fn sleep_impl() -> impl AsyncSleep + 'static {
WstdSleep
}
#[derive(Debug)]
struct WstdSleep;
impl AsyncSleep for WstdSleep {
fn sleep(&self, duration: Duration) -> Sleep {
Sleep::new(async move {
wstd::task::sleep(wstd::time::Duration::from(duration)).await;
})
}
}
pub fn http_client() -> impl HttpClient + 'static {
WstdHttpClient
}
#[derive(Debug)]
struct WstdHttpClient;
impl HttpClient for WstdHttpClient {
fn http_connector(
&self,
settings: &HttpConnectorSettings,
// afaict, none of these components are relevant to this
// implementation.
_components: &RuntimeComponents,
) -> SharedHttpConnector {
let mut client = Client::new();
if let Some(timeout) = settings.connect_timeout() {
client.set_connect_timeout(timeout);
}
if let Some(timeout) = settings.read_timeout() {
client.set_first_byte_timeout(timeout);
}
SharedHttpConnector::new(WstdHttpConnector(client))
}
}
#[derive(Debug)]
struct WstdHttpConnector(Client);
impl HttpConnector for WstdHttpConnector {
fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
let client = self.0.clone();
HttpConnectorFuture::new(async move {
let request = request
.try_into_http1x()
// This can only fail if the Extensions fail to convert
.map_err(|e| ConnectorError::other(Box::new(e), None))?;
// smithy's SdkBody Error is a non-'static boxed dyn stderror.
// Anyhow can't represent that, so convert it to the debug impl.
let request =
request.map(|body| WstdBody::from_http_body(body.map_err(|e| anyhow!("{e:?}"))));
// Any error given by send is considered a "ClientError" kind
// which should prevent smithy from retrying like it would for a
// throttling error
let response = client
.send(request)
.await
.map_err(|e| ConnectorError::other(e.into(), Some(ErrorKind::ClientError)))?;
Response::try_from(response.map(|wstd_body| {
// You'd think that an SdkBody would just be an impl Body with
// the usual error type dance.
let nonsync_body = wstd_body
.into_boxed_body()
.map_err(|e| e.into_boxed_dyn_error());
// But we have to do this weird dance: because Axum insists
// bodies are not Sync, wstd settled on non-Sync bodies.
// Smithy insists on Sync bodies. The SyncStream type exists
// to assert, because all Stream operations are on &mut self,
// all Streams are Sync. So, turn the Body into a Stream, make
// it sync, then back to a Body.
let nonsync_stream = BodyStream::new(nonsync_body);
let sync_stream = SyncStream::new(nonsync_stream);
let sync_body = StreamBody::new(sync_stream);
SdkBody::from_body_1_x(sync_body)
}))
// This can only fail if the Extensions fail to convert
.map_err(|e| ConnectorError::other(Box::new(e), None))
})
}
}