Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ export function createDownloadUrlPlugin(
: null;
if (withPgClient) {
const resolved = await withPgClient(null, async (pgClient: any) => {
const dbResult = await pgClient.query(
`SELECT jwt_private.current_database_id() AS id`,
);
const dbResult = await pgClient.query({
text: `SELECT jwt_private.current_database_id() AS id`,
});
const databaseId = dbResult.rows[0]?.id;
if (!databaseId) return null;
const config = await getStorageModuleConfig(pgClient, databaseId);
Expand Down
101 changes: 44 additions & 57 deletions graphile/graphile-presigned-url-plugin/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ function buildS3Key(contentHash: string): string {
* metaschema query needed.
*/
async function resolveDatabaseId(pgClient: any): Promise<string | null> {
const result = await pgClient.query(
`SELECT jwt_private.current_database_id() AS id`,
);
const result = await pgClient.query({
text: `SELECT jwt_private.current_database_id() AS id`,
});
return result.rows[0]?.id ?? null;
}

Expand Down Expand Up @@ -235,15 +235,14 @@ export function createPresignedUrlPlugin(
}

return withPgClient(pgSettings, async (pgClient: any) => {
await pgClient.query('BEGIN');
try {
return pgClient.withTransaction(async (txClient: any) => {
// --- Resolve storage module config (all limits come from here) ---
const databaseId = await resolveDatabaseId(pgClient);
const databaseId = await resolveDatabaseId(txClient);
if (!databaseId) {
throw new Error('DATABASE_NOT_FOUND');
}

const storageConfig = await getStorageModuleConfig(pgClient, databaseId);
const storageConfig = await getStorageModuleConfig(txClient, databaseId);
if (!storageConfig) {
throw new Error('STORAGE_MODULE_NOT_PROVISIONED');
}
Expand All @@ -259,7 +258,7 @@ export function createPresignedUrlPlugin(
}

// --- Look up the bucket (cached; first miss queries via RLS) ---
const bucket = await getBucketConfig(pgClient, storageConfig, databaseId, bucketKey);
const bucket = await getBucketConfig(txClient, storageConfig, databaseId, bucketKey);
if (!bucket) {
throw new Error('BUCKET_NOT_FOUND');
}
Expand Down Expand Up @@ -288,29 +287,28 @@ export function createPresignedUrlPlugin(
const s3Key = buildS3Key(contentHash);

// --- Dedup check: look for existing file with same content_hash in this bucket ---
const dedupResult = await pgClient.query(
`SELECT id, status
const dedupResult = await txClient.query({
text: `SELECT id, status
FROM ${storageConfig.filesQualifiedName}
WHERE content_hash = $1
AND bucket_id = $2
AND status IN ('ready', 'processed')
LIMIT 1`,
[contentHash, bucket.id],
);
values: [contentHash, bucket.id],
});

if (dedupResult.rows.length > 0) {
const existingFile = dedupResult.rows[0];
log.info(`Dedup hit: file ${existingFile.id} for hash ${contentHash}`);

// Track the dedup request
await pgClient.query(
`INSERT INTO ${storageConfig.uploadRequestsQualifiedName}
await txClient.query({
text: `INSERT INTO ${storageConfig.uploadRequestsQualifiedName}
(file_id, bucket_id, key, content_type, content_hash, size, status, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, 'confirmed', NOW())`,
[existingFile.id, bucket.id, s3Key, contentType, contentHash, size],
);
values: [existingFile.id, bucket.id, s3Key, contentType, contentHash, size],
});

await pgClient.query('COMMIT');
return {
uploadUrl: null,
fileId: existingFile.id,
Expand All @@ -321,12 +319,12 @@ export function createPresignedUrlPlugin(
}

// --- Create file record (status=pending) ---
const fileResult = await pgClient.query(
`INSERT INTO ${storageConfig.filesQualifiedName}
const fileResult = await txClient.query({
text: `INSERT INTO ${storageConfig.filesQualifiedName}
(bucket_id, key, content_type, content_hash, size, filename, owner_id, is_public, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending')
RETURNING id`,
[
values: [
bucket.id,
s3Key,
contentType,
Expand All @@ -336,7 +334,7 @@ export function createPresignedUrlPlugin(
bucket.owner_id,
bucket.is_public,
],
);
});

const fileId = fileResult.rows[0].id;

Expand All @@ -356,25 +354,21 @@ export function createPresignedUrlPlugin(
const expiresAt = new Date(Date.now() + storageConfig.uploadUrlExpirySeconds * 1000).toISOString();

// --- Track the upload request ---
await pgClient.query(
`INSERT INTO ${storageConfig.uploadRequestsQualifiedName}
await txClient.query({
text: `INSERT INTO ${storageConfig.uploadRequestsQualifiedName}
(file_id, bucket_id, key, content_type, content_hash, size, status, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, 'issued', $7)`,
[fileId, bucket.id, s3Key, contentType, contentHash, size, expiresAt],
);
values: [fileId, bucket.id, s3Key, contentType, contentHash, size, expiresAt],
});

await pgClient.query('COMMIT');
return {
uploadUrl,
fileId,
key: s3Key,
deduplicated: false,
expiresAt,
};
} catch (err) {
await pgClient.query('ROLLBACK');
throw err;
}
});
});
});
},
Expand All @@ -397,27 +391,26 @@ export function createPresignedUrlPlugin(
}

return withPgClient(pgSettings, async (pgClient: any) => {
await pgClient.query('BEGIN');
try {
return pgClient.withTransaction(async (txClient: any) => {
// --- Resolve storage module config ---
const databaseId = await resolveDatabaseId(pgClient);
const databaseId = await resolveDatabaseId(txClient);
if (!databaseId) {
throw new Error('DATABASE_NOT_FOUND');
}

const storageConfig = await getStorageModuleConfig(pgClient, databaseId);
const storageConfig = await getStorageModuleConfig(txClient, databaseId);
if (!storageConfig) {
throw new Error('STORAGE_MODULE_NOT_PROVISIONED');
}

// --- Look up the file (RLS enforced) ---
const fileResult = await pgClient.query(
`SELECT id, key, content_type, status, bucket_id
const fileResult = await txClient.query({
text: `SELECT id, key, content_type, status, bucket_id
FROM ${storageConfig.filesQualifiedName}
WHERE id = $1
LIMIT 1`,
[fileId],
);
values: [fileId],
});

if (fileResult.rows.length === 0) {
throw new Error('FILE_NOT_FOUND');
Expand All @@ -427,7 +420,6 @@ export function createPresignedUrlPlugin(

if (file.status !== 'pending') {
// File is already confirmed or processed — idempotent success
await pgClient.query('COMMIT');
return {
fileId: file.id,
status: file.status,
Expand All @@ -446,45 +438,40 @@ export function createPresignedUrlPlugin(
// --- Content-type verification ---
if (s3Head.contentType && s3Head.contentType !== file.content_type) {
// Mark upload_request as rejected
await pgClient.query(
`UPDATE ${storageConfig.uploadRequestsQualifiedName}
await txClient.query({
text: `UPDATE ${storageConfig.uploadRequestsQualifiedName}
SET status = 'rejected'
WHERE file_id = $1 AND status = 'issued'`,
[fileId],
);
values: [fileId],
});

await pgClient.query('COMMIT');
throw new Error(
`CONTENT_TYPE_MISMATCH: expected ${file.content_type}, got ${s3Head.contentType}`,
);
}

// --- Transition file to 'ready' ---
await pgClient.query(
`UPDATE ${storageConfig.filesQualifiedName}
await txClient.query({
text: `UPDATE ${storageConfig.filesQualifiedName}
SET status = 'ready'
WHERE id = $1`,
[fileId],
);
values: [fileId],
});

// --- Update upload_request to 'confirmed' ---
await pgClient.query(
`UPDATE ${storageConfig.uploadRequestsQualifiedName}
await txClient.query({
text: `UPDATE ${storageConfig.uploadRequestsQualifiedName}
SET status = 'confirmed', confirmed_at = NOW()
WHERE file_id = $1 AND status = 'issued'`,
[fileId],
);
values: [fileId],
});

await pgClient.query('COMMIT');
return {
fileId: file.id,
status: 'ready',
success: true,
};
} catch (err) {
await pgClient.query('ROLLBACK');
throw err;
}
});
});
});
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ interface StorageModuleRow {
* @returns StorageModuleConfig or null if no storage module is provisioned
*/
export async function getStorageModuleConfig(
pgClient: { query: (sql: string, params: unknown[]) => Promise<{ rows: unknown[] }> },
pgClient: { query: (opts: { text: string; values?: unknown[] }) => Promise<{ rows: unknown[] }> },
databaseId: string,
): Promise<StorageModuleConfig | null> {
const cacheKey = `storage:${databaseId}`;
Expand All @@ -102,7 +102,7 @@ export async function getStorageModuleConfig(

log.debug(`Cache miss for database ${databaseId}, querying metaschema...`);

const result = await pgClient.query(STORAGE_MODULE_QUERY, [databaseId]);
const result = await pgClient.query({ text: STORAGE_MODULE_QUERY, values: [databaseId] });
if (result.rows.length === 0) {
log.warn(`No storage module found for database ${databaseId}`);
return null;
Expand Down Expand Up @@ -172,7 +172,7 @@ const bucketCache = new LRUCache<string, BucketConfig>({
* @returns BucketConfig or null if the bucket doesn't exist / isn't accessible
*/
export async function getBucketConfig(
pgClient: { query: (sql: string, params: unknown[]) => Promise<{ rows: unknown[] }> },
pgClient: { query: (opts: { text: string; values?: unknown[] }) => Promise<{ rows: unknown[] }> },
storageConfig: StorageModuleConfig,
databaseId: string,
bucketKey: string,
Expand All @@ -185,13 +185,13 @@ export async function getBucketConfig(

log.debug(`Bucket cache miss for ${databaseId}:${bucketKey}, querying DB...`);

const result = await pgClient.query(
`SELECT id, key, type, is_public, owner_id, allowed_mime_types, max_file_size
const result = await pgClient.query({
text: `SELECT id, key, type, is_public, owner_id, allowed_mime_types, max_file_size
FROM ${storageConfig.bucketsQualifiedName}
WHERE key = $1
LIMIT 1`,
[bucketKey],
);
values: [bucketKey],
});

if (result.rows.length === 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- Schema creation for simple-seed-storage test scenario
-- Creates the app schema with storage tables (buckets, files, upload_requests)

-- Create app schemas
CREATE SCHEMA IF NOT EXISTS "simple-storage-public";

-- Grant schema usage
GRANT USAGE ON SCHEMA "simple-storage-public" TO administrator, authenticated, anonymous;

-- Set default privileges
ALTER DEFAULT PRIVILEGES IN SCHEMA "simple-storage-public"
GRANT ALL ON TABLES TO administrator;
ALTER DEFAULT PRIVILEGES IN SCHEMA "simple-storage-public"
GRANT USAGE ON SEQUENCES TO administrator, authenticated;
ALTER DEFAULT PRIVILEGES IN SCHEMA "simple-storage-public"
GRANT ALL ON FUNCTIONS TO administrator, authenticated, anonymous;

-- =====================================================
-- STORAGE TABLES (mirroring what the storage module generator creates)
-- =====================================================

-- Buckets table
CREATE TABLE IF NOT EXISTS "simple-storage-public".buckets (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
key text NOT NULL,
type text NOT NULL DEFAULT 'private',
is_public boolean NOT NULL DEFAULT false,
owner_id uuid,
allowed_mime_types text[] NULL,
max_file_size bigint NULL,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now(),
UNIQUE (key)
);

COMMENT ON TABLE "simple-storage-public".buckets IS E'@storageBuckets\nStorage buckets table';

-- Files table
CREATE TABLE IF NOT EXISTS "simple-storage-public".files (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
bucket_id uuid NOT NULL REFERENCES "simple-storage-public".buckets(id),
key text NOT NULL,
content_type text NOT NULL,
content_hash text,
size bigint,
filename text,
owner_id uuid,
is_public boolean NOT NULL DEFAULT false,
status text NOT NULL DEFAULT 'pending',
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);

COMMENT ON TABLE "simple-storage-public".files IS E'@storageFiles\nStorage files table';

-- Upload requests table
CREATE TABLE IF NOT EXISTS "simple-storage-public".upload_requests (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
file_id uuid NOT NULL REFERENCES "simple-storage-public".files(id),
bucket_id uuid NOT NULL REFERENCES "simple-storage-public".buckets(id),
key text NOT NULL,
content_type text NOT NULL,
content_hash text,
size bigint,
status text NOT NULL DEFAULT 'issued',
expires_at timestamptz,
confirmed_at timestamptz,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);

-- Grant table permissions (allow anonymous to do CRUD for tests — no RLS)
GRANT SELECT, INSERT, UPDATE, DELETE ON "simple-storage-public".buckets TO administrator, authenticated, anonymous;
GRANT SELECT, INSERT, UPDATE, DELETE ON "simple-storage-public".files TO administrator, authenticated, anonymous;
GRANT SELECT, INSERT, UPDATE, DELETE ON "simple-storage-public".upload_requests TO administrator, authenticated, anonymous;
Loading
Loading