feat(api): add pending transaction support in the eth subscription API#6941
feat(api): add pending transaction support in the eth subscription API#6941akaladarshi wants to merge 4 commits into
eth subscription API#6941Conversation
WalkthroughRefactors eth_subscribe to stream-based handlers (newHeads/logs/pendingTransactions), adds a broadcast->Stream adapter and tests, promotes a tx-hash helper, removes obsolete chain helpers, moves dead-code allows to narrower locations, and adds RPC subscription tests and a changelog entry. ChangesETH Pubsub Stream-based Refactoring
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Comment |
c221f29 to
f4b122e
Compare
f4b122e to
6f51115
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/methods/eth/pubsub.rs`:
- Around line 126-128: The stream currently iterates only changes.applies so
reverted tipsets are ignored; update the flat_map over
subscription_stream(head_rx) to emit both applies and reverts (e.g., pair each
tipset with a boolean or enum indicating is_revert) instead of only
changes.applies, then adjust the subsequent filter_map closure that binds ts to
accept that (tipset, is_revert) and produce log removal events for reverts and
normal events for applies; target the subscription_stream/head_rx pipeline and
the closure capturing ts in pubsub.rs so reorg-driven log removals are emitted.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 023ea685-f51d-4356-8476-c6eea9c54e3d
📒 Files selected for processing (7)
src/message_pool/msgpool/events.rssrc/message_pool/msgpool/msg_pool.rssrc/rpc/methods/chain.rssrc/rpc/methods/eth.rssrc/rpc/methods/eth/pubsub.rssrc/utils/broadcast/mod.rssrc/utils/broadcast/tests.rs
💤 Files with no reviewable changes (2)
- src/message_pool/msgpool/msg_pool.rs
- src/rpc/methods/chain.rs
| let stream = subscription_stream(head_rx) | ||
| .flat_map(|changes| futures::stream::iter(changes.applies)) | ||
| .filter_map(move |ts| { |
There was a problem hiding this comment.
Handle revert tipsets in logs subscriptions.
On Line 127, the stream only iterates changes.applies, so reverted tipsets are dropped and reorg-driven log removals cannot be emitted.
💡 Suggested fix
- let stream = subscription_stream(head_rx)
- .flat_map(|changes| futures::stream::iter(changes.applies))
+ let stream = subscription_stream(head_rx)
+ .flat_map(|changes| {
+ futures::stream::iter(
+ changes
+ .reverts
+ .into_iter()
+ .chain(changes.applies.into_iter()),
+ )
+ })
.filter_map(move |ts| {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/rpc/methods/eth/pubsub.rs` around lines 126 - 128, The stream currently
iterates only changes.applies so reverted tipsets are ignored; update the
flat_map over subscription_stream(head_rx) to emit both applies and reverts
(e.g., pair each tipset with a boolean or enum indicating is_revert) instead of
only changes.applies, then adjust the subsequent filter_map closure that binds
ts to accept that (tipset, is_revert) and produce log removal events for reverts
and normal events for applies; target the subscription_stream/head_rx pipeline
and the closure capturing ts in pubsub.rs so reorg-driven log removals are
emitted.
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 8 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
6f51115 to
709de17
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/tool/subcommands/api_cmd/stateful_tests.rs`:
- Around line 529-551: The current loops call next_subscription_payload(&mut
ws_stream, &subscription_id, Duration::from_secs(...)).await with a
per-notification timeout which resets on every unrelated event; change to use a
total deadline by recording let deadline = Instant::now() + TOTAL_DURATION (e.g.
120s for pending tx loop, 300s for logs loop) before the loop, then each
iteration compute remaining = deadline.saturating_duration_since(Instant::now())
and pass that remaining duration into next_subscription_payload; if
remaining.is_zero() or elapsed >= TOTAL_DURATION return an error (or bail) so
the test fails instead of hanging. Apply this same pattern to both the
pendingTransactions loop (using tx_hash, ws_stream, subscription_id) and the
logs loop (the one using 300s budget).
- Around line 182-188: The connect_ws helper currently forces the URL scheme to
"ws" and breaks secure endpoints; update the connect_ws function to derive the
websocket scheme from the rpc::Client's base URL (use "wss" when original scheme
is "https", "ws" when "http", and preserve existing "ws"/"wss" if present), call
url.set_scheme(...) with that derived scheme (handling set_scheme errors as
before), then set_path("/rpc/v1") and proceed to connect_async(url.as_str()) to
return the EthSubStream.
- Around line 596-606: The test currently treats the `eth_subscribe("logs")`
payload as an array; instead require a single object: check
`payload.is_object()` (not `is_array()`), deserialize into a single `LogView`
(not `Vec<LogView>`), remove the empty-array check, and update the matching
logic to compare the single `log`'s `transaction_hash` and `topics` against
`tx_hash` and `tx.topic` (the code around `payload`, `LogView`, `tx_hash`,
`tx.topic`, and the `matched` variable).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 73ae3705-5e85-4aec-9388-62e6528b206f
📒 Files selected for processing (10)
CHANGELOG.mdsrc/message_pool/msgpool/events.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/pending_store.rssrc/rpc/methods/chain.rssrc/rpc/methods/eth.rssrc/rpc/methods/eth/pubsub.rssrc/tool/subcommands/api_cmd/stateful_tests.rssrc/utils/broadcast/mod.rssrc/utils/broadcast/tests.rs
💤 Files with no reviewable changes (2)
- src/message_pool/msgpool/pending_store.rs
- src/rpc/methods/chain.rs
✅ Files skipped from review due to trivial changes (2)
- CHANGELOG.md
- src/message_pool/msgpool/msg_pool.rs
| async fn connect_ws(client: &rpc::Client) -> anyhow::Result<EthSubStream> { | ||
| let mut url = client.base_url().clone(); | ||
| url.set_scheme("ws") | ||
| .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; | ||
| url.set_path("/rpc/v1"); | ||
| let (ws_stream, _) = connect_async(url.as_str()).await?; | ||
| Ok(ws_stream) |
There was a problem hiding this comment.
Preserve wss:// for secure RPC endpoints.
This helper forces every base URL onto ws://, so the new subscription tests also fail against https:// RPC servers that require wss://.
💡 Suggested fix
async fn connect_ws(client: &rpc::Client) -> anyhow::Result<EthSubStream> {
let mut url = client.base_url().clone();
- url.set_scheme("ws")
+ let ws_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ scheme => anyhow::bail!("unsupported RPC URL scheme: {scheme}"),
+ };
+ url.set_scheme(ws_scheme)
.map_err(|_| anyhow::anyhow!("failed to set scheme"))?;
url.set_path("/rpc/v1");
let (ws_stream, _) = connect_async(url.as_str()).await?;
Ok(ws_stream)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn connect_ws(client: &rpc::Client) -> anyhow::Result<EthSubStream> { | |
| let mut url = client.base_url().clone(); | |
| url.set_scheme("ws") | |
| .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; | |
| url.set_path("/rpc/v1"); | |
| let (ws_stream, _) = connect_async(url.as_str()).await?; | |
| Ok(ws_stream) | |
| async fn connect_ws(client: &rpc::Client) -> anyhow::Result<EthSubStream> { | |
| let mut url = client.base_url().clone(); | |
| let ws_scheme = match url.scheme() { | |
| "http" => "ws", | |
| "https" => "wss", | |
| scheme => anyhow::bail!("unsupported RPC URL scheme: {scheme}"), | |
| }; | |
| url.set_scheme(ws_scheme) | |
| .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; | |
| url.set_path("/rpc/v1"); | |
| let (ws_stream, _) = connect_async(url.as_str()).await?; | |
| Ok(ws_stream) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/tool/subcommands/api_cmd/stateful_tests.rs` around lines 182 - 188, The
connect_ws helper currently forces the URL scheme to "ws" and breaks secure
endpoints; update the connect_ws function to derive the websocket scheme from
the rpc::Client's base URL (use "wss" when original scheme is "https", "ws" when
"http", and preserve existing "ws"/"wss" if present), call url.set_scheme(...)
with that derived scheme (handling set_scheme errors as before), then
set_path("/rpc/v1") and proceed to connect_async(url.as_str()) to return the
EthSubStream.
| let outcome: anyhow::Result<()> = async { | ||
| loop { | ||
| let payload = next_subscription_payload( | ||
| &mut ws_stream, | ||
| &subscription_id, | ||
| Duration::from_secs(120), | ||
| ) | ||
| .await?; | ||
| // a pending tx is a single hash string. | ||
| anyhow::ensure!( | ||
| payload.is_string(), | ||
| "pendingTransactions must yield a tx-hash string, got: {payload}" | ||
| ); | ||
| let hash: EthHash = serde_json::from_value(payload) | ||
| .context("pendingTransactions payload is not an Eth hash")?; | ||
| // Identity: it must be the exact tx we just submitted. | ||
| if hash == tx_hash { | ||
| break; | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
| .await; |
There was a problem hiding this comment.
Use a total deadline, not a per-notification timeout.
Both loops reset the timeout after every unrelated event. On a busy network, that can hang the test indefinitely if the expected notification never arrives.
💡 Suggested fix pattern
- let outcome: anyhow::Result<()> = async {
- loop {
- let payload = next_subscription_payload(
- &mut ws_stream,
- &subscription_id,
- Duration::from_secs(120),
- )
- .await?;
+ let outcome: anyhow::Result<()> = async {
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(120);
+ loop {
+ let remaining =
+ deadline.saturating_duration_since(tokio::time::Instant::now());
+ anyhow::ensure!(
+ !remaining.is_zero(),
+ "timeout waiting for matching pendingTransactions notification"
+ );
+ let payload =
+ next_subscription_payload(&mut ws_stream, &subscription_id, remaining)
+ .await?;
// a pending tx is a single hash string.
anyhow::ensure!(
payload.is_string(),
"pendingTransactions must yield a tx-hash string, got: {payload}"
);Apply the same pattern to the logs loop with its 300-second budget.
Also applies to: 588-613
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/tool/subcommands/api_cmd/stateful_tests.rs` around lines 529 - 551, The
current loops call next_subscription_payload(&mut ws_stream, &subscription_id,
Duration::from_secs(...)).await with a per-notification timeout which resets on
every unrelated event; change to use a total deadline by recording let deadline
= Instant::now() + TOTAL_DURATION (e.g. 120s for pending tx loop, 300s for logs
loop) before the loop, then each iteration compute remaining =
deadline.saturating_duration_since(Instant::now()) and pass that remaining
duration into next_subscription_payload; if remaining.is_zero() or elapsed >=
TOTAL_DURATION return an error (or bail) so the test fails instead of hanging.
Apply this same pattern to both the pendingTransactions loop (using tx_hash,
ws_stream, subscription_id) and the logs loop (the one using 300s budget).
| anyhow::ensure!( | ||
| payload.is_array(), | ||
| "logs must yield a JSON array, got: {payload}" | ||
| ); | ||
| let logs: Vec<LogView> = serde_json::from_value(payload) | ||
| .context("logs payload is not a list of Eth logs")?; | ||
| anyhow::ensure!(!logs.is_empty(), "received an empty logs notification"); | ||
| // Identity: a log carrying our event topic and `our` tx hash. | ||
| let matched = logs.iter().any(|log| { | ||
| log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For Ethereum JSON-RPC eth_subscribewith"logs", is each eth_subscriptionnotification'sparams.result a single log object or an array of logs?
💡 Result:
For an Ethereum JSON-RPC eth_subscribe subscription with the "logs" type, each eth_subscription notification's params.result is a single log object, not an array of logs [1][2][3]. When the subscription triggers, the JSON-RPC server sends a notification message with a params field containing the subscription ID and the result [1][2][3]. The result field contains the specific log object that matched the filter criteria [1][2][3]. This structure allows clients to process each log individually as it is emitted by the network [4][5].
Citations:
- 1: https://www.chainnodes.org/docs/polygon/eth_subscribe
- 2: https://geth.ethereum.org/docs/interacting-with-geth/rpc/pubsub
- 3: https://docs.clishachain.com/json-rpc-commands/rpc-pubsub
- 4: https://web3js.readthedocs.io/en/v1.8.1/web3-eth-subscribe.html
- 5: https://web3js.readthedocs.io/en/v1.2.1/web3-eth-subscribe.html
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== stateful_tests.rs (540-660) ==="
sed -n '540,660p' src/tool/subcommands/api_cmd/stateful_tests.rs
echo
echo "=== Search for LogView definition ==="
rg -n "struct LogView|type LogView|LogView" src/tool/subcommands/api_cmd/stateful_tests.rs
echo
echo "=== Search for payload parsing around logs subscription ==="
rg -n "eth_subscribe\\(\"logs\"\\)|payload\\.(is_array|is_object)|from_value\\(payload\\)|logs payload" src/tool/subcommands/api_cmd/stateful_tests.rsRepository: ChainSafe/forest
Length of output: 5154
Update the eth_subscribe("logs") test to expect a single log object, not a JSON array.
For eth_subscribe with "logs", params.result is a single log object; this test currently requires payload.is_array() and deserializes into Vec<LogView>, making it non-spec-compliant and brittle.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/tool/subcommands/api_cmd/stateful_tests.rs` around lines 596 - 606, The
test currently treats the `eth_subscribe("logs")` payload as an array; instead
require a single object: check `payload.is_object()` (not `is_array()`),
deserialize into a single `LogView` (not `Vec<LogView>`), remove the empty-array
check, and update the matching logic to compare the single `log`'s
`transaction_hash` and `topics` against `tx_hash` and `tx.topic` (the code
around `payload`, `LogView`, `tx_hash`, `tx.topic`, and the `matched` variable).
Summary of changes
Changes introduced in this pull request:
sinkReference issue to close (if applicable)
Closes #6031
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
New Features
Refactor
Tests
Documentation