Skip to content

Commit ad6dc40

Browse files
committed
feat: real-time mempool detection via CipherScan WebSocket
Connect to CipherScan WS with service key, subscribe to raw_mempool channel for instant mempool tx push with raw_hex. Trial decryption happens immediately on push — zero HTTP overhead. Polling retained as 30s fallback for resilience. No behavior change when service key is not configured.
1 parent c844386 commit ad6dc40

7 files changed

Lines changed: 326 additions & 20 deletions

File tree

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
*.db
66
*.db-shm
77
*.db-wal
8-
8+
.bizdev-outreach.md

Cargo.lock

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ serde_json = "1"
2525
# HTTP client (for CipherScan API + webhooks)
2626
reqwest = { version = "0.12", features = ["json"] }
2727

28+
# WebSocket client (for CipherScan real-time mempool push)
29+
tokio-tungstenite = { version = "0.26", features = ["native-tls"] }
30+
2831
# Zcash crates (aligned with zcash-explorer WASM versions)
2932
zcash_primitives = { version = "0.25", default-features = false }
3033
zcash_note_encryption = "0.4"

src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub struct Config {
2929
pub billing_cycle_days_new: i64,
3030
pub billing_cycle_days_standard: i64,
3131
pub admin_key: Option<String>,
32+
pub cipherscan_service_key: Option<String>,
3233
}
3334

3435
impl Config {
@@ -85,6 +86,7 @@ impl Config {
8586
.unwrap_or_else(|_| "30".into())
8687
.parse()?,
8788
admin_key: env::var("ADMIN_KEY").ok().filter(|s| !s.is_empty()),
89+
cipherscan_service_key: env::var("CIPHERSCAN_SERVICE_KEY").ok().filter(|s| !s.is_empty()),
8890
})
8991
}
9092

