Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ogscope/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ class Settings(BaseSettings):
description="大尺度背景减除:小图长边上限(像素),越小越快 / Large-scale BG downsample max side",
)
star_analysis_target_fps: float = Field(
default=1.5,
description="星空分析目标帧率(1–2),仅用于前端节流 / Target star-analysis FPS for UI throttle",
default=2 / 3,
description="星空分析目标帧率(约 1.5 秒 1 帧),仅用于前端节流 / Target star-analysis FPS for UI throttle (~1.5s per frame)",
)

model_config = SettingsConfigDict(
Expand Down
35 changes: 35 additions & 0 deletions ogscope/web/api/analysis/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
素材分析路由 / Asset analysis routes
"""

import json
import mimetypes

from fastapi import APIRouter, File, Form, HTTPException, Query, UploadFile
Expand Down Expand Up @@ -268,6 +269,40 @@ async def solve_analysis_frame(body: AnalysisSolveVideoFrameRequest):
raise HTTPException(status_code=404, detail=str(exc)) from exc
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc


@router.post("/analysis/solve/frame_upload")
async def solve_uploaded_frame(
file: UploadFile = File(...),
payload: str = Form(
...,
description="JSON 字符串,字段与 AnalysisSolveImageRequest 对齐 / JSON payload aligned with AnalysisSolveImageRequest",
),
):
"""上传单帧 JPEG/PNG 并解算 / Solve a single uploaded frame (multipart)."""
try:
raw = await file.read()
obj = json.loads(payload)
if not isinstance(obj, dict):
raise ValueError("payload 必须为 JSON 对象 / payload must be a JSON object")
topn = obj.get("overlay_topn_count")
enable_polar = obj.get("enable_polar_guide")
obj.pop("overlay_topn_count", None)
obj.pop("enable_polar_guide", None)
# 前端调试元数据,不参与 Pydantic 模型 / Client metadata not in schema
obj.pop("time_sec", None)
obj.pop("frame_width", None)
obj.pop("frame_height", None)
obj.setdefault("input_name", "__frame_upload__.jpg")
data = AnalysisSolveImageRequest.model_validate(obj)
return await analysis_service.solve_uploaded_frame(
image_bytes=raw,
solve_params=data,
overlay_topn_count=topn,
enable_polar_guide=enable_polar,
)
except HTTPException:
raise
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail=str(exc)) from exc

Expand Down
244 changes: 241 additions & 3 deletions ogscope/web/api/analysis/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncio
import json
import math
import shutil
import time
import uuid
Expand All @@ -16,6 +17,7 @@
from typing import Any

import cv2
import numpy as np

from ogscope.algorithms.plate_solve import (
CentroidExtractionParams,
Expand Down Expand Up @@ -161,6 +163,135 @@ def __init__(self) -> None:
self.default_hint_dec = settings.solver_hint_dec_deg
self._jobs: dict[str, AnalysisJob] = {}
self._lab = AnalysisLabStore(settings)
self._overlay_topn_default = 3
self._polar_guide_default = True

def _build_topn_labels(
self,
row: dict[str, Any],
*,
topn_count: int,
) -> list[dict[str, Any]]:
"""从 matched 星点构建 Top-N 标注 / Build Top-N labels from matched stars."""
overlay = row.get("solve_overlay")
if not isinstance(overlay, dict):
return []
matched = overlay.get("stars_matched")
if not isinstance(matched, list):
return []
labels: list[dict[str, Any]] = []
for star in matched:
if not isinstance(star, dict):
continue
mag_raw = star.get("mag")
try:
mag_val = float(mag_raw) if mag_raw is not None else None
except (TypeError, ValueError):
mag_val = None
cat_id = star.get("cat_id")
if isinstance(cat_id, list):
cat_id_text = "-".join(str(v) for v in cat_id if v is not None)
elif cat_id is None:
cat_id_text = ""
else:
cat_id_text = str(cat_id)
# 目前基于 Tetra3 匹配 ID 提供可读名占位,后续可接入正式星表映射
# Build readable placeholder name from Tetra3 cat_id; can be replaced by real catalog lookup later.
name = f"CAT-{cat_id_text}" if cat_id_text else "Unnamed"
item = {
"x": star.get("x"),
"y": star.get("y"),
"name": name,
"mag": mag_val,
"ra_deg": star.get("ra_deg"),
"dec_deg": star.get("dec_deg"),
}
labels.append(item)
labels.sort(
key=lambda x: (
x["mag"] is None,
float(x["mag"]) if x["mag"] is not None else 999.0,
)
)
n = max(1, int(topn_count))
return labels[:n]

def _build_polar_guide(self, row: dict[str, Any]) -> dict[str, Any] | None:
"""构建极轴引导向量 / Build polar guide vector from solve center."""
overlay = row.get("solve_overlay")
if not isinstance(overlay, dict):
return None
frame_shape = overlay.get("frame_shape")
if (
not isinstance(frame_shape, list)
or len(frame_shape) < 2
or frame_shape[0] in (None, 0)
or frame_shape[1] in (None, 0)
):
return None
try:
h = float(frame_shape[0])
w = float(frame_shape[1])
ra_center = float(row.get("ra_deg"))
dec_center = float(row.get("dec_deg"))
except (TypeError, ValueError):
return None
fov_deg = row.get("fov_deg")
roll_deg = row.get("roll_deg")
if fov_deg is None:
return None
try:
fov = float(fov_deg)
except (TypeError, ValueError):
return None
roll = 0.0
try:
if roll_deg is not None:
roll = float(roll_deg)
except (TypeError, ValueError):
roll = 0.0

# 北天极近似目标:RA 与当前中心相同,Dec=+90,减少 RA wrap 影响
# Approximate north celestial pole target with same RA and Dec=+90.
target_ra = ra_center
target_dec = 90.0
d_ra = target_ra - ra_center
while d_ra > 180.0:
d_ra -= 360.0
while d_ra < -180.0:
d_ra += 360.0
d_dec = target_dec - dec_center
east_deg = d_ra * math.cos(math.radians(dec_center))
north_deg = d_dec
roll_rad = math.radians(roll)
x_deg = east_deg * math.cos(roll_rad) + north_deg * math.sin(roll_rad)
y_deg = -east_deg * math.sin(roll_rad) + north_deg * math.cos(roll_rad)

px_per_deg = (min(w, h) / max(fov, 1e-6)) if fov > 0 else 1.0
dx_px = x_deg * px_per_deg
dy_px = -y_deg * px_per_deg
cx = w * 0.5
cy = h * 0.5
tx = cx + dx_px
ty = cy + dy_px

c_dec = math.radians(dec_center)
t_dec = math.radians(target_dec)
d_ra_rad = math.radians(d_ra)
cos_ang = (
math.sin(c_dec) * math.sin(t_dec)
+ math.cos(c_dec) * math.cos(t_dec) * math.cos(d_ra_rad)
)
cos_ang = max(-1.0, min(1.0, cos_ang))
angular_sep_deg = math.degrees(math.acos(cos_ang))

return {
"target_kind": "north_celestial_pole",
"frame_center": {"x": cx, "y": cy, "ra_deg": ra_center, "dec_deg": dec_center},
"target": {"x": tx, "y": ty, "ra_deg": target_ra, "dec_deg": target_dec},
"delta_px": {"dx": dx_px, "dy": dy_px},
"angular_sep_deg": angular_sep_deg,
}

def _centroid_params_from_payload(
self, payload: CentroidParamsPayload | None
Expand Down Expand Up @@ -213,6 +344,22 @@ def resolve_upload_path(self, filename: str) -> Path:
raise ValueError("路径非法 / Invalid path") from exc
return path

def _resolve_frame_source_path(self, input_name: str) -> Path:
"""单帧视频解算源路径:优先素材池,其次调试录制目录 / Resolve frame-solve source path."""
name = Path(input_name.strip()).name
if not name or name != input_name.strip():
raise ValueError("文件名无效 / Invalid filename")
try:
up = self.resolve_upload_path(name)
if up.is_file():
return up
except ValueError:
pass
dbg = Path.home() / "dev_captures" / name
if dbg.is_file():
return dbg
raise FileNotFoundError("上传文件不存在 / Uploaded file not found")

def get_upload_file_info(self, filename: str) -> dict[str, Any]:
"""从上传目录读取文件与 stem.txt 侧车 / File + optional sidecar from upload pool."""
path = self.resolve_upload_path(filename)
Expand Down Expand Up @@ -612,6 +759,76 @@ def delete_upload(
self._lab.remove_manifest_entry(path.name)
return {"success": True, "filename": path.name, "deleted_experiments": n_exp}

async def solve_uploaded_frame(
self,
*,
image_bytes: bytes,
solve_params: AnalysisSolveImageRequest,
overlay_topn_count: int | None = None,
enable_polar_guide: bool | None = None,
) -> dict[str, Any]:
"""解析上传的单帧图像并解算 / Solve a single uploaded frame (multipart)."""
if not image_bytes:
raise ValueError("空图像数据 / Empty image payload")
buf = np.frombuffer(image_bytes, dtype=np.uint8)
frame = cv2.imdecode(buf, cv2.IMREAD_COLOR)
if frame is None:
raise ValueError("无法解码图像 / Cannot decode image")

centroid_params, max_stars, timeout_ms, effective_profile = (
self._resolve_solve_profile(
solve_params.solve_profile,
solve_params.centroid,
solve_params.solve_timeout_ms,
)
)
loop = asyncio.get_running_loop()

def _run() -> dict[str, Any]:
return self._solve_bgr_to_row(
frame,
solve_params.hint_ra_deg,
solve_params.hint_dec_deg,
solve_params.fov_estimate,
solve_params.fov_max_error,
timeout_ms,
centroid_params,
solve_params.max_image_side,
max_stars,
bool(solve_params.large_scale_bg_subtract),
)

row = await loop.run_in_executor(self._solver_executor, _run)
# 统一 overlay_ext 结构,便于前端复用渲染逻辑
topn = (
int(overlay_topn_count)
if overlay_topn_count is not None
else self._overlay_topn_default
)
enable_polar = (
bool(enable_polar_guide)
if enable_polar_guide is not None
else self._polar_guide_default
)
overlay_ext: dict[str, Any] = {}
try:
overlay_ext["labels_topn"] = self._build_topn_labels(
row, topn_count=topn
)
except Exception:
overlay_ext["labels_topn"] = []
if enable_polar:
try:
overlay_ext["polar_guide"] = self._build_polar_guide(row)
except Exception:
overlay_ext["polar_guide"] = None
row["overlay_ext"] = overlay_ext
row["solve_profile"] = effective_profile
detail_level = getattr(solve_params, "detail_level", None) or "summary"
if detail_level != "full":
row.pop("tetra", None)
return {"success": True, "result": row}

def delete_experiment(self, experiment_id: str) -> None:
"""删除一条实验记录 / Delete one experiment record."""
self._lab.delete_experiment(experiment_id)
Expand Down Expand Up @@ -828,9 +1045,7 @@ async def solve_video_frame(
raise ValueError(
"需要 input_name / input_name required for file source"
)
path = self.resolve_upload_path(body.input_name)
if not path.is_file():
raise FileNotFoundError("上传文件不存在 / Uploaded file not found")
path = self._resolve_frame_source_path(body.input_name)
t_decode = time.perf_counter()
cap = cv2.VideoCapture(str(path))
if not cap.isOpened():
Expand Down Expand Up @@ -868,6 +1083,29 @@ def _run() -> dict[str, Any]:
)

row = await loop.run_in_executor(self._solver_executor, _run)
# 二次分析与极轴引导(失败降级,不影响基础解算)
topn = (
int(body.overlay_topn_count)
if getattr(body, "overlay_topn_count", None) is not None
else self._overlay_topn_default
)
enable_polar = (
bool(body.enable_polar_guide)
if getattr(body, "enable_polar_guide", None) is not None
else self._polar_guide_default
)
overlay_ext: dict[str, Any] = {}
try:
overlay_ext["labels_topn"] = self._build_topn_labels(row, topn_count=topn)
except Exception:
overlay_ext["labels_topn"] = []
if enable_polar:
try:
overlay_ext["polar_guide"] = self._build_polar_guide(row)
except Exception:
overlay_ext["polar_guide"] = None
row["overlay_ext"] = overlay_ext

if t_open_decode_ms is not None:
row["t_open_decode_ms"] = round(t_open_decode_ms, 3)
row["t_backend_total_ms"] = round((time.perf_counter() - t_total) * 1000.0, 3)
Expand Down
12 changes: 12 additions & 0 deletions ogscope/web/api/models/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,12 @@ class AnalysisSolveVideoFrameRequest(BaseModel):

model_config = ConfigDict(extra="forbid")

# 基本输入来源 / Basic input source
source: Literal["camera", "file"]
input_name: Optional[str] = None
frame_index: int = 0
time_sec: Optional[float] = None
# 解算参数 / Solve parameters
hint_ra_deg: Optional[float] = None
hint_dec_deg: Optional[float] = None
fov_estimate: Optional[float] = None
Expand All @@ -257,6 +259,16 @@ class AnalysisSolveVideoFrameRequest(BaseModel):
large_scale_bg_subtract: Optional[bool] = False
detail_level: Optional[Literal["summary", "full"]] = "summary"

# 叠加与引导选项(可选,未提供则使用后端默认)/ Optional overlay & guidance options
overlay_topn_count: Optional[int] = Field(
default=None,
description="自动标注的星点数量上限(Top-N),未填用服务器默认 / Max number of stars to label (Top-N); server default if omitted",
)
enable_polar_guide: Optional[bool] = Field(
default=None,
description="是否计算极轴引导信息;未填用服务器默认 / Whether to compute polar guide info; server default if omitted",
)


class ImportFromDebugRequest(BaseModel):
"""从调试采集目录导入到分析素材池 / Import capture into analysis pool."""
Expand Down
Loading
Loading