Skip to content

Commit fbed31b

Browse files
spacebear21DanGould
andcommitted
Introduce HasReplyableError typestate
This enforces proper handling within the receiver state machine, such that they must attempt replying to the sender with an error response before the session can be considered closed. Co-authored-by: DanGould <d@ngould.dev>
1 parent 5882dbb commit fbed31b

3 files changed

Lines changed: 149 additions & 30 deletions

File tree

  • payjoin-cli/src/app/v2
  • payjoin-ffi/src/receive
  • payjoin/src/core/receive/v2

payjoin-cli/src/app/v2/mod.rs

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use payjoin::bitcoin::consensus::encode::serialize_hex;
66
use payjoin::bitcoin::{Amount, FeeRate};
77
use payjoin::persist::OptionalTransitionOutcome;
88
use payjoin::receive::v2::{
9-
process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
10-
MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
11-
Receiver, ReceiverBuilder, SessionHistory, UncheckedOriginalPayload, WantsFeeRange,
9+
replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10+
MaybeInputsOwned, MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal,
11+
ReceiveSession, Receiver, ReceiverBuilder, UncheckedOriginalPayload, WantsFeeRange,
1212
WantsInputs, WantsOutputs,
1313
};
1414
use payjoin::send::v2::{
@@ -70,6 +70,7 @@ impl StatusText for ReceiveSession {
7070
| ReceiveSession::WantsFeeRange(_)
7171
| ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
7272
ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
73+
ReceiveSession::HasReplyableError(_) => "Session failure",
7374
ReceiveSession::TerminalFailure => "Session failure",
7475
}
7576
}
@@ -521,22 +522,13 @@ impl App {
521522
self.finalize_proposal(proposal, persister).await,
522523
ReceiveSession::PayjoinProposal(proposal) =>
523524
self.send_payjoin_proposal(proposal, persister).await,
525+
ReceiveSession::HasReplyableError(error) =>
526+
self.handle_error(error, persister).await,
524527
ReceiveSession::TerminalFailure =>
525528
return Err(anyhow!("Terminal receiver session")),
526529
}
527530
};
528-
529-
match res {
530-
Ok(_) => Ok(()),
531-
Err(e) => {
532-
let (_, session_history) = replay_receiver_event_log(persister)?;
533-
let pj_uri = session_history.pj_uri().extras.endpoint().clone();
534-
let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(pj_uri)).await?;
535-
self.handle_recoverable_error(&ohttp_relay, &session_history).await?;
536-
537-
Err(e)
538-
}
539-
}
531+
res
540532
}
541533

542534
#[allow(clippy::incompatible_msrv)]
@@ -702,20 +694,14 @@ impl App {
702694
Ok(ohttp_relay)
703695
}
704696

705-
/// Handle request error by sending an error response over the directory
706-
async fn handle_recoverable_error(
697+
/// Handle error by attempting to send an error response over the directory
698+
async fn handle_error(
707699
&self,
708-
ohttp_relay: &payjoin::Url,
709-
session_history: &SessionHistory,
700+
session: Receiver<HasReplyableError>,
701+
persister: &ReceiverPersister,
710702
) -> Result<()> {
711-
let e = match session_history.terminal_error() {
712-
Some(e) => e,
713-
_ => return Ok(()),
714-
};
715-
let (err_req, err_ctx) = session_history
716-
.extract_err_req(ohttp_relay.as_str())?
717-
.expect("If JsonReply is Some, then err_req and err_ctx should be Some");
718-
let to_return = anyhow!("Replied with error: {}", e.to_json());
703+
let (err_req, err_ctx) =
704+
session.create_error_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())?;
719705

720706
let err_response = match self.post_request(err_req).await {
721707
Ok(response) => response,
@@ -727,11 +713,11 @@ impl App {
727713
Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
728714
};
729715

730-
if let Err(e) = process_err_res(&err_bytes, err_ctx) {
716+
if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
731717
return Err(anyhow!("Failed to process error response: {}", e));
732718
}
733719

734-
Err(to_return)
720+
Ok(())
735721
}
736722

737723
async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {

payjoin-ffi/src/receive/mod.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub enum ReceiveSession {
7878
WantsFeeRange { inner: Arc<WantsFeeRange> },
7979
ProvisionalProposal { inner: Arc<ProvisionalProposal> },
8080
PayjoinProposal { inner: Arc<PayjoinProposal> },
81+
HasReplyableError { inner: Arc<HasReplyableError> },
8182
TerminalFailure,
8283
}
8384

@@ -105,6 +106,8 @@ impl From<payjoin::receive::v2::ReceiveSession> for ReceiveSession {
105106
Self::ProvisionalProposal { inner: Arc::new(inner.into()) },
106107
ReceiveSession::PayjoinProposal(inner) =>
107108
Self::PayjoinProposal { inner: Arc::new(inner.into()) },
109+
ReceiveSession::HasReplyableError(inner) =>
110+
Self::HasReplyableError { inner: Arc::new(inner.into()) },
108111
ReceiveSession::TerminalFailure => Self::TerminalFailure,
109112
}
110113
}
@@ -1003,6 +1006,80 @@ impl PayjoinProposal {
10031006
}
10041007
}
10051008

