This repository was archived by the owner on Feb 3, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathchain.rs
More file actions
378 lines (334 loc) · 14 KB
/
chain.rs
File metadata and controls
378 lines (334 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};
use crate::error::MutinyError;
use crate::wallet::MutinyWallet;
use bdk::blockchain::Blockchain;
use bdk_macros::maybe_await;
use lightning::chain::chaininterface::{
BroadcasterInterface, ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW,
};
use lightning::chain::{Confirm, Filter, WatchedOutput};
use log::error;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use wasm_bindgen_futures::spawn_local;
pub struct MutinyChain {
wallet: Arc<MutinyWallet>,
// Transactions that were registered via the `Filter` interface and have to be processed.
queued_transactions: Mutex<HashSet<Txid>>,
// Transactions that were previously processed, but must not be forgotten yet.
watched_transactions: Mutex<HashSet<Txid>>,
// Outputs that were registered via the `Filter` interface and have to be processed.
queued_outputs: Mutex<HashSet<WatchedOutput>>,
// Outputs that were previously processed, but must not be forgotten yet.
watched_outputs: Mutex<HashSet<WatchedOutput>>,
// The tip hash observed during our last sync.
last_sync_hash: futures::lock::Mutex<Option<BlockHash>>,
}
impl MutinyChain {
pub(crate) fn new(wallet: Arc<MutinyWallet>) -> Self {
let watched_transactions = Mutex::new(HashSet::new());
let queued_transactions = Mutex::new(HashSet::new());
let watched_outputs = Mutex::new(HashSet::new());
let queued_outputs = Mutex::new(HashSet::new());
let last_sync_hash = futures::lock::Mutex::new(None);
Self {
wallet,
queued_transactions,
watched_transactions,
queued_outputs,
watched_outputs,
last_sync_hash,
}
}
/// Syncs the LDK wallet via the `Confirm` interface. We run in a loop until we completed a
/// full iteration without
pub(crate) async fn sync(
&self,
confirmables: Vec<&(dyn Confirm + Sync)>,
) -> Result<(), MutinyError> {
// This lock makes sure we're syncing once at a time.
let mut locked_last_sync_hash = self.last_sync_hash.lock().await;
let client = &*self.wallet.blockchain;
let mut tip_hash = client.get_tip_hash().await?;
loop {
let registrations_are_pending = self.process_queues();
let tip_is_new = Some(tip_hash) != *locked_last_sync_hash;
// We loop until any registered transactions have been processed at least once, or the
// tip hasn't been updated during the last iteration.
if !registrations_are_pending && !tip_is_new {
// Nothing to do.
break;
} else {
// Update the known tip to the newest one.
if tip_is_new {
// First check for any unconfirmed transactions and act on it immediately.
self.sync_unconfirmed_transactions(&confirmables).await?;
match self.sync_best_block_updated(&confirmables, &tip_hash).await {
Ok(()) => {}
Err(MutinyError::ChainAccessFailed) => {
// Immediately restart syncing when we encounter any inconsistencies.
continue;
}
Err(err) => {
// (Semi-)permanent failure, retry later.
return Err(err);
}
}
}
match self.get_confirmed_transactions().await {
Ok((confirmed_txs, unconfirmed_registered_txs, unspent_registered_outputs)) => {
// Double-check tip hash. If something changed, restart last-minute.
tip_hash = client.get_tip_hash().await?;
if Some(tip_hash) != *locked_last_sync_hash {
continue;
}
self.sync_confirmed_transactions(
&confirmables,
confirmed_txs,
unconfirmed_registered_txs,
unspent_registered_outputs,
);
}
Err(MutinyError::ChainAccessFailed) => {
// Immediately restart syncing when we encounter any inconsistencies.
continue;
}
Err(err) => {
// (Semi-)permanent failure, retry later.
return Err(err);
}
}
*locked_last_sync_hash = Some(tip_hash);
}
}
Ok(())
}
// Processes the transaction and output queues, returns `true` if new items had been
// registered.
fn process_queues(&self) -> bool {
let mut pending_registrations = false;
{
let mut locked_queued_transactions = self.queued_transactions.lock().unwrap();
if !locked_queued_transactions.is_empty() {
let mut locked_watched_transactions = self.watched_transactions.lock().unwrap();
pending_registrations = true;
locked_watched_transactions.extend(locked_queued_transactions.iter());
*locked_queued_transactions = HashSet::new();
}
}
{
let mut locked_queued_outputs = self.queued_outputs.lock().unwrap();
if !locked_queued_outputs.is_empty() {
let mut locked_watched_outputs = self.watched_outputs.lock().unwrap();
pending_registrations = true;
locked_watched_outputs.extend(locked_queued_outputs.iter().cloned());
*locked_queued_outputs = HashSet::new();
}
}
pending_registrations
}
async fn sync_best_block_updated(
&self,
confirmables: &Vec<&(dyn Confirm + Sync)>,
tip_hash: &BlockHash,
) -> Result<(), MutinyError> {
let client = &*self.wallet.blockchain;
// Inform the interface of the new block.
let tip_header = client.get_header_by_hash(tip_hash).await?;
let tip_status = client.get_block_status(tip_hash).await?;
if tip_status.in_best_chain {
if let Some(tip_height) = tip_status.height {
for c in confirmables {
c.best_block_updated(&tip_header, tip_height);
}
}
} else {
return Err(MutinyError::ChainAccessFailed);
}
Ok(())
}
fn sync_confirmed_transactions(
&self,
confirmables: &Vec<&(dyn Confirm + Sync)>,
confirmed_txs: Vec<ConfirmedTx>,
unconfirmed_registered_txs: HashSet<Txid>,
unspent_registered_outputs: HashSet<WatchedOutput>,
) {
for ctx in confirmed_txs {
for c in confirmables {
c.transactions_confirmed(
&ctx.block_header,
&[(ctx.pos, &ctx.tx)],
ctx.block_height,
);
}
}
*self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs;
*self.watched_outputs.lock().unwrap() = unspent_registered_outputs;
}
async fn get_confirmed_transactions(
&self,
) -> Result<(Vec<ConfirmedTx>, HashSet<Txid>, HashSet<WatchedOutput>), MutinyError> {
let client = &*self.wallet.blockchain;
// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
let mut confirmed_txs = Vec::new();
// Check in the current queue, as well as in registered transactions leftover from
// previous iterations.
let registered_txs = self.watched_transactions.lock().unwrap().clone();
// Remember all registered but unconfirmed transactions for future processing.
let mut unconfirmed_registered_txs = HashSet::new();
for txid in registered_txs {
if let Some(confirmed_tx) = self.get_confirmed_tx(&txid, None, None).await? {
confirmed_txs.push(confirmed_tx);
} else {
unconfirmed_registered_txs.insert(txid);
}
}
// Check all registered outputs for dependent spending transactions.
let registered_outputs = self.watched_outputs.lock().unwrap().clone();
// Remember all registered outputs that haven't been spent for future processing.
let mut unspent_registered_outputs = HashSet::new();
for output in registered_outputs {
if let Some(output_status) = client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
.await?
{
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
if let Some(confirmed_tx) = self
.get_confirmed_tx(
&spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
)
.await?
{
confirmed_txs.push(confirmed_tx);
continue;
}
}
}
}
unspent_registered_outputs.insert(output);
}
// Sort all confirmed transactions first by block height, then by in-block
// position, and finally feed them to the interface in order.
confirmed_txs.sort_unstable_by(|tx1, tx2| {
tx1.block_height
.cmp(&tx2.block_height)
.then_with(|| tx1.pos.cmp(&tx2.pos))
});
Ok((
confirmed_txs,
unconfirmed_registered_txs,
unspent_registered_outputs,
))
}
async fn get_confirmed_tx(
&self,
txid: &Txid,
expected_block_hash: Option<BlockHash>,
known_block_height: Option<u32>,
) -> Result<Option<ConfirmedTx>, MutinyError> {
let client = &*self.wallet.blockchain;
if let Some(merkle_proof) = client.get_merkle_proof(txid).await? {
let block_hash = client.get_block_hash(merkle_proof.block_height).await?;
if let Some(expected_block_hash) = expected_block_hash {
if expected_block_hash != block_hash {
return Err(MutinyError::ChainAccessFailed);
}
}
let block_header = client.get_header_by_hash(&block_hash).await?;
if let Some(tx) = client.get_tx(txid).await? {
// We can take a shortcut here if a previous call already gave us the height.
if let Some(block_height) = known_block_height {
// if we have mismatched heights something probably went wrong
if merkle_proof.block_height != block_height {
return Err(MutinyError::ChainAccessFailed);
}
return Ok(Some(ConfirmedTx {
tx,
block_header,
pos: merkle_proof.pos,
block_height,
}));
}
return Ok(Some(ConfirmedTx {
tx,
block_header,
pos: merkle_proof.pos,
block_height: merkle_proof.block_height,
}));
}
}
Ok(None)
}
async fn sync_unconfirmed_transactions(
&self,
confirmables: &Vec<&(dyn Confirm + Sync)>,
) -> Result<(), MutinyError> {
let client = &*self.wallet.blockchain;
// Query the interface for relevant txids and check whether they are still
// in the best chain, mark them unconfirmed otherwise.
let relevant_txids = confirmables
.iter()
.flat_map(|c| c.get_relevant_txids())
.collect::<Vec<Txid>>();
for txid in relevant_txids {
match client.get_tx_status(&txid).await {
Ok(Some(status)) => {
// Skip if the tx in question is still confirmed.
if status.confirmed {
continue;
}
}
// if the tx no longer exists or errors, we should
// consider it unconfirmed
Ok(None) => (),
Err(_) => (),
}
for c in confirmables {
c.transaction_unconfirmed(&txid);
}
}
Ok(())
}
}
struct ConfirmedTx {
tx: Transaction,
block_header: BlockHeader,
block_height: u32,
pos: usize,
}
impl Filter for MutinyChain {
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
self.queued_transactions.lock().unwrap().insert(*txid);
}
fn register_output(&self, output: WatchedOutput) {
self.queued_outputs.lock().unwrap().insert(output);
}
}
impl BroadcasterInterface for MutinyChain {
fn broadcast_transaction(&self, tx: &Transaction) {
let blockchain = self.wallet.blockchain.clone();
let tx_clone = tx.clone();
spawn_local(async move {
maybe_await!(blockchain.broadcast(&tx_clone))
.unwrap_or_else(|_| error!("failed to broadcast tx! {}", tx_clone.txid()))
});
}
}
impl FeeEstimator for MutinyChain {
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
// todo get from esplora
fallback_fee_from_conf_target(confirmation_target)
}
}
fn fallback_fee_from_conf_target(confirmation_target: ConfirmationTarget) -> u32 {
match confirmation_target {
ConfirmationTarget::Background => FEERATE_FLOOR_SATS_PER_KW,
ConfirmationTarget::Normal => 2000,
ConfirmationTarget::HighPriority => 5000,
}
}