Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
602 changes: 537 additions & 65 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
{name = "Shantanu Mane",email = "shantanu.mane.200@outlook.com"}
]
license = {file = "LICENSE"}
readme = "README"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"psycopg[binary]",
Expand All @@ -30,6 +30,7 @@ requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
package-mode = false
name = "mpiper"
version = "0.1.0"
description = "A consumer for the mpiper queue system"
Expand All @@ -42,6 +43,7 @@ redis = "^7.1.0"
pillow = "^12.0.0"
psycopg-pool = "^3.3.0"
google-cloud-storage = "^3.7.0"
boto3 = "^1.35.0"
python-dotenv = "^1.2.1"
opentelemetry-api = "^1.39.1"
opentelemetry-sdk = "^1.39.1"
Expand Down
16 changes: 10 additions & 6 deletions worker/consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ class BucketConfig:

@staticmethod
def from_env() -> "BucketConfig":
# S3_* names mirror the Go server (internal/config/env.go); they take
# precedence over the generic BUCKET_* names so a single .env drives
# both services. BUCKET_* remains the fallback / GCS default.
bucket_name = os.getenv("S3_BUCKET_NAME") or os.getenv("BUCKET_NAME", "media-bucket")
return BucketConfig(
provider=os.getenv("BUCKET_PROVIDER", "gcs"),
bucket_name=os.getenv("BUCKET_NAME", "media-bucket"),
region=os.getenv("BUCKET_REGION", "us-east-1"),
access_key=os.getenv("BUCKET_ACCESS_KEY", ""),
secret_key=os.getenv("BUCKET_SECRET_KEY", ""),
endpoint_url=os.getenv("BUCKET_ENDPOINT_URL"),
sa_path=os.getenv("BUCKET_SA_PATH"),
bucket_name=bucket_name,
region=os.getenv("S3_REGION") or os.getenv("BUCKET_REGION", "us-east-1"),
access_key=os.getenv("S3_ACCESS_KEY_ID") or os.getenv("BUCKET_ACCESS_KEY", ""),
secret_key=os.getenv("S3_SECRET_ACCESS_KEY") or os.getenv("BUCKET_SECRET_KEY", ""),
endpoint_url=os.getenv("S3_ENDPOINT_URL") or os.getenv("BUCKET_ENDPOINT_URL"),
sa_path=os.getenv("GCS_SA_PATH") or os.getenv("BUCKET_SA_PATH"),
)


Expand Down
13 changes: 2 additions & 11 deletions worker/consumer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

from urllib.parse import quote_plus

from worker.consumer.config import WorkerConfig, get_config
from worker.consumer.config import get_config
from worker.consumer.consumer import Consumer
from worker.consumer.db import PgPool
from worker.consumer.migrations import run_migrations
from worker.storage.base import StorageX
from worker.storage.gcs import GCSStorage
from worker.storage import get_storage
from worker.utils import metrics as worker_metrics

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,11 +68,3 @@ def _term(signum, frame):

if __name__ == "__main__":
main()