1009+
#[derive(Clone, uniffi::Object)]
1010+
pub struct HasReplyableError(
1011+
pub payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
1012+
);
1013+
1014+
impl From<HasReplyableError>
1015+
for payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>
1016+
{
1017+
fn from(value: HasReplyableError) -> Self { value.0 }
1018+
}
1019+
1020+
impl From<payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>>
1021+
for HasReplyableError
1022+
{
1023+
fn from(
1024+
value: payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
1025+
) -> Self {
1026+
Self(value)
1027+
}
1028+
}
1029+
1030+
#[derive(uniffi::Object)]
1031+
pub struct HasReplyableErrorTransition(
1032+
Arc<
1033+
RwLock<
1034+
Option<
1035+
payjoin::persist::MaybeSuccessTransition<
1036+
payjoin::receive::v2::SessionEvent,
1037+
(),
1038+
payjoin::receive::Error,
1039+
>,
1040+
>,
1041+
>,
1042+
>,
1043+
);
1044+
1045+
#[uniffi::export]
1046+
impl HasReplyableErrorTransition {
1047+
pub fn save(
1048+
&self,
1049+
persister: Arc<dyn JsonReceiverSessionPersister>,
1050+
) -> Result<(), ReceiverPersistedError> {
1051+
let adapter = CallbackPersisterAdapter::new(persister);
1052+
let mut inner = self.0.write().expect("Lock should not be poisoned");
1053+
1054+
let value = inner.take().expect("Already saved or moved");
1055+
1056+
value.save(&adapter).map_err(ReceiverPersistedError::from)?;
1057+
Ok(())
1058+
}
1059+
}
1060+
1061+
#[uniffi::export]
1062+
impl HasReplyableError {
1063+
pub fn create_error_request(
1064+
&self,
1065+
ohttp_relay: String,
1066+
) -> Result<RequestResponse, SessionError> {
1067+
self.0.clone().create_error_request(ohttp_relay).map_err(Into::into).map(|(req, ctx)| {
1068+
RequestResponse { request: req.into(), client_response: Arc::new(ctx.into()) }
1069+
})
1070+
}
1071+
1072+
pub fn process_error_response(
1073+
&self,
1074+
body: &[u8],
1075+
ohttp_context: &ClientResponse,
1076+
) -> PayjoinProposalTransition {
1077+
PayjoinProposalTransition(Arc::new(RwLock::new(Some(
1078+
self.0.clone().process_error_response(body, ohttp_context.into()),
1079+
))))
1080+
}
1081+
}
1082+
10061083
/// Session persister that should save and load events as JSON strings.
10071084
#[uniffi::export(with_foreign)]
10081085
pub trait JsonReceiverSessionPersister: Send + Sync {

payjoin/src/core/receive/v2/mod.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ pub enum ReceiveSession {
138138
WantsFeeRange(Receiver<WantsFeeRange>),
139139
ProvisionalProposal(Receiver<ProvisionalProposal>),
140140
PayjoinProposal(Receiver<PayjoinProposal>),
141+
HasReplyableError(Receiver<HasReplyableError>),
141142
TerminalFailure,
142143
}
143144

@@ -188,7 +189,25 @@ impl ReceiveSession {
188189
SessionEvent::FinalizedProposal(payjoin_proposal),
189190
) => Ok(state.apply_finalized_proposal(payjoin_proposal)),
190191

191-
(_, SessionEvent::GotReplyableError(_)) => Ok(ReceiveSession::TerminalFailure),
192+
(session, SessionEvent::GotReplyableError(error)) =>
193+
Ok(ReceiveSession::HasReplyableError(Receiver {
194+
state: HasReplyableError { error_reply: error.clone() },
195+
session_context: match session {
196+
ReceiveSession::Initialized(r) => r.session_context,
197+
ReceiveSession::UncheckedOriginalPayload(r) => r.session_context,
198+
ReceiveSession::MaybeInputsOwned(r) => r.session_context,
199+
ReceiveSession::MaybeInputsSeen(r) => r.session_context,
200+
ReceiveSession::OutputsUnknown(r) => r.session_context,
201+
ReceiveSession::WantsOutputs(r) => r.session_context,
202+
ReceiveSession::WantsInputs(r) => r.session_context,
203+
ReceiveSession::WantsFeeRange(r) => r.session_context,
204+
ReceiveSession::ProvisionalProposal(r) => r.session_context,
205+
ReceiveSession::PayjoinProposal(r) => r.session_context,
206+
ReceiveSession::HasReplyableError(r) => r.session_context,
207+
ReceiveSession::TerminalFailure =>
208+
return Ok(ReceiveSession::TerminalFailure),
209+
},
210+
})),
192211

193212
(current_state, SessionEvent::Closed(_)) => Ok(current_state),
194213

@@ -214,6 +233,7 @@ mod sealed {
214233
impl State for super::WantsFeeRange {}
215234
impl State for super::ProvisionalProposal {}
216235
impl State for super::PayjoinProposal {}
236+
impl State for super::HasReplyableError {}
217237
}
218238

219239
/// Sealed trait for V2 receive session states.
@@ -1083,6 +1103,42 @@ impl Receiver<PayjoinProposal> {
10831103
}
10841104
}
10851105

