Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions fileglancer/apps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,13 @@ def validate_path_in_filestore(path_value: str, session) -> str | None:
mounts via the database. Returns an error message string if invalid,
or None if valid.
"""
# Resolve Mac/Windows/alternate-Linux paths to mount_path format first,
# before syntax validation rejects them.
from fileglancer.database import resolve_any_path_format
resolved = resolve_any_path_format(session, path_value)
if resolved is not None:
path_value = resolved

# Syntax check first
error = validate_path_for_shell(path_value)
if error:
Expand Down Expand Up @@ -515,6 +522,21 @@ def _validate_parameter_value(param: AppParameter, value, session=None) -> str:

if param.type in ("file", "directory"):
str_val = str_val.replace("\\", "/")
# Resolve Mac (smb://), Windows (//server/share), or alternate Linux
# paths to the server's mount_path so validation and shell commands
# use the canonical server path.
if session is not None:
from fileglancer.database import resolve_any_path_format
resolved = resolve_any_path_format(session, str_val)
Comment thread
allison-truhlar marked this conversation as resolved.
Outdated
if resolved is not None:
str_val = resolved
# Expand ~ so the path works inside shlex.quote() single quotes,
# where the shell would not perform tilde expansion.
# Use euid so this works when the server runs as root with seteuid.
if str_val.startswith("~/") or str_val == "~":
import pwd
home = pwd.getpwuid(os.geteuid()).pw_dir
str_val = home + str_val[1:]
if session is not None:
error = validate_path_in_filestore(str_val, session)
else:
Expand Down
113 changes: 93 additions & 20 deletions fileglancer/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,86 @@ def _clear_sharing_key_cache():
logger.debug(f"Cleared entire sharing key cache, removed {old_size} entries")


def _find_best_fsp_match(
fsps: list[FileSharePath],
normalized_input: str,
get_candidates: callable,
separator: str = "/",
) -> Optional[tuple[FileSharePath, str]]:
"""Find the FSP whose candidate path is the longest prefix of *normalized_input*.

This is the shared core of both ``resolve_any_path_format`` (which checks
all OS path fields) and ``find_fsp_from_absolute_path`` (which checks
filesystem-resolved mount paths).

Args:
fsps: All file share paths to search.
normalized_input: The input path, already normalised by the caller.
get_candidates: ``fn(fsp) -> list[str | None]`` returning the candidate
prefix strings to test for each FSP.
separator: The path separator used for the boundary check (``/`` or
``os.sep``).

Returns:
``(best_fsp, subpath)`` for the longest match, or *None*.
"""
best_fsp: Optional[FileSharePath] = None
best_len = 0

for fsp in fsps:
for candidate in get_candidates(fsp):
if not candidate:
continue
if (
normalized_input.startswith(candidate)
and len(candidate) > best_len
):
rest = normalized_input[len(candidate):]
if rest == "" or rest.startswith(separator):
best_fsp = fsp
best_len = len(candidate)

if best_fsp is None:
return None

subpath = normalized_input[best_len:]
if subpath.startswith(separator):
subpath = subpath.lstrip(separator)

return (best_fsp, subpath)


def resolve_any_path_format(session: Session, path_value: str) -> Optional[str]:
"""
Resolve a path in any OS format (Mac smb://, Windows UNC, Linux) to the
server's mount_path equivalent.

Normalises backslashes to forward slashes, then checks the input against
every FSP's mount_path, linux_path, mac_path, and windows_path. Returns
the mount_path-based equivalent (mount_path + subpath) if a match is found,
or None if no FSP matches.
"""
normalized = path_value.replace("\\", "/")
paths = get_file_share_paths(session)

def _all_path_formats(fsp: FileSharePath):
return [
fsp.mount_path,
fsp.linux_path,
fsp.mac_path,
fsp.windows_path.replace("\\", "/") if fsp.windows_path else None,
]

result = _find_best_fsp_match(paths, normalized, _all_path_formats)
if result is None:
return None

fsp, subpath = result
if subpath:
return fsp.mount_path + "/" + subpath
return fsp.mount_path


def find_fsp_from_absolute_path(session: Session, absolute_path: str) -> Optional[tuple[FileSharePath, str]]:
"""
Find the file share path that exactly matches the given absolute path.
Expand All @@ -546,27 +626,20 @@ def find_fsp_from_absolute_path(session: Session, absolute_path: str) -> Optiona
# Get all file share paths
paths = get_file_share_paths(session)

# Pre-compute expanded mount paths so the helper can use them
expanded_mounts: dict[str, str] = {}
for fsp in paths:
# Expand ~ to user's home directory and resolve symlinks to match Filestore behavior
expanded_mount_path = os.path.expanduser(fsp.mount_path)
expanded_mount_path = os.path.realpath(expanded_mount_path)

# Check if the normalized path starts with this mount path
if normalized_path.startswith(expanded_mount_path):
# Calculate the relative subpath
if normalized_path == expanded_mount_path:
subpath = ""
logger.trace(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}")
return (fsp, subpath)
else:
# Ensure we're matching on a directory boundary
remainder = normalized_path[len(expanded_mount_path):]
if remainder.startswith(os.sep):
subpath = remainder.lstrip(os.sep)
logger.trace(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}")
return (fsp, subpath)

return None
expanded = os.path.expanduser(fsp.mount_path)
expanded_mounts[fsp.name] = os.path.realpath(expanded)

def _expanded_mount(fsp: FileSharePath):
return [expanded_mounts[fsp.name]]

result = _find_best_fsp_match(paths, normalized_path, _expanded_mount, separator=os.sep)
if result is not None:
fsp, subpath = result
logger.trace(f"Found exact match for path: {absolute_path} in fsp: {fsp.name} with subpath: {subpath}")
return result


def _validate_proxied_path(session: Session, fsp_name: str, path: str) -> None:
Expand Down
Loading
Loading