Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions lib/remote_persistent_term.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ defmodule RemotePersistentTerm do

Currently only supports gzip (0x1F, 0x8B).
"""
],
version_fallback?: [
type: :boolean,
required: false,
default: false,
doc: """
If true, when deserialization fails, the system will attempt to use previous versions \
Comment thread
louiscb marked this conversation as resolved.
of the term until a valid version is found or all versions are exhausted.
"""
]
]

Expand All @@ -77,15 +86,17 @@ defmodule RemotePersistentTerm do
fetcher_state: term(),
refresh_interval: pos_integer(),
current_version: String.t(),
auto_decompress?: boolean()
auto_decompress?: boolean(),
version_fallback?: boolean()
}
defstruct [
:fetcher_mod,
:fetcher_state,
:refresh_interval,
:current_version,
:name,
:auto_decompress?
:auto_decompress?,
:version_fallback?
]

@doc """
Expand Down Expand Up @@ -138,7 +149,8 @@ defmodule RemotePersistentTerm do
fetcher_state: fetcher_state,
refresh_interval: opts[:refresh_interval],
name: name(opts),
auto_decompress?: opts[:auto_decompress?]
auto_decompress?: opts[:auto_decompress?],
version_fallback?: opts[:version_fallback?]
}

if opts[:lazy_init?] do
Expand Down Expand Up @@ -307,9 +319,31 @@ defmodule RemotePersistentTerm do

defp download_and_store_term(state, deserialize_fun, put_fun) do
with {:ok, term} <- state.fetcher_mod.download(state.fetcher_state),
{:ok, decompressed} <- maybe_decompress(state, term),
{:ok, deserialized} <- deserialize_fun.(decompressed) do
put_fun.(deserialized)
{:ok, decompressed} <- maybe_decompress(state, term) do
try_deserialize_and_store(state, decompressed, deserialize_fun, put_fun)
end
end

defp try_deserialize_and_store(state, term, deserialize_fun, put_fun) do
case deserialize_fun.(term) do
{:ok, deserialized} ->
put_fun.(deserialized)

{:error, _reason} = error when state.version_fallback? ->
case state.fetcher_mod.previous_version(state.fetcher_state) do
Comment thread
sneako marked this conversation as resolved.
Outdated
Comment thread
sneako marked this conversation as resolved.
Outdated
{:ok, previous_state} ->
download_and_store_term(
%{state | fetcher_state: previous_state},
deserialize_fun,
put_fun
)

{:error, _} ->
error
end

error ->
error
end
end

