Skip to content

Commit 085bd84

Browse files
committed
Add endpoint to cancel a submission
1 parent 6b34ce9 commit 085bd84

5 files changed

Lines changed: 66 additions & 2 deletions

File tree

libs/opsqueue_python/python/opsqueue/producer.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,15 @@ def count_submissions(self) -> int:
314314
"""
315315
return self.inner.count_submissions() # type: ignore[no-any-return]
316316

317+
def cancel_submission(self, submission_id: SubmissionId) -> None:
318+
"""
319+
Cancel a specific submission if it's in progress.
320+
321+
Raises:
322+
- `InternalProducerClientError` if there is a low-level internal error.
323+
"""
324+
return self.inner.cancel_submission(submission_id)
325+
317326
def get_submission_status(
318327
self, submission_id: SubmissionId
319328
) -> SubmissionStatus | None:

libs/opsqueue_python/src/producer.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,25 @@ impl ProducerClient {
118118
})
119119
}
120120

121+
/// TODO docstring
122+
pub fn cancel_submission(
123+
&self,
124+
py: Python<'_>,
125+
id: SubmissionId,
126+
) -> CPyResult<(), E<FatalPythonException, InternalProducerClientError>>
127+
{
128+
py.allow_threads(|| {
129+
self.block_unless_interrupted(async {
130+
self.producer_client
131+
.cancel_submission(id.into())
132+
.await
133+
.map_err(|e| CError(R(e)))
134+
})
135+
// .map(|opt| opt.map(Into::into))
136+
// .map_err(|e| ProducerClientError::new_err(e.to_string()))
137+
})
138+
}
139+
121140
/// Retrieve the status (in progress, completed or failed) of a specific submission.
122141
///
123142
/// The returned SubmissionStatus object also includes the number of chunks finished so far,

opsqueue/src/common/submission.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,9 +668,9 @@ pub mod db {
668668
"
669669
INSERT INTO submissions_cancelled
670670
(id, chunks_total, prefix, metadata, cancelled_at, chunks_done)
671-
SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $3;
671+
SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $2;
672672
673-
DELETE FROM submissions WHERE id = $4 RETURNING *;
673+
DELETE FROM submissions WHERE id = $3 RETURNING *;
674674
",
675675
now,
676676
id,

opsqueue/src/producer/client.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,29 @@ impl Client {
101101
})
102102
.retry(retry_policy())
103103
.when(InternalProducerClientError::is_ephemeral)
104+
.notify(|err, dur| {
105+
tracing::debug!("retrying error {err:?} with sleeping {dur:?}");}) .await
106+
}
107+
108+
/// TODO docstring
109+
pub async fn cancel_submission(
110+
&self,
111+
submission_id: SubmissionId,
112+
) -> Result<(), InternalProducerClientError> {
113+
(|| async {
114+
let base_url = &self.base_url;
115+
self
116+
.http_client
117+
.post(format!("{base_url}/submissions/cancel/{submission_id}"))
118+
.send()
119+
.await?
120+
.error_for_status()?;
121+
// let bytes = resp.bytes().await?;
122+
// let body = serde_json::from_slice(&bytes)?;
123+
Ok(())
124+
})
125+
.retry(retry_policy())
126+
.when(InternalProducerClientError::is_ephemeral)
104127
.notify(|err, dur| {
105128
tracing::debug!("retrying error {err:?} with sleeping {dur:?}");
106129
})

opsqueue/src/producer/server.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ impl ServerState {
4646
pub fn build_router(self) -> Router<()> {
4747
Router::new()
4848
.route("/submissions", post(insert_submission))
49+
.route(
50+
"/submissions/cancel/{submission_id}",
51+
post(cancel_submission),
52+
)
4953
.route(
5054
"/submissions/count_completed",
5155
get(submissions_count_completed),
@@ -85,6 +89,15 @@ where
8589
}
8690
}
8791

92+
async fn cancel_submission(
93+
State(state): State<ServerState>,
94+
Path(submission_id): Path<SubmissionId>,
95+
) -> Result<(), ServerError> {
96+
let mut conn = state.pool.writer_conn().await?;
97+
submission::db::cancel_submission(submission_id, &mut conn).await?;
98+
Ok(())
99+
}
100+
88101
async fn submission_status(
89102
State(state): State<ServerState>,
90103
Path(submission_id): Path<SubmissionId>,

0 commit comments

Comments
 (0)