Skip to content

Commit 4768106

Browse files
committed
Fix review issues in mirror feature
- Fix race where runJob could overwrite canceled state set by Cancel() - Fix Debian ecosystem name inconsistency ("deb" -> "debian") - Stream metadata responses when caching is disabled to avoid buffering - Add metadata_cache table to initial schema strings for consistency - Gate mirror API behind mirror_api config flag (disabled by default) - Fix goconst lint in metadata_cache_test.go
1 parent 0273865 commit 4768106

7 files changed

Lines changed: 110 additions & 9 deletions

File tree

docs/configuration.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,18 @@ Or via environment variable: `PROXY_CACHE_METADATA=true`.
225225

226226
The `proxy mirror` command always enables metadata caching regardless of this setting.
227227

228+
## Mirror API
229+
230+
The `/api/mirror` endpoints are disabled by default. Enable them to allow starting mirror jobs via HTTP:
231+
232+
```yaml
233+
mirror_api: true
234+
```
235+
236+
Or via environment variable: `PROXY_MIRROR_API=true`.
237+
238+
When disabled, the endpoints are not registered and return 404.
239+
228240
## Mirror Command
229241

230242
The `proxy mirror` command pre-populates the cache from various sources. It accepts the same storage and database flags as `serve`.

internal/config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ type Config struct {
8888
// When enabled, metadata is stored in the database and storage backend.
8989
// The mirror command always enables this regardless of this setting.
9090
CacheMetadata bool `json:"cache_metadata" yaml:"cache_metadata"`
91+
92+
// MirrorAPI enables the /api/mirror endpoints for starting mirror jobs via HTTP.
93+
// Disabled by default to prevent unauthenticated users from triggering downloads.
94+
MirrorAPI bool `json:"mirror_api" yaml:"mirror_api"`
9195
}
9296

9397
// CooldownConfig configures version cooldown periods.
@@ -314,6 +318,9 @@ func (c *Config) LoadFromEnv() {
314318
if v := os.Getenv("PROXY_CACHE_METADATA"); v != "" {
315319
c.CacheMetadata = v == "true" || v == "1"
316320
}
321+
if v := os.Getenv("PROXY_MIRROR_API"); v != "" {
322+
c.MirrorAPI = v == "true" || v == "1"
323+
}
317324
}
318325

319326
// Validate checks the configuration for errors.

