11defmodule RemotePersistentTerm.Fetcher.S3 do
22 @ moduledoc """
33 A Fetcher implementation for AWS S3.
4+
5+ ## Versioned vs. non-versioned buckets
6+
7+ This fetcher works with both versioned and non-versioned buckets. It uses the object's
8+ `ETag` as a change token and performs conditional GETs with `If-None-Match` to avoid
9+ re-downloading unchanged data.
10+
11+ - **Versioned buckets**: `HEAD`/`GET` responses include `ETag`; the fetcher uses it for
12+ change detection. The latest object is always whatever S3 returns for the key (no explicit
13+ version ID required).
14+ - **Non-versioned buckets**: only `ETag` is available, which is sufficient to detect
15+ content changes. Overwriting an object with identical bytes may keep the same `ETag`,
16+ which is fine because the content is unchanged.
17+
18+ ## S3-compatible services
19+
20+ S3-compatible providers (e.g., DigitalOcean Spaces, Linode Object Storage) should work
21+ as long as they support standard S3 headers: `ETag`, `If-None-Match`, and `304 Not Modified`.
22+ If a provider ignores conditional requests, the fetcher will still function but will
23+ download on every refresh.
424 """
525 require Logger
626
@@ -69,6 +89,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
6989 """
7090 @ impl true
7191 def init ( opts ) do
92+ ensure_http_client ( )
93+
7294 with { :ok , valid_opts } <- NimbleOptions . validate ( opts , @ opts_schema ) do
7395 { :ok ,
7496 % __MODULE__ {
@@ -82,17 +104,20 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
82104
83105 @ impl true
84106 def current_version ( state ) do
85- with { :ok , versions } <- list_object_versions ( state ) ,
86- { :ok , % { etag: etag , version_id: version } } <- find_latest ( versions ) do
107+ with { :ok , % { headers: headers } } <- head_object ( state ) ,
108+ { :ok , version } <- extract_version ( headers ) do
87109 Logger . info (
88110 bucket: state . bucket ,
89111 key: state . key ,
90112 version: version ,
91113 message: "Found latest version of object"
92114 )
93115
94- { :ok , etag }
116+ { :ok , version }
95117 else
118+ { :error , { :http_error , 404 , _ } } ->
119+ { :error , "could not find s3://#{ state . bucket } /#{ state . key } " }
120+
96121 { :error , { :unexpected_response , % { body: reason } } } ->
97122 { :error , reason }
98123
@@ -133,60 +158,134 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
133158 end
134159 end
135160
136- defp list_object_versions ( state ) do
161+ @ impl true
162+ def download_if_changed ( state , current_version ) do
137163 res =
138- aws_client_request (
139- & ExAws.S3 . get_bucket_object_versions / 2 ,
164+ get_object_request (
140165 state ,
141- prefix: state . key
166+ if_none_match_opts ( current_version ) ,
167+ & failover_on_error? / 1
142168 )
143169
144- with { :ok , % { body: % { versions: versions } } } <- res do
145- { :ok , versions }
170+ case res do
171+ { :ok , % { status_code: 304 } } ->
172+ { :not_modified , current_version }
173+
174+ { :error , { :http_error , 304 , _ } } ->
175+ { :not_modified , current_version }
176+
177+ { :ok , % { body: body , headers: headers } } ->
178+ with { :ok , version } <- extract_version ( headers ) do
179+ { :ok , body , version }
180+ end
181+
182+ { :error , reason } ->
183+ { :error , inspect ( reason ) }
146184 end
147185 end
148186
149187 defp get_object ( state ) do
150- aws_client_request ( & ExAws.S3 . get_object / 2 , state , state . key )
188+ get_object_request ( state , [ ] )
189+ end
190+
191+ defp get_object_request ( state , opts , failover_on_error? \\ fn _ -> true end ) do
192+ aws_client_request (
193+ fn bucket , request_opts -> ExAws.S3 . get_object ( bucket , state . key , request_opts ) end ,
194+ state ,
195+ opts ,
196+ failover_on_error?
197+ )
198+ end
199+
200+ defp head_object ( state ) do
201+ aws_client_request ( & ExAws.S3 . head_object / 2 , state , state . key )
151202 end
152203
153- defp find_latest ( [ _ | _ ] = contents ) do
154- Enum . find ( contents , fn
155- % { is_latest: "true" } ->
156- true
204+ defp extract_version ( headers ) do
205+ case header_value ( headers , "etag" ) do
206+ nil -> { :error , :not_found }
207+ value -> { :ok , normalize_etag ( value ) }
208+ end
209+ end
210+
211+ defp header_value ( headers , name ) do
212+ downcased = String . downcase ( name )
213+
214+ Enum . find_value ( headers , fn
215+ { key , value } when is_binary ( key ) and is_binary ( value ) ->
216+ if String . downcase ( key ) == downcased , do: value , else: nil
157217
158218 _ ->
159- false
219+ nil
160220 end )
161- |> case do
162- res when is_map ( res ) -> { :ok , res }
163- _ -> { :error , :not_found }
221+ end
222+
223+ defp normalize_etag ( value ) when is_binary ( value ) do
224+ value
225+ |> String . trim ( )
226+ |> String . trim ( "\" " )
227+ end
228+
229+ defp if_none_match_opts ( nil ) , do: [ ]
230+ defp if_none_match_opts ( etag ) , do: [ if_none_match: quote_etag ( etag ) ]
231+
232+ defp quote_etag ( etag ) do
233+ etag = String . trim ( etag )
234+
235+ if String . starts_with? ( etag , "\" " ) and String . ends_with? ( etag , "\" " ) do
236+ etag
237+ else
238+ "\" #{ etag } \" "
239+ end
240+ end
241+
242+ defp failover_on_error? ( { :http_error , 304 , _ } ) , do: false
243+ defp failover_on_error? ( _reason ) , do: true
244+
245+ defp ensure_http_client do
246+ case Application . get_env ( :ex_aws , :http_client ) do
247+ nil ->
248+ Application . put_env ( :ex_aws , :http_client , RemotePersistentTerm.Fetcher.S3.HttpClient )
249+
250+ _ ->
251+ :ok
164252 end
165253 end
166254
167- defp find_latest ( _ ) , do: { :error , :not_found }
255+ defp aws_client_request ( op , state , opts ) do
256+ aws_client_request ( op , state , opts , fn _ -> true end )
257+ end
168258
169- defp aws_client_request ( op , % { failover_buckets: nil } = state , opts ) do
259+ defp aws_client_request ( op , % { failover_buckets: nil } = state , opts , _failover_on_error? ) do
170260 perform_request ( op , state . bucket , state . region , opts )
171261 end
172262
173263 defp aws_client_request (
174264 op ,
175265 % {
176- failover_buckets: [ _ | _ ] = failover_buckets
266+ failover_buckets: [ _ | _ ] = failover_buckets
177267 } = state ,
178- opts
268+ opts ,
269+ failover_on_error?
179270 ) do
180- with { :error , reason } <- perform_request ( op , state . bucket , state . region , opts ) do
181- Logger . error ( % {
182- bucket: state . bucket ,
183- key: state . key ,
184- region: state . region ,
185- reason: inspect ( reason ) ,
186- message: "Failed to fetch from primary bucket, attempting failover buckets"
187- } )
188-
189- try_failover_buckets ( op , failover_buckets , opts , state )
271+ case perform_request ( op , state . bucket , state . region , opts ) do
272+ { :error , reason } = error ->
273+ if failover_on_error? . ( reason ) do
274+ Logger . error ( % {
275+ bucket: state . bucket ,
276+ key: state . key ,
277+ region: state . region ,
278+ reason: inspect ( reason ) ,
279+ message: "Failed to fetch from primary bucket, attempting failover buckets"
280+ } )
281+
282+ try_failover_buckets ( op , failover_buckets , opts , state )
283+ else
284+ error
285+ end
286+
287+ result ->
288+ result
190289 end
191290 end
192291
0 commit comments