Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule Lightning.Runs do
end

@spec complete_run(Run.t(), %{optional(any()) => any()}) ::
{:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())}
{:ok, Run.t(), map() | nil} | {:error, Ecto.Changeset.t(Run.t())}
def complete_run(run, params) do
Handlers.CompleteRun.call(run, params)
end
Expand Down Expand Up @@ -394,7 +394,7 @@ defmodule Lightning.Runs do
result =
Repo.transaction(fn ->
case complete_run(run, %{state: "lost", error_type: error_type}) do
{:ok, updated_run} ->
{:ok, updated_run, _body} ->
Ecto.assoc(run, :steps)
|> where([r], is_nil(r.exit_reason))
|> mark_steps_lost()
Expand Down
52 changes: 32 additions & 20 deletions lib/lightning/runs/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,26 @@ defmodule Lightning.Runs.Handlers do

def call(run, params) do
with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do
Repo.transact(fn ->
with {:ok, run_params} <-
resolve_final_dataclip(complete_run, run.options) do
run
|> Run.complete(run_params)
|> case do
%{valid?: false} = changeset ->
{:error, changeset}

changeset ->
Runs.update_run(changeset)
end
end
end)
case Repo.transact(fn ->
with {:ok, run_params, final_body} <-
resolve_final_dataclip(complete_run, run.options) do
run
|> Run.complete(run_params)
|> case do
%{valid?: false} = changeset ->
{:error, changeset}

changeset ->
case Runs.update_run(changeset) do
{:ok, run} -> {:ok, {run, final_body}}
error -> error
end
end
end
end) do
{:ok, {run, final_body}} -> {:ok, run, final_body}
{:error, _} = error -> error
end
end
end

Expand Down Expand Up @@ -154,10 +160,15 @@ defmodule Lightning.Runs.Handlers do
_options
)
when is_binary(id) do
if Repo.exists?(from d in Dataclip, where: d.id == ^id) do
{:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)}
else
{:error, %{errors: %{final_dataclip_id: ["does not exist"]}}}
case Repo.one(
from d in Dataclip, where: d.id == ^id, select: {d.id, d.body}
) do
nil ->
{:error, %{errors: %{final_dataclip_id: ["does not exist"]}}}

{_id, body} ->
{:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id),
body}
end
end

Expand All @@ -170,7 +181,8 @@ defmodule Lightning.Runs.Handlers do
when is_map(final_state) and is_binary(project_id) do
case save_final_dataclip(final_state, project_id, options) do
{:ok, %Dataclip{id: id}} ->
{:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)}
{:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id),
final_state}

error ->
error
Expand All @@ -179,7 +191,7 @@ defmodule Lightning.Runs.Handlers do

# Neither provided (e.g., mark_run_lost, or worker didn't send final state).
defp resolve_final_dataclip(complete_run, _options) do
{:ok, to_run_params(complete_run)}
{:ok, to_run_params(complete_run), nil}
end

defp save_final_dataclip(
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning/setup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ defmodule Lightning.SetupUtils do
reason -> reason
end

{:ok, _} = Runs.complete_run(run, %{state: state})
{:ok, _, _} = Runs.complete_run(run, %{state: state})

workorder
end)
Expand Down
8 changes: 4 additions & 4 deletions lib/lightning_web/channels/run_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ defmodule LightningWeb.RunChannel do
payload = Map.put(payload, "project_id", socket.assigns.project_id)

case Runs.complete_run(socket.assigns.run, payload) do
{:ok, run} ->
{:ok, run, final_body} ->
# TODO: Turn FailureAlerter into an Oban worker and process async
# instead of blocking the channel.
run_with_preloads =
Expand All @@ -121,7 +121,7 @@ defmodule LightningWeb.RunChannel do
|> Lightning.FailureAlerter.alert_on_failure()

# Broadcast webhook response if after_completion is enabled
maybe_broadcast_webhook_response(run_with_preloads, payload)
maybe_broadcast_webhook_response(run_with_preloads, final_body)

socket |> assign(run: run) |> reply_with({:ok, nil})

Expand Down Expand Up @@ -306,7 +306,7 @@ defmodule LightningWeb.RunChannel do
# Ignore other messages
def handle_info(_msg, socket), do: {:noreply, socket}

defp maybe_broadcast_webhook_response(run, payload) do
defp maybe_broadcast_webhook_response(run, final_body) do
work_order = run.work_order
trigger = work_order.trigger

Expand All @@ -319,7 +319,7 @@ defmodule LightningWeb.RunChannel do
status_code = determine_status_code(run.state)

body = %{
data: payload["final_state"],
data: final_body,
meta: %{
work_order_id: work_order.id,
run_id: run.id,
Expand Down
4 changes: 2 additions & 2 deletions test/lightning/runs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ defmodule Lightning.RunsTest do

Lightning.WorkOrders.subscribe(workflow.project_id)

{:ok, run} = Runs.complete_run(run, %{state: "success"})
{:ok, run, _body} = Runs.complete_run(run, %{state: "success"})

assert run.state == :success
assert run.finished_at == Lightning.current_time()
Expand Down Expand Up @@ -760,7 +760,7 @@ defmodule Lightning.RunsTest do
|> Ecto.Changeset.change(state: :claimed)
|> Repo.update()

{:ok, run} =
{:ok, run, _body} =
Runs.complete_run(run, %{state: "lost", error_type: "Lost"})

assert run.state == :lost
Expand Down
4 changes: 2 additions & 2 deletions test/lightning_web/live/run_live/show_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ defmodule LightningWeb.RunLive.ShowTest do
assert {_attrs, %{"dataclipId" => ^output_dataclip_id}} =
output_dataclip_viewer

{:ok, _} = Lightning.Runs.complete_run(run, %{state: "failed"})
{:ok, _, _} = Lightning.Runs.complete_run(run, %{state: "failed"})

html =
view
Expand Down Expand Up @@ -259,7 +259,7 @@ defmodule LightningWeb.RunLive.ShowTest do
reason: "success"
})

{:ok, _} =
{:ok, _, _} =
Lightning.Runs.complete_run(run, %{
state: "success",
final_dataclip_id: output_dataclip_id
Expand Down
2 changes: 1 addition & 1 deletion test/lightning_web/live/workflow_live/editor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ defmodule LightningWeb.WorkflowLive.EditorTest do
|> Lightning.Repo.update!()

# lets crash the run
{:ok, _run} =
{:ok, _run, _body} =
Lightning.Runs.complete_run(run, %{
"error_message" => "Unexpected token (6:9)",
"error_type" => "CompileError",
Expand Down