diff --git a/pingrip/package.json b/pingrip/package.json index f11f5945a..4126f354d 100644 --- a/pingrip/package.json +++ b/pingrip/package.json @@ -1,6 +1,6 @@ { "name": "@bpinternal/pingrip", - "version": "0.1.1", + "version": "0.2.1", "description": "Pushpin GRIP Websocket-over-HTTP", "keywords": [ "pushpin", diff --git a/pingrip/src/outputs.test.ts b/pingrip/src/outputs.test.ts new file mode 100644 index 000000000..290972ed6 --- /dev/null +++ b/pingrip/src/outputs.test.ts @@ -0,0 +1,58 @@ +import { describe, it, expect } from 'vitest' +import { ResponseBuilder } from './outputs' +import { parse } from './messages' + +describe('OpenResponseBuilder', () => { + it('text() prepends "m:" to the content', () => { + const { body } = new ResponseBuilder().open().text('hello').toResponse() + const parsed = parse(body) + expect(parsed).toEqual([{ type: 'open' }, { type: 'text', content: 'm:hello' }]) + }) + + it('subscribe() emits one "c:" prefixed frame per channel', () => { + const { body } = new ResponseBuilder().open().subscribe(['conv_X', 'user_Y']).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'user_Y' })}` } + ]) + }) + + it('keepAlive() emits a "c:" prefixed keep-alive control frame', () => { + const { body } = new ResponseBuilder().open().keepAlive('ping', 30).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: `c:${JSON.stringify({ type: 'keep-alive', content: 'ping', timeout: 30 })}` } + ]) + }) + + it('subscribe() and text() emit independent frames', () => { + const { body } = new ResponseBuilder().open().subscribe(['conv_X']).text('hello').toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'open' }, + { type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` }, + { type: 'text', content: 'm:hello' } + ]) + }) + + it('binary() prepends "m:" to the buffer', () => { + const payload = Buffer.from([0x01, 0x02, 0x03]) + const { body } = new ResponseBuilder().open().binary(payload).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([{ type: 'open' }, { type: 'binary', content: Buffer.concat([Buffer.from('m:'), payload]) }]) + }) +}) + +describe('ResponseBuilder.unsubscribe', () => { + it('emits "c:" prefixed frames', () => { + const { body } = new ResponseBuilder().close(1000).unsubscribe(['conv_X']).toResponse() + const parsed = parse(body) + expect(parsed).toEqual([ + { type: 'close', code: 1000 }, + { type: 'text', content: `c:${JSON.stringify({ type: 'unsubscribe', channel: 'conv_X' })}` } + ]) + }) +}) diff --git a/pingrip/src/outputs.ts b/pingrip/src/outputs.ts index dba01e694..5e352aa7a 100644 --- a/pingrip/src/outputs.ts +++ b/pingrip/src/outputs.ts @@ -27,24 +27,23 @@ export class ResponseBuilder { } text(content: string) { - this._messages.push({ - type: 'text', - content - }) + this._pushText(content, 'm') return this } binary(content: Buffer) { - this._messages.push({ - type: 'binary', - content - }) + this._pushBinary(content, 'm') + return this + } + + control(content: string) { + this._pushText(content, 'c') return this } unsubscribe(channels: string[]) { for (const channel of channels) { - this.text(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`) + this.control(JSON.stringify({ type: 'unsubscribe', channel })) } return this } @@ -55,6 +54,20 @@ export class ResponseBuilder { headers: {} } } + + private _pushText(content: string, type: 'm' | 'c') { + this._messages.push({ + type: 'text', + content: `${type}:${content}` + }) + } + + private _pushBinary(content: Buffer, type: 'm' | 'c') { + this._messages.push({ + type: 'binary', + content: Buffer.concat([Buffer.from(`${type}:`), content]) + }) + } } class CloseResponseBuilder { @@ -71,18 +84,13 @@ class CloseResponseBuilder { } class OpenResponseBuilder { - private _keepAliveMessage: string | null = null - private _keepAliveTimeout: number | null = null - private _channels: string[] = [] - constructor(private _builder: ResponseBuilder) {} - keepAlive(message: string, timeout: number) { + keepAlive(content: string, timeout: number) { if (timeout < 30) { throw new Error(`Keep Alive timeout should be at least 30 secondes. ${timeout} was given.`) } - this._keepAliveTimeout = timeout - this._keepAliveMessage = message + this._builder.control(JSON.stringify({ type: 'keep-alive', content, timeout })) return this } @@ -98,29 +106,19 @@ class OpenResponseBuilder { subscribe(channels: string[]) { for (const channel of channels) { - this._channels.push(channel) - this.text(`c:${JSON.stringify({ type: 'subscribe', channel })}`) + this._builder.control(JSON.stringify({ type: 'subscribe', channel })) } return this } toResponse(): Response { const { body } = this._builder.toResponse() - const headers: Record = { - 'Content-Type': 'application/websocket-events', - 'Grip-Hold': 'stream', - 'Sec-WebSocket-Extensions': 'grip' - } - if (this._channels.length > 0) { - headers['Grip-Channel'] = this._channels.join(',') - } - if (this._keepAliveMessage !== null && this._keepAliveTimeout) { - const b64KeepAlive = Buffer.from(this._keepAliveMessage, 'utf-8').toString('base64') - headers['Grip-Keep-Alive'] = `${b64KeepAlive}; format=base64; timeout=${this._keepAliveTimeout};` - } return { body, - headers + headers: { + 'Content-Type': 'application/websocket-events', + 'Sec-WebSocket-Extensions': 'grip' + } } } }