@@ -27,6 +27,7 @@ namespace server {
2727#define CLASS protocol_electrum
2828
2929using namespace system ;
30+ using namespace system ::chain;
3031using namespace network ::rpc;
3132using namespace std ::placeholders;
3233constexpr auto relaxed = std::memory_order_relaxed;
@@ -104,10 +105,12 @@ void protocol_electrum::handle_blockchain_outpoint_get_status(const code& ec,
104105 return ;
105106 }
106107
107- chain::point prevout{ hash, index };
108- send_outpoint_status (prevout, spk_hint);
108+ send_outpoint_status ({ hash, index }, spk_hint);
109109}
110110
111+ // subscribe
112+ // ----------------------------------------------------------------------------
113+
111114void protocol_electrum::handle_blockchain_outpoint_subscribe (const code& ec,
112115 rpc_interface::blockchain_outpoint_subscribe, const std::string& tx_hash,
113116 double txout_idx, const std::string& spk_hint) NOEXCEPT
@@ -130,16 +133,54 @@ void protocol_electrum::handle_blockchain_outpoint_subscribe(const code& ec,
130133 return ;
131134 }
132135
133- // TODO: subscribe.
134- // /////////////////////////////////////////////////////////////////////////
135- subscribed_outpoint_.store (true , relaxed);
136- // /////////////////////////////////////////////////////////////////////////
136+ // Outpoint status is not long-running so the notification strand is only
137+ // used to guard the notifications set. No need for the monitor, as
138+ // do_outpoint_subscribe() is trivial and pointless to cancel.
139+ // //monitor(true);
140+ NOTIFY (do_outpoint_subscribe, point{ hash, index }, spk_hint);
141+ }
142+
143+ void protocol_electrum::do_outpoint_subscribe (const point& prevout,
144+ const std::string& hint) NOEXCEPT
145+ {
146+ // Cancellability is preserved because not on channel strand.
147+ BC_ASSERT (notification_strand_.running_in_this_thread ());
148+
149+ code ec{};
150+ if (outpoint_subscriptions_.size () < options_.maximum_subscriptions )
151+ {
152+ // Subscription response is idempotent.
153+ outpoint_subscriptions_.insert (prevout);
154+ subscribed_outpoint_.store (true , relaxed);
155+ }
156+ else
157+ {
158+ ec = error::subscription_limit;
159+ }
160+
161+ POST (complete_outpoint_subscribe, ec, prevout, hint);
162+ }
137163
138- chain::point prevout{ hash, index };
139- if (!send_outpoint_status (prevout, spk_hint))
164+ void protocol_electrum::complete_outpoint_subscribe (const code& ec,
165+ const point& prevout, const std::string& hint) NOEXCEPT
166+ {
167+ BC_ASSERT (stranded ());
168+ // //monitor(false);
169+ if (stopped ())
140170 return ;
171+
172+ if (ec)
173+ {
174+ send_code (ec);
175+ return ;
176+ }
177+
178+ send_outpoint_status (prevout, hint);
141179}
142180
181+ // unsubscribe
182+ // ----------------------------------------------------------------------------
183+
143184void protocol_electrum::handle_blockchain_outpoint_unsubscribe (const code& ec,
144185 rpc_interface::blockchain_outpoint_unsubscribe, const std::string& tx_hash,
145186 double txout_idx) NOEXCEPT
@@ -162,51 +203,69 @@ void protocol_electrum::handle_blockchain_outpoint_unsubscribe(const code& ec,
162203 return ;
163204 }
164205
165- chain::point prevout{ hash, index };
206+ NOTIFY (do_outpoint_unsubscribe, point{ hash, index });
207+ }
208+
209+ void protocol_electrum::do_outpoint_unsubscribe (const point& prevout) NOEXCEPT
210+ {
211+ BC_ASSERT (notification_strand_.running_in_this_thread ());
166212
167- // TODO: unsubscribe.
168- // /////////////////////////////////////////////////////////////////////////
169- const auto prior = subscribed_outpoint_.load (relaxed);
170- // /////////////////////////////////////////////////////////////////////////
213+ const auto found = to_bool (outpoint_subscriptions_.erase (prevout));
214+ if (is_zero (outpoint_subscriptions_.size ()))
215+ subscribed_outpoint_.store (false , relaxed);
171216
172- send_result (prior, 16 , BIND (complete, _1) );
217+ POST (complete_outpoint_unsubscribe, found );
173218}
174219
175- // notification.
176- // ============================================================================
220+ void protocol_electrum::complete_outpoint_unsubscribe (bool found) NOEXCEPT
221+ {
222+ send_result (found, 16 , BIND (complete, _1));
223+ }
224+
225+ // notify
226+ // ----------------------------------------------------------------------------
177227
178228// Notifier for blockchain_outpoint_subscribe events.
179- void protocol_electrum::do_outpoint (node::header_t link ) NOEXCEPT
229+ void protocol_electrum::do_outpoint (node::header_t ) NOEXCEPT
180230{
231+ // Cancellability is preserved because not on channel strand.
181232 BC_ASSERT (notification_strand_.running_in_this_thread ());
182233
183- // TODO: get prevout from event.
184- // /////////////////////////////////////////////////////////////////////////
185- chain::point prevout{};
186- // /////////////////////////////////////////////////////////////////////////
187-
188- object_t status{};
189- if (!get_outpoint_status (status, prevout))
234+ for (const auto & prevout: outpoint_subscriptions_)
190235 {
191- LOGF (" Electrum::do_outpoint, outpoint not found (" << link << " )." );
192- return ;
236+ if (stopping_)
237+ return ;
238+
239+ auto status = std::make_unique<object_t >();
240+ if (!get_outpoint_status (*status, prevout))
241+ {
242+ LOGV (" Electrum::do_outpoint, outpoint not found." );
243+ }
244+ else
245+ {
246+ // Asio-buffered message (small, not under caller control).
247+ POST (outpoint_notify, std::move (status), prevout);
248+ }
193249 }
250+ }
251+
252+ void protocol_electrum::outpoint_notify (const std::unique_ptr<object_t >& status,
253+ const point& prevout) NOEXCEPT
254+ {
255+ BC_ASSERT (stranded ());
194256
195- // TODO: post_notification(bounce to network strand).
196257 send_notification (" blockchain.outpoint.subscribe" , array_t
197258 {
198259 array_t { encode_hash (prevout.hash ()), prevout.index () },
199- std::move (status)
260+ std::move (* status)
200261 }, 128 , BIND (handle_send, _1));
201262}
202263
203- // ============================================================================
204-
205- // utility.
264+ // utility
206265// ----------------------------------------------------------------------------
207266
208267bool protocol_electrum::get_outpoint_status (object_t & status,
209- const chain:: point& prevout) const NOEXCEPT
268+ const point& prevout) const NOEXCEPT
210269{
211270 // May be on either network or notification strand (thread safe).
212271
@@ -225,17 +284,17 @@ bool protocol_electrum::get_outpoint_status(object_t& status,
225284 return true ;
226285}
227286
228- bool protocol_electrum::send_outpoint_status (const chain:: point& prevout,
229- const std::string& spk_hint ) NOEXCEPT
287+ bool protocol_electrum::send_outpoint_status (const point& prevout,
288+ const std::string& hint ) NOEXCEPT
230289{
231290 BC_ASSERT (stranded ());
232291
233292 // This is parsed for correctness but is not used.
234293 // Script is advisory, and should match output script.
235- if (!spk_hint .empty ())
294+ if (!hint .empty ())
236295 {
237296 data_chunk bytes{};
238- if (!decode_base16 (bytes, spk_hint ))
297+ if (!decode_base16 (bytes, hint ))
239298 {
240299 send_code (error::invalid_argument);
241300 return false ;
0 commit comments