internal/handler/debian.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (h *DebianHandler) handlePackageDownload(w http.ResponseWriter, r *http.Req
9494
// These change frequently so we don't cache them.
9595
func (h *DebianHandler) handleMetadata(w http.ResponseWriter, r *http.Request, path string) {
9696
cacheKey := strings.ReplaceAll(path, "/", "_")
97-
h.proxy.ProxyCached(w, r, fmt.Sprintf("%s/%s", h.upstreamURL, path), "deb", cacheKey, "*/*")
97+
h.proxy.ProxyCached(w, r, fmt.Sprintf("%s/%s", h.upstreamURL, path), "debian", cacheKey, "*/*")
9898
}
9999

100100
// proxyFile proxies any file directly without caching.

internal/handler/handler.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,15 @@ func (p *Proxy) cacheMetadataBlob(ctx context.Context, ecosystem, cacheKey, stor
517517

518518
// ProxyCached fetches metadata from upstream (with optional caching for offline fallback)
519519
// and writes it to the response. Optional acceptHeaders specify the Accept header to send.
520+
// When metadata caching is disabled, the response is streamed directly to avoid buffering
521+
// large metadata responses (e.g. npm packages with many versions) in memory.
520522
func (p *Proxy) ProxyCached(w http.ResponseWriter, r *http.Request, upstreamURL, ecosystem, cacheKey string, acceptHeaders ...string) {
523+
if !p.CacheMetadata {
524+
// Stream directly without buffering when caching is off.
525+
p.proxyMetadataStream(w, r, upstreamURL, acceptHeaders...)
526+
return
527+
}
528+
521529
body, contentType, err := p.FetchOrCacheMetadata(r.Context(), ecosystem, cacheKey, upstreamURL, acceptHeaders...)
522530
if err != nil {
523531
if errors.Is(err, ErrUpstreamNotFound) {
@@ -534,6 +542,44 @@ func (p *Proxy) ProxyCached(w http.ResponseWriter, r *http.Request, upstreamURL,
534542
_, _ = w.Write(body)
535543
}
536544

545+
// proxyMetadataStream forwards an upstream metadata response by streaming it to the client
546+
// without buffering the full body in memory.
547+
func (p *Proxy) proxyMetadataStream(w http.ResponseWriter, r *http.Request, upstreamURL string, acceptHeaders ...string) {
548+
req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, upstreamURL, nil)
549+
if err != nil {
550+
http.Error(w, "failed to create request", http.StatusInternalServerError)
551+
return
552+
}
553+
554+
accept := contentTypeJSON
555+
if len(acceptHeaders) > 0 && acceptHeaders[0] != "" {
556+
accept = acceptHeaders[0]
557+
}
558+
req.Header.Set("Accept", accept)
559+
560+
for _, header := range []string{"Accept-Encoding", "If-Modified-Since", "If-None-Match"} {
561+
if v := r.Header.Get(header); v != "" {
562+
req.Header.Set(header, v)
563+
}
564+
}
565+
566+
resp, err := p.HTTPClient.Do(req)
567+
if err != nil {
568+
http.Error(w, "failed to fetch from upstream", http.StatusBadGateway)
569+
return
570+
}
571+
defer func() { _ = resp.Body.Close() }()
572+
573+
for _, header := range []string{"Content-Type", "Content-Length", "Last-Modified", "ETag"} {
574+
if v := resp.Header.Get(header); v != "" {
575+
w.Header().Set(header, v)
576+
}
577+
}
578+
579+
w.WriteHeader(resp.StatusCode)
580+
_, _ = io.Copy(w, resp.Body)
581+
}
582+
537583
// GetOrFetchArtifactFromURL retrieves an artifact from cache or fetches from a specific URL.
538584
// This is useful for registries where download URLs are determined from metadata.
539585
func (p *Proxy) GetOrFetchArtifactFromURL(ctx context.Context, ecosystem, name, version, filename, downloadURL string) (*CacheResult, error) {

internal/mirror/job.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ func (js *JobStore) runJob(ctx context.Context, cancel context.CancelFunc, job *
151151
defer cancel()
152152

153153
js.mu.Lock()
154+
if job.State == JobStateCanceled {
155+
js.mu.Unlock()
156+
return
157+
}
154158
job.State = JobStateRunning
155159
js.mu.Unlock()
156160

@@ -159,6 +163,11 @@ func (js *JobStore) runJob(ctx context.Context, cancel context.CancelFunc, job *
159163
js.mu.Lock()
160164
defer js.mu.Unlock()
161165

166+
// Cancel() may have already set the state; don't overwrite it.
167+
if job.State == JobStateCanceled {
168+
return
169+
}
170+
162171
if err != nil {
163172
job.State = JobStateFailed
164173
job.Error = err.Error()

internal/mirror/job_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,31 @@ func TestJobStoreCleanup(t *testing.T) {
149149
}
150150
}
151151

152+
func TestJobStoreCancelPreservesStateAfterRunJob(t *testing.T) {
153+
m := setupTestMirror(t, 1)
154+
js := NewJobStore(context.Background(), m)
155+
156+
// Create a job that will fail (registry enumeration is not implemented)
157+
id, err := js.Create(JobRequest{Registry: "npm"})
158+
if err != nil {
159+
t.Fatalf("Create() error = %v", err)
160+
}
161+
162+
// Cancel immediately -- the job may already be running
163+
js.Cancel(id)
164+
165+
// Wait for runJob goroutine to finish
166+
time.Sleep(200 * time.Millisecond)
167+
168+
job := js.Get(id)
169+
if job == nil {
170+
t.Fatal("Get() returned nil")
171+
}
172+
if job.State != JobStateCanceled {
173+
t.Errorf("state = %q, want %q (cancel should not be overwritten by runJob)", job.State, JobStateCanceled)
174+
}
175+
}
176+
152177
func TestNewJobIDUnique(t *testing.T) {
153178
ids := make(map[string]bool)
154179
for range 100 {

internal/server/server.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,16 @@ func (s *Server) Start() error {
235235
bgCtx, bgCancel := context.WithCancel(context.Background())
236236
s.cancel = bgCancel
237237

238-
// Mirror API endpoints
239-
mirrorSvc := mirror.New(proxy, s.db, s.storage, s.logger, 4) //nolint:mnd // default concurrency
240-
jobStore := mirror.NewJobStore(bgCtx, mirrorSvc)
241-
mirrorAPI := NewMirrorAPIHandler(jobStore)
242-
r.Post("/api/mirror", mirrorAPI.HandleCreate)
243-
r.Get("/api/mirror/{id}", mirrorAPI.HandleGet)
244-
r.Delete("/api/mirror/{id}", mirrorAPI.HandleCancel)
238+
// Mirror API endpoints (opt-in via mirror_api config or PROXY_MIRROR_API env)
239+
if s.cfg.MirrorAPI {
240+
mirrorSvc := mirror.New(proxy, s.db, s.storage, s.logger, 4) //nolint:mnd // default concurrency
241+
jobStore := mirror.NewJobStore(bgCtx, mirrorSvc)
242+
mirrorAPI := NewMirrorAPIHandler(jobStore)
243+
r.Post("/api/mirror", mirrorAPI.HandleCreate)
244+
r.Get("/api/mirror/{id}", mirrorAPI.HandleGet)
245+
r.Delete("/api/mirror/{id}", mirrorAPI.HandleCancel)
246+
go jobStore.StartCleanup(bgCtx)
247+
}
245248

246249
s.http = &http.Server{
247250
Addr: s.cfg.Listen,
@@ -257,7 +260,6 @@ func (s *Server) Start() error {
257260
"storage", s.storage.URL(),
258261
"database", s.cfg.Database.Path)
259262
go s.updateCacheStatsMetrics()
260-
go jobStore.StartCleanup(bgCtx)
261263

262264
return s.http.ListenAndServe()
263265
}

0 commit comments

Comments
 (0)