Skip to content

Commit af87ae2

Browse files
authored
feat: update message pending tasks (#1716)
## CLA - [ ] I have signed the [Stream CLA](https://docs.google.com/forms/d/e/1FAIpQLScFKsKkAJI7mhCr7K9rEIOpqIDThrWxuvxnwUq2XkHyG154vQ/viewform) (required). - [ ] Code changes are tested ## Description of the changes, What, Why and How? This PR introduces support for adding message updates as a pending task whenever we are offline and persisting it. It goes hand in hand with an extended pending task API through `updatePendingTask` (living in the UI SDK implementation currently). The flow is as follows: - If we're online, we simply update the message - If we're offline and the message is an already successful one, we simply queue up the pending task - If we're offline and the message is a failed one (for example, we send a message while offline and then immediately edit it), we update the `send-message` pending task with the new data - When this happens, we make sure to not change the `message_text_changed_at` property so that the newly sent message does not appear as edited This comes with a couple of caveats: - Attachments are out of scope, since currently our async uploads feature still lives on the UI SDK (once that gets moved to the LLC we can support that too) - Offline updates containing attachments will simply be skipped - The optimistic updates still live in the UI SDK, their move towards the LLC will come in the next 2 quarters gradually ## Changelog -
1 parent 19f344b commit af87ae2

10 files changed

Lines changed: 678 additions & 27 deletions

File tree

src/client.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ import { ReminderManager } from './reminders';
265265
import { StateStore } from './store';
266266
import type { MessageComposer } from './messageComposer';
267267
import type { AbstractOfflineDB } from './offline-support';
268+
import { getPendingTaskChannelData } from './offline-support/util';
268269

269270
function isString(x: unknown): x is string {
270271
return typeof x === 'string' || x instanceof String;
@@ -3114,6 +3115,38 @@ export class StreamChat {
31143115
throw Error('Please specify the message.id when calling updateMessage');
31153116
}
31163117

3118+
const messageId = message.id as string;
3119+
3120+
try {
3121+
if (this.offlineDb) {
3122+
return await this.offlineDb.queueTask<UpdateMessageAPIResponse>({
3123+
task: {
3124+
...getPendingTaskChannelData(message.cid),
3125+
messageId,
3126+
payload: [message, partialUserOrUserId, options],
3127+
type: 'update-message',
3128+
},
3129+
});
3130+
}
3131+
} catch (error) {
3132+
this.logger('error', `offlineDb:updateMessage`, {
3133+
tags: ['channel', 'offlineDb'],
3134+
error,
3135+
});
3136+
}
3137+
3138+
return await this._updateMessage(message, partialUserOrUserId, options);
3139+
}
3140+
3141+
async _updateMessage(
3142+
message: LocalMessage | Partial<MessageResponse>,
3143+
partialUserOrUserId?: string | { id: string },
3144+
options?: UpdateMessageOptions,
3145+
) {
3146+
if (!message.id) {
3147+
throw Error('Please specify the message.id when calling updateMessage');
3148+
}
3149+
31173150
// should not include user object
31183151
const payload = toUpdatedMessagePayload(message);
31193152

src/offline-support/offline_support_api.ts

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import type { APIErrorResponse, ChannelResponse, Event } from '../types';
1+
import type {
2+
APIErrorResponse,
3+
ChannelResponse,
4+
Event,
5+
LocalMessage,
6+
Message,
7+
MessageResponse,
8+
} from '../types';
29

310
import type {
411
OfflineDBApi,
@@ -11,7 +18,8 @@ import type { StreamChat } from '../client';
1118
import type { AxiosError } from 'axios';
1219
import { OfflineDBSyncManager } from './offline_sync_manager';
1320
import { StateStore } from '../store';
14-
import { runDetached } from '../utils';
21+
import { localMessageToNewMessagePayload, runDetached } from '../utils';
22+
import { isMessageUpdateReplayable } from './util';
1523

1624
/**
1725
* Abstract base class for an offline database implementation used with StreamChat.
@@ -310,6 +318,16 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
310318
*/
311319
abstract addPendingTask: OfflineDBApi['addPendingTask'];
312320

321+
/**
322+
* @abstract
323+
* Updates a pending task in the DB, given its ID.
324+
* Will return the prepared queries for delayed execution (even if they are
325+
* already executed).
326+
* @param {DBUpdatePendingTaskType} options
327+
* @returns {Promise<ExecuteBatchDBQueriesType>}
328+
*/
329+
abstract updatePendingTask: OfflineDBApi['updatePendingTask'];
330+
313331
/**
314332
* @abstract
315333
* Deletes a pending task from the DB, given its ID.
@@ -1076,7 +1094,7 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
10761094
return await attemptTaskExecution();
10771095
} catch (e) {
10781096
if (!this.shouldSkipQueueingTask(e as AxiosError<APIErrorResponse>)) {
1079-
await this.addPendingTask(task);
1097+
await this.handleAddPendingTask({ task });
10801098
}
10811099
throw e;
10821100
}
@@ -1092,13 +1110,112 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
10921110
private shouldSkipQueueingTask = (error: AxiosError<APIErrorResponse>) =>
10931111
error?.response?.data?.code === 4 || error?.response?.data?.code === 17;
10941112

1113+
private mergeFailedMessageUpdateIntoPendingSendMessage = ({
1114+
editedMessage,
1115+
pendingMessage,
1116+
}: {
1117+
editedMessage: LocalMessage | Partial<MessageResponse>;
1118+
pendingMessage: Message;
1119+
}) => {
1120+
const normalizedEditedMessageSource = {
1121+
...editedMessage,
1122+
} as LocalMessage & { message_text_updated_at?: string };
1123+
1124+
if (editedMessage.status === 'failed') {
1125+
delete normalizedEditedMessageSource.message_text_updated_at;
1126+
}
1127+
1128+
const normalizedEditedMessage = localMessageToNewMessagePayload(
1129+
normalizedEditedMessageSource,
1130+
);
1131+
const pendingMessageStatus = (pendingMessage as { status?: string }).status;
1132+
1133+
return {
1134+
...pendingMessage,
1135+
...normalizedEditedMessage,
1136+
...(typeof pendingMessageStatus !== 'undefined'
1137+
? { status: pendingMessageStatus }
1138+
: {}),
1139+
} as Message;
1140+
};
1141+
1142+
private isPendingSendMessageTask = (
1143+
task: PendingTask,
1144+
): task is Extract<PendingTask, { type: 'send-message' }> =>
1145+
task.type === 'send-message';
1146+
1147+
private handleOfflineFailedUpdateMessagePendingTask = async (
1148+
task: Extract<PendingTask, { type: 'update-message' }>,
1149+
) => {
1150+
const [message] = task.payload;
1151+
if (!message.id) {
1152+
return;
1153+
}
1154+
1155+
const pendingTasks = await this.getPendingTasks({ messageId: message.id });
1156+
const pendingSendMessageTask = pendingTasks.find(this.isPendingSendMessageTask);
1157+
1158+
if (!pendingSendMessageTask) {
1159+
return;
1160+
}
1161+
1162+
const updatedPendingSendMessage = this.mergeFailedMessageUpdateIntoPendingSendMessage(
1163+
{
1164+
editedMessage: message,
1165+
pendingMessage: pendingSendMessageTask.payload[0],
1166+
},
1167+
);
1168+
1169+
const updatedPendingTask: Extract<PendingTask, { type: 'send-message' }> = {
1170+
...pendingSendMessageTask,
1171+
payload: [updatedPendingSendMessage, pendingSendMessageTask.payload[1]],
1172+
};
1173+
1174+
if (pendingSendMessageTask.id) {
1175+
await this.updatePendingTask({
1176+
id: pendingSendMessageTask.id,
1177+
task: updatedPendingTask,
1178+
});
1179+
return;
1180+
}
1181+
1182+
await this.addPendingTask({
1183+
...updatedPendingTask,
1184+
id: undefined,
1185+
});
1186+
};
1187+
1188+
/**
1189+
* Central ingress for persisting pending tasks. It either stores the task as-is
1190+
* or rewrites an existing pending `send-message` task for offline edits of failed messages.
1191+
*/
1192+
public handleAddPendingTask = async ({ task }: { task: PendingTask }) => {
1193+
if (task.type === 'update-message' && !isMessageUpdateReplayable(task.payload[0])) {
1194+
return;
1195+
}
1196+
1197+
if (
1198+
task.type === 'update-message' &&
1199+
!this.client.wsConnection?.isHealthy &&
1200+
task.payload[0].status === 'failed'
1201+
) {
1202+
await this.handleOfflineFailedUpdateMessagePendingTask(task);
1203+
return;
1204+
}
1205+
1206+
await this.addPendingTask(task);
1207+
};
1208+
10951209
/**
10961210
* Executes a task from the list of supported pending tasks. Currently supported pending tasks
10971211
* are:
1212+
* - Updating a message
10981213
* - Deleting a message
10991214
* - Sending a reaction
11001215
* - Removing a reaction
11011216
* - Sending a message
1217+
* - Creating a draft
1218+
* - Deleting a draft
11021219
* It will throw if we try to execute a pending task that is not supported.
11031220
* @param task - The task we want to execute
11041221
* @param isPendingTask - a control value telling us if it's an actual pending task being executed
@@ -1108,6 +1225,10 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
11081225
{ task }: { task: PendingTask },
11091226
isPendingTask = false,
11101227
) => {
1228+
if (task.type === 'update-message') {
1229+
return await this.client._updateMessage(...task.payload);
1230+
}
1231+
11111232
if (task.type === 'delete-message') {
11121233
return await this.client._deleteMessage(...task.payload);
11131234
}

src/offline-support/types.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,16 @@ export type DBDeletePendingTaskType = {
227227
id: number;
228228
};
229229

230+
/**
231+
* Update a pending task by ID.
232+
*/
233+
export type DBUpdatePendingTaskType = {
234+
/** ID of the pending task. */
235+
id: number;
236+
/** The next task payload to persist. */
237+
task: PendingTask;
238+
};
239+
230240
/**
231241
* Options to delete a reaction from a message.
232242
*/
@@ -372,6 +382,9 @@ export interface OfflineDBApi {
372382
addPendingTask: (task: PendingTask) => Promise<() => Promise<void>>;
373383
getPendingTasks: (conditions?: DBGetPendingTasksType) => Promise<PendingTask[]>;
374384
deleteDraft: (options: DBDeleteDraftType) => Promise<ExecuteBatchDBQueriesType>;
385+
updatePendingTask: (
386+
options: DBUpdatePendingTaskType,
387+
) => Promise<ExecuteBatchDBQueriesType>;
375388
deletePendingTask: (
376389
options: DBDeletePendingTaskType,
377390
) => Promise<ExecuteBatchDBQueriesType>;
@@ -397,6 +410,7 @@ export type OfflineDBState = {
397410
};
398411

399412
export type PendingTaskTypes = {
413+
updateMessage: 'update-message';
400414
deleteMessage: 'delete-message';
401415
deleteReaction: 'delete-reaction';
402416
sendReaction: 'send-reaction';
@@ -417,6 +431,10 @@ export type PendingTask = {
417431
payload: Parameters<Channel['sendReaction']>;
418432
type: PendingTaskTypes['sendReaction'];
419433
}
434+
| {
435+
payload: Parameters<StreamChat['updateMessage']>;
436+
type: PendingTaskTypes['updateMessage'];
437+
}
420438
| {
421439
payload: Parameters<StreamChat['deleteMessage']>;
422440
type: PendingTaskTypes['deleteMessage'];

src/offline-support/util.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import type { Attachment, LocalMessage, MessageResponse } from '../types';
2+
3+
export const isLocalUrl = (value: string | undefined) =>
4+
!!value && !value.startsWith('http');
5+
6+
export const isAttachmentReplayable = (attachment: Attachment) => {
7+
if (!attachment || typeof attachment !== 'object') {
8+
return true;
9+
}
10+
11+
return !isLocalUrl(attachment.asset_url) && !isLocalUrl(attachment.image_url);
12+
};
13+
14+
export const isMessageUpdateReplayable = (
15+
message: LocalMessage | Partial<MessageResponse>,
16+
) => !message.attachments?.some((attachment) => !isAttachmentReplayable(attachment));
17+
18+
export const getPendingTaskChannelData = (cid?: string) => {
19+
if (!cid) {
20+
return {};
21+
}
22+
23+
const separatorIndex = cid.indexOf(':');
24+
if (separatorIndex <= 0 || separatorIndex === cid.length - 1) {
25+
return {};
26+
}
27+
28+
return {
29+
channelId: cid.slice(separatorIndex + 1),
30+
channelType: cid.slice(0, separatorIndex),
31+
};
32+
};

0 commit comments

Comments
 (0)