Skip to content

Commit 3dd89a9

Browse files
committed
stream: cache chunk data between 1st and 2nd pass in Writev
Signed-off-by: Daijiro Wachi <daijiro.wachi@gmail.com>
1 parent c3a00e5 commit 3dd89a9

3 files changed

Lines changed: 197 additions & 37 deletions

File tree

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
const { Writable } = require('stream');
5+
6+
// Benchmarks StreamBase::Writev with string chunks, exercising the chunk
7+
// cache that avoids redundant V8 array accesses, ToString, and ParseEncoding
8+
// calls between the sizing pass and the write pass.
9+
const bench = common.createBenchmark(main, {
10+
n: [1e4],
11+
chunks: [4, 16, 64],
12+
encoding: ['utf8', 'latin1'],
13+
type: ['string', 'buffer', 'mixed'],
14+
});
15+
16+
function main({ n, chunks, encoding, type }) {
17+
const str = 'Hello, benchmark! '.repeat(4);
18+
const buf = Buffer.from(str, encoding);
19+
20+
const wr = new Writable({
21+
writev(chunks, cb) { cb(); },
22+
write(chunk, enc, cb) { cb(); },
23+
});
24+
25+
bench.start();
26+
for (let i = 0; i < n; i++) {
27+
wr.cork();
28+
for (let j = 0; j < chunks; j++) {
29+
if (type === 'buffer') {
30+
wr.write(buf);
31+
} else if (type === 'string') {
32+
wr.write(str, encoding);
33+
} else if (j % 2 === 0) {
34+
// Alternate buffer and string to hit the mixed (non-all_buffers) path.
35+
wr.write(buf);
36+
} else {
37+
wr.write(str, encoding);
38+
}
39+
}
40+
wr.uncork();
41+
}
42+
bench.end(n);
43+
}

src/stream_base.cc

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,24 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
201201
size_t offset;
202202

