Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3a016be
fix: submit cluster jobs as the authenticated user's euid
allison-truhlar Mar 19, 2026
543d9cb
fix: resolve job work directory under the user's home, not root's
allison-truhlar Mar 19, 2026
95742f4
fix: remove any existing repo_link symlink before creating it
allison-truhlar Mar 19, 2026
db6f047
test: resolve bsub to full path
allison-truhlar Mar 19, 2026
99fda28
fix: submit with username because bsub doesn't use geteuid internally
allison-truhlar Mar 19, 2026
e2affc7
chore: bump alpha version for a pre-release to test changes
allison-truhlar Mar 19, 2026
95b301f
feat: add cluster.extra_paths setting to configure scheduler PATH
allison-truhlar Mar 20, 2026
eff3c09
chore: new alpha version to create a release testing more changes to …
allison-truhlar Mar 20, 2026
c85bade
feat: add cluster.extra_env config for setting scheduler environment …
allison-truhlar Mar 20, 2026
fc2a569
chore: new alpha version to test changes for using apps while running…
allison-truhlar Mar 20, 2026
ab33f34
test: take out previously added -U username
allison-truhlar Mar 20, 2026
6ebf98b
chore: bump alpha version for another test pre-release
allison-truhlar Mar 20, 2026
8b57085
refactor: move extra_paths, extra_env out of cluster settings to top …
allison-truhlar Mar 23, 2026
be57371
feat: add env_source_script setting to source shell env at startup
allison-truhlar Mar 23, 2026
a276bb3
feat: add pre_run to pixi task entry points for PATH setup
allison-truhlar Mar 23, 2026
045ff9c
chore: bump alpha version for test release
allison-truhlar Mar 23, 2026
52d6c84
fix: use per-user repo cache and submit LSF jobs as authenticated user
allison-truhlar Mar 23, 2026
4638fd5
chore: bump alpha version for test release
allison-truhlar Mar 23, 2026
a9b6c89
test: remove -U; this is only for advance submissions
allison-truhlar Mar 23, 2026
ce1c0d8
chore: bump alpha version
allison-truhlar Mar 23, 2026
422170f
cleanup: remove now unused top level extra_env and extra_paths env vars
allison-truhlar Mar 23, 2026
486489f
run lsf operations in separate worker with setuid
krokicki Mar 23, 2026
bb301a8
wrap other file operations in user contexts
krokicki Mar 23, 2026
e83338c
fix error serializing cached_repo_dir
krokicki Mar 23, 2026
1c69ef2
only switch identity when running as root
krokicki Mar 23, 2026
69a0303
Merge remote-tracking branch 'refs/remotes/origin/further-fixes-to-jo…
krokicki Mar 23, 2026
0bc19b7
chore: bump alpha verison for test release
allison-truhlar Mar 24, 2026
198ef8e
updated to py_cluster_api 0.4.0
krokicki Mar 25, 2026
594cadc
move bjobs monitoring into worker process
krokicki Mar 25, 2026
e93c7fc
chore: bump alpha version for test release
allison-truhlar Mar 26, 2026
a8f5dba
fix: if cluster rejects a submission, clean up job entry in db
allison-truhlar Mar 26, 2026
06c0ccd
refactor: show job submission errs on form, not as toast
allison-truhlar Mar 26, 2026
d507051
fix: run git/manifest operations in worker subprocess instead of Effe…
allison-truhlar Mar 26, 2026
e1a58e2
fix: add user context to submit_job response and validate_paths endpoint
allison-truhlar Mar 26, 2026
97fc166
fix: seed poll stubs with current DB status to prevent status toggling
allison-truhlar Mar 26, 2026
5e571d1
chore: add debug logging for effective user identity in apps/jobs flow
allison-truhlar Mar 26, 2026
804295c
chore: bump alpha version for test release
allison-truhlar Mar 26, 2026
df1c5d0
fix: use file lock so only one uvicorn worker polls bjobs per cycle
allison-truhlar Mar 26, 2026
07c129c
test: add tests for poll lock election and status-update logic
allison-truhlar Mar 26, 2026
5105d12
chore: new alpha version for test release
allison-truhlar Mar 26, 2026
134a760
fix: hold poll lock through sleep interval and fix misleading log
allison-truhlar Mar 26, 2026
391f838
fix(tests): hold the file lock for longer to prevent other worker fro…
allison-truhlar Mar 27, 2026
312fe46
fix: only run job polling if there are active jobs in the user's db
allison-truhlar Mar 27, 2026
c1e4bc7
chore: new alpha version for test release
allison-truhlar Apr 6, 2026
8c89296
Merge branch 'main' into further-fixes-to-job-submission
allison-truhlar Apr 8, 2026
e631cb9
Merge branch 'main' into further-fixes-to-job-submission
allison-truhlar Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/config.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ session_cookie_name: fg_session
#
session_cookie_secure: true