src/main.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,9 @@ async fn main() -> anyhow::Result<()> {
4444
db::migrate_blind_index_to_hmac(&pool, &config.encryption_key).await?;
4545
let mut default_headers = reqwest::header::HeaderMap::new();
4646
default_headers.insert("User-Agent", reqwest::header::HeaderValue::from_static("CipherPay/1.0"));
47-
if let Ok(key) = std::env::var("CIPHERSCAN_SERVICE_KEY") {
48-
if !key.is_empty() {
49-
if let Ok(val) = reqwest::header::HeaderValue::from_str(&key) {
50-
default_headers.insert("X-Service-Key", val);
51-
}
47+
if let Some(ref key) = config.cipherscan_service_key {
48+
if let Ok(val) = reqwest::header::HeaderValue::from_str(key) {
49+
default_headers.insert("X-Service-Key", val);
5250
}
5351
}
5452
let http_client = reqwest::Client::builder()

src/scanner/mod.rs

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod mempool;
22
pub mod blocks;
33
pub mod decrypt;
4+
pub mod ws;
45

56
use std::collections::HashMap;
67
use std::sync::Arc;
@@ -43,33 +44,83 @@ pub async fn run(config: Config, pool: SqlitePool, http: reqwest::Client) {
4344
"Scanner started"
4445
);
4546

47+
// Spawn WS client if service key is configured
48+
let mut ws_rx: Option<tokio::sync::mpsc::Receiver<ws::MempoolPush>> = None;
49+
if let Some(ref key) = config.cipherscan_service_key {
50+
let ws_url = ws::api_url_to_ws(&config.cipherscan_api_url);
51+
let (tx, rx) = tokio::sync::mpsc::channel(256);
52+
ws_rx = Some(rx);
53+
let ws_key = key.clone();
54+
tokio::spawn(async move {
55+
ws::run(ws_url, ws_key, tx).await;
56+
});
57+
}
58+
4659
let mempool_config = config.clone();
4760
let mempool_pool = pool.clone();
4861
let mempool_http = http.clone();
4962
let mempool_seen = seen_txids.clone();
5063
let mempool_cb = circuit_breaker.clone();
64+
let has_ws = ws_rx.is_some();
5165

5266
let mempool_handle = tokio::spawn(async move {
5367
let mut key_cache: Option<KeyCache> = None;
68+
let mut ws_receiver = ws_rx;
69+
70+
// With WS: poll every 30s as a slow fallback. Without: use configured interval.
71+
let poll_secs = if has_ws { 30 } else { mempool_config.mempool_poll_interval_secs };
5472
let mut interval = tokio::time::interval(
55-
std::time::Duration::from_secs(mempool_config.mempool_poll_interval_secs),
73+
std::time::Duration::from_secs(poll_secs),
5674
);
75+
76+
if has_ws {
77+
tracing::info!(poll_fallback_secs = poll_secs, "Mempool: WebSocket mode + polling fallback");
78+
}
79+
5780
loop {
58-
interval.tick().await;
59-
if mempool_cb.is_open() {
60-
tracing::debug!("CipherScan circuit breaker open, skipping mempool scan");
61-
continue;
62-
}
63-
match scan_mempool(&mempool_config, &mempool_pool, &mempool_http, &mempool_seen, &mut key_cache).await {
64-
Ok(_) => mempool_cb.record_success(),
65-
Err(e) => {
66-
mempool_cb.record_failure();
67-
tracing::error!(error = %e, "Mempool scan error");
81+
tokio::select! {
82+
result = async {
83+
match ws_receiver.as_mut() {
84+
Some(rx) => rx.recv().await,
85+
None => std::future::pending().await,
86+
}
87+
} => {
88+
match result {
89+
Some(push) => {
90+
{
91+
let mut seen_set = mempool_seen.write().await;
92+
seen_set.insert(push.txid.clone(), Instant::now());
93+
}
94+
if let Err(e) = process_ws_mempool_tx(
95+
&mempool_config, &mempool_pool, &mempool_http,
96+
&push, &mut key_cache,
97+
).await {
98+
tracing::error!(error = %e, txid = %push.txid, "WS mempool tx error");
99+
}
100+
}
101+
None => {
102+
tracing::warn!("[WS] Channel closed, falling back to polling only");
103+
ws_receiver = None;
104+
}
105+
}
68106
}
69-
}
107+
_ = interval.tick() => {
108+
if mempool_cb.is_open() {
109+
tracing::debug!("CipherScan circuit breaker open, skipping mempool scan");
110+
continue;
111+
}
112+
match scan_mempool(&mempool_config, &mempool_pool, &mempool_http, &mempool_seen, &mut key_cache).await {
113+
Ok(_) => mempool_cb.record_success(),
114+
Err(e) => {
115+
mempool_cb.record_failure();
116+
tracing::error!(error = %e, "Mempool scan error");
117+
}
118+
}
70119

71-
if mempool_config.fee_enabled() {
72-
let _ = billing::check_settlement_payments(&mempool_pool).await;
120+
if mempool_config.fee_enabled() {
121+
let _ = billing::check_settlement_payments(&mempool_pool).await;
122+
}
123+
}
73124
}
74125
}
75126
});
@@ -293,6 +344,87 @@ async fn scan_mempool(
293344
Ok(())
294345
}
295346

347+
/// Process a single mempool transaction pushed via WebSocket (with raw_hex included).
348+
/// Skips the HTTP fetch entirely — goes straight to trial decryption.
349+
async fn process_ws_mempool_tx(
350+
config: &Config,
351+
pool: &SqlitePool,
352+
http: &reqwest::Client,
353+
push: &ws::MempoolPush,
354+
key_cache: &mut Option<KeyCache>,
355+
) -> anyhow::Result<()> {
356+
let pending = invoices::get_pending_invoices(pool).await?;
357+
if pending.is_empty() {
358+
return Ok(());
359+
}
360+
361+
let merchants = crate::merchants::get_all_merchants(pool, &config.encryption_key).await?;
362+
if merchants.is_empty() {
363+
return Ok(());
364+
}
365+
366+
let cached_keys = refresh_key_cache(key_cache, &merchants);
367+
let invoice_index = matching::InvoiceIndex::build(&pending);
368+
369+
let mut invoice_totals: HashMap<String, (invoices::Invoice, i64)> = HashMap::new();
370+
371+
for (_merchant_id, keys) in cached_keys {
372+
match decrypt::try_decrypt_with_keys(&push.raw_hex, keys) {
373+
Ok(outputs) => {
374+
for output in &outputs {
375+
let recipient_hex = hex::encode(output.recipient_raw);
376+
tracing::info!(txid = %push.txid, "[WS] Decrypted mempool output");
377+
tracing::debug!(
378+
txid = %push.txid, memo = %output.memo,
379+
amount = output.amount_zec, "Decrypted output details"
380+
);
381+
382+
if let Some(invoice) = invoice_index.find(&recipient_hex, &output.memo) {
383+
let entry = invoice_totals.entry(invoice.id.clone())
384+
.or_insert((invoice.clone(), 0));
385+
entry.1 += output.amount_zatoshis as i64;
386+
}
387+
}
388+
}
389+
Err(_) => {}
390+
}
391+
}
392+
393+
for (invoice_id, (invoice, tx_total)) in &invoice_totals {
394+
let dust_min = std::cmp::max(
395+
(invoice.price_zatoshis as f64 * decrypt::DUST_THRESHOLD_FRACTION) as i64,
396+
decrypt::DUST_THRESHOLD_MIN_ZATOSHIS,
397+
);
398+
if *tx_total < dust_min && *tx_total < invoice.price_zatoshis {
399+
tracing::debug!(invoice_id, tx_total, dust_min, "Ignoring dust payment");
400+
continue;
401+
}
402+
403+
let new_received = if invoice.status == "underpaid" {
404+
invoices::accumulate_payment(pool, invoice_id, *tx_total).await?
405+
} else {
406+
*tx_total
407+
};
408+
409+
let min = (invoice.price_zatoshis as f64 * decrypt::SLIPPAGE_TOLERANCE) as i64;
410+
411+
if new_received >= min {
412+
let changed = invoices::mark_detected(pool, invoice_id, &push.txid, new_received).await?;
413+
if changed {
414+
let overpaid = new_received > invoice.price_zatoshis + 1000;
415+
spawn_payment_webhook(pool, http, invoice_id, "detected", &push.txid,
416+
invoice.price_zatoshis, new_received, overpaid, &config.encryption_key);
417+
}
418+
} else if invoice.status == "pending" {
419+
invoices::mark_underpaid(pool, invoice_id, new_received, &push.txid).await?;
420+
spawn_payment_webhook(pool, http, invoice_id, "underpaid", &push.txid,
421+
invoice.price_zatoshis, new_received, false, &config.encryption_key);
422+
}
423+
}
424+
425+
Ok(())
426+
}
427+
296428
/// Max blocks to process per iteration. Keeps each call short so the
297429
/// confirmation check at the top runs every ~block_interval seconds.
298430
const MAX_BLOCKS_PER_SCAN: u64 = 100;

0 commit comments

Comments
 (0)