diff --git a/app/services/active_storage/staging_bucket_backfill.rb b/app/services/active_storage/staging_bucket_backfill.rb new file mode 100644 index 000000000..b746238f7 --- /dev/null +++ b/app/services/active_storage/staging_bucket_backfill.rb @@ -0,0 +1,124 @@ +require 'aws-sdk-s3' +require 'cgi' +require 'set' + +module ActiveStorage + class StagingBucketBackfill + DEFAULT_DB_BATCH_SIZE = 1_000 + + Result = Struct.new( + :scanned_blob_keys_count, + :missing_in_staging_count, + :copied_objects_count, + :missing_in_production_count, + :dry_run, + keyword_init: true + ) + + def self.call(...) + new(...).call + end + + def initialize( + s3_client:, + source_bucket_name:, + destination_bucket_name:, + blob_scope: ::ActiveStorage::Blob.all, + dry_run: true, + db_batch_size: DEFAULT_DB_BATCH_SIZE, + logger: Rails.logger + ) + @s3_client = s3_client + @source_bucket_name = source_bucket_name + @destination_bucket_name = destination_bucket_name + @blob_scope = blob_scope + @dry_run = dry_run + @db_batch_size = db_batch_size + @logger = logger + end + + def call + ensure_supported_environment! + + # The one-off restore repair should avoid one S3 existence check per blob. + # Listing the destination bucket once and diffing against DB keys keeps + # the repair cost proportional to bucket pages plus actual copies. + destination_keys = load_bucket_keys(destination_bucket_name) + scanned_blob_keys_count = 0 + missing_in_staging_count = 0 + copied_objects_count = 0 + missing_in_production_count = 0 + + blob_scope.select(:key).in_batches(of: db_batch_size) do |relation| + relation.pluck(:key).each do |key| + scanned_blob_keys_count += 1 + next if destination_keys.include?(key) + + missing_in_staging_count += 1 + + if dry_run + logger.info "ActiveStorage staging bucket backfill dry run would copy key=#{key}" + next + end + + copied_objects_count += copy_key_from_production(key) + rescue Aws::S3::Errors::NoSuchKey + missing_in_production_count += 1 + logger.warn "ActiveStorage staging bucket backfill could not find production key=#{key}" + end + end + + Result.new( + scanned_blob_keys_count:, + missing_in_staging_count:, + copied_objects_count:, + missing_in_production_count:, + dry_run: + ) + end + + private + + attr_reader :blob_scope, :db_batch_size, :destination_bucket_name, :dry_run, + :logger, :s3_client, :source_bucket_name + + def ensure_supported_environment! + # This repair copies production-backed files into staging to match a + # restored staging DB. Restricting it to staging prevents accidental + # cross-environment copying in production. + return if Rails.env.staging? + + raise "#{self.class.name} only supports staging" + end + + def load_bucket_keys(bucket_name) + keys = Set.new + continuation_token = nil + + loop do + response = s3_client.list_objects_v2( + bucket: bucket_name, + continuation_token: + ) + + response.contents.each { |object| keys.add(object.key) } + break unless response.is_truncated + + continuation_token = response.next_continuation_token + end + + keys + end + + def copy_key_from_production(key) + s3_client.copy_object( + bucket: destination_bucket_name, + key:, + copy_source: "#{source_bucket_name}/#{CGI.escape(key).gsub('+', '%20')}" + ) + + logger.info "ActiveStorage staging bucket backfill copied key=#{key}" + 1 + end + end +end diff --git a/app/services/active_storage/staging_bucket_cleanup.rb b/app/services/active_storage/staging_bucket_cleanup.rb new file mode 100644 index 000000000..23adf8718 --- /dev/null +++ b/app/services/active_storage/staging_bucket_cleanup.rb @@ -0,0 +1,147 @@ +require 'aws-sdk-s3' +require 'set' + +module ActiveStorage + class StagingBucketCleanup + DEFAULT_DELETE_BATCH_SIZE = 1_000 + DEFAULT_DB_BATCH_SIZE = 1_000 + + Result = Struct.new( + :scanned_objects_count, + :orphaned_objects_count, + :deleted_objects_count, + :dry_run, + keyword_init: true + ) + + def self.call(...) + new(...).call + end + + def initialize( + s3_client:, + bucket_name:, + blob_scope: ::ActiveStorage::Blob.all, + dry_run: true, + delete_batch_size: DEFAULT_DELETE_BATCH_SIZE, + db_batch_size: DEFAULT_DB_BATCH_SIZE, + delete_only_if_last_modified_before: nil, + logger: Rails.logger + ) + @s3_client = s3_client + @bucket_name = bucket_name + @blob_scope = blob_scope + @dry_run = dry_run + @delete_batch_size = delete_batch_size + @db_batch_size = db_batch_size + @delete_only_if_last_modified_before = delete_only_if_last_modified_before + @logger = logger + end + + def call + ensure_supported_environment! + + # We compare the staging bucket against the current staging DB because a + # production DB restore can leave behind objects that Rails no longer + # knows about. Cleaning from the DB view avoids one S3 existence check per + # blob, which keeps the restore-time task cheap enough to run routinely. + valid_blob_keys = load_valid_blob_keys + orphan_keys_to_delete = [] + scanned_objects_count = 0 + orphaned_objects_count = 0 + deleted_objects_count = 0 + + each_bucket_object do |object| + scanned_objects_count += 1 + next if valid_blob_keys.include?(object.key) + next if delete_only_if_last_modified_before.present? && + object.last_modified >= delete_only_if_last_modified_before + + orphan_keys_to_delete << object.key + orphaned_objects_count += 1 + + next unless orphan_keys_to_delete.size >= delete_batch_size + + deleted_objects_count += delete_keys(orphan_keys_to_delete) + orphan_keys_to_delete.clear + end + + deleted_objects_count += delete_keys(orphan_keys_to_delete) if orphan_keys_to_delete.any? + + Result.new( + scanned_objects_count:, + orphaned_objects_count:, + deleted_objects_count:, + dry_run: + ) + end + + private + + attr_reader :blob_scope, :bucket_name, :db_batch_size, :delete_batch_size, + :delete_only_if_last_modified_before, :dry_run, :logger, :s3_client + + def ensure_supported_environment! + # This cleanup is intentionally limited to staging + # because its job is to discard bucket objects that are no longer + # represented in the current DB snapshot. That is appropriate for staging + # restore hygiene, but it would be too destructive to allow elsewhere. + return if Rails.env.staging? + + raise "#{self.class.name} only supports staging" + end + + def load_valid_blob_keys + keys = Set.new + + blob_scope.select(:key).in_batches(of: db_batch_size) do |relation| + relation.pluck(:key).each { |key| keys.add(key) } + end + + keys + end + + def each_bucket_object + continuation_token = nil + + loop do + response = s3_client.list_objects_v2( + bucket: bucket_name, + continuation_token: + ) + + response.contents.each { |object| yield object } + + break unless response.is_truncated + + continuation_token = response.next_continuation_token + end + end + + def delete_keys(keys) + if dry_run + keys.each do |key| + # Dry-run output needs to show the exact candidate keys so operators + # can validate the deletion set before any irreversible cleanup. + logger.info "ActiveStorage staging bucket cleanup dry run would delete key=#{key}" + end + logger.info "ActiveStorage staging bucket cleanup dry run would delete #{keys.size} objects" + return 0 + end + + # S3 supports deleting up to 1,000 objects per request. Batching on that + # boundary keeps request volume low without trying to issue a single huge + # delete operation that S3 would reject. + s3_client.delete_objects( + bucket: bucket_name, + delete: { + objects: keys.map { |key| { key: } }, + quiet: true + } + ) + + logger.info "ActiveStorage staging bucket cleanup deleted #{keys.size} objects" + keys.size + end + end +end diff --git a/lib/tasks/active_storage.rake b/lib/tasks/active_storage.rake new file mode 100644 index 000000000..9978c2159 --- /dev/null +++ b/lib/tasks/active_storage.rake @@ -0,0 +1,139 @@ +# Usage: +# bundle exec rake active_storage:cleanup_orphaned_objects +# DRY_RUN=false bundle exec rake active_storage:cleanup_orphaned_objects +# bundle exec rake active_storage:backfill_missing_objects_from_production +# DRY_RUN=false bundle exec rake active_storage:backfill_missing_objects_from_production +# +# Requirements: +# - Only run this in staging. Production is hard-blocked because +# the task deletes bucket objects that are not represented by the current DB. +# - The ActiveStorage service must be S3-backed. +# - Run this before any new file upload happens in staging after the DB restore. +# The task uses the latest ActiveStorage blob timestamp in the restored DB as +# the safety cutoff, so new staging uploads after the restore can move that +# cutoff forward and make cleanup more conservative. +# - The task assumes a restore workflow where staging can contain: +# - old staging-only objects that disappeared from the restored DB and should +# be deleted +# - newer replicated production objects that are not in the restored DB yet +# and must be preserved for the next restore +# - To preserve those newer replicated objects, the task only deletes orphaned +# objects older than the latest ActiveStorage blob row. +# - The backfill task assumes the staging bucket name ends with `-staging` and +# the corresponding production bucket uses the same prefix with `-production`. +# +namespace :active_storage do + desc 'Delete S3 objects that no longer have an active_storage_blobs row' + task cleanup_orphaned_objects: :environment do + unless Rails.env.staging? + raise 'active_storage:cleanup_orphaned_objects only supports staging' + end + + service_name = Rails.application.config.active_storage.service + service_config = Rails.application.config.active_storage + .service_configurations + .fetch(service_name.to_s) + service_type = service_config.fetch('service') + + unless service_type == 'S3' + raise "active_storage:cleanup_orphaned_objects requires an S3-backed ActiveStorage service, got #{service_type.inspect}" + end + + # After a production DB restore, the latest blob row timestamp is a + # practical snapshot boundary: objects replicated into staging after + # that point will not exist in the restored DB yet, so deleting them + # would recreate the missing-file problem on the next restore. + delete_only_if_last_modified_before = ActiveStorage::Blob.maximum(:created_at) + dry_run = ENV.fetch('DRY_RUN', 'true') != 'false' + + s3_client = Aws::S3::Client.new( + access_key_id: service_config['access_key_id'], + secret_access_key: service_config['secret_access_key'], + session_token: service_config['session_token'], + region: service_config['region'], + endpoint: service_config['endpoint'], + force_path_style: service_config['force_path_style'] + ) + + # This task is meant for post-restore cleanup where clear operator + # feedback matters more than silent success, so we log the exact bucket + # and safety settings on every run. + Rails.logger.info( + "ActiveStorage staging bucket cleanup configured for bucket=#{service_config['bucket']} dry_run=#{dry_run} delete_only_if_last_modified_before=#{delete_only_if_last_modified_before || 'none'} delete_only_if_last_modified_before_source=active_storage_blobs.maximum(:created_at)" + ) + + result = ActiveStorage::StagingBucketCleanup.call( + s3_client:, + bucket_name: service_config.fetch('bucket'), + dry_run:, + delete_only_if_last_modified_before: + ) + + summary = [ + "scanned=#{result.scanned_objects_count}", + "orphaned=#{result.orphaned_objects_count}", + "deleted=#{result.deleted_objects_count}", + "dry_run=#{result.dry_run}" + ].join(' ') + + Rails.logger.info("ActiveStorage staging bucket cleanup complete #{summary}") + puts summary + end + + desc 'Copy ActiveStorage objects referenced by the staging DB but missing from the staging bucket' + task backfill_missing_objects_from_production: :environment do + unless Rails.env.staging? + raise 'active_storage:backfill_missing_objects_from_production only supports staging' + end + + service_name = Rails.application.config.active_storage.service + service_config = Rails.application.config.active_storage + .service_configurations + .fetch(service_name.to_s) + service_type = service_config.fetch('service') + + unless service_type == 'S3' + raise "active_storage:backfill_missing_objects_from_production requires an S3-backed ActiveStorage service, got #{service_type.inspect}" + end + + destination_bucket_name = service_config.fetch('bucket') + source_bucket_name = destination_bucket_name.sub(/-staging\z/, '-production') + + if source_bucket_name == destination_bucket_name + raise "active_storage:backfill_missing_objects_from_production could not derive a production bucket name from #{destination_bucket_name.inspect}" + end + + dry_run = ENV.fetch('DRY_RUN', 'true') != 'false' + + s3_client = Aws::S3::Client.new( + access_key_id: service_config['access_key_id'], + secret_access_key: service_config['secret_access_key'], + session_token: service_config['session_token'], + region: service_config['region'], + endpoint: service_config['endpoint'], + force_path_style: service_config['force_path_style'] + ) + + Rails.logger.info( + "ActiveStorage staging bucket backfill configured for source_bucket=#{source_bucket_name} destination_bucket=#{destination_bucket_name} dry_run=#{dry_run}" + ) + + result = ActiveStorage::StagingBucketBackfill.call( + s3_client:, + source_bucket_name:, + destination_bucket_name:, + dry_run: + ) + + summary = [ + "scanned=#{result.scanned_blob_keys_count}", + "missing_in_staging=#{result.missing_in_staging_count}", + "copied=#{result.copied_objects_count}", + "missing_in_production=#{result.missing_in_production_count}", + "dry_run=#{result.dry_run}" + ].join(' ') + + Rails.logger.info("ActiveStorage staging bucket backfill complete #{summary}") + puts summary + end +end diff --git a/spec/services/active_storage/staging_bucket_backfill_spec.rb b/spec/services/active_storage/staging_bucket_backfill_spec.rb new file mode 100644 index 000000000..6c4e730ae --- /dev/null +++ b/spec/services/active_storage/staging_bucket_backfill_spec.rb @@ -0,0 +1,81 @@ +require 'spec_helper' + +describe ActiveStorage::StagingBucketBackfill do + let(:s3_client) { instance_double(Aws::S3::Client) } + let(:logger) { instance_double(ActiveSupport::Logger, info: true, warn: true) } + let(:blob_scope) { class_double(ActiveStorage::Blob) } + let(:source_bucket_name) { 'species-plus-production' } + let(:destination_bucket_name) { 'species-plus-staging' } + let(:first_relation) { instance_double(ActiveRecord::Relation) } + let(:second_relation) { instance_double(ActiveRecord::Relation) } + let(:existing_destination_object) do + instance_double(Aws::S3::Types::Object, key: 'already-present') + end + let(:destination_page_response) do + instance_double( + Aws::S3::Types::ListObjectsV2Output, + contents: [ existing_destination_object ], + is_truncated: false, + next_continuation_token: nil + ) + end + + before do + allow(blob_scope).to receive(:select).with(:key).and_return(blob_scope) + allow(blob_scope).to receive(:in_batches).with(of: 2).and_yield(first_relation).and_yield(second_relation) + allow(first_relation).to receive(:pluck).with(:key).and_return([ 'already-present', 'missing-in-staging' ]) + allow(second_relation).to receive(:pluck).with(:key).and_return([ 'missing-in-production' ]) + allow(Rails).to receive(:env).and_return(ActiveSupport::StringInquirer.new('staging')) + allow(s3_client).to receive(:list_objects_v2).with( + bucket: destination_bucket_name, + continuation_token: nil + ).and_return(destination_page_response) + end + + it 'lists missing staging keys in dry run mode' do + result = described_class.call( + s3_client:, + source_bucket_name:, + destination_bucket_name:, + blob_scope:, + dry_run: true, + db_batch_size: 2, + logger: + ) + + expect(result.scanned_blob_keys_count).to eq(3) + expect(result.missing_in_staging_count).to eq(2) + expect(result.copied_objects_count).to eq(0) + expect(result.missing_in_production_count).to eq(0) + expect(result.dry_run).to be(true) + end + + it 'copies only keys missing from staging and reports missing production objects' do + expect(s3_client).to receive(:copy_object).with( + bucket: destination_bucket_name, + key: 'missing-in-staging', + copy_source: 'species-plus-production/missing-in-staging' + ) + expect(s3_client).to receive(:copy_object).with( + bucket: destination_bucket_name, + key: 'missing-in-production', + copy_source: 'species-plus-production/missing-in-production' + ).and_raise(Aws::S3::Errors::NoSuchKey.new(nil, 'missing')) + + result = described_class.call( + s3_client:, + source_bucket_name:, + destination_bucket_name:, + blob_scope:, + dry_run: false, + db_batch_size: 2, + logger: + ) + + expect(result.scanned_blob_keys_count).to eq(3) + expect(result.missing_in_staging_count).to eq(2) + expect(result.copied_objects_count).to eq(1) + expect(result.missing_in_production_count).to eq(1) + expect(result.dry_run).to be(false) + end +end diff --git a/spec/services/active_storage/staging_bucket_cleanup_spec.rb b/spec/services/active_storage/staging_bucket_cleanup_spec.rb new file mode 100644 index 000000000..d265b9157 --- /dev/null +++ b/spec/services/active_storage/staging_bucket_cleanup_spec.rb @@ -0,0 +1,160 @@ +require 'spec_helper' + +describe ActiveStorage::StagingBucketCleanup do + let(:s3_client) { instance_double(Aws::S3::Client) } + let(:logger) { instance_double(ActiveSupport::Logger, info: true) } + let(:blob_scope) { class_double(ActiveStorage::Blob) } + let(:bucket_name) { 'species-plus-staging' } + let(:first_relation) { instance_double(ActiveRecord::Relation) } + let(:second_relation) { instance_double(ActiveRecord::Relation) } + let(:first_page_object) { instance_double(Aws::S3::Types::Object, key: 'kept-key', last_modified: 3.days.ago) } + let(:second_page_object) { instance_double(Aws::S3::Types::Object, key: 'orphan-key', last_modified: 3.days.ago) } + let(:first_page_response) do + instance_double( + Aws::S3::Types::ListObjectsV2Output, + contents: [ first_page_object ], + is_truncated: true, + next_continuation_token: 'page-2' + ) + end + let(:second_page_response) do + instance_double( + Aws::S3::Types::ListObjectsV2Output, + contents: [ second_page_object ], + is_truncated: false, + next_continuation_token: nil + ) + end + + before do + allow(blob_scope).to receive(:select).with(:key).and_return(blob_scope) + allow(blob_scope).to receive(:in_batches).with(of: 2).and_yield(first_relation).and_yield(second_relation) + allow(first_relation).to receive(:pluck).with(:key).and_return([ 'kept-key' ]) + allow(second_relation).to receive(:pluck).with(:key).and_return([ 'another-kept-key' ]) + allow(Rails).to receive(:env).and_return(ActiveSupport::StringInquirer.new('staging')) + end + + it 'scans the bucket once and batch deletes only orphaned keys' do + expect(s3_client).to receive(:list_objects_v2).with( + bucket: bucket_name, + continuation_token: nil + ).and_return(first_page_response) + expect(s3_client).to receive(:list_objects_v2).with( + bucket: bucket_name, + continuation_token: 'page-2' + ).and_return(second_page_response) + expect(s3_client).to receive(:delete_objects).with( + bucket: bucket_name, + delete: { + objects: [ { key: 'orphan-key' } ], + quiet: true + } + ) + + result = described_class.call( + s3_client:, + bucket_name:, + blob_scope:, + dry_run: false, + db_batch_size: 2, + logger: + ) + + expect(result.scanned_objects_count).to eq(2) + expect(result.orphaned_objects_count).to eq(1) + expect(result.deleted_objects_count).to eq(1) + expect(result.dry_run).to be(false) + end + + it 'respects dry run safeguards' do + first_object = instance_double(Aws::S3::Types::Object, key: 'first-orphan', last_modified: 12.hours.ago) + second_object = instance_double(Aws::S3::Types::Object, key: 'second-orphan', last_modified: 5.days.ago) + response = instance_double( + Aws::S3::Types::ListObjectsV2Output, + contents: [ first_object, second_object ], + is_truncated: false, + next_continuation_token: nil + ) + + expect(s3_client).to receive(:list_objects_v2).with( + bucket: bucket_name, + continuation_token: nil + ).and_return(response) + expect(s3_client).not_to receive(:delete_objects) + + result = described_class.call( + s3_client:, + bucket_name:, + blob_scope:, + dry_run: true, + db_batch_size: 2, + logger: + ) + + expect(result.scanned_objects_count).to eq(2) + expect(result.orphaned_objects_count).to eq(2) + expect(result.deleted_objects_count).to eq(0) + expect(result.dry_run).to be(true) + end + + it 'keeps orphaned objects that were created after the restore cutoff' do + pre_restore_object = instance_double( + Aws::S3::Types::Object, + key: 'old-orphan', + last_modified: Time.zone.parse('2026-05-13 08:00:00 UTC') + ) + post_restore_replica = instance_double( + Aws::S3::Types::Object, + key: 'future-prod-replica', + last_modified: Time.zone.parse('2026-05-13 10:00:00 UTC') + ) + response = instance_double( + Aws::S3::Types::ListObjectsV2Output, + contents: [ pre_restore_object, post_restore_replica ], + is_truncated: false, + next_continuation_token: nil + ) + + expect(s3_client).to receive(:list_objects_v2).with( + bucket: bucket_name, + continuation_token: nil + ).and_return(response) + expect(s3_client).to receive(:delete_objects).with( + bucket: bucket_name, + delete: { + objects: [ { key: 'old-orphan' } ], + quiet: true + } + ) + + result = described_class.call( + s3_client:, + bucket_name:, + blob_scope:, + dry_run: false, + db_batch_size: 2, + delete_only_if_last_modified_before: Time.zone.parse('2026-05-13 09:00:00 UTC'), + logger: + ) + + expect(result.scanned_objects_count).to eq(2) + expect(result.orphaned_objects_count).to eq(1) + expect(result.deleted_objects_count).to eq(1) + end + + it 'rejects unsupported environments' do + allow(Rails).to receive(:env).and_return(ActiveSupport::StringInquirer.new('production')) + + expect do + described_class.call( + s3_client:, + bucket_name:, + blob_scope:, + logger: + ) + end.to raise_error( + RuntimeError, + 'ActiveStorage::StagingBucketCleanup only supports staging' + ) + end +end