From 5fd35b149996f5dd739d0b5ff4a4ca947b46ddd1 Mon Sep 17 00:00:00 2001 From: Ruslan Kasheparov Date: Mon, 30 Mar 2026 18:39:08 +0200 Subject: [PATCH] use sequence zmq event instead of hashblock --- src/bin/electrs.rs | 18 +++++++++--------- src/new_index/zmq.rs | 25 +++++++++++-------------- src/signal.rs | 8 ++++---- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index f8178fbf7..3648d6ec3 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -4,13 +4,8 @@ extern crate log; extern crate electrs; -use crossbeam_channel::{self as channel}; -use error_chain::ChainedError; -use std::{env, process, thread}; -use std::sync::{Arc, RwLock}; -use std::time::Duration; use bitcoin::hex::DisplayHex; -use rand::{rng, RngCore}; +use crossbeam_channel::{self as channel}; use electrs::{ config::Config, daemon::Daemon, @@ -21,6 +16,11 @@ use electrs::{ rest, signal::Waiter, }; +use error_chain::ChainedError; +use rand::{rng, RngCore}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use std::{env, process, thread}; #[cfg(feature = "otlp-tracing")] use electrs::otlp_trace; @@ -49,13 +49,13 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom { } fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<()> { - let (block_hash_notify, block_hash_receive) = channel::bounded(1); - let signal = Waiter::start(block_hash_receive); + let (zmq_event_notify, zmq_event_receive) = channel::bounded(1); + let signal = Waiter::start(zmq_event_receive); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); if let Some(zmq_addr) = config.zmq_addr.as_ref() { - zmq::start(&format!("tcp://{zmq_addr}"), block_hash_notify); + zmq::start(&format!("tcp://{zmq_addr}"), zmq_event_notify); } let daemon = Arc::new(Daemon::new( diff --git a/src/new_index/zmq.rs b/src/new_index/zmq.rs index 73d5d7964..99256e3b9 100644 --- a/src/new_index/zmq.rs +++ b/src/new_index/zmq.rs @@ -1,9 +1,12 @@ -use bitcoin::{hashes::Hash, BlockHash}; use crossbeam_channel::Sender; use crate::util::spawn_thread; -pub fn start(url: &str, block_hash_notify: Sender) { +pub enum ZmqEvent { + Sequence, +} + +pub fn start(url: &str, zmq_event_notify: Sender) { log::debug!("Starting ZMQ thread"); let ctx = zmq::Context::new(); let subscriber: zmq::Socket = ctx.socket(zmq::SUB).expect("failed creating subscriber"); @@ -11,24 +14,18 @@ pub fn start(url: &str, block_hash_notify: Sender) { .connect(url) .expect("failed connecting subscriber"); - // subscriber.set_subscribe(b"rawtx").unwrap(); subscriber - .set_subscribe(b"hashblock") - .expect("failed subscribing to hashblock"); + .set_subscribe(b"sequence") + .expect("failed subscribing to sequence"); spawn_thread("zmq", move || loop { match subscriber.recv_multipart(0) { Ok(data) => match (data.get(0), data.get(1)) { (Some(topic), Some(data)) => { - if &topic[..] == &[114, 97, 119, 116, 120] { - //rawtx - } else if &topic[..] == &[104, 97, 115, 104, 98, 108, 111, 99, 107] { - //hashblock - let mut reversed = data.to_vec(); - reversed.reverse(); - if let Ok(block_hash) = BlockHash::from_slice(&reversed[..]) { - log::debug!("New block from ZMQ: {block_hash}"); - let _ = block_hash_notify.send(block_hash); + if &topic[..] == b"sequence" { + log::debug!("New ZMQ sequence event"); + if let Err(e) = zmq_event_notify.send(ZmqEvent::Sequence) { + log::error!("Failed to send ZMQ event: {e}"); } } } diff --git a/src/signal.rs b/src/signal.rs index ee319c3f6..4e6a64879 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,4 +1,3 @@ -use bitcoin::BlockHash; use crossbeam_channel::{self as channel, after, select}; use std::thread; use std::time::{Duration, Instant}; @@ -6,11 +5,12 @@ use std::time::{Duration, Instant}; use signal_hook::consts::{SIGINT, SIGTERM, SIGUSR1}; use crate::errors::*; +use crate::new_index::zmq::ZmqEvent; #[derive(Clone)] // so multiple threads could wait on signals pub struct Waiter { receiver: channel::Receiver, - zmq_receiver: channel::Receiver, + zmq_receiver: channel::Receiver, } fn notify(signals: &[i32]) -> channel::Receiver { @@ -27,13 +27,13 @@ fn notify(signals: &[i32]) -> channel::Receiver { } impl Waiter { - pub fn start(block_hash_receive: channel::Receiver) -> Waiter { + pub fn start(zmq_event_receive: channel::Receiver) -> Waiter { Waiter { receiver: notify(&[ SIGINT, SIGTERM, SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) ]), - zmq_receiver: block_hash_receive, + zmq_receiver: zmq_event_receive, } }