From 574a6ac2d26c845e310ab96820ff727ba92a56d9 Mon Sep 17 00:00:00 2001 From: Homer Quan Date: Mon, 18 May 2026 13:41:05 -0400 Subject: [PATCH] Require operator token for gRPC pause resume --- README.md | 1 + lib/mirror_neuron_grpc/auth.ex | 122 +++++++++++++++++++++++++++++++ lib/mirror_neuron_grpc/server.ex | 8 +- tests/unit/grpc/auth_test.exs | 48 ++++++++++++ 4 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 lib/mirror_neuron_grpc/auth.ex create mode 100644 tests/unit/grpc/auth_test.exs diff --git a/README.md b/README.md index 2569174..666d06e 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,7 @@ Runtime configuration is read from environment variables in `config/runtime.exs` | `MN_NODE_GPU` | Auto-detected | Optional override for whether this runtime node advertises GPU capacity. | | `MN_CORE_HOST` | `localhost` | Host/IP used by the gRPC listener. | | `MN_GRPC_PORT` | `50051` | gRPC port. | +| `MN_GRPC_OPERATOR_TOKEN` | Empty | Required bearer or `x-mirror-neuron-operator-token` metadata token for gRPC operator control RPCs such as pause and resume. | | `MN_API_ENABLED` | `true` | Enables API-related runtime config. | | `MN_API_PORT` | `4000` | Core API config port. The separate `mn-api` package uses its own defaults. | | `MN_TEMP_DIR` | `/tmp/mirror_neuron` | Temporary runtime directory. | diff --git a/lib/mirror_neuron_grpc/auth.ex b/lib/mirror_neuron_grpc/auth.ex new file mode 100644 index 0000000..3a392b0 --- /dev/null +++ b/lib/mirror_neuron_grpc/auth.ex @@ -0,0 +1,122 @@ +defmodule MirrorNeuron.Grpc.Auth do + @moduledoc false + + import Bitwise + + @operator_token_env "MN_GRPC_OPERATOR_TOKEN" + @token_headers [ + "authorization", + "x-mirror-neuron-operator-token", + "mn-operator-token" + ] + + def authorize_operator!(stream) do + case operator_token() do + {:ok, expected_token} -> + if authorized?(stream, expected_token) do + :ok + else + raise GRPC.RPCError, + status: GRPC.Status.unauthenticated(), + message: "operator token is required for this RPC" + end + + :error -> + raise GRPC.RPCError, + status: GRPC.Status.unauthenticated(), + message: "#{@operator_token_env} must be set before operator control RPCs can be used" + end + end + + def authorized?(stream, expected_token) when is_binary(expected_token) do + expected_token = String.trim(expected_token) + + expected_token != "" and + stream + |> metadata() + |> token_candidates() + |> Enum.any?(&secure_compare(&1, expected_token)) + end + + def authorized?(_stream, _expected_token), do: false + + defp operator_token do + case System.get_env(@operator_token_env, "") |> String.trim() do + "" -> :error + token -> {:ok, token} + end + end + + defp metadata(stream) when is_map(stream) do + stream_map = if Map.has_key?(stream, :__struct__), do: Map.from_struct(stream), else: stream + + headers = + stream_map + |> adapter_headers() + |> Kernel.++( + [:http_request_headers, :headers, :metadata, :request_headers] + |> Enum.flat_map(&metadata_values(Map.get(stream_map, &1))) + ) + + Enum.reduce(headers, %{}, fn {key, value}, acc -> Map.put(acc, normalize_key(key), value) end) + end + + defp metadata(_stream), do: %{} + + defp adapter_headers(%{adapter: adapter, payload: payload}) when is_atom(adapter) do + if match?({:module, ^adapter}, Code.ensure_loaded(adapter)) and + function_exported?(adapter, :get_headers, 1) do + adapter.get_headers(payload) |> metadata_values() + else + [] + end + end + + defp adapter_headers(_stream_map), do: [] + + defp metadata_values(nil), do: [] + defp metadata_values(metadata) when is_map(metadata), do: Map.to_list(metadata) + defp metadata_values(metadata) when is_list(metadata), do: metadata + defp metadata_values(_metadata), do: [] + + defp token_candidates(metadata) do + @token_headers + |> Enum.flat_map(fn header -> metadata |> Map.get(header) |> header_values() end) + |> Enum.flat_map(&authorization_values/1) + end + + defp header_values(nil), do: [] + defp header_values(values) when is_list(values), do: values + defp header_values(value), do: [value] + + defp authorization_values(value) when is_binary(value) do + value = String.trim(value) + + case String.split(value, " ", parts: 2) do + [scheme, token] -> + if String.downcase(scheme) == "bearer", do: [String.trim(token)], else: [value] + + _ -> + [value] + end + end + + defp authorization_values(_value), do: [] + + defp normalize_key(key) when is_atom(key), do: key |> Atom.to_string() |> String.downcase() + defp normalize_key(key) when is_binary(key), do: String.downcase(key) + defp normalize_key(key), do: key |> to_string() |> String.downcase() + + defp secure_compare(left, right) when is_binary(left) and is_binary(right) do + byte_size(left) == byte_size(right) and compare_bytes(left, right) == 0 + end + + defp secure_compare(_left, _right), do: false + + defp compare_bytes(left, right), do: compare_bytes(left, right, 0) + defp compare_bytes(<<>>, <<>>, acc), do: acc + + defp compare_bytes(<>, <>, acc) do + compare_bytes(left_rest, right_rest, bor(acc, bxor(left, right))) + end +end diff --git a/lib/mirror_neuron_grpc/server.ex b/lib/mirror_neuron_grpc/server.ex index 41e98d4..af14446 100644 --- a/lib/mirror_neuron_grpc/server.ex +++ b/lib/mirror_neuron_grpc/server.ex @@ -121,7 +121,9 @@ defmodule MirrorNeuron.Grpc.JobServer do end end - def pause_job(request, _stream) do + def pause_job(request, stream) do + MirrorNeuron.Grpc.Auth.authorize_operator!(stream) + job_id = request.job_id case MirrorNeuron.pause(job_id) do @@ -136,7 +138,9 @@ defmodule MirrorNeuron.Grpc.JobServer do end end - def resume_job(request, _stream) do + def resume_job(request, stream) do + MirrorNeuron.Grpc.Auth.authorize_operator!(stream) + job_id = request.job_id case MirrorNeuron.resume(job_id) do diff --git a/tests/unit/grpc/auth_test.exs b/tests/unit/grpc/auth_test.exs new file mode 100644 index 0000000..367946e --- /dev/null +++ b/tests/unit/grpc/auth_test.exs @@ -0,0 +1,48 @@ +defmodule MirrorNeuron.Grpc.AuthTest do + use ExUnit.Case, async: true + + alias MirrorNeuron.Grpc.Auth + + defmodule Stream do + defstruct [:adapter, :payload, :http_request_headers, :headers, :metadata, :request_headers] + end + + defmodule Adapter do + def get_headers(:payload), do: %{"authorization" => "Bearer secret-token"} + end + + test "authorizes bearer tokens from request headers" do + stream = %Stream{headers: %{"authorization" => "Bearer secret-token"}} + + assert Auth.authorized?(stream, "secret-token") + end + + test "authorizes tokens from grpc adapter request headers" do + stream = %Stream{adapter: Adapter, payload: :payload} + + assert Auth.authorized?(stream, "secret-token") + end + + test "authorizes tokens from grpc http request headers" do + stream = %Stream{http_request_headers: %{"authorization" => "Bearer secret-token"}} + + assert Auth.authorized?(stream, "secret-token") + end + + test "authorizes explicit operator token metadata" do + stream = %Stream{metadata: [{"x-mirror-neuron-operator-token", "secret-token"}]} + + assert Auth.authorized?(stream, "secret-token") + end + + test "rejects missing, blank, and mismatched tokens" do + refute Auth.authorized?(%Stream{headers: %{}}, "secret-token") + + refute Auth.authorized?( + %Stream{headers: %{"authorization" => "Bearer wrong"}}, + "secret-token" + ) + + refute Auth.authorized?(%Stream{headers: %{"authorization" => "Bearer secret-token"}}, "") + end +end