Skip to content

Commit c4132e8

Browse files
committed
feat: improve listening to tcp stream
1 parent 732ca71 commit c4132e8

2 files changed

Lines changed: 186 additions & 129 deletions

File tree

cli/v2/src/main.rs

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ use bdk_sp::{
1010
self,
1111
address::NetworkUnchecked,
1212
bip32,
13-
consensus::Decodable,
13+
consensus::{deserialize, Decodable},
1414
hex::{DisplayHex, FromHex},
1515
key::Secp256k1,
1616
script::PushBytesBuf,
17-
secp256k1::{PublicKey, Scalar, SecretKey},
17+
secp256k1::{PublicKey, Scalar},
1818
Address, Amount, Block, BlockHash, FeeRate, Network, OutPoint, PrivateKey, ScriptBuf,
1919
Sequence, Transaction, TxOut, Txid,
2020
},
@@ -34,7 +34,7 @@ use bdk_sp_oracles::{
3434
TrustedPeer, UnboundedReceiver, Warning,
3535
},
3636
filters::kyoto::{FilterEvent, FilterSubscriber},
37-
frigate::{FrigateClient, FrigateListener, History, SubscribeRequest},
37+
frigate::{FrigateClient, History, SubscribeRequest},
3838
tweaks::blindbit::{BlindbitSubscriber, TweakEvent},
3939
};
4040
use bdk_sp_wallet::{
@@ -168,16 +168,7 @@ pub enum Commands {
168168
rpc_args: RpcArgs,
169169
/// The scan private key for which outputs will be scanned for.
170170
#[clap(long)]
171-
scan_priv_key: SecretKey,
172-
/// The spend public key for which outputs will be scanned for.
173-
#[clap(long)]
174-
spend_pub_key: PublicKey,
175-
/// An optional start parameter from where the scanning will start. When not specified it starts from Taproot activation height.
176-
#[clap(long)]
177171
start: Option<u32>,
178-
/// Optional list of labels to scan for. It always scan for change index with label 0.
179-
#[clap(long)]
180-
labels: Option<Vec<u32>>,
181172
},
182173

183174
Create {
@@ -586,42 +577,51 @@ async fn main() -> anyhow::Result<()> {
586577
);
587578
}
588579
}
589-
Commands::ScanFrigate {
590-
rpc_args,
591-
scan_priv_key,
592-
spend_pub_key,
593-
start,
594-
labels,
595-
} => {
596-
let client = FrigateClient::new(
597-
&rpc_args.url,
598-
rpc_args.rpc_user.as_deref(),
599-
rpc_args.rpc_password.as_deref(),
600-
)?;
580+
Commands::ScanFrigate { rpc_args, start } => {
581+
let mut client = FrigateClient::connect(&rpc_args.url).await.unwrap();
582+
let labels = wallet
583+
.indexer()
584+
.index()
585+
.num_to_label
586+
.clone()
587+
.into_keys()
588+
.collect::<Vec<u32>>();
589+
let labels = if !labels.is_empty() {
590+
Some(labels)
591+
} else {
592+
None
593+
};
601594

602-
let rpc_client = rpc_args.new_client()?;
603595
// send a subscribe request
604596
let subscribe_params = SubscribeRequest {
605-
scan_priv_key,
606-
spend_pub_key: spend_pub_key.into(),
597+
scan_priv_key: *wallet.indexer().scan_sk(),
598+
spend_pub_key: *wallet.indexer().spend_pk(),
607599
start_height: start,
608600
labels,
609601
};
610602

611-
let (history_tx, mut history_rx) =
612-
tokio::sync::mpsc::unbounded_channel::<(Vec<History>, f32)>();
613-
let frigate_listener = std::sync::Arc::new(FrigateListener::new(client, history_tx));
614-
let frigate_listener_task = frigate_listener.clone();
615-
tokio::task::spawn(async move { frigate_listener_task.run(subscribe_params).await });
603+
client.version().await.unwrap();
604+
client.subscribe(&subscribe_params).await.unwrap();
616605

617-
tracing::info!("Starting frigate scanning loop... {rpc_args:#?}");
606+
tracing::info!("Starting frigate scanning loop...");
618607
loop {
619-
if let Some(history) = history_rx.recv().await {
608+
609+
let subscribe_result = client.read_from_stream(4096).await.unwrap();
610+
611+
if subscribe_result["params"].is_object() {
612+
let histories: Vec<History> =
613+
serde_json::from_value(subscribe_result["params"]["history"].clone())?;
614+
let progress = subscribe_result["params"]["progress"]
615+
.as_f64()
616+
.unwrap_or(0.0) as f32;
617+
620618
// Group by block height, then iterate over by fetching block details and apply
621619
let mut secrets_by_height: HashMap<u32, HashMap<Txid, PublicKey>> =
622620
HashMap::new();
623621

624-
history.0.iter().for_each(|h| {
622+
tracing::debug!("Received history {:#?}", histories);
623+
624+
histories.iter().for_each(|h| {
625625
secrets_by_height
626626
.entry(h.height)
627627
.and_modify(|v| {
@@ -632,13 +632,36 @@ async fn main() -> anyhow::Result<()> {
632632

633633
// Filter when the height is 0, because that would mean mempool transaction
634634
for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) {
635-
let block_hash = rpc_client.get_block_hash(secret.0 as u64)?;
636-
let block = rpc_client.get_block(&block_hash)?;
635+
// Since frigate doesn't provide a blockchain.getblock we will mimick that here
636+
// By constructing a block from the block header and the list of transactions
637+
// received from the scan request
638+
let mut raw_blk = client.get_block_header(secret.0).await.unwrap();
639+
raw_blk.push_str("00");
640+
641+
// Push dummy coinbase
642+
let dummy_coinbase = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000";
643+
let coinbase: Transaction =
644+
deserialize(&Vec::<u8>::from_hex(dummy_coinbase).unwrap()).unwrap();
645+
let mut block: Block =
646+
deserialize(&Vec::<u8>::from_hex(&raw_blk).unwrap()).unwrap();
647+
648+
let mut txs: Vec<Transaction> = vec![coinbase];
649+
for key in secret.1.keys() {
650+
let raw_tx = client.get_transaction(key.to_string()).await.unwrap();
651+
let tx: Transaction =
652+
deserialize(&Vec::<u8>::from_hex(&raw_tx).unwrap()).unwrap();
653+
txs.push(tx);
654+
}
655+
656+
block.txdata = txs;
657+
tracing::debug!("Final block {:?}", block);
637658
wallet.apply_block_relevant(&block, secret.1, secret.0);
638659
}
639660

661+
tracing::info!("Progress {progress}");
640662
// Check the progress
641-
if history.1 >= 1.0 {
663+
if progress >= 1.0 {
664+
tracing::warn!("Scanning completed");
642665
break;
643666
}
644667
}

0 commit comments

Comments
 (0)