Fix ResultSet.json() race condition on JSONEachRow streams#603
Fix ResultSet.json() race condition on JSONEachRow streams#603Onyx2406 wants to merge 4 commits intoClickHouse:mainfrom
Conversation
When calling json() on a JSONEachRow result, the method first checks _stream.readableEnded and then calls stream() which checks readableEnded again. If the stream ends between these two checks (common with fast/small responses), stream() throws "Stream has been already consumed" even though this is the first consumption call. Fix by introducing a _consumed boolean flag that is set synchronously when any consumption method (text/json/stream) is called. This eliminates the race window between the two readableEnded checks. The fix splits stream() into a public method (with consumption check) and a private _streamImpl() (without check) that json() calls internally after already marking as consumed. This matches the pattern used by the web client's ResultSet which uses isAlreadyConsumed boolean instead of readableEnded. Fixes ClickHouse#575
readableEnded=true just means the 'end' event fired, NOT that someone already consumed the data. For fast/small responses, the stream can end before json() is even called, making readableEnded=true while data is still buffered and available. Checking readableEnded would reject the first consumption call — exactly the bug reported in ClickHouse#575. Only use the _consumed boolean flag, which tracks actual consumption by our code, not stream lifecycle events.
There was a problem hiding this comment.
Pull request overview
Fixes a race condition in the Node.js client ResultSet.json() for streamable JSON formats (e.g. JSONEachRow) where the first call could incorrectly throw "Stream has been already consumed" due to separate readableEnded checks.
Changes:
- Add synchronous consumption tracking (
_consumed+markAsConsumed()) toResultSetand apply it totext(),json(), andstream(). - Refactor
stream()to delegate to a private_streamImpl()sojson()can stream rows without re-checking consumption. - Add a unit regression test intended to simulate the fast-end/race scenario for
JSONEachRow.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| packages/client-node/src/result_set.ts | Introduces a boolean consumption flag and refactors stream()/json() to avoid double-check races on fast-ending streams. |
| packages/client-node/tests/unit/node_result_set.test.ts | Adds a regression test targeting the reported "already consumed" first-call failure. |
Comments suppressed due to low confidence (1)
packages/client-node/src/result_set.ts:136
json()marks the ResultSet as consumed before verifying that the currentformatcan actually be decoded as JSON. In the non-JSON case (Cannot decode ${this.format} as JSON), this is a behavior change: callers can no longer fall back totext()/stream()even though the underlying stream was never read. Consider checking the format first (and throwing) before callingmarkAsConsumed(), or only callingmarkAsConsumed()inside the JSON-capable branches right before consuming the stream.
this.markAsConsumed()
return (await getAsText(this._stream)).toString()
}
/** See {@link BaseResultSet.json}. */
async json<T>(): Promise<ResultJSONType<T, Format>> {
this.markAsConsumed()
// JSONEachRow, etc.
if (isStreamableJSONFamily(this.format as DataFormat)) {
const result: T[] = []
const stream = this._streamImpl<T>()
for await (const rows of stream) {
for (const row of rows) {
result.push(row.json() as T)
}
}
return result as any
}
// JSON, JSONObjectEachRow, etc.
if (isNotStreamableJSONFamily(this.format as DataFormat)) {
const text = await getAsText(this._stream)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address Copilot review feedback: 1. Move validateStreamFormat() before markAsConsumed() in stream(). Previously, if the format was invalid, the ResultSet was permanently marked as consumed even though nothing was actually read, preventing a subsequent text() call from working. 2. Make regression test deterministic by overriding readableEnded to always return true, simulating a fast response. The old code would throw on this; the new code only checks the _consumed flag.
peter-leonov-ch
left a comment
There was a problem hiding this comment.
Thanks a bunch for looking into this tricky issue and bringing a fresh look at the problem. Let's see if we can avoid making "dark" API changes here while addressing the issue in question.
Address reviewer feedback from peter-leonov-ch: Move `markAsConsumed()` from the top of `json()` into each branch that actually consumes the stream (streamable JSON and non-streamable JSON). The unsupported-format path (CSV, etc.) no longer marks the ResultSet as consumed, preserving the ability to call `text()` afterwards — matching the pre-PR exception semantics. Add tests verifying: - json() on CSV throws without marking consumed, text() still works - stream() on non-streamable format throws, text() still works
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /** | ||
| * Mark the result set as consumed and throw if it was already consumed. | ||
| * Uses a boolean flag instead of checking `readableEnded` to avoid a race | ||
| * condition where the stream's 'end' event fires between two separate | ||
| * `readableEnded` checks (e.g. when `json()` calls `stream()` internally | ||
| * for JSONEachRow). See: https://github.com/ClickHouse/clickhouse-js/issues/575 | ||
| * | ||
| * We intentionally do NOT check `readableEnded` here. A stream can have | ||
| * `readableEnded=true` (the 'end' event fired) while its data is still | ||
| * buffered and available for reading. Checking readableEnded would falsely | ||
| * reject the first consumption call for fast/small responses. | ||
| */ | ||
| private markAsConsumed(): void { | ||
| if (this._consumed) { | ||
| throw Error(streamAlreadyConsumedMessage) | ||
| } | ||
| this._consumed = true | ||
| } |
|
@peter-leonov-ch can you review this please? |
Summary
Fix
ResultSet.json()throwing "Stream has been already consumed" on the first call forJSONEachRowformat due to a race condition between tworeadableEndedchecks.Fixes #575
Root Cause
For streamable JSON formats (JSONEachRow, etc.),
json()callsstream()internally:json()checksthis._stream.readableEnded→ false ✓json()callsthis.stream()readableEndedbecomes truestream()checksthis._stream.readableEnded→ true ✗"Stream has been already consumed"— on the first call!This is triggered by fast/small responses (e.g.
SELECT number FROM numbers(1)) where the response arrives fully beforejson()reaches thestream()call.Fix (44 insertions, 2 files)
packages/client-node/src/result_set.ts:_consumedboolean flag toResultSetclassmarkAsConsumed()private method that checks_consumed || readableEndedand sets_consumed = truesynchronouslytext(),json(),stream()all callmarkAsConsumed()as their first operationstream()into publicstream()(with check) and private_streamImpl()(without check)json()calls_streamImpl()after already callingmarkAsConsumed(), eliminating the double-check raceThis matches the pattern already used by the web client's
ResultSetwhich usesisAlreadyConsumedboolean.packages/client-node/__tests__/unit/node_result_set.test.ts:setImmediate) before callingjson(), simulating the race conditionTest Plan
text(),json(),stream()only-once) continue to pass sincemarkAsConsumed()covers all the same cases