Skip to content

fix(storage-opendal): add TimeoutLayer inside RetryLayer to bound hangs#2455

Open
AlJohri wants to merge 1 commit into
apache:mainfrom
AlJohri:add-timeout-layer-to-opendal-storage
Open

fix(storage-opendal): add TimeoutLayer inside RetryLayer to bound hangs#2455
AlJohri wants to merge 1 commit into
apache:mainfrom
AlJohri:add-timeout-layer-to-opendal-storage

Conversation

@AlJohri
Copy link
Copy Markdown

@AlJohri AlJohri commented May 14, 2026

Which issue does this PR close?

What changes are included in this PR?

iceberg-storage-opendal::OpenDalStorage::create_operator currently wraps the built operator with RetryLayer::new(). That layer retries when its inner future returns Err, which is what was needed for the fast-failing transient errors that #788 set out to fix — and it works correctly for that case.

It does not bound futures that park indefinitely without producing an Err. The canonical example is an S3 Range-GET whose underlying TCP connection is silently dropped (NAT/conntrack eviction, route flap, server-side disconnect with no RST received). The response future stays Pending forever; RetryLayer has nothing to retry against because no error is ever produced.

This PR adds TimeoutLayer::new() inside RetryLayer to close that gap. Per opendal's docs:

While using TimeoutLayer with RetryLayer at the same time, please make sure timeout layer showed up before retry layer. Since timeout layer will drop future, leaving retry layer in a bad state.

opendal's defaults (60 s for non-IO ops like stat/list/delete, 10 s per IO chunk for read/write) are used; each retry attempt is now independently bounded, hung connections surface as a timeout error which RetryLayer then retries with backoff, and unrecoverable hangs propagate a clean error to the caller in seconds rather than the inner future parking forever.

The diff is two lines in crates/storage/opendal/src/lib.rs (the import and the layer composition) plus an updated comment explaining the ordering invariant.

How we hit this

In production: a Rust application using iceberg-storage-opendal::OpenDalStorageFactory::S3 to read iceberg tables on AWS hung for 24 hours when iceberg try_next() returned a Pending future whose underlying opendal Range-GET against S3 never completed. Core-dump analysis showed:

  • Two in-flight HTTP/1.1 Range-GETs in heap (one for ~723 KB, one for ~367 KB), both signed with valid temporary credentials.
  • No active TCP connection to any S3 IP at the time of the dump (/proc/<pid>/net/tcp had only the OTel collector socket).
  • gdb backtraces of all 35 threads showed the tokio runtime fully idle: workers parked in Condvar::wait_until_internal, main thread in Runtime::block_on.

So the response future was permanently Pending after the TCP connection silently died, with no error to propagate. The RetryLayer was in the chain but dormant because there was no error to react to. Adding TimeoutLayer would have produced a timeout Err within seconds, RetryLayer would have retried with backoff, and the operation would have surfaced cleanly within ~90 s instead of hanging until the pod's activeDeadlineSeconds killed it 24 h later.

Context on the original composition

RetryLayer::new() was added in #788 (Dec 2024) to bound transient "connection closed before message completed" errors. That PR's description explicitly noted that configurability could be a follow-up. It correctly addressed the fast-failing transient case; the silent-hang case wasn't in scope. This PR extends the layer composition to also cover that second class.

A user filed #1288 in May 2025 asking for IO-operation timeout support; it received no maintainer engagement and was auto-closed by the stale bot. This PR closes that issue with the minimal change: add a per-attempt bound so RetryLayer has a timeout error to retry against. opendal's docs explicitly document the ordering rule that applies when both layers are used together, which this PR follows.

Are these changes tested?

Upstream CI on this PR (already running)

The full project CI ran on this draft and the codebase-internal tests pass — including the S3 integration suite against MinIO that I couldn't run locally:

  • Tests (default) — full cargo nextest ✅ pass (7m55s)
  • Tests (doc) ✅ pass
  • check_standalone (every crate builds in isolation) ✅ pass
  • check, build on Linux + macOS ✅ pass
  • build_with_no_default_features on Linux + macOS + Windows ✅ pass
  • MSRV ✅ pass
  • CodeQL ✅ pass
  • Windows test job still pending at the time of writing

So healthy-path reads/writes/stats/deletes against MinIO still work with the new layer composition — opendal's defaults (60 s non-IO, 10 s per IO chunk) are not tight enough to false-positive on normal test traffic.

Local validation that the fix actually fixes the bug

The existing test suite doesn't have a "hung-connection" harness, so it can't directly validate the new behaviour. I wrote a small standalone reproducer that does: it spawns a TCP tarpit (accepts connections, never replies — exactly mimicking a silently-dropped TCP session), points opendal at it, and measures time-to-error under three layer stacks.

Result:

[                               NO LAYERS] reading... ABORTED by harness after 20.00s — still hanging
[        RetryLayer ONLY (upstream today)] reading... ABORTED by harness after 20.00s — still hanging
[      TimeoutLayer + RetryLayer (PR fix)] errored in 15.02s:
                                            Unexpected (persistent) at read,
                                            context: { timeout: 2 } => io timeout reached
  • NO LAYERS — operation hangs forever; harness aborts at 20 s.
  • RetryLayer only (this crate today) — also hangs forever, because RetryLayer has nothing to retry against when the inner future never produces an Err. This is the gap the PR closes.
  • TimeoutLayer + RetryLayer (this PR) — errors cleanly in 15.02 s with a structured io timeout reached error naming the timeout duration. Math checks out: 2 s io_timeout × 3 retry attempts + default exponential backoff between attempts (≈ 1+2+4 s) ≈ 13–15 s.