1106+
#[derive(Debug, Clone, PartialEq)]
1107+
pub struct HasReplyableError {
1108+
error_reply: JsonReply,
1109+
}
1110+
1111+
impl Receiver<HasReplyableError> {
1112+
pub fn create_error_request(
1113+
&self,
1114+
ohttp_relay: impl IntoUrl,
1115+
) -> Result<(Request, ohttp::ClientResponse), SessionError> {
1116+
extract_err_req(&self.error_reply, ohttp_relay, &self.session_context)
1117+
}
1118+
1119+
pub fn process_error_response(
1120+
&self,
1121+
res: &[u8],
1122+
ohttp_context: ohttp::ClientResponse,
1123+
) -> MaybeSuccessTransition<SessionEvent, (), ProtocolError> {
1124+
match process_post_res(res, ohttp_context) {
1125+
Ok(_) =>
1126+
MaybeSuccessTransition::success(SessionEvent::Closed(SessionOutcome::Failure), ()),
1127+
Err(e) =>
1128+
if e.is_fatal() {
1129+
MaybeSuccessTransition::fatal(
1130+
SessionEvent::Closed(SessionOutcome::Failure),
1131+
ProtocolError::V2(InternalSessionError::DirectoryResponse(e).into()),
1132+
)
1133+
} else {
1134+
MaybeSuccessTransition::transient(ProtocolError::V2(
1135+
InternalSessionError::DirectoryResponse(e).into(),
1136+
))
1137+
},
1138+
}
1139+
}
1140+
}
1141+
10861142
/// Derive a mailbox endpoint on a directory given a [`ShortId`].
10871143
/// It consists of a directory URL and the session ShortID in the path.
10881144
fn mailbox_endpoint(directory: &Url, id: &ShortId) -> Url {

0 commit comments

Comments
 (0)