203203
if (!all_buffers) {
204+
// Cache per-chunk data from the first pass so the second pass avoids
205+
// redundant V8 array accesses, ToString conversions, and ParseEncoding
206+
// calls. Local<> handles remain valid for the duration of this scope.
207+
struct CachedChunk {
208+
Local<Value> value;
209+
Local<String> string; // empty for Buffer chunks
210+
enum encoding enc;
211+
};
212+
MaybeStackBuffer<CachedChunk, 16> chunk_cache(count);
213+
204214
// Determine storage size first
205215
for (size_t i = 0; i < count; i++) {
206216
Local<Value> chunk;
207217
if (!chunks->Get(context, i * 2).ToLocal(&chunk))
208218
return -1;
209219

220+
chunk_cache[i].value = chunk;
221+
210222
if (Buffer::HasInstance(chunk))
211223
continue;
212224
// Buffer chunk, no additional storage required
@@ -219,6 +231,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
219231
if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
220232
return -1;
221233
enum encoding encoding = ParseEncoding(isolate, next_chunk);
234+
chunk_cache[i].string = string;
235+
chunk_cache[i].enc = encoding;
222236
size_t chunk_size;
223237
if ((encoding == UTF8 &&
224238
string->Length() > 65535 &&
@@ -230,35 +244,23 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
230244
storage_size += chunk_size;
231245
}
232246

233-
if (storage_size > INT_MAX)
234-
return UV_ENOBUFS;
235-
} else {
236-
for (size_t i = 0; i < count; i++) {
237-
Local<Value> chunk;
238-
if (!chunks->Get(context, i).ToLocal(&chunk))
239-
return -1;
240-
bufs[i].base = Buffer::Data(chunk);
241-
bufs[i].len = Buffer::Length(chunk);
242-
}
243-
}
247+
if (storage_size > INT_MAX) return UV_ENOBUFS;
244248

245-
std::unique_ptr<BackingStore> bs;
246-
if (storage_size > 0) {
247-
bs = ArrayBuffer::NewBackingStore(
248-
isolate, storage_size, BackingStoreInitializationMode::kUninitialized);
249-
}
249+
std::unique_ptr<BackingStore> bs;
250+
if (storage_size > 0) {
251+
bs = ArrayBuffer::NewBackingStore(
252+
isolate,
253+
storage_size,
254+
BackingStoreInitializationMode::kUninitialized);
255+
}
250256

251-
offset = 0;
252-
if (!all_buffers) {
257+
offset = 0;
253258
for (size_t i = 0; i < count; i++) {
254-
Local<Value> chunk;
255-
if (!chunks->Get(context, i * 2).ToLocal(&chunk))
256-
return -1;
257-
258-
// Write buffer
259-
if (Buffer::HasInstance(chunk)) {
260-
bufs[i].base = Buffer::Data(chunk);
261-
bufs[i].len = Buffer::Length(chunk);
259+
// string.IsEmpty() signals a Buffer chunk; enc is uninitialised in
260+
// that case so we must not read it.
261+
if (chunk_cache[i].string.IsEmpty()) {
262+
bufs[i].base = Buffer::Data(chunk_cache[i].value);
263+
bufs[i].len = Buffer::Length(chunk_cache[i].value);
262264
continue;
263265
}
264266

@@ -268,28 +270,32 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
268270
static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
269271
size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
270272

271-
Local<String> string;
272-
if (!chunk->ToString(context).ToLocal(&string))
273-
return -1;
274-
Local<Value> next_chunk;
275-
if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
276-
return -1;
277-
enum encoding encoding = ParseEncoding(isolate, next_chunk);
278273
str_size = StringBytes::Write(isolate,
279274
str_storage,
280275
str_size,
281-
string,
282-
encoding);
276+
chunk_cache[i].string,
277+
chunk_cache[i].enc);
283278
bufs[i].base = str_storage;
284279
bufs[i].len = str_size;
285280
offset += str_size;
286281
}
282+
283+
StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
284+
SetWriteResult(res);
285+
if (res.wrap != nullptr && storage_size > 0)
286+
res.wrap->SetBackingStore(std::move(bs));
287+
return res.err;
288+
} else {
289+
for (size_t i = 0; i < count; i++) {
290+
Local<Value> chunk;
291+
if (!chunks->Get(context, i).ToLocal(&chunk)) return -1;
292+
bufs[i].base = Buffer::Data(chunk);
293+
bufs[i].len = Buffer::Length(chunk);
294+
}
287295
}
288296

289297
StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
290298
SetWriteResult(res);
291-
if (res.wrap != nullptr && storage_size > 0)
292-
res.wrap->SetBackingStore(std::move(bs));
293299
return res.err;
294300
}
295301

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
'use strict';
2+
3+
// Tests that StreamBase::Writev correctly handles string chunks by verifying
4+
// data integrity when string chunks, buffer chunks, and mixed chunks are
5+
// written via cork/uncork. This exercises the chunk-cache path (all_buffers=false)
6+
// introduced to avoid redundant V8 array accesses between the sizing and write
7+
// passes.
8+
9+
const common = require('../common');
10+
const assert = require('assert');
11+
const { Writable } = require('stream');
12+
13+
function collectWritev(decodeStrings) {
14+
const received = [];
15+
const w = new Writable({
16+
decodeStrings,
17+
writev(chunks, cb) {
18+
for (const { chunk, encoding } of chunks)
19+
received.push({ chunk, encoding });
20+
cb();
21+
},
22+
write(chunk, encoding, cb) {
23+
received.push({ chunk, encoding });
24+
cb();
25+
},
26+
});
27+
return { w, received };
28+
}
29+
30+
// String chunks only — various encodings, decodeStrings: false so we can
31+
// inspect the raw string/encoding pairs the stream layer received.
32+
{
33+
const { w, received } = collectWritev(false);
34+
w.cork();
35+
w.write('hello', 'utf8');
36+
w.write('world', 'latin1');
37+
w.write('cafebabe', 'hex');
38+
w.uncork();
39+
w.end();
40+
41+
w.on('finish', common.mustCall(() => {
42+
assert.strictEqual(received[0].chunk, 'hello');
43+
assert.strictEqual(received[0].encoding, 'utf8');
44+
assert.strictEqual(received[1].chunk, 'world');
45+
assert.strictEqual(received[1].encoding, 'latin1');
46+
assert.strictEqual(received[2].chunk, 'cafebabe');
47+
assert.strictEqual(received[2].encoding, 'hex');
48+
}));
49+
}
50+
51+
// Mixed buffer + string chunks — the non-all_buffers branch in Writev.
52+
{
53+
const { w, received } = collectWritev(false);
54+
const buf = Buffer.from('buffered');
55+
w.cork();
56+
w.write(buf);
57+
w.write('stringy', 'utf8');
58+
w.write(buf);
59+
w.uncork();
60+
w.end();
61+
62+
w.on('finish', common.mustCall(() => {
63+
assert(Buffer.isBuffer(received[0].chunk));
64+
assert.strictEqual(received[0].chunk.toString(), 'buffered');
65+
assert.strictEqual(received[1].chunk, 'stringy');
66+
assert.strictEqual(received[1].encoding, 'utf8');
67+
assert(Buffer.isBuffer(received[2].chunk));
68+
assert.strictEqual(received[2].chunk.toString(), 'buffered');
69+
}));
70+
}
71+
72+
// More than 16 chunks so MaybeStackBuffer<CachedChunk, 16> must heap-allocate.
73+
// Verify that every string arrives correctly after the realloc.
74+
{
75+
const COUNT = 20;
76+
const chunks = [];
77+
const { w, received } = collectWritev(false);
78+
w.cork();
79+
for (let i = 0; i < COUNT; i++) {
80+
const s = `chunk-${i}`;
81+
chunks.push(s);
82+
w.write(s, 'utf8');
83+
}
84+
w.uncork();
85+
w.end();
86+
87+
w.on('finish', common.mustCall(() => {
88+
assert.strictEqual(received.length, COUNT);
89+
for (let i = 0; i < COUNT; i++) {
90+
assert.strictEqual(received[i].chunk, chunks[i]);
91+
assert.strictEqual(received[i].encoding, 'utf8');
92+
}
93+
}));
94+
}
95+
96+
// decodeStrings: true — all strings should arrive as Buffers with correct data.
97+
{
98+
const { w, received } = collectWritev(true);
99+
w.cork();
100+
w.write('café', 'utf8');
101+
w.write(Buffer.from('raw'));
102+
w.uncork();
103+
w.end();
104+
105+
w.on('finish', common.mustCall(() => {
106+
assert(Buffer.isBuffer(received[0].chunk));
107+
assert.strictEqual(received[0].chunk.toString('utf8'), 'café');
108+
assert(Buffer.isBuffer(received[1].chunk));
109+
assert.strictEqual(received[1].chunk.toString(), 'raw');
110+
}));
111+
}

0 commit comments

Comments
 (0)