@@ -10,7 +10,8 @@ use bdk_sp::{
1010 self ,
1111 address:: NetworkUnchecked ,
1212 bip32,
13- consensus:: Decodable ,
13+ consensus:: { deserialize, Decodable } ,
14+ hashes:: Hash ,
1415 hex:: { DisplayHex , FromHex } ,
1516 key:: Secp256k1 ,
1617 script:: PushBytesBuf ,
@@ -34,6 +35,7 @@ use bdk_sp_oracles::{
3435 TrustedPeer , UnboundedReceiver , Warning ,
3536 } ,
3637 filters:: kyoto:: { FilterEvent , FilterSubscriber } ,
38+ frigate:: { FrigateClient , History , SubscribeRequest , UnsubscribeRequest , DUMMY_COINBASE } ,
3739 tweaks:: blindbit:: { BlindbitSubscriber , TweakEvent } ,
3840} ;
3941use bdk_sp_wallet:: {
@@ -161,6 +163,16 @@ pub enum Commands {
161163 #[ clap( long) ]
162164 hash : Option < BlockHash > ,
163165 } ,
166+
167+ ScanFrigate {
168+ #[ clap( flatten) ]
169+ rpc_args : RpcArgs ,
170+ #[ clap( long) ]
171+ height : Option < u32 > ,
172+ #[ clap( long) ]
173+ hash : Option < BlockHash > ,
174+ } ,
175+
164176 Create {
165177 /// Network
166178 #[ clap( long, short, default_value = "signet" ) ]
@@ -567,6 +579,167 @@ async fn main() -> anyhow::Result<()> {
567579 ) ;
568580 }
569581 }
582+ Commands :: ScanFrigate {
583+ rpc_args,
584+ height,
585+ hash,
586+ } => {
587+ // The implementation done here differs from what is mentioned in the section
588+ // https://github.com/sparrowwallet/frigate/tree/master?tab=readme-ov-file#blockchainsilentpaymentssubscribe
589+ // This implementation is doing a one time scanning only. So instead of calling
590+ // `blockchain.scripthash.subscribe` on each script from the wallet, we just subscribe
591+ // and read the scanning result from the stream. On each result received we update the
592+ // wallet state and once scanning progress reaches 1.0 (100%) we stop.
593+ let sync_point = if let ( Some ( height) , Some ( hash) ) = ( height, hash) {
594+ HeaderCheckpoint :: new ( height, hash)
595+ } else if wallet. birthday . height <= wallet. chain ( ) . tip ( ) . height ( ) {
596+ let height = wallet. chain ( ) . tip ( ) . height ( ) ;
597+ let hash = wallet. chain ( ) . tip ( ) . hash ( ) ;
598+ HeaderCheckpoint :: new ( height, hash)
599+ } else {
600+ let checkpoint = wallet
601+ . chain ( )
602+ . get ( wallet. birthday . height )
603+ . expect ( "should be something" ) ;
604+ let height = checkpoint. height ( ) ;
605+ let hash = checkpoint. hash ( ) ;
606+ HeaderCheckpoint :: new ( height, hash)
607+ } ;
608+
609+ let mut client = FrigateClient :: connect ( & rpc_args. url )
610+ . await
611+ . unwrap ( )
612+ . with_timeout ( tokio:: time:: Duration :: from_secs ( 60 ) ) ;
613+
614+ let labels = wallet
615+ . indexer ( )
616+ . index ( )
617+ . num_to_label
618+ . clone ( )
619+ . into_keys ( )
620+ . collect :: < Vec < u32 > > ( ) ;
621+ let labels = if !labels. is_empty ( ) {
622+ Some ( labels)
623+ } else {
624+ None
625+ } ;
626+
627+ let subscribe_params = SubscribeRequest {
628+ scan_priv_key : * wallet. indexer ( ) . scan_sk ( ) ,
629+ spend_pub_key : * wallet. indexer ( ) . spend_pk ( ) ,
630+ start_height : Some ( sync_point. height ) ,
631+ labels,
632+ } ;
633+
634+ // Attempt to subscribe; any timeout will trigger unsubscribe automatically.
635+ match client. subscribe_with_timeout ( & subscribe_params) . await {
636+ Ok ( Some ( ( histories, progress) ) ) => {
637+ tracing:: info!(
638+ "Initial subscription result: {} histories, progress {}" ,
639+ histories. len( ) ,
640+ progress
641+ ) ;
642+ }
643+ Ok ( None ) => {
644+ tracing:: info!( "Subscription acknowledged, awaiting notifications" ) ;
645+ }
646+ Err ( e) => {
647+ tracing:: error!( "Subscribe failed: {}" , e) ;
648+ return Err ( e. into ( ) ) ;
649+ }
650+ }
651+
652+ tracing:: info!( "Starting frigate scanning loop..." ) ;
653+ loop {
654+ match client. read_from_stream ( 4096 ) . await {
655+ Ok ( subscribe_result) => {
656+ if subscribe_result[ "params" ] . is_object ( ) {
657+ let histories: Vec < History > = serde_json:: from_value (
658+ subscribe_result[ "params" ] [ "history" ] . clone ( ) ,
659+ ) ?;
660+ let progress = subscribe_result[ "params" ] [ "progress" ]
661+ . as_f64 ( )
662+ . unwrap_or ( 0.0 ) as f32 ;
663+
664+ let mut secrets_by_height: HashMap < u32 , HashMap < Txid , PublicKey > > =
665+ HashMap :: new ( ) ;
666+
667+ tracing:: debug!( "Received history {:#?}" , histories) ;
668+
669+ histories. iter ( ) . for_each ( |h| {
670+ secrets_by_height
671+ . entry ( h. height )
672+ . and_modify ( |v| {
673+ v. insert ( h. tx_hash , h. tweak_key ) ;
674+ } )
675+ . or_insert ( HashMap :: from ( [ ( h. tx_hash , h. tweak_key ) ] ) ) ;
676+ } ) ;
677+
678+ // Filter when the height is 0, because that would mean mempool transaction
679+ for secret in secrets_by_height. into_iter ( ) . filter ( |v| v. 0 > 0 ) {
680+ // Since frigate doesn't provide a blockchain.getblock we will mimick that here
681+ // By constructing a block from the block header and the list of transactions
682+ // received from the scan request
683+ let mut raw_blk = client. get_block_header ( secret. 0 ) . await . unwrap ( ) ;
684+ raw_blk. push_str ( "00" ) ;
685+
686+ // Push dummy coinbase
687+ let coinbase: Transaction =
688+ deserialize ( & Vec :: < u8 > :: from_hex ( DUMMY_COINBASE ) . unwrap ( ) )
689+ . unwrap ( ) ;
690+ let mut block: Block =
691+ deserialize ( & Vec :: < u8 > :: from_hex ( & raw_blk) . unwrap ( ) ) . unwrap ( ) ;
692+
693+ let mut blockhash = BlockHash :: all_zeros ( ) ;
694+
695+ let mut txs: Vec < Transaction > = vec ! [ coinbase] ;
696+ for key in secret. 1 . keys ( ) {
697+ let tx_result =
698+ client. get_transaction ( key. to_string ( ) ) . await . unwrap ( ) ;
699+ let tx: Transaction =
700+ deserialize ( & Vec :: < u8 > :: from_hex ( & tx_result. 1 ) . unwrap ( ) )
701+ . unwrap ( ) ;
702+ txs. push ( tx) ;
703+
704+ blockhash = BlockHash :: from_str ( & tx_result. 0 ) . unwrap ( ) ;
705+ }
706+
707+ block. txdata = txs;
708+ tracing:: debug!( "Final block {:?}" , block) ;
709+ wallet. apply_block_relevant ( & block, secret. 1 , secret. 0 ) ;
710+
711+ tracing:: debug!( "Checkpoint hash {blockhash:?}" ) ;
712+ let checkpoint = wallet. chain ( ) . tip ( ) . insert ( BlockId {
713+ height : secret. 0 ,
714+ hash : blockhash,
715+ } ) ;
716+ wallet. update_chain ( checkpoint) ;
717+ }
718+
719+ tracing:: info!( "Progress {progress}" ) ;
720+ // Check the progress
721+ if progress >= 1.0 {
722+ tracing:: info!( "Scanning completed" ) ;
723+ break ;
724+ }
725+ }
726+ }
727+ Err ( e) if e. to_string ( ) . contains ( "timed out" ) => {
728+ tracing:: warn!( "read_from_stream timeout, exiting scan" ) ;
729+ let unsubscribe_request = UnsubscribeRequest {
730+ scan_privkey : * wallet. indexer ( ) . scan_sk ( ) ,
731+ spend_pubkey : * wallet. indexer ( ) . spend_pk ( ) ,
732+ } ;
733+ let _ = client. unsubscribe ( & unsubscribe_request) . await ;
734+ break ;
735+ }
736+ Err ( e) => {
737+ tracing:: error!( "read_from_stream error: {}" , e) ;
738+ return Err ( e. into ( ) ) ;
739+ }
740+ }
741+ }
742+ }
570743 Commands :: Balance => {
571744 fn print_balances < ' a > (
572745 title_str : & ' a str ,
0 commit comments