From 971bb3aca928373bc17149f3ca611b23037a7414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 09:38:20 -0400 Subject: [PATCH 1/8] fix(pingrip): forward data text frames in OPEN response --- pingrip/package.json | 2 +- pingrip/src/outputs.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pingrip/package.json b/pingrip/package.json index f11f5945a..b0ab66164 100644 --- a/pingrip/package.json +++ b/pingrip/package.json @@ -1,6 +1,6 @@ { "name": "@bpinternal/pingrip", - "version": "0.1.1", + "version": "0.1.2", "description": "Pushpin GRIP Websocket-over-HTTP", "keywords": [ "pushpin", diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index dba01e694..16e1d03a5 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -109,7 +109,7 @@ class OpenResponseBuilder { const headers: Record = { 'Content-Type': 'application/websocket-events', 'Grip-Hold': 'stream', - 'Sec-WebSocket-Extensions': 'grip' + 'Sec-WebSocket-Extensions': 'grip; message-prefix=""' } if (this._channels.length > 0) { headers['Grip-Channel'] = this._channels.join(',') From 04fd5cbb1daa96df0178b070b3ac6ac93c9c1d99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 10:09:09 -0400 Subject: [PATCH 2/8] update --- pingrip/src/outputs.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index 16e1d03a5..3861ac3b8 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -87,12 +87,12 @@ class OpenResponseBuilder { } text(content: string) { - this._builder.text(content) + this._builder.text(`m:${content}`) return this } binary(content: Buffer) { - this._builder.binary(content) + this._builder.binary(Buffer.concat([Buffer.from('m:'), content])) return this } @@ -109,7 +109,7 @@ class OpenResponseBuilder { const headers: Record = { 'Content-Type': 'application/websocket-events', 'Grip-Hold': 'stream', - 'Sec-WebSocket-Extensions': 'grip; message-prefix=""' + 'Sec-WebSocket-Extensions': 'grip' } if (this._channels.length > 0) { headers['Grip-Channel'] = this._channels.join(',') From 29221f317abfd9037555af60fa304410603eaf47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 10:26:56 -0400 Subject: [PATCH 3/8] fix and add tests --- pingrip/src/outputs.test.ts | 58 +++++++++++++++++++++++++++++++++++++ pingrip/src/outputs.ts | 2 +- 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 pingrip/src/outputs.test.ts diff --git a/pingrip/src/outputs.test.ts b/pingrip/src/outputs.test.ts new file mode 100644 index 000000000..b34e1772d --- /dev/null +++ b/pingrip/src/outputs.test.ts @@ -0,0 +1,58 @@ +import { describe, it, expect } from 'vitest' +import { ResponseBuilder } from './outputs' +import { parse } from './messages' + +describe('OpenResponseBuilder', () => { + it('text() prepends "m:" prefix so pushpin forwards the frame to the client', () => { + const { body } = new ResponseBuilder().open().text('hello').toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: 'm:hello' }, + ]) + }) + + it('subscribe() emits "c:" control frames without the "m:" data prefix', () => { + const { body, headers } = new ResponseBuilder().open().subscribe(['conv_X', 'user_Y']).toResponse() + + expect(headers['Grip-Channel']).toBe('conv_X,user_Y') + + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'user_Y' })}` }, + ]) + }) + + it('subscribe() and text() can be combined without cross-contamination', () => { + const { body } = new ResponseBuilder().open().subscribe(['conv_X']).text('hello').toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` }, + { type: 'text', content: 'm:hello' }, + ]) + }) + + it('binary() prepends "m:" bytes so pushpin forwards the frame to the client', () => { + const payload = Buffer.from([0x01, 0x02, 0x03]) + const { body } = new ResponseBuilder().open().binary(payload).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'binary', content: Buffer.concat([Buffer.from('m:'), payload]) }, + ]) + }) +}) + +describe('ResponseBuilder.unsubscribe', () => { + it('emits "c:" control frames without the "m:" data prefix', () => { + const { body } = new ResponseBuilder().close(1000).unsubscribe(['conv_X']).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'close', code: 1000 }, + { type: 'text', content: `c:${JSON.stringify({ type: 'unsubscribe', channel: 'conv_X' })}` }, + ]) + }) +}) diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index 3861ac3b8..9fed82e23 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -99,7 +99,7 @@ class OpenResponseBuilder { subscribe(channels: string[]) { for (const channel of channels) { this._channels.push(channel) - this.text(`c:${JSON.stringify({ type: 'subscribe', channel })}`) + this._builder.text(`c:${JSON.stringify({ type: 'subscribe', channel })}`) } return this } From 734b470300db475cf2c4975e5b1d21c863d0f7b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 10:28:09 -0400 Subject: [PATCH 4/8] fix format --- pingrip/src/outputs.test.ts | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pingrip/src/outputs.test.ts b/pingrip/src/outputs.test.ts index b34e1772d..3f8e7754f 100644 --- a/pingrip/src/outputs.test.ts +++ b/pingrip/src/outputs.test.ts @@ -6,10 +6,7 @@ describe('OpenResponseBuilder', () => { it('text() prepends "m:" prefix so pushpin forwards the frame to the client', () => { const { body } = new ResponseBuilder().open().text('hello').toResponse() const parsed = parse(body) - expect(parsed).toEqual([ - { type: 'open' }, - { type: 'text', content: 'm:hello' }, - ]) + expect(parsed).toEqual([{ type: 'open' }, { type: 'text', content: 'm:hello' }]) }) it('subscribe() emits "c:" control frames without the "m:" data prefix', () => { @@ -21,7 +18,7 @@ describe('OpenResponseBuilder', () => { expect(parsed).toEqual([ { type: 'open' }, { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` }, - { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'user_Y' })}` }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'user_Y' })}` } ]) }) @@ -31,7 +28,7 @@ describe('OpenResponseBuilder', () => { expect(parsed).toEqual([ { type: 'open' }, { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` }, - { type: 'text', content: 'm:hello' }, + { type: 'text', content: 'm:hello' } ]) }) @@ -39,10 +36,7 @@ describe('OpenResponseBuilder', () => { const payload = Buffer.from([0x01, 0x02, 0x03]) const { body } = new ResponseBuilder().open().binary(payload).toResponse() const parsed = parse(body) - expect(parsed).toEqual([ - { type: 'open' }, - { type: 'binary', content: Buffer.concat([Buffer.from('m:'), payload]) }, - ]) + expect(parsed).toEqual([{ type: 'open' }, { type: 'binary', content: Buffer.concat([Buffer.from('m:'), payload]) }]) }) }) @@ -52,7 +46,7 @@ describe('ResponseBuilder.unsubscribe', () => { const parsed = parse(body) expect(parsed).toEqual([ { type: 'close', code: 1000 }, - { type: 'text', content: `c:${JSON.stringify({ type: 'unsubscribe', channel: 'conv_X' })}` }, + { type: 'text', content: `c:${JSON.stringify({ type: 'unsubscribe', channel: 'conv_X' })}` } ]) }) }) From 718e415b87f0df773636dd6f9c83baac3c728caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 10:31:56 -0400 Subject: [PATCH 5/8] fix tests name --- pingrip/src/outputs.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pingrip/src/outputs.test.ts b/pingrip/src/outputs.test.ts index 3f8e7754f..bc3d4f226 100644 --- a/pingrip/src/outputs.test.ts +++ b/pingrip/src/outputs.test.ts @@ -3,13 +3,13 @@ import { ResponseBuilder } from './outputs' import { parse } from './messages' describe('OpenResponseBuilder', () => { - it('text() prepends "m:" prefix so pushpin forwards the frame to the client', () => { + it('text() prepends "m:" to the content', () => { const { body } = new ResponseBuilder().open().text('hello').toResponse() const parsed = parse(body) expect(parsed).toEqual([{ type: 'open' }, { type: 'text', content: 'm:hello' }]) }) - it('subscribe() emits "c:" control frames without the "m:" data prefix', () => { + it('subscribe() emits "c:" prefixed frames and sets the Grip-Channel header', () => { const { body, headers } = new ResponseBuilder().open().subscribe(['conv_X', 'user_Y']).toResponse() expect(headers['Grip-Channel']).toBe('conv_X,user_Y') @@ -22,7 +22,7 @@ describe('OpenResponseBuilder', () => { ]) }) - it('subscribe() and text() can be combined without cross-contamination', () => { + it('subscribe() and text() emit independent frames', () => { const { body } = new ResponseBuilder().open().subscribe(['conv_X']).text('hello').toResponse() const parsed = parse(body) expect(parsed).toEqual([ @@ -32,7 +32,7 @@ describe('OpenResponseBuilder', () => { ]) }) - it('binary() prepends "m:" bytes so pushpin forwards the frame to the client', () => { + it('binary() prepends "m:" to the buffer', () => { const payload = Buffer.from([0x01, 0x02, 0x03]) const { body } = new ResponseBuilder().open().binary(payload).toResponse() const parsed = parse(body) @@ -41,7 +41,7 @@ describe('OpenResponseBuilder', () => { }) describe('ResponseBuilder.unsubscribe', () => { - it('emits "c:" control frames without the "m:" data prefix', () => { + it('emits "c:" prefixed frames', () => { const { body } = new ResponseBuilder().close(1000).unsubscribe(['conv_X']).toResponse() const parsed = parse(body) expect(parsed).toEqual([ From ad9d63168ec0d048ee08956d31ac63aee4788df5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 15:40:12 -0400 Subject: [PATCH 6/8] update --- pingrip/package.json | 2 +- pingrip/src/outputs.test.ts | 16 ++++++--- pingrip/src/outputs.ts | 65 ++++++++++++++----------------------- 3 files changed, 36 insertions(+), 47 deletions(-) diff --git a/pingrip/package.json b/pingrip/package.json index b0ab66164..4126f354d 100644 --- a/pingrip/package.json +++ b/pingrip/package.json @@ -1,6 +1,6 @@ { "name": "@bpinternal/pingrip", - "version": "0.1.2", + "version": "0.2.1", "description": "Pushpin GRIP Websocket-over-HTTP", "keywords": [ "pushpin", diff --git a/pingrip/src/outputs.test.ts b/pingrip/src/outputs.test.ts index bc3d4f226..290972ed6 100644 --- a/pingrip/src/outputs.test.ts +++ b/pingrip/src/outputs.test.ts @@ -9,11 +9,8 @@ describe('OpenResponseBuilder', () => { expect(parsed).toEqual([{ type: 'open' }, { type: 'text', content: 'm:hello' }]) }) - it('subscribe() emits "c:" prefixed frames and sets the Grip-Channel header', () => { - const { body, headers } = new ResponseBuilder().open().subscribe(['conv_X', 'user_Y']).toResponse() - - expect(headers['Grip-Channel']).toBe('conv_X,user_Y') - + it('subscribe() emits one "c:" prefixed frame per channel', () => { + const { body } = new ResponseBuilder().open().subscribe(['conv_X', 'user_Y']).toResponse() const parsed = parse(body) expect(parsed).toEqual([ { type: 'open' }, @@ -22,6 +19,15 @@ describe('OpenResponseBuilder', () => { ]) }) + it('keepAlive() emits a "c:" prefixed keep-alive control frame', () => { + const { body } = new ResponseBuilder().open().keepAlive('ping', 30).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: `c:${JSON.stringify({ type: 'keep-alive', content: 'ping', timeout: 30 })}` } + ]) + }) + it('subscribe() and text() emit independent frames', () => { const { body } = new ResponseBuilder().open().subscribe(['conv_X']).text('hello').toResponse() const parsed = parse(body) diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index 9fed82e23..1d9bd1860 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -26,25 +26,9 @@ export class ResponseBuilder { return new CloseResponseBuilder(this) } - text(content: string) { - this._messages.push({ - type: 'text', - content - }) - return this - } - - binary(content: Buffer) { - this._messages.push({ - type: 'binary', - content - }) - return this - } - unsubscribe(channels: string[]) { for (const channel of channels) { - this.text(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`) + this._text(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`) } return this } @@ -55,6 +39,20 @@ export class ResponseBuilder { headers: {} } } + + _text(content: string) { + this._messages.push({ + type: 'text', + content + }) + } + + _binary(content: Buffer) { + this._messages.push({ + type: 'binary', + content + }) + } } class CloseResponseBuilder { @@ -71,56 +69,41 @@ class CloseResponseBuilder { } class OpenResponseBuilder { - private _keepAliveMessage: string | null = null - private _keepAliveTimeout: number | null = null - private _channels: string[] = [] - constructor(private _builder: ResponseBuilder) {} - keepAlive(message: string, timeout: number) { + keepAlive(content: string, timeout: number) { if (timeout < 30) { throw new Error(`Keep Alive timeout should be at least 30 secondes. ${timeout} was given.`) } - this._keepAliveTimeout = timeout - this._keepAliveMessage = message + this._builder._text(`c:${JSON.stringify({ type: 'keep-alive', content, timeout })}`) return this } text(content: string) { - this._builder.text(`m:${content}`) + this._builder._text(`m:${content}`) return this } binary(content: Buffer) { - this._builder.binary(Buffer.concat([Buffer.from('m:'), content])) + this._builder._binary(Buffer.concat([Buffer.from('m:'), content])) return this } subscribe(channels: string[]) { for (const channel of channels) { - this._channels.push(channel) - this._builder.text(`c:${JSON.stringify({ type: 'subscribe', channel })}`) + this._builder._text(`c:${JSON.stringify({ type: 'subscribe', channel })}`) } return this } toResponse(): Response { const { body } = this._builder.toResponse() - const headers: Record = { - 'Content-Type': 'application/websocket-events', - 'Grip-Hold': 'stream', - 'Sec-WebSocket-Extensions': 'grip' - } - if (this._channels.length > 0) { - headers['Grip-Channel'] = this._channels.join(',') - } - if (this._keepAliveMessage !== null && this._keepAliveTimeout) { - const b64KeepAlive = Buffer.from(this._keepAliveMessage, 'utf-8').toString('base64') - headers['Grip-Keep-Alive'] = `${b64KeepAlive}; format=base64; timeout=${this._keepAliveTimeout};` - } return { body, - headers + headers: { + 'Content-Type': 'application/websocket-events', + 'Sec-WebSocket-Extensions': 'grip' + } } } } From 03e65cc65e157a26d033bc5b103edf20bd856fb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Tue, 5 May 2026 15:50:03 -0400 Subject: [PATCH 7/8] fix --- pingrip/src/outputs.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index 1d9bd1860..c097a5c1f 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -28,7 +28,7 @@ export class ResponseBuilder { unsubscribe(channels: string[]) { for (const channel of channels) { - this._text(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`) + this._pushText(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`) } return this } @@ -40,14 +40,14 @@ export class ResponseBuilder { } } - _text(content: string) { + _pushText(content: string) { this._messages.push({ type: 'text', content }) } - _binary(content: Buffer) { + _pushBinary(content: Buffer) { this._messages.push({ type: 'binary', content @@ -75,23 +75,23 @@ class OpenResponseBuilder { if (timeout < 30) { throw new Error(`Keep Alive timeout should be at least 30 secondes. ${timeout} was given.`) } - this._builder._text(`c:${JSON.stringify({ type: 'keep-alive', content, timeout })}`) + this._builder._pushText(`c:${JSON.stringify({ type: 'keep-alive', content, timeout })}`) return this } text(content: string) { - this._builder._text(`m:${content}`) + this._builder._pushText(`m:${content}`) return this } binary(content: Buffer) { - this._builder._binary(Buffer.concat([Buffer.from('m:'), content])) + this._builder._pushBinary(Buffer.concat([Buffer.from('m:'), content])) return this } subscribe(channels: string[]) { for (const channel of channels) { - this._builder._text(`c:${JSON.stringify({ type: 'subscribe', channel })}`) + this._builder._pushText(`c:${JSON.stringify({ type: 'subscribe', channel })}`) } return this } From f8d9d9ea7995a8776e06ccabb1e59bf6f8a11552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hubert=20B=C3=A9langer?= Date: Wed, 6 May 2026 08:03:24 -0400 Subject: [PATCH 8/8] fix PR comments --- pingrip/src/outputs.ts | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index c097a5c1f..5e352aa7a 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -26,9 +26,24 @@ export class ResponseBuilder { return new CloseResponseBuilder(this) } + text(content: string) { + this._pushText(content, 'm') + return this + } + + binary(content: Buffer) { + this._pushBinary(content, 'm') + return this + } + + control(content: string) { + this._pushText(content, 'c') + return this + } + unsubscribe(channels: string[]) { for (const channel of channels) { - this._pushText(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`) + this.control(JSON.stringify({ type: 'unsubscribe', channel })) } return this } @@ -40,17 +55,17 @@ export class ResponseBuilder { } } - _pushText(content: string) { + private _pushText(content: string, type: 'm' | 'c') { this._messages.push({ type: 'text', - content + content: `${type}:${content}` }) } - _pushBinary(content: Buffer) { + private _pushBinary(content: Buffer, type: 'm' | 'c') { this._messages.push({ type: 'binary', - content + content: Buffer.concat([Buffer.from(`${type}:`), content]) }) } } @@ -75,23 +90,23 @@ class OpenResponseBuilder { if (timeout < 30) { throw new Error(`Keep Alive timeout should be at least 30 secondes. ${timeout} was given.`) } - this._builder._pushText(`c:${JSON.stringify({ type: 'keep-alive', content, timeout })}`) + this._builder.control(JSON.stringify({ type: 'keep-alive', content, timeout })) return this } text(content: string) { - this._builder._pushText(`m:${content}`) + this._builder.text(content) return this } binary(content: Buffer) { - this._builder._pushBinary(Buffer.concat([Buffer.from('m:'), content])) + this._builder.binary(content) return this } subscribe(channels: string[]) { for (const channel of channels) { - this._builder._pushText(`c:${JSON.stringify({ type: 'subscribe', channel })}`) + this._builder.control(JSON.stringify({ type: 'subscribe', channel })) } return this }