#
# Environment setup — sourced at startup before any scheduler commands.
# Useful when running as a systemd service where login scripts don't run.
#
# env_source_script: /misc/lsf/conf/profile.lsf # Source a shell script to set env vars
# extra_paths: # Directories prepended to the server's PATH
# - /opt/lsf/bin
# extra_env: # Extra environment variables set at startup
# LSF_ENVDIR: /misc/lsf/conf

#
# Settings for the Apps feature
#
Expand Down
88 changes: 57 additions & 31 deletions fileglancer/apps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import shlex
import shutil
import subprocess
from contextlib import nullcontext
from pathlib import Path
from datetime import datetime, UTC
from typing import Optional
Expand All @@ -26,7 +27,13 @@

_MANIFEST_FILENAME = "runnables.yaml"

_REPO_CACHE_BASE = Path(os.path.expanduser("~/.fileglancer/apps"))
def _repo_cache_base(username: str | None = None) -> Path:
"""Return the repo cache base directory, optionally for a specific user."""
if username:
home = os.path.expanduser(f"~{username}")
else:
home = os.path.expanduser("~")
return Path(home) / ".fileglancer" / "apps"
_repo_locks: dict[str, asyncio.Lock] = {}


Expand Down Expand Up @@ -120,18 +127,21 @@
return "main"


async def _ensure_repo_cache(url: str, pull: bool = False) -> Path:
async def _ensure_repo_cache(url: str, pull: bool = False,
username: str | None = None) -> Path:
"""Clone or update the GitHub repo in per-user cache. Returns repo path.

Cache is keyed by owner/repo/branch to avoid checkout races between branches.
An asyncio lock serializes git operations for the same repo+branch.
When username is provided, the cache is placed under ~username/.fileglancer/apps.
"""
owner, repo, branch = _parse_github_url(url)
clone_url = f"https://github.com/{owner}/{repo}.git"
if not branch:
branch = await _resolve_default_branch(clone_url)
repo_dir = (_REPO_CACHE_BASE / owner / repo / branch).resolve()
repo_dir.relative_to(_REPO_CACHE_BASE.resolve())
cache_base = _repo_cache_base(username)
repo_dir = (cache_base / owner / repo / branch).resolve()
Comment thread Dismissed
repo_dir.relative_to(cache_base.resolve())
Comment thread Dismissed
lock = _get_repo_lock(owner, repo, branch)

async with lock:
Expand Down Expand Up @@ -226,22 +236,24 @@
MANIFEST_FILENAME = _MANIFEST_FILENAME


async def discover_app_manifests(url: str) -> list[tuple[str, AppManifest]]:
async def discover_app_manifests(url: str,
username: str | None = None) -> list[tuple[str, AppManifest]]:
"""Clone/pull a GitHub repo and discover all manifest files.

Returns a list of (relative_dir_path, AppManifest) tuples.
Raises ValueError if the URL is invalid or the clone fails.
"""
repo_dir = await _ensure_repo_cache(url, pull=True)
repo_dir = await _ensure_repo_cache(url, pull=True, username=username)
return _find_manifests_in_repo(repo_dir)


