Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ Runtime configuration is read from environment variables in `config/runtime.exs`
| `MN_NODE_GPU` | Auto-detected | Optional override for whether this runtime node advertises GPU capacity. |
| `MN_CORE_HOST` | `localhost` | Host/IP used by the gRPC listener. |
| `MN_GRPC_PORT` | `50051` | gRPC port. |
| `MN_GRPC_OPERATOR_TOKEN` | Empty | Required bearer or `x-mirror-neuron-operator-token` metadata token for gRPC operator control RPCs such as pause and resume. |
| `MN_API_ENABLED` | `true` | Enables API-related runtime config. |
| `MN_API_PORT` | `4000` | Core API config port. The separate `mn-api` package uses its own defaults. |
| `MN_TEMP_DIR` | `/tmp/mirror_neuron` | Temporary runtime directory. |
Expand Down
122 changes: 122 additions & 0 deletions lib/mirror_neuron_grpc/auth.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule MirrorNeuron.Grpc.Auth do
@moduledoc false

import Bitwise

@operator_token_env "MN_GRPC_OPERATOR_TOKEN"
@token_headers [
"authorization",
"x-mirror-neuron-operator-token",
"mn-operator-token"
]

def authorize_operator!(stream) do
case operator_token() do
{:ok, expected_token} ->
if authorized?(stream, expected_token) do
:ok
else
raise GRPC.RPCError,
status: GRPC.Status.unauthenticated(),
message: "operator token is required for this RPC"
end

:error ->
raise GRPC.RPCError,
status: GRPC.Status.unauthenticated(),
message: "#{@operator_token_env} must be set before operator control RPCs can be used"
end
end

def authorized?(stream, expected_token) when is_binary(expected_token) do
expected_token = String.trim(expected_token)

expected_token != "" and
stream
|> metadata()
|> token_candidates()
|> Enum.any?(&secure_compare(&1, expected_token))
end

def authorized?(_stream, _expected_token), do: false

defp operator_token do
case System.get_env(@operator_token_env, "") |> String.trim() do
"" -> :error
token -> {:ok, token}
end
end

defp metadata(stream) when is_map(stream) do
stream_map = if Map.has_key?(stream, :__struct__), do: Map.from_struct(stream), else: stream

headers =
stream_map
|> adapter_headers()
|> Kernel.++(
[:http_request_headers, :headers, :metadata, :request_headers]
|> Enum.flat_map(&metadata_values(Map.get(stream_map, &1)))
)

Enum.reduce(headers, %{}, fn {key, value}, acc -> Map.put(acc, normalize_key(key), value) end)
end

defp metadata(_stream), do: %{}

defp adapter_headers(%{adapter: adapter, payload: payload}) when is_atom(adapter) do
if match?({:module, ^adapter}, Code.ensure_loaded(adapter)) and
function_exported?(adapter, :get_headers, 1) do
adapter.get_headers(payload) |> metadata_values()
else
[]
end
end

defp adapter_headers(_stream_map), do: []

defp metadata_values(nil), do: []
defp metadata_values(metadata) when is_map(metadata), do: Map.to_list(metadata)
defp metadata_values(metadata) when is_list(metadata), do: metadata
defp metadata_values(_metadata), do: []

defp token_candidates(metadata) do
@token_headers
|> Enum.flat_map(fn header -> metadata |> Map.get(header) |> header_values() end)
|> Enum.flat_map(&authorization_values/1)
end

defp header_values(nil), do: []
defp header_values(values) when is_list(values), do: values
defp header_values(value), do: [value]

defp authorization_values(value) when is_binary(value) do
value = String.trim(value)

case String.split(value, " ", parts: 2) do
[scheme, token] ->
if String.downcase(scheme) == "bearer", do: [String.trim(token)], else: [value]

_ ->
[value]
end
end

defp authorization_values(_value), do: []

defp normalize_key(key) when is_atom(key), do: key |> Atom.to_string() |> String.downcase()
defp normalize_key(key) when is_binary(key), do: String.downcase(key)
defp normalize_key(key), do: key |> to_string() |> String.downcase()

defp secure_compare(left, right) when is_binary(left) and is_binary(right) do
byte_size(left) == byte_size(right) and compare_bytes(left, right) == 0
end

defp secure_compare(_left, _right), do: false

defp compare_bytes(left, right), do: compare_bytes(left, right, 0)
defp compare_bytes(<<>>, <<>>, acc), do: acc

defp compare_bytes(<<left, left_rest::binary>>, <<right, right_rest::binary>>, acc) do
compare_bytes(left_rest, right_rest, bor(acc, bxor(left, right)))
end
end
8 changes: 6 additions & 2 deletions lib/mirror_neuron_grpc/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ defmodule MirrorNeuron.Grpc.JobServer do
end
end

def pause_job(request, _stream) do
def pause_job(request, stream) do
MirrorNeuron.Grpc.Auth.authorize_operator!(stream)

job_id = request.job_id

case MirrorNeuron.pause(job_id) do
Expand All @@ -136,7 +138,9 @@ defmodule MirrorNeuron.Grpc.JobServer do
end
end

def resume_job(request, _stream) do
def resume_job(request, stream) do
MirrorNeuron.Grpc.Auth.authorize_operator!(stream)

job_id = request.job_id

case MirrorNeuron.resume(job_id) do
Expand Down
48 changes: 48 additions & 0 deletions tests/unit/grpc/auth_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule MirrorNeuron.Grpc.AuthTest do
use ExUnit.Case, async: true

alias MirrorNeuron.Grpc.Auth

defmodule Stream do
defstruct [:adapter, :payload, :http_request_headers, :headers, :metadata, :request_headers]
end

defmodule Adapter do
def get_headers(:payload), do: %{"authorization" => "Bearer secret-token"}
end

test "authorizes bearer tokens from request headers" do
stream = %Stream{headers: %{"authorization" => "Bearer secret-token"}}

assert Auth.authorized?(stream, "secret-token")
end

test "authorizes tokens from grpc adapter request headers" do
stream = %Stream{adapter: Adapter, payload: :payload}

assert Auth.authorized?(stream, "secret-token")
end

test "authorizes tokens from grpc http request headers" do
stream = %Stream{http_request_headers: %{"authorization" => "Bearer secret-token"}}

assert Auth.authorized?(stream, "secret-token")
end

test "authorizes explicit operator token metadata" do
stream = %Stream{metadata: [{"x-mirror-neuron-operator-token", "secret-token"}]}

assert Auth.authorized?(stream, "secret-token")
end

test "rejects missing, blank, and mismatched tokens" do
refute Auth.authorized?(%Stream{headers: %{}}, "secret-token")

refute Auth.authorized?(
%Stream{headers: %{"authorization" => "Bearer wrong"}},
"secret-token"
)

refute Auth.authorized?(%Stream{headers: %{"authorization" => "Bearer secret-token"}}, "")
end
end
Loading