diff --git a/.gitignore b/.gitignore index 674cf51..aded70f 100644 --- a/.gitignore +++ b/.gitignore @@ -113,4 +113,8 @@ API_DOCUMENTATION.md .kimchi/ -.codegraph/ \ No newline at end of file +.codegraph/ +# local e2e verification (do not commit) +.env.local +.secrets/ +docker-compose.override.yml diff --git a/go.mod b/go.mod index fc97cc8..97919fb 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.25.0 require ( cloud.google.com/go/storage v1.58.0 + github.com/aws/aws-sdk-go-v2 v1.42.0 + github.com/aws/aws-sdk-go-v2/config v1.32.25 + github.com/aws/aws-sdk-go-v2/credentials v1.19.24 + github.com/aws/aws-sdk-go-v2/service/s3 v1.103.3 github.com/go-chi/chi/v5 v5.2.3 github.com/go-chi/cors v1.2.2 github.com/google/uuid v1.6.0 @@ -24,6 +28,23 @@ require ( google.golang.org/grpc v1.77.0 ) +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.13 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.22 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.29 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.2.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.31.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.43.3 // indirect + github.com/aws/smithy-go v1.27.1 // indirect +) + require ( cel.dev/expr v0.24.0 // indirect cloud.google.com/go v0.123.0 // indirect @@ -66,7 +87,7 @@ require ( golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/text v0.31.0 // indirect - golang.org/x/time v0.14.0 // indirect + golang.org/x/time v0.14.0 google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect diff --git a/go.sum b/go.sum index 95d8d55..b24f082 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,42 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA= +github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.13 h1:p1BBrg/Hhp6uK7zpejeI8QFXHJeC/mynzi04Sl03k9g= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.13/go.mod h1:8cIfkE9MDhkRZGpQ22aV6/lkYeYSozpz16Smrs5x4Ls= +github.com/aws/aws-sdk-go-v2/config v1.32.25 h1:ACCejvStYoilgwrfegSt5ZntCbPrk52qfwyNcnl3omM= +github.com/aws/aws-sdk-go-v2/config v1.32.25/go.mod h1:LJyU8sDRbXUxFn8xMJIGP+v9QYYwveNLI8a/giAOiAs= +github.com/aws/aws-sdk-go-v2/credentials v1.19.24 h1:2hQqYCV9yqyePQ9o6dCrZc/zO8U3TwPr9mIKlZnPu/I= +github.com/aws/aws-sdk-go-v2/credentials v1.19.24/go.mod h1:IDwpACtwqHLISdzfwUUNq4P9DsB/h5BLg4FwJPNfqFY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.29 h1:r6qZHbT+wxgWO/e9vYNUEtg7lv5+UN3pRqKhLXvnArg= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.29/go.mod h1:QRnaRcTVGKPGRy8w78HMQtKUGRYcnMZAANATkeVA6Mo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30 h1:VTGy885W5DKBxWRUJbym9hytNaYzsyaPkCHGRRMAOhU= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30/go.mod h1:AS0HycUvJRFvTt613AYDOgO2jzw+00cVSMny8XB3yMY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 h1:ZD2+BSw9vFsNlKYIasSNt3uDbjqqXIBcM13UJv/Lx2k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12/go.mod h1:Ms4zlcVBbXbiP7EVLhl+lgjvA/a7YphqQ3Ih3174EmI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.22 h1:V51LGlOq/1VsDsHUdoklAQi7rMmx4qQubvFYAlP2254= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.22/go.mod h1:4Pzhyz8hJOm2bepgl+NjvRx8vlUFAIIvJnZ/MkcNPpU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 h1:DRebniUGZ2MqiiIVmQJ04vIXr918hubdHMnarSLEWyU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29/go.mod h1:LfRkPCD8YHDM2E5eTkos2UpwYeZnBcVarTa8L59bJHA= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.29 h1:hiME6pBzC7OTl9LMtlyTWBuEl1f4QBcUmFDKC7MLXtc= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.29/go.mod h1:G7RP+uhagpKtKhd1BM9N6JQqjCcGEU47K5lBVZQyRQw= +github.com/aws/aws-sdk-go-v2/service/s3 v1.103.3 h1:JRseEu/vIDMaWis4bSw0QbXL+cvIGc1XnX076H5ZXLE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.103.3/go.mod h1:77ZAgynvx1txMvDG8gGWoWkO1augYDxkp9JElWFgjQU= +github.com/aws/aws-sdk-go-v2/service/signin v1.2.0 h1:3nXpRcFwRCW8n7HgO2QGy0Dc20eQNfBuUemGQhpF8m8= +github.com/aws/aws-sdk-go-v2/service/signin v1.2.0/go.mod h1:LxYujSTLPRlp2vTtcUO/+1ilrew8ytt6SvQyOgejzFQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.31.3 h1:ey1XLTYXb9PcLt4535632o5kCGXNXEhNb620Dqwuylo= +github.com/aws/aws-sdk-go-v2/service/sso v1.31.3/go.mod h1:Lk7PlmoTYryQmyBG0EXqj5BcUbj3whXdU2s3yGI3EAc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.6 h1:yLr03zQE/5Eu5l3QU0Si+xMbLMbSDF2YXsigqXngs6g= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.6/go.mod h1:Q5N6icH+KJZDLh+ESNwzdv6cZ6vLFF/egy3IOxWhmz4= +github.com/aws/aws-sdk-go-v2/service/sts v1.43.3 h1:VrIhKRCSK1umelSgB9RghvA9RTUYeQffyAS5ApXehNI= +github.com/aws/aws-sdk-go-v2/service/sts v1.43.3/go.mod h1:r8wkDOuLaaMFqFiYAb8dGY2A3gJCOujMc6CFOVC4Zhc= +github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8= +github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -145,7 +181,6 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.6 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU= go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 h1:cEf8jF6WbuGQWUVcqgyWtTR0kOOAWY1DYZ+UhvdmQPw= @@ -160,14 +195,12 @@ go.opentelemetry.io/otel/log v0.20.0 h1:/5i0vuHxCLWUfChWG41K9wkM0jafruPw9NU1/RCJ go.opentelemetry.io/otel/log v0.20.0/go.mod h1:wOcMcjsZpG8x7Bak7IhSi/lg8wscV2C1VdrKCLPlt0E= go.opentelemetry.io/otel/log/logtest v0.20.0 h1:+tsZVE15N+RWyN9lUzsRyw7hMZXNMepGu105Eim82/k= go.opentelemetry.io/otel/log/logtest v0.20.0/go.mod h1:zS9Ryx9RrEAG2tgapMBSvacwhVSSOGSaSiWWgW3NPlQ= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc= go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo= go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk= go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= diff --git a/internal/config/env.go b/internal/config/env.go index 981812e..a000571 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -61,9 +61,19 @@ type GCSConfig struct { SAPath string } +type S3Config struct { + Bucket string + Region string + AccessKeyID string + SecretAccessKey string + EndpointURL string // optional — set for MinIO / S3-compatible stores +} + type StorageConfig struct { Provider string + Bucket string GCS GCSConfig + S3 S3Config } type EnvConfig struct { @@ -198,9 +208,17 @@ func GetEnvConfig(envFile string) (EnvConfig, error) { }, Storage: StorageConfig{ Provider: envOr("BUCKET_PROVIDER", "gcs"), + Bucket: envOr("BUCKET_NAME", "mpiper"), GCS: GCSConfig{ SAPath: os.Getenv("GCS_SA_PATH"), }, + S3: S3Config{ + Bucket: envOr("S3_BUCKET_NAME", envOr("BUCKET_NAME", "mpiper")), + Region: os.Getenv("S3_REGION"), + AccessKeyID: os.Getenv("S3_ACCESS_KEY_ID"), + SecretAccessKey: os.Getenv("S3_SECRET_ACCESS_KEY"), + EndpointURL: os.Getenv("S3_ENDPOINT_URL"), + }, }, CORSAllowedOrigins: corsOrigins, LogLevel: envOr("LOG_LEVEL", "INFO"), diff --git a/internal/router/router.go b/internal/router/router.go index d8a90b0..64bb72d 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -19,7 +19,6 @@ import ( "github.com/rndmcodeguy20/mpiper/internal/service" applogger "github.com/rndmcodeguy20/mpiper/pkg/logger" "github.com/rndmcodeguy20/mpiper/pkg/utils" - "github.com/rndmcodeguy20/mpiper/pkg/utils/storagex" "golang.org/x/time/rate" ) @@ -114,7 +113,7 @@ func NewRouter(cfg config.EnvConfig, db *sqlx.DB, m *metrics.Metrics) *chi.Mux { r.Use(appMiddleware.SlowRequestMiddleware(logger, 2*time.Second)) assetRepo := repository.NewAssetRepository(db, logger, m) - assetSvc := service.NewAssetService(&cfg.Redis, storagex.GCPProvider, assetRepo, logger, m) + assetSvc := service.NewAssetService(&cfg.Redis, assetRepo, logger, m) assetHandler := handler.NewAssetHandler(assetSvc, logger, m) // Routes diff --git a/internal/service/asset.go b/internal/service/asset.go index b5a5d8a..8e1d48a 100644 --- a/internal/service/asset.go +++ b/internal/service/asset.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/google/uuid" @@ -30,25 +31,31 @@ type assetService struct { assetRepo repository.AssetRepository logger *zap.Logger storageClient storagex.StorageX + bucket string queue *queue.RedisQueue m *metrics.Metrics } -func NewAssetService(redisCfg *config.RedisConfig, provider storagex.Provider, assetRepo repository.AssetRepository, logger *zap.Logger, m *metrics.Metrics) AssetService { - var storageClient storagex.StorageX - var err error +func NewAssetService(redisCfg *config.RedisConfig, assetRepo repository.AssetRepository, logger *zap.Logger, m *metrics.Metrics) AssetService { ctx := context.Background() - switch provider { - case storagex.GCPProvider: - saPath := config.MustGet().Storage.GCS.SAPath - if saPath == "" { - logger.Sugar().Fatalf("GCS_SA_PATH is not set") - } - storageClient, err = storagex.NewGCSStorageFromServiceAccountJSON(ctx, saPath, m, logger) - default: - logger.Sugar().Fatalf("Unsupported storage provider: %v", provider) + storeCfg := config.MustGet().Storage + + // Effective bucket: S3 may override via S3_BUCKET_NAME, otherwise BUCKET_NAME. + bucket := storeCfg.Bucket + switch storagex.Provider(strings.ToLower(storeCfg.Provider)) { + case storagex.S3Provider, storagex.MinIOProvider: + bucket = storeCfg.S3.Bucket } + storageClient, err := storagex.New(ctx, storagex.Config{ + Provider: storagex.Provider(storeCfg.Provider), + Bucket: bucket, + Region: storeCfg.S3.Region, + Endpoint: storeCfg.S3.EndpointURL, + AccessKeyID: storeCfg.S3.AccessKeyID, + SecretAccessKey: storeCfg.S3.SecretAccessKey, + GCPServiceAccount: storeCfg.GCS.SAPath, + }, m, logger) if err != nil { logger.Sugar().Fatalf("Failed to initialize storage client: %v", err) } @@ -67,6 +74,7 @@ func NewAssetService(redisCfg *config.RedisConfig, provider storagex.Provider, a assetRepo: assetRepo, logger: logger, storageClient: storageClient, + bucket: bucket, queue: rq, m: m, } @@ -94,7 +102,7 @@ func (s *assetService) CreateAsset(ctx context.Context, request models.UploadAss // GeneratePresignedURL creates a temporary signed URL for uploading an object to the storage bucket. // It generates a PUT presigned URL valid for 5 minutes that allows clients to upload files // with the specified content type to the "mpiper" bucket at the given object key. - signedUrl, err := s.storageClient.GeneratePresignedURL(spanStorageCtx, "mpiper", objectKey, &storagex.PresignedURLOptions{ + signedUrl, err := s.storageClient.GeneratePresignedURL(spanStorageCtx, s.bucket, objectKey, &storagex.PresignedURLOptions{ Method: "PUT", ContentType: request.ContentType, ExpiresInSeconds: 60 * 5, // 5 minutes @@ -112,7 +120,7 @@ func (s *assetService) CreateAsset(ctx context.Context, request models.UploadAss spanStorageCtx, spanStorage = tracer.Start(ctx, "StorageClient.PublicURL") spanStorage.SetAttributes(attribute.String("object_key", objectKey)) - publicUrl, err := s.storageClient.PublicURL(spanStorageCtx, "mpiper", objectKey) + publicUrl, err := s.storageClient.PublicURL(spanStorageCtx, s.bucket, objectKey) spanStorage.End() if err != nil { @@ -181,7 +189,7 @@ func (s *assetService) MarkAssetUploaded(ctx context.Context, assetID uuid.UUID) ctxStorage, spanStorage := tracer.Start(ctx, "StorageClient.GetObjectAttrs") spanStorage.SetAttributes(attribute.String("object_key", objectKey)) - _, err := s.storageClient.GetObjectAttrs(ctxStorage, "mpiper", objectKey) + _, err := s.storageClient.GetObjectAttrs(ctxStorage, s.bucket, objectKey) spanStorage.End() if err != nil { diff --git a/pkg/utils/storagex/config.go b/pkg/utils/storagex/config.go index 0e878b9..78c4ccd 100644 --- a/pkg/utils/storagex/config.go +++ b/pkg/utils/storagex/config.go @@ -4,6 +4,8 @@ type Provider string const ( GCPProvider Provider = "gcp" + GCSProvider Provider = "gcs" + S3Provider Provider = "s3" MinIOProvider Provider = "minio" ) diff --git a/pkg/utils/storagex/factory.go b/pkg/utils/storagex/factory.go new file mode 100644 index 0000000..5319b15 --- /dev/null +++ b/pkg/utils/storagex/factory.go @@ -0,0 +1,27 @@ +package storagex + +import ( + "context" + "fmt" + "strings" + + "github.com/rndmcodeguy20/mpiper/internal/metrics" + "go.uber.org/zap" +) + +// New builds a StorageX for the configured provider. "gcs"/"gcp" use the GCS +// service-account client; "s3"/"minio" use the S3 client (MinIO when an +// endpoint is set). +func New(ctx context.Context, cfg Config, m *metrics.Metrics, logger *zap.Logger) (StorageX, error) { + switch Provider(strings.ToLower(string(cfg.Provider))) { + case GCSProvider, GCPProvider: + if cfg.GCPServiceAccount == "" { + return nil, fmt.Errorf("GCS service account path (GCS_SA_PATH) is not set") + } + return NewGCSStorageFromServiceAccountJSON(ctx, cfg.GCPServiceAccount, m, logger) + case S3Provider, MinIOProvider: + return NewS3Storage(ctx, cfg, m, logger) + default: + return nil, fmt.Errorf("unknown storage provider: %q", cfg.Provider) + } +} diff --git a/pkg/utils/storagex/gcs.go b/pkg/utils/storagex/gcs.go index 9199c6c..cae66bd 100644 --- a/pkg/utils/storagex/gcs.go +++ b/pkg/utils/storagex/gcs.go @@ -148,7 +148,7 @@ func (g *gcsStorage) GetObject(ctx context.Context, bucket, key string) (io.Read return rc, nil } -func (g *gcsStorage) GetObjectAttrs(ctx context.Context, bucket, key string) (*storage.ObjectAttrs, error) { +func (g *gcsStorage) GetObjectAttrs(ctx context.Context, bucket, key string) (*ObjectAttrs, error) { tracer := otel.Tracer("mpiper-api") ctx, span := tracer.Start(ctx, "GCS.GetObjectAttrs") defer span.End() @@ -171,7 +171,11 @@ func (g *gcsStorage) GetObjectAttrs(ctx context.Context, bucket, key string) (*s attribute.String("object.content_type", attrs.ContentType), ) span.SetStatus(codes.Ok, "Object attributes retrieved") - return attrs, nil + return &ObjectAttrs{ + Size: attrs.Size, + ContentType: attrs.ContentType, + ETag: attrs.Etag, + }, nil } func (g *gcsStorage) Close() error { diff --git a/pkg/utils/storagex/s3.go b/pkg/utils/storagex/s3.go new file mode 100644 index 0000000..a999ab5 --- /dev/null +++ b/pkg/utils/storagex/s3.go @@ -0,0 +1,273 @@ +package storagex + +import ( + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/rndmcodeguy20/mpiper/internal/metrics" + "github.com/rndmcodeguy20/mpiper/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" +) + +type s3Storage struct { + client *s3.Client + presign *s3.PresignClient + region string + endpoint string // non-empty for MinIO / S3-compatible endpoints + logger *zap.Logger + m *metrics.Metrics +} + +// NewS3Storage builds an S3-backed StorageX. An empty cfg.Endpoint targets AWS +// S3; a non-empty one (with path-style addressing) targets MinIO or any +// S3-compatible store. +func NewS3Storage(ctx context.Context, cfg Config, m *metrics.Metrics, logger *zap.Logger) (StorageX, error) { + region := cfg.Region + if region == "" { + region = "us-east-1" + } + + loadOpts := []func(*awsconfig.LoadOptions) error{awsconfig.WithRegion(region)} + if cfg.AccessKeyID != "" { + loadOpts = append(loadOpts, awsconfig.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, cfg.SessionToken), + )) + } + + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, loadOpts...) + if err != nil { + return nil, errors.NewInternalServerError("Failed to load AWS config", err) + } + + client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true // MinIO and most S3-compatible stores need path-style + } + }) + + return &s3Storage{ + client: client, + presign: s3.NewPresignClient(client), + region: region, + endpoint: cfg.Endpoint, + logger: logger, + m: m, + }, nil +} + +func (s *s3Storage) PutObject(ctx context.Context, bucket, key string, data io.Reader, options *PutOptions) error { + tracer := otel.Tracer("mpiper-api") + ctx, span := tracer.Start(ctx, "S3.PutObject") + defer span.End() + + start := time.Now() + span.SetAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + attribute.String("storage.provider", "s3"), + ) + + in := &s3.PutObjectInput{Bucket: aws.String(bucket), Key: aws.String(key), Body: data} + if options != nil { + if options.ContentType != "" { + in.ContentType = aws.String(options.ContentType) + } + if options.Metadata != nil { + in.Metadata = options.Metadata + } + } + + if _, err := s.client.PutObject(ctx, in); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Failed to put object") + s.recordOperationMetrics(ctx, "put", false, time.Since(start)) + return err + } + + s.recordOperationMetrics(ctx, "put", true, time.Since(start)) + span.SetStatus(codes.Ok, "Object uploaded successfully") + return nil +} + +func (s *s3Storage) GetObject(ctx context.Context, bucket, key string) (io.ReadCloser, error) { + tracer := otel.Tracer("mpiper-api") + ctx, span := tracer.Start(ctx, "S3.GetObject") + defer span.End() + + span.SetAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + attribute.String("storage.provider", "s3"), + ) + + out, err := s.client.GetObject(ctx, &s3.GetObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)}) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Failed to get object") + return nil, err + } + + span.SetStatus(codes.Ok, "Object reader created") + return out.Body, nil +} + +func (s *s3Storage) GetObjectAttrs(ctx context.Context, bucket, key string) (*ObjectAttrs, error) { + tracer := otel.Tracer("mpiper-api") + ctx, span := tracer.Start(ctx, "S3.GetObjectAttrs") + defer span.End() + + span.SetAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + attribute.String("storage.provider", "s3"), + ) + + head, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)}) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Failed to get object attributes") + return nil, err + } + + attrs := &ObjectAttrs{ + Size: aws.ToInt64(head.ContentLength), + ContentType: aws.ToString(head.ContentType), + ETag: aws.ToString(head.ETag), + } + span.SetAttributes( + attribute.Int64("object.size", attrs.Size), + attribute.String("object.content_type", attrs.ContentType), + ) + span.SetStatus(codes.Ok, "Object attributes retrieved") + return attrs, nil +} + +func (s *s3Storage) GeneratePresignedURL(ctx context.Context, bucket, key string, options *PresignedURLOptions) (string, error) { + tracer := otel.Tracer("mpiper-api") + ctx, span := tracer.Start(ctx, "S3.GeneratePresignedURL") + defer span.End() + + span.SetAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + attribute.String("storage.provider", "s3"), + ) + + if options == nil { + options = &PresignedURLOptions{} + } + + expiresIn := time.Duration(options.ExpiresInSeconds) * time.Second + if expiresIn == 0 { + expiresIn = 15 * time.Minute + } + withExpiry := s3.WithPresignExpires(expiresIn) + + var ( + url string + err error + ) + switch strings.ToUpper(options.Method) { + case "", "PUT": + in := &s3.PutObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)} + if options.ContentType != "" { + in.ContentType = aws.String(options.ContentType) + } + var req *v4.PresignedHTTPRequest + if req, err = s.presign.PresignPutObject(ctx, in, withExpiry); req != nil { + url = req.URL + } + case "GET": + var req *v4.PresignedHTTPRequest + if req, err = s.presign.PresignGetObject(ctx, &s3.GetObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)}, withExpiry); req != nil { + url = req.URL + } + default: + return "", errors.NewInternalServerError("Unsupported presign method", fmt.Errorf("method %q", options.Method)) + } + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Failed to generate presigned URL") + return "", errors.NewInternalServerError("Failed to generate presigned URL", err) + } + + span.SetStatus(codes.Ok, "Presigned URL generated") + return url, nil +} + +func (s *s3Storage) PublicURL(ctx context.Context, bucket, key string) (string, error) { + _, span := otel.Tracer("mpiper-api").Start(ctx, "S3.PublicURL") + defer span.End() + + span.SetAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + attribute.String("storage.provider", "s3"), + ) + + var url string + if s.endpoint != "" { + // path-style for MinIO / S3-compatible endpoints + url = fmt.Sprintf("%s/%s/%s", strings.TrimRight(s.endpoint, "/"), bucket, key) + } else { + url = fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", bucket, s.region, key) + } + span.SetStatus(codes.Ok, "Public URL generated") + return url, nil +} + +func (s *s3Storage) DeleteObject(ctx context.Context, bucket, key string) error { + tracer := otel.Tracer("mpiper-api") + ctx, span := tracer.Start(ctx, "S3.DeleteObject") + defer span.End() + + span.SetAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + attribute.String("storage.provider", "s3"), + ) + + if _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)}); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Failed to delete object") + return err + } + + span.SetStatus(codes.Ok, "Object deleted successfully") + return nil +} + +func (s *s3Storage) recordOperationMetrics(ctx context.Context, operation string, success bool, duration time.Duration) { + if s.m == nil { + return + } + status := "success" + if !success { + status = "error" + } + attrs := []attribute.KeyValue{ + attribute.String("storage.operation", operation), + attribute.String("storage.provider", "s3"), + attribute.String("storage.status", status), + } + if success { + s.m.StorageOperationTotal.Add(ctx, 1, metric.WithAttributes(attrs...)) + } else { + s.m.StorageOperationErrors.Add(ctx, 1, metric.WithAttributes(attrs...)) + } + s.m.StorageOperationDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} diff --git a/pkg/utils/storagex/s3_test.go b/pkg/utils/storagex/s3_test.go new file mode 100644 index 0000000..2dcf6d7 --- /dev/null +++ b/pkg/utils/storagex/s3_test.go @@ -0,0 +1,109 @@ +package storagex + +import ( + "context" + "io" + "os" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "go.uber.org/zap" +) + +// TestS3StorageRoundTrip exercises the S3 impl against a real S3-compatible +// store (MinIO). It is skipped unless S3_TEST_ENDPOINT is set, so it never +// runs in CI without an endpoint. +// +// S3_TEST_ENDPOINT=http://localhost:9000 S3_TEST_ACCESS_KEY=... \ +// S3_TEST_SECRET_KEY=... S3_TEST_BUCKET=test-bucket go test ./pkg/utils/storagex/ -run TestS3 +func TestS3StorageRoundTrip(t *testing.T) { + endpoint := os.Getenv("S3_TEST_ENDPOINT") + if endpoint == "" { + t.Skip("set S3_TEST_ENDPOINT to run the MinIO integration test") + } + ak := os.Getenv("S3_TEST_ACCESS_KEY") + sk := os.Getenv("S3_TEST_SECRET_KEY") + bucket := os.Getenv("S3_TEST_BUCKET") + ctx := context.Background() + + cfg := Config{ + Provider: S3Provider, + Region: "us-east-1", + Endpoint: endpoint, + Bucket: bucket, + AccessKeyID: ak, + SecretAccessKey: sk, + } + + // Ensure the bucket exists (create via a raw client; ignore if it already does). + createTestBucket(t, ctx, cfg, bucket) + + st, err := NewS3Storage(ctx, cfg, nil, zap.NewNop()) + if err != nil { + t.Fatalf("NewS3Storage: %v", err) + } + + key := "test/roundtrip.txt" + body := "hello mpiper s3" + + if err := st.PutObject(ctx, bucket, key, strings.NewReader(body), &PutOptions{ContentType: "text/plain"}); err != nil { + t.Fatalf("PutObject: %v", err) + } + + attrs, err := st.GetObjectAttrs(ctx, bucket, key) + if err != nil { + t.Fatalf("GetObjectAttrs: %v", err) + } + if attrs.Size != int64(len(body)) { + t.Errorf("size = %d, want %d", attrs.Size, len(body)) + } + if attrs.ContentType != "text/plain" { + t.Errorf("contentType = %q, want text/plain", attrs.ContentType) + } + + rc, err := st.GetObject(ctx, bucket, key) + if err != nil { + t.Fatalf("GetObject: %v", err) + } + got, _ := io.ReadAll(rc) + _ = rc.Close() + if string(got) != body { + t.Errorf("body = %q, want %q", got, body) + } + + url, err := st.GeneratePresignedURL(ctx, bucket, key, &PresignedURLOptions{Method: "GET", ExpiresInSeconds: 60}) + if err != nil { + t.Fatalf("GeneratePresignedURL: %v", err) + } + if !strings.Contains(url, key) || !strings.Contains(url, "X-Amz-Signature") { + t.Errorf("presigned url looks wrong: %s", url) + } + + if err := st.DeleteObject(ctx, bucket, key); err != nil { + t.Fatalf("DeleteObject: %v", err) + } + if _, err := st.GetObjectAttrs(ctx, bucket, key); err == nil { + t.Error("GetObjectAttrs after delete: want error, got nil") + } +} + +func createTestBucket(t *testing.T, ctx context.Context, cfg Config, bucket string) { + t.Helper() + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, + awsconfig.WithRegion(cfg.Region), + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, "")), + ) + if err != nil { + t.Fatalf("load aws config: %v", err) + } + client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true + }) + // Ignore error: bucket may already exist. + _, _ = client.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(bucket)}) +} diff --git a/pkg/utils/storagex/storagex.go b/pkg/utils/storagex/storagex.go index 55aacbc..68a4afc 100644 --- a/pkg/utils/storagex/storagex.go +++ b/pkg/utils/storagex/storagex.go @@ -4,8 +4,6 @@ import ( "context" "io" "time" - - "cloud.google.com/go/storage" ) type PutOptions struct { @@ -21,10 +19,19 @@ type PresignedURLOptions struct { // Add other options as needed: Headers, QueryParams, etc. } +// ObjectAttrs is a provider-agnostic subset of object metadata. Concrete +// providers (GCS, S3) map their native attributes into this shape so the +// StorageX interface stays free of provider-specific types. +type ObjectAttrs struct { + Size int64 + ContentType string + ETag string +} + type StorageX interface { PutObject(ctx context.Context, bucket, key string, data io.Reader, options *PutOptions) error GetObject(ctx context.Context, bucket, key string) (io.ReadCloser, error) - GetObjectAttrs(ctx context.Context, bucket, key string) (*storage.ObjectAttrs, error) + GetObjectAttrs(ctx context.Context, bucket, key string) (*ObjectAttrs, error) GeneratePresignedURL(ctx context.Context, bucket, key string, options *PresignedURLOptions) (string, error) PublicURL(ctx context.Context, bucket, key string) (string, error) DeleteObject(ctx context.Context, bucket, key string) error