Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pingrip/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@bpinternal/pingrip",
"version": "0.1.1",
"version": "0.2.1",
"description": "Pushpin GRIP Websocket-over-HTTP",
"keywords": [
"pushpin",
Expand Down
58 changes: 58 additions & 0 deletions pingrip/src/outputs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { describe, it, expect } from 'vitest'
import { ResponseBuilder } from './outputs'
import { parse } from './messages'

describe('OpenResponseBuilder', () => {
it('text() prepends "m:" to the content', () => {
const { body } = new ResponseBuilder().open().text('hello').toResponse()
const parsed = parse(body)
expect(parsed).toEqual([{ type: 'open' }, { type: 'text', content: 'm:hello' }])
})

it('subscribe() emits one "c:" prefixed frame per channel', () => {
const { body } = new ResponseBuilder().open().subscribe(['conv_X', 'user_Y']).toResponse()
const parsed = parse(body)
expect(parsed).toEqual([
{ type: 'open' },
{ type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` },
{ type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'user_Y' })}` }
])
})

it('keepAlive() emits a "c:" prefixed keep-alive control frame', () => {
const { body } = new ResponseBuilder().open().keepAlive('ping', 30).toResponse()
const parsed = parse(body)
expect(parsed).toEqual([
{ type: 'open' },
{ type: 'text', content: `c:${JSON.stringify({ type: 'keep-alive', content: 'ping', timeout: 30 })}` }
])
})

it('subscribe() and text() emit independent frames', () => {
const { body } = new ResponseBuilder().open().subscribe(['conv_X']).text('hello').toResponse()
const parsed = parse(body)
expect(parsed).toEqual([
{ type: 'open' },
{ type: 'text', content: `c:${JSON.stringify({ type: 'subscribe', channel: 'conv_X' })}` },
{ type: 'text', content: 'm:hello' }
])
})

it('binary() prepends "m:" to the buffer', () => {
const payload = Buffer.from([0x01, 0x02, 0x03])
const { body } = new ResponseBuilder().open().binary(payload).toResponse()
const parsed = parse(body)
expect(parsed).toEqual([{ type: 'open' }, { type: 'binary', content: Buffer.concat([Buffer.from('m:'), payload]) }])
})
})

describe('ResponseBuilder.unsubscribe', () => {
it('emits "c:" prefixed frames', () => {
const { body } = new ResponseBuilder().close(1000).unsubscribe(['conv_X']).toResponse()
const parsed = parse(body)
expect(parsed).toEqual([
{ type: 'close', code: 1000 },
{ type: 'text', content: `c:${JSON.stringify({ type: 'unsubscribe', channel: 'conv_X' })}` }
])
})
})
60 changes: 29 additions & 31 deletions pingrip/src/outputs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,23 @@ export class ResponseBuilder {
}

text(content: string) {
this._messages.push({
type: 'text',
content
})
this._pushText(content, 'm')
return this
}

binary(content: Buffer) {
this._messages.push({
type: 'binary',
content
})
this._pushBinary(content, 'm')
return this
}

control(content: string) {
this._pushText(content, 'c')
return this
}

unsubscribe(channels: string[]) {
for (const channel of channels) {
this.text(`c:${JSON.stringify({ type: 'unsubscribe', channel })}`)
this.control(JSON.stringify({ type: 'unsubscribe', channel }))
}
return this
}
Expand All @@ -55,6 +54,20 @@ export class ResponseBuilder {
headers: {}
}
}

private _pushText(content: string, type: 'm' | 'c') {
this._messages.push({
type: 'text',
content: `${type}:${content}`
})
}

private _pushBinary(content: Buffer, type: 'm' | 'c') {
this._messages.push({
type: 'binary',
content: Buffer.concat([Buffer.from(`${type}:`), content])
})
}
}

class CloseResponseBuilder {
Expand All @@ -71,18 +84,13 @@ class CloseResponseBuilder {
}

class OpenResponseBuilder {
private _keepAliveMessage: string | null = null
private _keepAliveTimeout: number | null = null
private _channels: string[] = []

constructor(private _builder: ResponseBuilder) {}

keepAlive(message: string, timeout: number) {
keepAlive(content: string, timeout: number) {
if (timeout < 30) {
throw new Error(`Keep Alive timeout should be at least 30 secondes. ${timeout} was given.`)
}
this._keepAliveTimeout = timeout
this._keepAliveMessage = message
this._builder.control(JSON.stringify({ type: 'keep-alive', content, timeout }))
return this
}

Expand All @@ -98,29 +106,19 @@ class OpenResponseBuilder {

subscribe(channels: string[]) {
for (const channel of channels) {
this._channels.push(channel)
this.text(`c:${JSON.stringify({ type: 'subscribe', channel })}`)
this._builder.control(JSON.stringify({ type: 'subscribe', channel }))
}
return this
}

toResponse(): Response {
const { body } = this._builder.toResponse()
const headers: Record<string, string> = {
'Content-Type': 'application/websocket-events',
'Grip-Hold': 'stream',
'Sec-WebSocket-Extensions': 'grip'
}
if (this._channels.length > 0) {
headers['Grip-Channel'] = this._channels.join(',')
}
if (this._keepAliveMessage !== null && this._keepAliveTimeout) {
const b64KeepAlive = Buffer.from(this._keepAliveMessage, 'utf-8').toString('base64')
headers['Grip-Keep-Alive'] = `${b64KeepAlive}; format=base64; timeout=${this._keepAliveTimeout};`
}
return {
body,
headers
headers: {
'Content-Type': 'application/websocket-events',
'Sec-WebSocket-Extensions': 'grip'
}
}
}
}
Loading