Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ and this project adheres to

### Fixed

- Fix version-stuck bug where the collaborative editor shows stale state after a
sandbox merge or CLI deploy.
[#4535](https://github.com/OpenFn/lightning/issues/4535)

## [2.16.0] - 2026-03-24

## [2.16.0-pre3] - 2026-03-23
Expand Down
19 changes: 18 additions & 1 deletion lib/lightning/collaboration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Lightning.Collaborate do

Collaborate.start(user: user, workflow: workflow)
"""
alias Lightning.Accounts.User
alias Lightning.Collaboration.DocumentSupervisor
alias Lightning.Collaboration.Registry
alias Lightning.Collaboration.Session
Expand All @@ -36,6 +37,20 @@ defmodule Lightning.Collaborate do

@spec start(opts :: Keyword.t()) :: GenServer.on_start()
def start(opts) do
case do_start(opts) do
{:error, {:error, :shared_doc_not_found}} ->
# A SharedDoc was registered in :pg but died before the Session could
# observe it (0ms auto_exit race). Yield one ms — enough for the timer
# to fire and clear :pg — then try once more from scratch.
Process.sleep(1)
do_start(opts)

result ->
result
end
end

defp do_start(opts) do
session_id = Ecto.UUID.generate()
parent_pid = Keyword.get(opts, :parent_pid, self())

Expand Down Expand Up @@ -64,13 +79,15 @@ defmodule Lightning.Collaborate do
end

# Start session for this user
user_id = if is_struct(user, User), do: user.id, else: nil

SessionSupervisor.start_child({
Session,
workflow: workflow,
user: user,
parent_pid: parent_pid,
document_name: document_name,
name: Registry.via({:session, "#{document_name}:#{session_id}", user.id})
name: Registry.via({:session, "#{document_name}:#{session_id}", user_id})
})
end

Expand Down
94 changes: 0 additions & 94 deletions lib/lightning/collaboration/persistence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule Lightning.Collaboration.Persistence do
alias Lightning.Collaboration.DocumentState
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Session
alias Lightning.Collaboration.WorkflowSerializer

require Logger

Expand All @@ -30,7 +29,6 @@ defmodule Lightning.Collaboration.Persistence do
case DocumentState.get_checkpoint_and_updates(doc_name) do
{:ok, checkpoint, updates} ->
apply_persisted_state(doc, doc_name, checkpoint, updates)
reconcile_or_reset(doc, doc_name, workflow)

{:error, :not_found} ->
Logger.info(
Expand Down Expand Up @@ -89,96 +87,4 @@ defmodule Lightning.Collaboration.Persistence do
DocumentState.apply_to_doc(doc, checkpoint, updates)
Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}")
end

defp reconcile_or_reset(doc, doc_name, workflow) do
workflow_map = Yex.Doc.get_map(doc, "workflow")
persisted_lock_version = extract_lock_version(workflow_map)
current_lock_version = workflow.lock_version

if stale?(persisted_lock_version, current_lock_version) do
Logger.warning("""
Persisted Y.Doc is stale (persisted: #{inspect(persisted_lock_version)}, \
current: #{current_lock_version})
Discarding persisted state and reloading from database.
document=#{doc_name}
""")

clear_and_reset_workflow(doc, workflow)
else
Logger.debug(
"Persisted Y.Doc is current (lock_version: #{current_lock_version}). document=#{doc_name}"
)

reconcile_workflow_metadata(doc, workflow)
end
end

defp extract_lock_version(workflow_map) do
case Yex.Map.fetch(workflow_map, "lock_version") do
{:ok, version} when is_float(version) -> trunc(version)
{:ok, version} when is_integer(version) -> version
{:ok, nil} -> nil
:error -> nil
end
end

defp stale?(nil, current_version), do: not is_nil(current_version)

defp stale?(persisted_version, current_version),
do: persisted_version != current_version

defp clear_and_reset_workflow(doc, workflow) do
# Same pattern as Session.clear_and_reset_doc
# Get all Yex collections BEFORE transaction to avoid VM deadlock
jobs_array = Yex.Doc.get_array(doc, "jobs")
edges_array = Yex.Doc.get_array(doc, "edges")
triggers_array = Yex.Doc.get_array(doc, "triggers")

# Transaction 1: Clear all arrays
Yex.Doc.transaction(doc, "clear_stale_workflow", fn ->
clear_array(jobs_array)
clear_array(edges_array)
clear_array(triggers_array)
end)

# Transaction 2: Re-serialize workflow from database
Session.initialize_workflow_document(doc, workflow)

:ok
end

defp clear_array(array) do
length = Yex.Array.length(array)

if length > 0 do
Yex.Array.delete_range(array, 0, length)
end
end

defp reconcile_workflow_metadata(doc, workflow) do
# Update workflow metadata fields to match current database state
# This is critical when loading persisted Y.Doc state that may be stale
workflow_map = Yex.Doc.get_map(doc, "workflow")

Yex.Doc.transaction(doc, "reconcile_metadata", fn ->
# Update lock_version to current database value
Yex.Map.set(workflow_map, "lock_version", workflow.lock_version)

# Update name in case it changed
Yex.Map.set(workflow_map, "name", workflow.name)

# Update deleted_at if present
Yex.Map.set(
workflow_map,
"deleted_at",
WorkflowSerializer.datetime_to_string(workflow.deleted_at)
)
end)

Logger.debug(
"Reconciled workflow metadata: lock_version=#{workflow.lock_version}, name=#{workflow.name}"
)

:ok
end
end
53 changes: 33 additions & 20 deletions lib/lightning/collaboration/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Lightning.Collaboration.Session do

alias Lightning.Accounts.User
alias Lightning.Collaboration.WorkflowSerializer
alias Lightning.VersionControl.ProjectRepoConnection
alias Lightning.Workflows.Presence
alias Lightning.Workflows.WorkflowUsageLimiter
alias Yex.Sync.SharedDoc
Expand All @@ -41,7 +42,7 @@ defmodule Lightning.Collaboration.Session do

@type start_opts :: [
workflow: Lightning.Workflows.Workflow.t(),
user: User.t(),
user: User.t() | ProjectRepoConnection.t(),
parent_pid: pid()
]

Expand Down Expand Up @@ -69,6 +70,10 @@ defmodule Lightning.Collaboration.Session do
GenServer.stop(session_pid)
end

def shared_doc_pid(session_pid) do
GenServer.call(session_pid, :shared_doc_pid)
end

def child_spec(opts) do
{opts, args} =
Keyword.put_new_lazy(opts, :session_id, fn -> Ecto.UUID.generate() end)
Expand Down Expand Up @@ -114,20 +119,26 @@ defmodule Lightning.Collaboration.Session do
{:stop, {:error, :shared_doc_not_found}}

shared_doc_pid ->
SharedDoc.observe(shared_doc_pid)
Logger.info("Joined SharedDoc for #{document_name}")

# We track the user presence here so the the original WorkflowLive.Edit
# can be stopped from editing the workflow when someone else is editing it.
# Note: Presence tracking uses workflow.id, not document_name, because
# presence is about showing who is editing the workflow, not which version
Presence.track_user_presence(
user,
workflow.id,
self()
)

{:ok, %{state | shared_doc_pid: shared_doc_pid}}
try do
SharedDoc.observe(shared_doc_pid)
Logger.info("Joined SharedDoc for #{document_name}")

# We track the user presence here so the the original WorkflowLive.Edit
# can be stopped from editing the workflow when someone else is editing it.
# Note: Presence tracking uses workflow.id, not document_name, because
# presence is about showing who is editing the workflow, not which version.
# Only track presence for real users — not system actors like ProjectRepoConnection.
if is_struct(user, User),
do: Presence.track_user_presence(user, workflow.id, self())

{:ok, %{state | shared_doc_pid: shared_doc_pid}}
catch
# GenServer.call raises an exit (not a rescuable exception) when the
# target process is dead. SharedDoc may have been registered in :pg
# but died before we could observe it (0ms auto_exit race).
# Return cleanly so Collaborate.start can retry.
:exit, _ -> {:stop, {:error, :shared_doc_not_found}}
end
end
end

Expand All @@ -144,11 +155,8 @@ defmodule Lightning.Collaboration.Session do
SharedDoc.unobserve(shared_doc_pid)
end

Presence.untrack_user_presence(
state.user,
state.workflow.id,
self()
)
if is_struct(state.user, User),
do: Presence.untrack_user_presence(state.user, state.workflow.id, self())

:ok
end
Expand Down Expand Up @@ -259,6 +267,11 @@ defmodule Lightning.Collaboration.Session do
GenServer.call(session_pid, {:reset_workflow, user}, 10_000)
end

@impl true
def handle_call(:shared_doc_pid, _from, state) do
{:reply, state.shared_doc_pid, state}
end

@impl true
def handle_call(:get_doc, _from, %{shared_doc_pid: shared_doc_pid} = state) do
{:reply, SharedDoc.get_doc(shared_doc_pid), state}
Expand Down
Loading