Expand Down
6 changes: 6 additions & 0 deletions lib/remote_persistent_term/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ defmodule RemotePersistentTerm.Fetcher do
Download the term from the remote source.
"""
@callback download(state()) :: {:ok, term()} | {:error, term()}

@doc """
Get the previous version of the remote term.
Returns a new state that can be used to fetch the previous version.
"""
@callback previous_version(state()) :: {:ok, state()} | {:error, term()}
end
3 changes: 3 additions & 0 deletions lib/remote_persistent_term/fetcher/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ defmodule RemotePersistentTerm.Fetcher.Http do
end
end

@impl true
def previous_version(_state), do: {:error, :no_previous_version}
Comment thread
sneako marked this conversation as resolved.
Outdated

defp response_status(url, status) do
if status < 300 do
Logger.info("successfully downloaded remote term from #{url} with status #{status}")
Expand Down
62 changes: 54 additions & 8 deletions lib/remote_persistent_term/fetcher/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
bucket: bucket,
key: String.t(),
region: region,
failover_buckets: [failover_bucket] | nil
failover_buckets: [failover_bucket] | nil,
version_id: String.t() | nil
}
defstruct [:bucket, :key, :region, :failover_buckets]
defstruct [:bucket, :key, :region, :failover_buckets, :version_id]

@failover_bucket_schema [
bucket: [
Expand Down Expand Up @@ -136,9 +137,9 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
defp list_object_versions(state) do
res =
aws_client_request(
&ExAws.S3.get_bucket_object_versions/2,
:get_bucket_object_versions,
state,
prefix: state.key
[[prefix: state.key]]
Comment thread
louiscb marked this conversation as resolved.
)

with {:ok, %{body: %{versions: versions}}} <- res do
Expand All @@ -147,7 +148,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
end

defp get_object(state) do
aws_client_request(&ExAws.S3.get_object/2, state, state.key)
aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]])
end

defp find_latest([_ | _] = contents) do
Expand All @@ -166,14 +167,59 @@ defmodule RemotePersistentTerm.Fetcher.S3 do

defp find_latest(_), do: {:error, :not_found}

@impl true
def previous_version(state) do
Logger.info(
bucket: state.bucket,
key: state.key,
message: "About to fetch previous version of object",
version_id: state.version_id
)

with {:ok, versions} <- list_object_versions(state),
{:ok, previous_version} <- find_previous_version(versions, state.version_id) do
{:ok, %{state | version_id: previous_version.version_id}}
else
{:error, reason} ->
Logger.error(%{
bucket: state.bucket,
key: state.key,
reason: inspect(reason),
message: "Failed to get previous version of object"
})

{:error, reason}
end
end

defp find_previous_version(versions, current_version_id) do
versions
|> Enum.sort_by(
fn version ->
{:ok, datetime, _} = DateTime.from_iso8601(version.last_modified)
Comment thread
sneako marked this conversation as resolved.
Outdated
datetime
end,
{:desc, DateTime}
)
|> Enum.find(fn version ->
version.version_id != current_version_id
end)
|> case do
nil -> {:error, :no_previous_version}
version -> {:ok, version}
end
end

defp aws_client_request(op, state, opts \\ [])

defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do
perform_request(op, state.bucket, state.region, opts)
end

defp aws_client_request(
op,
%{
failover_buckets: [_|_] = failover_buckets
failover_buckets: [_ | _] = failover_buckets
} = state,
opts
) do
Expand Down Expand Up @@ -222,8 +268,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
end
end

defp perform_request(op, bucket, region, opts) do
op.(bucket, opts)
defp perform_request(func, bucket, region, opts) do
apply(ExAws.S3, func, [bucket | opts])
|> client().request(region: region)
end

Expand Down
5 changes: 4 additions & 1 deletion lib/remote_persistent_term/fetcher/static.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule RemotePersistentTerm.Fetcher.Static do
@moduledoc """
A macro to help define a valid `RemotePersistentTerm.Fetcher` which
A macro to help define a valid `RemotePersistentTerm.Fetcher` which
always returns some hardcoded static data.

Mostly intended for testing purposes.
Expand All @@ -26,6 +26,9 @@ defmodule RemotePersistentTerm.Fetcher.Static do

@impl true
def download(_), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))}

@impl true
def previous_version(_), do: {:error, :no_previous_version}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule RemotePersistentTerm.MixProject do
use Mix.Project

@name "RemotePersistentTerm"
@version "0.12.0"
@version "0.13.0"
@repo_url "https://github.com/AppMonet/remote_persistent_term"

def project do
Expand Down
54 changes: 54 additions & 0 deletions test/remote_persistent_term/fetcher/s3_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,58 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do
assert log =~ "Downloaded object from S3"
end
end

describe "previous_version/1" do
test "finds the correct previous version when given a current version ID" do
versions = [
%{
version_id: "v3",
last_modified: "2025-05-08T09:58:38.000Z",
is_latest: "true"
},
%{
version_id: "v2",
last_modified: "2025-04-02T10:21:18.000Z",
is_latest: "false"
},
%{
version_id: "v1",
last_modified: "2025-04-02T09:10:37.000Z",
is_latest: "false"
}
]

expect(AwsClientMock, :request, fn operation, opts ->
assert operation.bucket == @bucket
assert operation.resource == "versions"
assert operation.params == [prefix: @key]
assert opts == [region: @region]
{:ok, %{body: %{versions: versions}}}
end)

state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v3"}
assert {:ok, %{version_id: "v2"}} = S3.previous_version(state)
end

test "returns error when there are no previous versions" do
versions = [
%{
version_id: "v1",
last_modified: "2025-04-02T09:10:37.000Z",
is_latest: "true"
}
]

expect(AwsClientMock, :request, fn operation, opts ->
assert operation.bucket == @bucket
assert operation.resource == "versions"
assert operation.params == [prefix: @key]
assert opts == [region: @region]
{:ok, %{body: %{versions: versions}}}
end)

state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v1"}
assert {:error, :no_previous_version} = S3.previous_version(state)
end
end
end