This is reproducible in ~5 minutes by anyone with a Rust toolchain. Happy to upstream this as an integration test in crates/storage/opendal/tests/ if maintainers want — the harness is ~70 lines and depends only on opendal + tokio + anyhow.

Click to expand: full tarpit harness source

Cargo.toml:

[package]
name = "opendal-timeout-test"
version = "0.0.0"
edition = "2024"

[[bin]]
name = "tarpit"
path = "src/main.rs"

[dependencies]
opendal = { version = "0.56", features = ["services-s3"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1"

src/main.rs:

//! Spawns a TCP tarpit (accepts connections, never responds) and tests
//! whether an opendal S3 operator hangs forever vs times out cleanly
//! depending on whether TimeoutLayer is in the layer stack.

use std::sync::Arc;
use std::time::{Duration, Instant};

use opendal::Operator;
use opendal::layers::{RetryLayer, TimeoutLayer};
use opendal::services::S3;
use tokio::net::TcpListener;

async fn run_tarpit(listener: Arc<TcpListener>) {
    // Accept connections, hold them open, never read or write.
    loop {
        match listener.accept().await {
            Ok((stream, _)) => {
                tokio::spawn(async move {
                    let _hold = stream;
                    tokio::time::sleep(Duration::from_secs(3600)).await;
                });
            }
            Err(_) => return,
        }
    }
}

fn make_s3_builder(endpoint: &str) -> S3 {
    let mut b = S3::default();
    b = b.endpoint(endpoint);
    b = b.region("us-east-1");
    b = b.bucket("test-bucket");
    b = b.access_key_id("AKIATEST");
    b = b.secret_access_key("secrettest");
    b = b.allow_anonymous();
    b
}

async fn try_read(op: Operator, label: &str, abort_after: Duration) {
    print!("[{label:>40}] reading... ");
    let start = Instant::now();
    let result = tokio::time::timeout(abort_after, op.read("does-not-exist")).await;
    let elapsed = start.elapsed();
    match result {
        Ok(Ok(_))  => println!("succeeded?! elapsed={elapsed:.2?}"),
        Ok(Err(e)) => println!("errored in {elapsed:.2?}: {e}"),
        Err(_)     => println!("ABORTED by harness after {abort_after:.2?} — still hanging"),
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;
    let endpoint = format!("http://{addr}");
    println!("tarpit listening on {endpoint}");

    let listener = Arc::new(listener);
    tokio::spawn(run_tarpit(listener.clone()));

    let harness_abort = Duration::from_secs(20);

    // Case A: no layers.
    {
        let op = Operator::new(make_s3_builder(&endpoint))?.finish();
        try_read(op, "NO LAYERS", harness_abort).await;
    }
    // Case B: today's upstream composition (RetryLayer only).
    {
        let op = Operator::new(make_s3_builder(&endpoint))?
            .layer(RetryLayer::new())
            .finish();
        try_read(op, "RetryLayer ONLY (upstream today)", harness_abort).await;
    }
    // Case C: this PR (TimeoutLayer inside RetryLayer).
    {
        let op = Operator::new(make_s3_builder(&endpoint))?
            .layer(
                TimeoutLayer::new()
                    .with_timeout(Duration::from_secs(2))
                    .with_io_timeout(Duration::from_secs(2)),
            )
            .layer(RetryLayer::new().with_max_times(3))
            .finish();
        try_read(op, "TimeoutLayer + RetryLayer (PR fix)", harness_abort).await;
    }

    Ok(())
}

Notes

  • No API change. No new builder methods, no new fields, no breaking changes for current users.
  • The same one-line composition runs for all storage variants (Memory, Fs, S3, Gcs, Oss, Azdls). TimeoutLayer applies uniformly. For in-memory/fs backends the timeout is effectively never hit; the cost is negligible. For network backends it is the actual fix.
  • If a user genuinely needs longer-than-default bounds (e.g. fetching very large files over a slow link), the post-rfc: Making Storage a Trait #1885 approach is to implement iceberg::Storage themselves and inject their preferred layer stack — but the default path should not hang silently, which is what this PR addresses. Happy to expand into a configurable form (e.g. accept a TimeoutLayer on the OpenDalStorageFactory::S3 variant) in review if maintainers prefer.

The current `create_operator` applies `RetryLayer` with no
`TimeoutLayer`. Per opendal's docs, `TimeoutLayer` must be inside
`RetryLayer`; without it, a future parked indefinitely on a silently
dropped TCP connection never produces an `Err` and `RetryLayer` cannot
retry — the caller hangs forever.

We hit this in production: an iceberg `try_next()` Range-GET parked for
24h until a Kubernetes Job's `activeDeadlineSeconds` killed the pod.
Core-dump analysis confirmed two in-flight Range-GETs sitting in heap
with no live TCP connection, no error propagated, no retries attempted.

This adds `TimeoutLayer::new()` with opendal's defaults (60s for
non-IO ops; 10s per IO chunk) inside `RetryLayer`. Each retry attempt
is now independently bounded; a hung connection times out, RetryLayer
sees the error, and either succeeds on a subsequent attempt or
propagates a clean error to the caller.

Refs: apache#1288 (auto-closed by stale bot without engagement), apache#788 (the
PR that added the unbounded RetryLayer).
@AlJohri AlJohri closed this May 14, 2026
@AlJohri AlJohri reopened this May 14, 2026
@AlJohri AlJohri marked this pull request as ready for review May 14, 2026 19:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: timeout support for IO operations

1 participant