From 422b200731f3bdeb38c48660476d8ca336d1aef2 Mon Sep 17 00:00:00 2001 From: Gil Pedersen Date: Mon, 11 May 2026 22:57:44 +0200 Subject: [PATCH] Fix stream payload error handling --- lib/index.js | 6 ++++-- lib/payload.js | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/request.js | 49 +++++++++------------------------------------- test/index.js | 18 +++++++++++++++++ 4 files changed, 84 insertions(+), 42 deletions(-) create mode 100644 lib/payload.js diff --git a/lib/index.js b/lib/index.js index c752be1..53406ba 100755 --- a/lib/index.js +++ b/lib/index.js @@ -47,12 +47,14 @@ exports.inject = async function (dispatchFunc, options) { // eslint-disable Validate.assert(options ?? null, internals.options, 'Invalid options:'); } - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const req = new Request(options); const res = new Response(req, resolve); - req.prepare(() => dispatchFunc(req, res)); + req.prepare() + .then(() => dispatchFunc(req, res)) + .catch(reject); }); }; diff --git a/lib/payload.js b/lib/payload.js new file mode 100644 index 0000000..9143765 --- /dev/null +++ b/lib/payload.js @@ -0,0 +1,53 @@ +'use strict'; + +const Events = require('events'); +const Stream = require('stream'); + + +const internals = {}; + + +exports.encode = function (payload) { + + if (payload instanceof Stream) { + return internals.encodeStreamPayload(payload); + } + + const headers = Object.create(null); + + if (payload) { + if (typeof payload !== 'string' && + !Buffer.isBuffer(payload)) { + + payload = JSON.stringify(payload); + headers['content-type'] = 'application/json'; + } + + // Compute the content-length for the corresponding payload in case none set + + headers['content-length'] = (Buffer.isBuffer(payload) ? payload.length : Buffer.byteLength(payload)).toString(); + } + + return { payload, headers }; +}; + + +internals.encodeStreamPayload = function (stream) { + + const headers = Object.create(null); + + const deferredPayload = (async () => { + + const chunks = []; + stream.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + + await Events.once(stream, 'end'); + + const payload = Buffer.concat(chunks); + headers['content-length'] = headers['content-length'] || payload.length; + + return payload; + })(); + + return { payload: deferredPayload, headers }; +}; diff --git a/lib/request.js b/lib/request.js index d5d71c8..546795e 100755 --- a/lib/request.js +++ b/lib/request.js @@ -4,6 +4,7 @@ const Events = require('events'); const Stream = require('stream'); const Url = require('url'); +const Payload = require('./payload'); const Symbols = require('./symbols'); @@ -32,13 +33,14 @@ exports = module.exports = internals.Request = class extends Stream.Readable { this.httpVersion = '1.1'; this.method = (options.method ? options.method.toUpperCase() : 'GET'); - this.headers = {}; - const headers = options.headers ?? {}; - const fields = Object.keys(headers); - fields.forEach((field) => { + const { payload, headers: baseHeaders } = Payload.encode(options.payload ?? null); + + this.headers = baseHeaders; + const headers = options.headers ?? {}; + for (const field of Object.keys(headers)) { this.headers[field.toLowerCase()] = headers[field]; - }); + } this.headers['user-agent'] = this.headers['user-agent'] ?? 'shot'; @@ -61,25 +63,6 @@ exports = module.exports = internals.Request = class extends Stream.Readable { this.socket = this.connection = new internals.MockSocket(options); - let payload = options.payload ?? null; - if (payload && - typeof payload !== 'string' && - !(payload instanceof Stream) && - !Buffer.isBuffer(payload)) { - - payload = JSON.stringify(payload); - this.headers['content-type'] = this.headers['content-type'] || 'application/json'; - } - - // Set the content-length for the corresponding payload if none set - - if (payload && - !(payload instanceof Stream) && - !this.headers.hasOwnProperty('content-length')) { - - this.headers['content-length'] = (Buffer.isBuffer(payload) ? payload.length : Buffer.byteLength(payload)).toString(); - } - // Use _shot namespace to avoid collision with Node this._shot = { @@ -91,23 +74,9 @@ exports = module.exports = internals.Request = class extends Stream.Readable { return this; } - prepare(next) { - - if (this._shot.payload instanceof Stream === false) { - return next(); - } + async prepare() { - const chunks = []; - - this._shot.payload.on('data', (chunk) => chunks.push(Buffer.from(chunk))); - - this._shot.payload.on('end', () => { - - const payload = Buffer.concat(chunks); - this.headers['content-length'] = this.headers['content-length'] || payload.length; - this._shot.payload = payload; - return next(); - }); + this._shot.payload = await this._shot.payload; } _read(size) { diff --git a/test/index.js b/test/index.js index b69cec7..08b5df4 100755 --- a/test/index.js +++ b/test/index.js @@ -514,6 +514,24 @@ describe('inject()', () => { expect(res.payload).to.equal('100'); }); + it('rejects on stream payload errors', async () => { + + const dispatch = function (req, res) { + + internals.readStream(req, (buff) => { + + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end(buff); + }); + }; + + const payload = internals.getTestStream(); + payload.destroy(new Error('ERROR')); + + const promise = Shot.inject(dispatch, { method: 'post', url: '/', payload }); + await expect(promise).to.reject('ERROR'); + }); + it('iterates over payload', async () => { const dispatch = async function (req, res) {