def get_storage(cfg: WorkerConfig) -> StorageX:
return GCSStorage(cfg.bucket.bucket_name, cfg.bucket.sa_path)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion worker/processing/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def process_image_file(

variant_hash = compute_variant_hash(content_hash, params)
key = f"media/processed/{content_hash}/{variant_hash}.{v['format']}"
url = f"https://storage.googleapis.com/{cfg.bucket.bucket_name}/{key}"
url = storage.public_url(key)

with pg_pool.get_pg_conn() as conn:
cur = conn.cursor()
Expand Down
2 changes: 1 addition & 1 deletion worker/processing/videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def ensure_variant_exists(
# CORRECT: Storage key uses variant_hash, not content_hash in path
# This way, identical variants from different content share the same file
key = f"media/variants/{variant_hash[:2]}/{variant_hash}.{ext}"
url = f"https://storage.googleapis.com/{cfg.bucket.bucket_name}/{key}"
url = storage.public_url(key)

with pg_pool.get_pg_conn() as conn:
cur = conn.cursor()
Expand Down
20 changes: 20 additions & 0 deletions worker/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from worker.consumer.config import WorkerConfig
from worker.storage.base import StorageX
from worker.storage.gcs import GCSStorage
from worker.storage.s3 import S3Storage


def get_storage(cfg: WorkerConfig) -> StorageX:
"""Construct the storage backend for the configured provider."""
b = cfg.bucket
if b.provider == "gcs":
return GCSStorage(b.bucket_name, b.sa_path)
if b.provider == "s3":
return S3Storage(
bucket_name=b.bucket_name,
region=b.region,
access_key=b.access_key,
secret_key=b.secret_key,
endpoint_url=b.endpoint_url,
)
raise ValueError(f"unknown storage provider: {b.provider}")
5 changes: 5 additions & 0 deletions worker/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ def get_metadata(self, key: str) -> Dict[str, Any]:
def exists(self, key: str) -> bool:
"""Check if a key exists in storage."""
pass

@abstractmethod
def public_url(self, key: str) -> str:
"""Return the public (unsigned) URL for an object key."""
pass
4 changes: 4 additions & 0 deletions worker/storage/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class GCSStorage(StorageX):
def __init__(self, bucket_name: str, sa_path: str):
self.client = _create_gcs_client(sa_path)
self.bucket = self.client.bucket(bucket_name)
self.bucket_name = bucket_name

def upload_bytes(
self, key: str, data: bytes, content_type: Optional[Any] = None
Expand Down Expand Up @@ -47,3 +48,6 @@ def get_metadata(self, key: str) -> Dict[str, Any]:
def exists(self, key: str) -> bool:
blob = self.bucket.blob(key)
return blob.exists()

def public_url(self, key: str) -> str:
return f"https://storage.googleapis.com/{self.bucket_name}/{key}"
78 changes: 78 additions & 0 deletions worker/storage/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Any, Dict, List, Optional

import boto3
from botocore.client import Config
from botocore.exceptions import ClientError

from worker.storage.base import StorageX


class S3Storage(StorageX):
"""S3 / S3-compatible (MinIO) storage backed by boto3.

When ``endpoint_url`` is set, path-style addressing is used so the same
client works against MinIO and other S3-compatible stores.
"""

def __init__(
self,
bucket_name: str,
region: str,
access_key: str,
secret_key: str,
endpoint_url: Optional[str] = None,
):
self.bucket_name = bucket_name
self.region = region
self.endpoint_url = endpoint_url or None

self.client = boto3.client(
"s3",
region_name=region or None,
aws_access_key_id=access_key or None,
aws_secret_access_key=secret_key or None,
endpoint_url=self.endpoint_url,
config=Config(s3={"addressing_style": "path"}) if self.endpoint_url else None,
)

def upload_bytes(
self, key: str, data: bytes, content_type: Optional[Any] = None
) -> None:
extra = {"ContentType": content_type} if content_type else {}
self.client.put_object(Bucket=self.bucket_name, Key=key, Body=data, **extra)

def download_bytes(self, key: str) -> bytes:
resp = self.client.get_object(Bucket=self.bucket_name, Key=key)
return resp["Body"].read()

def download_to_file(self, key: str, file_path: str) -> None:
self.client.download_file(self.bucket_name, key, file_path)

def delete(self, key: str) -> None:
self.client.delete_object(Bucket=self.bucket_name, Key=key)

def list_keys(self) -> List[str]:
keys: List[str] = []
paginator = self.client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self.bucket_name):
keys.extend(obj["Key"] for obj in page.get("Contents", []))
return keys

def get_metadata(self, key: str) -> Dict[str, Any]:
resp = self.client.head_object(Bucket=self.bucket_name, Key=key)
return resp.get("Metadata", {})

def exists(self, key: str) -> bool:
try:
self.client.head_object(Bucket=self.bucket_name, Key=key)
return True
except ClientError as e:
if e.response["Error"]["Code"] in ("404", "NoSuchKey", "NotFound"):
return False
raise

def public_url(self, key: str) -> str:
if self.endpoint_url:
# path-style for MinIO / S3-compatible endpoints
return f"{self.endpoint_url.rstrip('/')}/{self.bucket_name}/{key}"
return f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{key}"