async def fetch_app_manifest(url: str, manifest_path: str = "") -> AppManifest:
async def fetch_app_manifest(url: str, manifest_path: str = "",
username: str | None = None) -> AppManifest:
"""Fetch and validate an app manifest from a cloned repo.

Clones the repo if needed, then reads the manifest from disk.
"""
repo_dir = await _ensure_repo_cache(url)
repo_dir = await _ensure_repo_cache(url, username=username)
target_dir = repo_dir / manifest_path if manifest_path else repo_dir
return _read_manifest_file(target_dir)

Expand Down Expand Up @@ -788,12 +800,18 @@


def _build_work_dir(job_id: int, app_name: str, entry_point_id: str,
job_name_prefix: Optional[str] = None) -> Path:
"""Build a working directory path under ~/.fileglancer/jobs/."""
job_name_prefix: Optional[str] = None,
username: Optional[str] = None) -> Path:
"""Build a working directory path under ~/.fileglancer/jobs/.

When username is provided, expands ~username to the user's home directory
instead of the server process's home (which is typically root).
"""
safe_app = _sanitize_for_path(app_name)
safe_ep = _sanitize_for_path(entry_point_id)
prefix = f"{_sanitize_for_path(job_name_prefix)}-" if job_name_prefix else ""
return Path(os.path.expanduser(f"~/.fileglancer/jobs/{prefix}{job_id}-{safe_app}-{safe_ep}"))
home = os.path.expanduser(f"~{username}") if username else os.path.expanduser("~")
return Path(f"{home}/.fileglancer/jobs/{prefix}{job_id}-{safe_app}-{safe_ep}")


