Skip to content

Commit a852e2a

Browse files
committed
Add fallback to previous version
1 parent 8f8a204 commit a852e2a

7 files changed

Lines changed: 163 additions & 16 deletions

File tree

lib/remote_persistent_term.ex

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ defmodule RemotePersistentTerm do
6969
7070
Currently only supports gzip (0x1F, 0x8B).
7171
"""
72+
],
73+
version_fallback?: [
74+
type: :boolean,
75+
required: false,
76+
default: false,
77+
doc: """
78+
If true, when deserialization fails, the system will attempt to use previous versions \
79+
of the term until a valid version is found or all versions are exhausted.
80+
"""
7281
]
7382
]
7483

@@ -77,15 +86,17 @@ defmodule RemotePersistentTerm do
7786
fetcher_state: term(),
7887
refresh_interval: pos_integer(),
7988
current_version: String.t(),
80-
auto_decompress?: boolean()
89+
auto_decompress?: boolean(),
90+
version_fallback?: boolean()
8191
}
8292
defstruct [
8393
:fetcher_mod,
8494
:fetcher_state,
8595
:refresh_interval,
8696
:current_version,
8797
:name,
88-
:auto_decompress?
98+
:auto_decompress?,
99+
:version_fallback?
89100
]
90101

91102
@doc """
@@ -138,7 +149,8 @@ defmodule RemotePersistentTerm do
138149
fetcher_state: fetcher_state,
139150
refresh_interval: opts[:refresh_interval],
140151
name: name(opts),
141-
auto_decompress?: opts[:auto_decompress?]
152+
auto_decompress?: opts[:auto_decompress?],
153+
version_fallback?: opts[:version_fallback?]
142154
}
143155

144156
if opts[:lazy_init?] do
@@ -307,9 +319,31 @@ defmodule RemotePersistentTerm do
307319

308320
defp download_and_store_term(state, deserialize_fun, put_fun) do
309321
with {:ok, term} <- state.fetcher_mod.download(state.fetcher_state),
310-
{:ok, decompressed} <- maybe_decompress(state, term),
311-
{:ok, deserialized} <- deserialize_fun.(decompressed) do
312-
put_fun.(deserialized)
322+
{:ok, decompressed} <- maybe_decompress(state, term) do
323+
try_deserialize_and_store(state, decompressed, deserialize_fun, put_fun)
324+
end
325+
end
326+
327+
defp try_deserialize_and_store(state, term, deserialize_fun, put_fun) do
328+
case deserialize_fun.(term) do
329+
{:ok, deserialized} ->
330+
put_fun.(deserialized)
331+
332+
{:error, _reason} = error when state.version_fallback? ->
333+
case state.fetcher_mod.previous_version(state.fetcher_state) do
334+
{:ok, previous_state} ->
335+
download_and_store_term(
336+
%{state | fetcher_state: previous_state},
337+
deserialize_fun,
338+
put_fun
339+
)
340+
341+
{:error, _} ->
342+
error
343+
end
344+
345+
error ->
346+
error
313347
end
314348
end
315349

