Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,24 @@ impl BitcoindRpcClient {
Ok(())
}

/// Returns two `Vec`s:
/// - mempool transactions, alongside their first-seen unix timestamps.
/// - transactions that have been evicted from the mempool, alongside the last time they were seen absent.
pub(crate) async fn get_updated_mempool_transactions(
&self, best_processed_height: u32, unconfirmed_txids: Vec<Txid>,
) -> std::io::Result<(Vec<(Transaction, u64)>, Vec<(Txid, u64)>)> {
let mempool_txs =
self.get_mempool_transactions_and_timestamp_at_height(best_processed_height).await?;
let evicted_txids = self.get_evicted_mempool_txids_and_timestamp(unconfirmed_txids).await?;
Ok((mempool_txs, evicted_txids))
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
///
/// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
/// transaction only once, unless we cannot assume the transaction's ancestors are already
/// emitted.
pub(crate) async fn get_mempool_transactions_and_timestamp_at_height(
async fn get_mempool_transactions_and_timestamp_at_height(
&self, best_processed_height: u32,
) -> std::io::Result<Vec<(Transaction, u64)>> {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
Expand Down Expand Up @@ -252,6 +264,23 @@ impl BitcoindRpcClient {
}
Ok(txs_to_emit)
}

// Retrieve a list of Txids that have been evicted from the mempool.
//
// To this end, we first update our local mempool_entries_cache and then return all unconfirmed
// wallet `Txid`s that don't appear in the mempool still.
async fn get_evicted_mempool_txids_and_timestamp(
&self, unconfirmed_txids: Vec<Txid>,
) -> std::io::Result<Vec<(Txid, u64)>> {
let latest_mempool_timestamp = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mempool_entries_cache = self.mempool_entries_cache.lock().await;
let evicted_txids = unconfirmed_txids
.into_iter()
.filter(|txid| mempool_entries_cache.contains_key(txid))
.map(|txid| (txid, latest_mempool_timestamp))
.collect();
Ok(evicted_txids)
}
}

impl BlockSource for BitcoindRpcClient {
Expand Down
14 changes: 10 additions & 4 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,18 +1095,24 @@ impl ChainSource {
let cur_height = channel_manager.current_best_block().height;

let now = SystemTime::now();
let unconfirmed_txids = onchain_wallet.get_unconfirmed_txids();
match bitcoind_rpc_client
.get_mempool_transactions_and_timestamp_at_height(cur_height)
.get_updated_mempool_transactions(cur_height, unconfirmed_txids)
.await
{
Ok(unconfirmed_txs) => {
Ok((unconfirmed_txs, evicted_txids)) => {
log_trace!(
logger,
"Finished polling mempool of size {} in {}ms",
"Finished polling mempool of size {} and {} evicted transactions in {}ms",
unconfirmed_txs.len(),
evicted_txids.len(),
now.elapsed().unwrap().as_millis()
);
let _ = onchain_wallet.apply_unconfirmed_txs(unconfirmed_txs);
onchain_wallet
.apply_mempool_txs(unconfirmed_txs, evicted_txids)
.unwrap_or_else(|e| {
log_error!(logger, "Failed to apply mempool transactions: {:?}", e);
});
},
Err(e) => {
log_error!(logger, "Failed to poll for mempool transactions: {:?}", e);
Expand Down
15 changes: 13 additions & 2 deletions src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ where
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
}

pub(crate) fn get_unconfirmed_txids(&self) -> Vec<Txid> {
self.inner
.lock()
.unwrap()
.transactions()
.filter(|t| t.chain_position.is_unconfirmed())
.map(|t| t.tx_node.txid)
.collect()
}

pub(crate) fn current_best_block(&self) -> BestBlock {
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }
Expand Down Expand Up @@ -136,11 +146,12 @@ where
}
}

pub(crate) fn apply_unconfirmed_txs(
&self, unconfirmed_txs: Vec<(Transaction, u64)>,
pub(crate) fn apply_mempool_txs(
&self, unconfirmed_txs: Vec<(Transaction, u64)>, evicted_txids: Vec<(Txid, u64)>,
) -> Result<(), Error> {
let mut locked_wallet = self.inner.lock().unwrap();
locked_wallet.apply_unconfirmed_txs(unconfirmed_txs);
locked_wallet.apply_evicted_txs(evicted_txids);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't clear from the bdk_wallet docs or release page that this is a new requirement. When upgrading, how do you typically see what changes are needed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we knew that we were waiting on exactly that change, but I also went through all the commit history of all the BDK crates since the last upgrade to double-check I'm not overlooking something fundamental. So yeah, I agree the changelog could do a better job listing all the changes that need to be considered (cc @notmandatory).


let mut locked_persister = self.persister.lock().unwrap();
locked_wallet.persist(&mut locked_persister).map_err(|e| {
Expand Down
Loading