async def submit_job(
Expand All @@ -810,6 +828,7 @@
post_run: Optional[str] = None,
container: Optional[str] = None,
container_args: Optional[str] = None,
user_context=None,
) -> db.JobDB:
"""Submit a new job to the cluster.

Expand All @@ -820,7 +839,7 @@
settings = get_settings()

# Fetch and validate manifest
manifest = await fetch_app_manifest(app_url, manifest_path)
manifest = await fetch_app_manifest(app_url, manifest_path, username=username)

# Find entry point
entry_point = None
Expand All @@ -844,6 +863,12 @@
overrides["extra_args"] = extra_args
resource_spec = _build_resource_spec(entry_point, overrides or None, settings)

# When running as root, tell LSF to execute the job as the actual user.
if user_context is not None:
if resource_spec.extra_args is None:
resource_spec.extra_args = []
resource_spec.extra_args.append(f"-U {username}")

# Merge env/pre_run/post_run: manifest defaults overridden by user values
merged_env = dict(entry_point.env or {})
if env:
Expand Down Expand Up @@ -891,25 +916,19 @@

# Compute and persist work_dir now that we have the job ID
work_dir = _build_work_dir(job_id, manifest.name, entry_point.id,
job_name_prefix=settings.cluster.job_name_prefix)
job_name_prefix=settings.cluster.job_name_prefix,
username=username)
db_job.work_dir = str(work_dir)
session.commit()

# Create work directory on disk
work_dir.mkdir(parents=True, exist_ok=True)

# Determine which repo to symlink and where to cd
# Clone/pull repo into the user's cache (~username/.fileglancer/apps).
if manifest.repo_url:
# Tool code lives in a separate repo — clone it and cd to its root
tool_repo_dir = await _ensure_repo_cache(manifest.repo_url, pull=pull_latest)
repo_link = work_dir / "repo"
repo_link.symlink_to(tool_repo_dir)
cached_repo_dir = await _ensure_repo_cache(manifest.repo_url, pull=pull_latest,
username=username)
cd_suffix = "repo"
else:
# Tool code is in the discovery repo — cd into manifest's subdirectory
repo_dir = await _ensure_repo_cache(app_url, pull=pull_latest)
repo_link = work_dir / "repo"
repo_link.symlink_to(repo_dir)
cached_repo_dir = await _ensure_repo_cache(app_url, pull=pull_latest,
username=username)
cd_suffix = f"repo/{manifest_path}" if manifest_path else "repo"

# Build environment variable export lines
Expand Down Expand Up @@ -984,14 +1003,21 @@
resource_spec.stdout_path = str(work_dir / "stdout.log")
resource_spec.stderr_path = str(work_dir / "stderr.log")

# Submit to executor
# Create work directory, symlink the cached repo, and submit to the cluster —
# all as the authenticated user so the job runs with correct ownership.
executor = await get_executor()
job_name = f"{manifest.name}-{entry_point.id}"
cluster_job = await executor.submit(
command=full_command,
name=job_name,
resources=resource_spec,
)
with user_context if user_context is not None else nullcontext():
work_dir.mkdir(parents=True, exist_ok=True)
Comment thread Fixed
repo_link = work_dir / "repo"
if repo_link.is_symlink() or repo_link.exists():
Comment thread Fixed
Comment thread Fixed
repo_link.unlink()
Comment thread Fixed
repo_link.symlink_to(cached_repo_dir)
Comment thread Fixed
cluster_job = await executor.submit(
command=full_command,
name=job_name,
resources=resource_spec,
)

# Register callback to update DB when job reaches terminal state
cluster_job.on_exit(_on_job_exit)
Expand Down
1 change: 1 addition & 0 deletions fileglancer/apps/pixi.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def _task_to_entry_point(name: str, task: dict) -> AppEntryPoint | None:
description=description,
command=command,
parameters=parameters,
pre_run='export PATH="$HOME/.pixi/bin:$PATH"',
)


Expand Down
101 changes: 78 additions & 23 deletions fileglancer/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,52 @@ def mask_password(url: str) -> str:
logger.debug(f" external_proxy_url: {settings.external_proxy_url}")
logger.debug(f" atlassian_url: {settings.atlassian_url}")

# Source a shell script to import environment variables
# (e.g., /misc/lsf/conf/profile.lsf). This runs the script
# in a bash subshell and captures the resulting environment,
# applying any new/changed vars to this process. Pixi strips
# inherited env vars, so they must be set inside the process.
# Runs before extra_paths/extra_env so those can override.
if settings.env_source_script:
import subprocess as _sp
script = settings.env_source_script
try:
result = _sp.run(
["bash", "-c", f". {script} && env -0"],
capture_output=True, text=True, timeout=10,
)
if result.returncode == 0:
sourced_env = dict(
line.split("=", 1)
for line in result.stdout.split("\0")
if "=" in line
)
for key, value in sourced_env.items():
if os.environ.get(key) != value:
os.environ[key] = value
logger.debug(f" env_source_script set: {key}={value}")
else:
logger.warning(
f"env_source_script failed (rc={result.returncode}): "
f"{result.stderr.strip()}"
)
except Exception as e:
logger.warning(f"env_source_script error: {e}")

# Prepend extra_paths to PATH so commands (e.g. bsub, bjobs,
# bkill) are findable without relying on the system service's
# default PATH.
if settings.extra_paths:
extra = os.pathsep.join(settings.extra_paths)
os.environ["PATH"] = extra + os.pathsep + os.environ.get("PATH", "")
logger.debug(f"extra_paths prepended to PATH: {extra}")

# Set extra environment variables at startup.
if settings.extra_env:
for key, value in settings.extra_env.items():
os.environ[key] = value
logger.debug(f"extra_env set: {key}={value}")

# Initialize database (run migrations once at startup)
db.initialize_database(settings.db_url)

Expand Down Expand Up @@ -1492,7 +1538,9 @@ async def fetch_manifest(body: ManifestFetchRequest,
username: str = Depends(get_current_user)):
try:
logger.info(f"Fetching manifest for URL: '{body.url}' path: '{body.manifest_path}'")
manifest = await apps_module.fetch_app_manifest(body.url, body.manifest_path)
with _get_user_context(username):
manifest = await apps_module.fetch_app_manifest(body.url, body.manifest_path,
username=username)
return manifest
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
Expand All @@ -1518,9 +1566,11 @@ async def get_user_apps(username: str = Depends(get_current_user)):
)
# Try to fetch manifest from local clone
try:
user_app.manifest = await apps_module.fetch_app_manifest(
app_entry["url"], app_entry.get("manifest_path", "")
)
with _get_user_context(username):
user_app.manifest = await apps_module.fetch_app_manifest(
app_entry["url"], app_entry.get("manifest_path", ""),
username=username,
)
# Update name/description from manifest
user_app.name = user_app.manifest.name
user_app.description = user_app.manifest.description
Expand All @@ -1536,7 +1586,9 @@ async def add_user_app(body: AppAddRequest,
username: str = Depends(get_current_user)):
# Clone the repo and discover all manifests
try:
discovered = await apps_module.discover_app_manifests(body.url)
with _get_user_context(username):
discovered = await apps_module.discover_app_manifests(body.url,
username=username)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
Expand Down Expand Up @@ -1620,12 +1672,15 @@ async def remove_user_app(url: str = Query(..., description="URL of the app to r
async def update_user_app(body: ManifestFetchRequest,
username: str = Depends(get_current_user)):
try:
await apps_module._ensure_repo_cache(body.url, pull=True)
with _get_user_context(username):
await apps_module._ensure_repo_cache(body.url, pull=True, username=username)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to pull latest code: {str(e)}")

try:
manifest = await apps_module.fetch_app_manifest(body.url, body.manifest_path)
with _get_user_context(username):
manifest = await apps_module.fetch_app_manifest(body.url, body.manifest_path,
username=username)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
Expand Down Expand Up @@ -1687,22 +1742,22 @@ async def submit_job(body: JobSubmitRequest,
if body.resources:
resources_dict = body.resources.model_dump(exclude_none=True)

with _get_user_context(username):
db_job = await apps_module.submit_job(
username=username,
app_url=body.app_url,
entry_point_id=body.entry_point_id,
parameters=body.parameters,
resources=resources_dict,
extra_args=body.extra_args,
pull_latest=body.pull_latest,
manifest_path=body.manifest_path,
env=body.env,
pre_run=body.pre_run,
post_run=body.post_run,
container=body.container,
container_args=body.container_args,
)
db_job = await apps_module.submit_job(
username=username,
app_url=body.app_url,
entry_point_id=body.entry_point_id,
parameters=body.parameters,
resources=resources_dict,
extra_args=body.extra_args,
pull_latest=body.pull_latest,
manifest_path=body.manifest_path,
env=body.env,
pre_run=body.pre_run,
post_run=body.post_run,
container=body.container,
container_args=body.container_args,
user_context=_get_user_context(username),
)
return _convert_job(db_job)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
Expand Down
13 changes: 12 additions & 1 deletion fileglancer/settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Dict, List, Optional
from functools import cache
import sys

Expand Down Expand Up @@ -94,6 +94,17 @@ class Settings(BaseSettings):

# CLI mode - enables auto-login endpoint for standalone CLI usage
cli_mode: bool = False

# Shell script sourced at startup to import environment variables.
# Useful for setting up scheduler env (e.g., /misc/lsf/conf/profile.lsf).
env_source_script: Optional[str] = None

# Directories prepended to the server's PATH at startup.
extra_paths: List[str] = []

# Extra environment variables set at startup.
extra_env: Dict[str, str] = {}

# Cluster / Apps settings (mirrors py-cluster-api ClusterConfig)
cluster: ClusterSettings = ClusterSettings()

Expand Down
Loading
Loading