lib/remote_persistent_term/fetcher.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,10 @@ defmodule RemotePersistentTerm.Fetcher do
2626
Download the term from the remote source.
2727
"""
2828
@callback download(state()) :: {:ok, term()} | {:error, term()}
29+
30+
@doc """
31+
Get the previous version of the remote term.
32+
Returns a new state that can be used to fetch the previous version.
33+
"""
34+
@callback previous_version(state()) :: {:ok, state()} | {:error, term()}
2935
end

lib/remote_persistent_term/fetcher/http.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ defmodule RemotePersistentTerm.Fetcher.Http do
8181
end
8282
end
8383

84+
@impl true
85+
def previous_version(_state), do: {:error, :no_previous_version}
86+
8487
defp response_status(url, status) do
8588
if status < 300 do
8689
Logger.info("successfully downloaded remote term from #{url} with status #{status}")

lib/remote_persistent_term/fetcher/s3.ex

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
1414
bucket: bucket,
1515
key: String.t(),
1616
region: region,
17-
failover_buckets: [failover_bucket] | nil
17+
failover_buckets: [failover_bucket] | nil,
18+
version_id: String.t() | nil
1819
}
19-
defstruct [:bucket, :key, :region, :failover_buckets]
20+
defstruct [:bucket, :key, :region, :failover_buckets, :version_id]
2021

2122
@failover_bucket_schema [
2223
bucket: [
@@ -136,9 +137,9 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
136137
defp list_object_versions(state) do
137138
res =
138139
aws_client_request(
139-
&ExAws.S3.get_bucket_object_versions/2,
140+
:get_bucket_object_versions,
140141
state,
141-
prefix: state.key
142+
[[prefix: state.key]]
142143
)
143144

144145
with {:ok, %{body: %{versions: versions}}} <- res do
@@ -147,7 +148,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
147148
end
148149

149150
defp get_object(state) do
150-
aws_client_request(&ExAws.S3.get_object/2, state, state.key)
151+
# aws_client_request(&ExAws.S3.get_object/2, state, state.key)
152+
aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]])
151153
end
152154

153155
defp find_latest([_ | _] = contents) do
@@ -166,14 +168,59 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
166168

167169
defp find_latest(_), do: {:error, :not_found}
168170

171+
@impl true
172+
def previous_version(state) do
173+
Logger.info(
174+
bucket: state.bucket,
175+
key: state.key,
176+
message: "About to fetch previous version of object",
177+
version_id: state.version_id
178+
)
179+
180+
with {:ok, versions} <- list_object_versions(state),
181+
{:ok, previous_version} <- find_previous_version(versions, state.version_id) do
182+
{:ok, %{state | version_id: previous_version.version_id}}
183+
else
184+
{:error, reason} ->
185+
Logger.error(%{
186+
bucket: state.bucket,
187+
key: state.key,
188+
reason: inspect(reason),
189+
message: "Failed to get previous version of object"
190+
})
191+
192+
{:error, reason}
193+
end
194+
end
195+
196+
defp find_previous_version(versions, current_version_id) do
197+
versions
198+
|> Enum.sort_by(
199+
fn version ->
200+
{:ok, datetime, _} = DateTime.from_iso8601(version.last_modified)
201+
datetime
202+
end,
203+
{:desc, DateTime}
204+
)
205+
|> Enum.find(fn version ->
206+
version.version_id != current_version_id
207+
end)
208+
|> case do
209+
nil -> {:error, :no_previous_version}
210+
version -> {:ok, version}
211+
end
212+
end
213+
214+
defp aws_client_request(op, state, opts \\ [])
215+
169216
defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do
170217
perform_request(op, state.bucket, state.region, opts)
171218
end
172219

173220
defp aws_client_request(
174221
op,
175222
%{
176-
failover_buckets: [_|_] = failover_buckets
223+
failover_buckets: [_ | _] = failover_buckets
177224
} = state,
178225
opts
179226
) do
@@ -222,8 +269,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
222269
end
223270
end
224271

225-
defp perform_request(op, bucket, region, opts) do
226-
op.(bucket, opts)
272+
defp perform_request(func, bucket, region, opts) do
273+
apply(ExAws.S3, func, [bucket | opts])
227274
|> client().request(region: region)
228275
end
229276

lib/remote_persistent_term/fetcher/static.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule RemotePersistentTerm.Fetcher.Static do
22
@moduledoc """
3-
A macro to help define a valid `RemotePersistentTerm.Fetcher` which
3+
A macro to help define a valid `RemotePersistentTerm.Fetcher` which
44
always returns some hardcoded static data.
55
66
Mostly intended for testing purposes.
@@ -26,6 +26,9 @@ defmodule RemotePersistentTerm.Fetcher.Static do
2626

2727
@impl true
2828
def download(_), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))}
29+
30+
@impl true
31+
def previous_version(_), do: {:error, :no_previous_version}
2932
end
3033
end
3134
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule RemotePersistentTerm.MixProject do
22
use Mix.Project
33

44
@name "RemotePersistentTerm"
5-
@version "0.12.0"
5+
@version "0.13.0"
66
@repo_url "https://github.com/AppMonet/remote_persistent_term"
77

88
def project do

test/remote_persistent_term/fetcher/s3_test.exs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,4 +253,58 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do
253253
assert log =~ "Downloaded object from S3"
254254
end
255255
end
256+
257+
describe "previous_version/1" do
258+
test "finds the correct previous version when given a current version ID" do
259+
versions = [
260+
%{
261+
version_id: "v3",
262+
last_modified: "2025-05-08T09:58:38.000Z",
263+
is_latest: "true"
264+
},
265+
%{
266+
version_id: "v2",
267+
last_modified: "2025-04-02T10:21:18.000Z",
268+
is_latest: "false"
269+
},
270+
%{
271+
version_id: "v1",
272+
last_modified: "2025-04-02T09:10:37.000Z",
273+
is_latest: "false"
274+
}
275+
]
276+
277+
expect(AwsClientMock, :request, fn operation, opts ->
278+
assert operation.bucket == @bucket
279+
assert operation.resource == "versions"
280+
assert operation.params == [prefix: @key]
281+
assert opts == [region: @region]
282+
{:ok, %{body: %{versions: versions}}}
283+
end)
284+
285+
state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v3"}
286+
assert {:ok, %{version_id: "v2"}} = S3.previous_version(state)
287+
end
288+
289+
test "returns error when there are no previous versions" do
290+
versions = [
291+
%{
292+
version_id: "v1",
293+
last_modified: "2025-04-02T09:10:37.000Z",
294+
is_latest: "true"
295+
}
296+
]
297+
298+
expect(AwsClientMock, :request, fn operation, opts ->
299+
assert operation.bucket == @bucket
300+
assert operation.resource == "versions"
301+
assert operation.params == [prefix: @key]
302+
assert opts == [region: @region]
303+
{:ok, %{body: %{versions: versions}}}
304+
end)
305+
306+
state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v1"}
307+
assert {:error, :no_previous_version} = S3.previous_version(state)
308+
end
309+
end
256310
end

0 commit comments

Comments
 (0)