diff --git a/apps/backend/package.json b/apps/backend/package.json index 5d8b3bd..fd3338f 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -25,6 +25,8 @@ "license": "ISC", "packageManager": "pnpm@10.28.1", "dependencies": { + "@aws-sdk/client-s3": "^3.500.0", + "@aws-sdk/s3-request-presigner": "^3.500.0", "@socket.io/redis-adapter": "^8.3.0", "@stellar/stellar-sdk": "^15.1.0", "cors": "^2.8.6", diff --git a/apps/backend/src/app.ts b/apps/backend/src/app.ts index 2f2339b..e32ede9 100644 --- a/apps/backend/src/app.ts +++ b/apps/backend/src/app.ts @@ -11,6 +11,7 @@ import { devicesRouter } from './routes/devices.js'; import { messagesRouter } from './routes/messages.js'; import { usersRouter } from './routes/users.js'; import { treasuryRouter } from './routes/treasury.js'; +import { uploadsRouter } from './routes/uploads.js'; import { requireAuth, type AuthRequest } from './middleware/auth.js'; const packageJson = JSON.parse( @@ -51,6 +52,7 @@ app.use('/devices', devicesRouter); app.use('/messages', messagesRouter); app.use('/users', usersRouter); app.use('/treasury', treasuryRouter); +app.use('/uploads', uploadsRouter); app.get('/me', requireAuth, (req, res) => { res.json({ user: (req as AuthRequest).auth }); diff --git a/apps/backend/src/cron/garbageCollection.ts b/apps/backend/src/cron/garbageCollection.ts new file mode 100644 index 0000000..feaadfa --- /dev/null +++ b/apps/backend/src/cron/garbageCollection.ts @@ -0,0 +1,52 @@ +import { eq, and, lt } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { files } from '../db/schema.js'; +import { DeleteObjectCommand } from '@aws-sdk/client-s3'; +import { s3Client, S3_BUCKET } from '../lib/s3.js'; + +const ONE_HOUR_MS = 60 * 60 * 1000; + +export async function garbageCollectPendingFiles() { + try { + const oneHourAgo = new Date(Date.now() - ONE_HOUR_MS); + + // Find unconfirmed pending files older than 1 hour + const pendingFiles = await db.query.files.findMany({ + where: and( + eq(files.status, 'pending'), + lt(files.createdAt, oneHourAgo) + ) + }); + + if (pendingFiles.length === 0) return; + + for (const file of pendingFiles) { + try { + // Attempt to delete from S3 just in case the file was uploaded but not confirmed + const command = new DeleteObjectCommand({ + Bucket: S3_BUCKET, + Key: file.objectKey, + }); + await s3Client.send(command).catch(err => { + console.warn(`Failed to delete object from S3: ${file.objectKey}`, err); + }); + + // Delete from database + await db.delete(files).where(eq(files.id, file.id)); + + console.log(`Garbage collected pending file: ${file.id}`); + } catch (err) { + console.error(`Error garbage collecting file ${file.id}:`, err); + } + } + } catch (error) { + console.error('Error in garbage collection:', error); + } +} + +export function startGarbageCollectionCron() { + // Run every 10 minutes + setInterval(() => { + void garbageCollectPendingFiles(); + }, 10 * 60 * 1000); +} diff --git a/apps/backend/src/db/schema.ts b/apps/backend/src/db/schema.ts index 9e09e99..35e8c57 100644 --- a/apps/backend/src/db/schema.ts +++ b/apps/backend/src/db/schema.ts @@ -57,6 +57,21 @@ export const conversationMembers = pgTable('conversation_members', { joinedAt: timestamp('joined_at').notNull().defaultNow(), }); +export const fileStatusEnum = pgEnum('file_status', ['pending', 'ready']); + +export const files = pgTable('files', { + id: uuid('id').primaryKey().defaultRandom(), + uploaderId: uuid('uploader_id') + .notNull() + .references(() => users.id, { onDelete: 'cascade' }), + objectKey: text('object_key').notNull(), + status: fileStatusEnum('status').notNull().default('pending'), + size: integer('size').notNull(), + sha256: text('sha256'), + createdAt: timestamp('created_at').notNull().defaultNow(), + updatedAt: timestamp('updated_at').notNull().defaultNow(), +}); + export const messages = pgTable( 'messages', { @@ -68,6 +83,7 @@ export const messages = pgTable( .notNull() .references(() => users.id, { onDelete: 'cascade' }), content: text('content').notNull(), + fileId: uuid('file_id').references(() => files.id, { onDelete: 'set null' }), createdAt: timestamp('created_at').notNull().defaultNow(), deletedAt: timestamp('deleted_at'), }, @@ -236,6 +252,7 @@ export const usersRelations = relations(users, ({ many }) => ({ wallets: many(wallets), memberships: many(conversationMembers), messages: many(messages), + files: many(files), transfers: many(tokenTransfers), devices: many(devices), })); @@ -265,6 +282,12 @@ export const messagesRelations = relations(messages, ({ one }) => ({ references: [conversations.id], }), sender: one(users, { fields: [messages.senderId], references: [users.id] }), + file: one(files, { fields: [messages.fileId], references: [files.id] }), +})); + +export const filesRelations = relations(files, ({ one, many }) => ({ + uploader: one(users, { fields: [files.uploaderId], references: [users.id] }), + messages: many(messages), })); export const tokenTransfersRelations = relations(tokenTransfers, ({ one }) => ({ @@ -311,3 +334,5 @@ export type SignedPreKey = typeof signedPreKeys.$inferSelect; export type NewSignedPreKey = typeof signedPreKeys.$inferInsert; export type OneTimePreKey = typeof oneTimePreKeys.$inferSelect; export type NewOneTimePreKey = typeof oneTimePreKeys.$inferInsert; +export type File = typeof files.$inferSelect; +export type NewFile = typeof files.$inferInsert; diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index f8d60b7..8285959 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -18,6 +18,7 @@ import { runForever as runStellarListener, } from './services/stellarListener.js'; import { loadEnv } from './config.js'; +import { startGarbageCollectionCron } from './cron/garbageCollection.js'; dotenv.config(); @@ -123,6 +124,8 @@ httpServer.listen(PORT, () => { // Redis is unreachable; on failure we fall back to the in-process adapter. void attachRedisAdapter(); +startGarbageCollectionCron(); + // #46 — Stellar transfer event listener. Only spin up when the contract // id is configured so local-dev and unit-test runs don't try to talk to // Soroban RPC. The listener never throws out of runForever, so a failed diff --git a/apps/backend/src/lib/s3.ts b/apps/backend/src/lib/s3.ts new file mode 100644 index 0000000..530ecb2 --- /dev/null +++ b/apps/backend/src/lib/s3.ts @@ -0,0 +1,14 @@ +import { S3Client } from '@aws-sdk/client-s3'; + +export const s3Client = new S3Client({ + region: process.env.AWS_REGION || 'us-east-1', + credentials: { + accessKeyId: process.env.AWS_ACCESS_KEY_ID || '', + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || '', + }, + // If using MinIO or R2, an endpoint can be provided + endpoint: process.env.S3_ENDPOINT, + forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true', +}); + +export const S3_BUCKET = process.env.S3_BUCKET || 'clicked-uploads'; diff --git a/apps/backend/src/routes/uploads.ts b/apps/backend/src/routes/uploads.ts new file mode 100644 index 0000000..bc919f8 --- /dev/null +++ b/apps/backend/src/routes/uploads.ts @@ -0,0 +1,127 @@ +import { Router } from 'express'; +import type { IRouter } from 'express'; +import { eq, and } from 'drizzle-orm'; +import { HeadObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3'; +import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; +import { z } from 'zod'; +import { db } from '../db/index.js'; +import { files } from '../db/schema.js'; +import { requireAuth, type AuthRequest } from '../middleware/auth.js'; +import { s3Client, S3_BUCKET } from '../lib/s3.js'; +import { validate } from '../middleware/validate.js'; + +export const uploadsRouter: IRouter = Router(); + +uploadsRouter.use(requireAuth); + +const RequestUploadSchema = z.object({ + size: z.number().int().positive(), + sha256: z.string().optional(), + mimeType: z.string().optional().default('application/octet-stream'), +}); + +// 1. Request upload URL +uploadsRouter.post('/', validate(RequestUploadSchema), async (req: AuthRequest, res) => { + const userId = req.auth!.userId; + const { size, sha256, mimeType } = req.body as z.infer; + + try { + // We insert a pending file record + const [file] = await db + .insert(files) + .values({ + uploaderId: userId, + objectKey: `uploads/${userId}/${Date.now()}_${Math.random().toString(36).substring(7)}`, + size, + sha256, + status: 'pending', + }) + .returning(); + + // Generate a presigned URL for upload + const command = new PutObjectCommand({ + Bucket: S3_BUCKET, + Key: file.objectKey, + ContentType: mimeType, + ...(sha256 ? { ChecksumSHA256: sha256 } : {}), + }); + + const presignedUrl = await getSignedUrl(s3Client, command, { expiresIn: 3600 }); + + res.status(201).json({ + fileId: file.id, + uploadUrl: presignedUrl, + objectKey: file.objectKey, + }); + } catch (error) { + console.error('Error creating presigned URL:', error); + res.status(500).json({ error: 'Failed to request upload' }); + } +}); + +// 2. Complete/confirm upload +uploadsRouter.post('/:id/complete', async (req: AuthRequest, res) => { + const userId = req.auth!.userId; + const fileId = req.params['id'] as string; + + try { + const file = await db.query.files.findFirst({ + where: and(eq(files.id, fileId), eq(files.uploaderId, userId)), + }); + + if (!file) { + res.status(404).json({ error: 'File not found' }); + return; + } + + if (file.status === 'ready') { + res.status(200).json({ fileId: file.id, status: 'ready' }); + return; + } + + // Server HEADs the object + const command = new HeadObjectCommand({ + Bucket: S3_BUCKET, + Key: file.objectKey, + }); + + const headResponse = await s3Client.send(command); + + // Verify size matches + if (headResponse.ContentLength !== file.size) { + res.status(400).json({ + error: 'Size mismatch', + expected: file.size, + actual: headResponse.ContentLength + }); + return; + } + + // Optionally verify ciphertext sha256 (if provided in the database and by S3) + // S3 might return it in ChecksumSHA256 depending on how it was uploaded + if (file.sha256 && headResponse.ChecksumSHA256 && headResponse.ChecksumSHA256 !== file.sha256) { + res.status(400).json({ + error: 'Hash mismatch', + expected: file.sha256, + actual: headResponse.ChecksumSHA256 + }); + return; + } + + // Flip the file to ready + const [updatedFile] = await db + .update(files) + .set({ status: 'ready', updatedAt: new Date() }) + .where(eq(files.id, fileId)) + .returning(); + + res.status(200).json(updatedFile); + } catch (error: any) { + if (error.name === 'NotFound') { + res.status(400).json({ error: 'Object not found in storage. Ensure upload is complete.' }); + return; + } + console.error('Error completing upload:', error); + res.status(500).json({ error: 'Failed to complete upload' }); + } +}); diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 17d3bab..ef05f99 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -1,7 +1,7 @@ import type { Server } from 'socket.io'; import { and, eq, lt, desc, sql } from 'drizzle-orm'; import { db } from '../db/index.js'; -import { conversations, conversationMembers, messages } from '../db/schema.js'; +import { conversations, conversationMembers, messages, files } from '../db/schema.js'; import type { AuthSocket } from '../middleware/socketAuth.js'; import { invalidateConversationCaches } from '../lib/conversationCache.js'; import { serializeMessage } from '../lib/messages.js'; @@ -37,11 +37,11 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void // ── send_message ─────────────────────────────────────────────────────────── // Payload: { conversationId: string; content: string } // Persists the message and broadcasts it to all room members. - socket.on('send_message', async (payload: { conversationId: string; content: string }) => { - const { conversationId, content } = payload; + socket.on('send_message', async (payload: { conversationId: string; content: string; fileId?: string }) => { + const { conversationId, content, fileId } = payload; - if (!content?.trim()) { - socket.emit('error', { event: 'send_message', message: 'Content must not be empty' }); + if (!content?.trim() && !fileId) { + socket.emit('error', { event: 'send_message', message: 'Content or file must be provided' }); return; } @@ -57,9 +57,23 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } + if (fileId) { + const file = await db.query.files.findFirst({ + where: eq(files.id, fileId), + }); + if (!file) { + socket.emit('error', { event: 'send_message', message: 'File not found' }); + return; + } + if (file.status !== 'ready') { + socket.emit('error', { event: 'send_message', message: 'File is not ready' }); + return; + } + } + const [message] = await db .insert(messages) - .values({ conversationId, senderId: userId, content: content.trim() }) + .values({ conversationId, senderId: userId, content: content?.trim() ?? '', fileId }) .returning(); io.to(conversationId).emit('new_message', message);