Skip to content

Commit f26f646

Browse files
committed
fix outgoing requests
Signed-off-by: Jorge Prendes <jorge.prendes@gmail.com>
1 parent 915e0e3 commit f26f646

2 files changed

Lines changed: 42 additions & 17 deletions

File tree

src/resource.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ impl<T> Resource<T> {
4848
WriteGuard {
4949
guard: self.inner.clone().write_owned().await,
5050
resource: self.clone(),
51+
do_notify: true,
5152
}
5253
}
5354

@@ -56,7 +57,7 @@ impl<T> Resource<T> {
5657
let mut fut = pin!(fut);
5758
fut.as_mut().enable();
5859

59-
drop(guard);
60+
G::unlock(guard);
6061

6162
fut.await;
6263
G::lock(self.clone()).await
@@ -90,6 +91,12 @@ pub struct ReadGuard<T> {
9091
guard: tokio::sync::OwnedRwLockReadGuard<T>,
9192
}
9293

94+
impl<T> ReadGuard<T> {
95+
fn drop_no_notify(self) {
96+
drop(self);
97+
}
98+
}
99+
93100
impl<T> Deref for ReadGuard<T> {
94101
type Target = T;
95102
fn deref(&self) -> &Self::Target {
@@ -100,6 +107,14 @@ impl<T> Deref for ReadGuard<T> {
100107
pub struct WriteGuard<T> {
101108
guard: tokio::sync::OwnedRwLockWriteGuard<T>,
102109
resource: Resource<T>,
110+
do_notify: bool,
111+
}
112+
113+
impl<T> WriteGuard<T> {
114+
fn drop_no_notify(mut self) {
115+
self.do_notify = false;
116+
drop(self);
117+
}
103118
}
104119

105120
impl<T> Deref for WriteGuard<T> {
@@ -115,27 +130,36 @@ impl<T> DerefMut for WriteGuard<T> {
115130
}
116131
impl<T> Drop for WriteGuard<T> {
117132
fn drop(&mut self) {
118-
self.resource.notify();
133+
if self.do_notify {
134+
self.resource.notify();
135+
}
119136
}
120137
}
121138

122-
pub trait Guard: Sized {
139+
trait Guard: Sized {
123140
type Target;
124141
async fn lock(res: Resource<Self::Target>) -> Self;
142+
fn unlock(self);
125143
}
126144

127145
impl<T> Guard for ReadGuard<T> {
128146
type Target = T;
129147
async fn lock(res: Resource<Self::Target>) -> Self {
130148
res.read().await
131149
}
150+
fn unlock(self) {
151+
self.drop_no_notify();
152+
}
132153
}
133154

134155
impl<T> Guard for WriteGuard<T> {
135156
type Target = T;
136157
async fn lock(res: Resource<Self::Target>) -> Self {
137158
res.write().await
138159
}
160+
fn unlock(self) {
161+
self.drop_no_notify();
162+
}
139163
}
140164

141165
pub trait BlockOn: Future {

src/types/http_outgoing_handler.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,29 +43,29 @@ impl
4343
for (k, v) in headers {
4444
builder = builder.header(k, v);
4545
}
46-
let body = request.body.clone();
47-
let mut body = body.write_wait_until(|b| b.is_finished()).block_on();
4846

49-
// TODO: actually use the trailers
50-
let _trailers = body.trailers.clone();
47+
let future_response = Resource::new(FutureIncomingResponse::default());
48+
let future_response_clone = future_response.clone();
49+
async move {
50+
let body = request.body.clone();
51+
let mut body = body.write_wait_until(|b| b.is_finished()).await;
5152

52-
// TODO: use a streaming body instead of reading it all at once
53-
let body = body.read_all().block_on();
53+
// TODO: actually use the trailers
54+
let _trailers = body.trailers.clone();
5455

55-
let builder = builder.body(body);
56+
// TODO: use a streaming body instead of reading it all at once
57+
let body = body.read_all().await;
5658

57-
let future_response = Resource::new(FutureIncomingResponse::default());
58-
let future_response_clone = future_response.clone();
59+
let builder = builder.body(body);
5960

60-
async move {
61-
let response = builder.send().block_on();
61+
let response = builder.send().await;
6262

6363
let response = match response {
6464
Ok(resp) => resp,
6565
Err(err) => {
6666
future_response_clone
6767
.write()
68-
.block_on()
68+
.await
6969
.set(Err(ErrorCode::InternalError(Some(err.to_string()))));
7070
return;
7171
}
@@ -80,14 +80,15 @@ impl
8080
Err(err) => {
8181
future_response_clone
8282
.write()
83-
.block_on()
83+
.await
8484
.set(Err(ErrorCode::InternalError(Some(err.to_string()))));
8585
return;
8686
}
8787
};
8888

8989
let mut stream = Stream::new();
9090
let _ = stream.write(bytes);
91+
let _ = stream.close();
9192
let body = IncomingBody {
9293
stream: Resource::new(stream),
9394
trailers: Resource::default(),
@@ -103,7 +104,7 @@ impl
103104

104105
future_response_clone
105106
.write()
106-
.block_on()
107+
.await
107108
.set(Ok(Resource::new(response)));
108109
}
109110
.spawn();

0 commit comments

Comments
 (0)