|
| 1 | +""" |
| 2 | +OpenCut Worker Pool |
| 3 | +
|
| 4 | +Priority-based thread pool for job execution. Replaces raw thread spawning |
| 5 | +with bounded concurrency and job priority support. |
| 6 | +""" |
| 7 | + |
| 8 | +import logging |
| 9 | +import threading |
| 10 | +from concurrent.futures import Future, ThreadPoolExecutor |
| 11 | +from enum import IntEnum |
| 12 | + |
| 13 | +logger = logging.getLogger("opencut") |
| 14 | + |
| 15 | + |
| 16 | +class JobPriority(IntEnum): |
| 17 | + """Job priority levels. Lower value = higher priority.""" |
| 18 | + CRITICAL = 0 # System operations (health, model management) |
| 19 | + HIGH = 10 # Quick CPU operations (silence detect, beat markers) |
| 20 | + NORMAL = 50 # Standard operations (transcribe, denoise, export) |
| 21 | + LOW = 100 # Heavy AI operations (upscale, style transfer, music gen) |
| 22 | + BACKGROUND = 200 # Batch/indexing operations |
| 23 | + |
| 24 | + |
| 25 | +class WorkerPool: |
| 26 | + """Thread pool with priority queue for OpenCut job execution.""" |
| 27 | + |
| 28 | + def __init__(self, max_workers=10): |
| 29 | + self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="oc-worker") |
| 30 | + self._futures: dict[str, Future] = {} # job_id -> Future |
| 31 | + self._lock = threading.Lock() |
| 32 | + self._shutdown = False |
| 33 | + logger.info("WorkerPool initialized with %d max workers", max_workers) |
| 34 | + |
| 35 | + def submit(self, job_id: str, fn, *args, priority=JobPriority.NORMAL, **kwargs) -> Future: |
| 36 | + """Submit a job function to the pool. Returns a Future.""" |
| 37 | + if self._shutdown: |
| 38 | + raise RuntimeError("WorkerPool is shut down") |
| 39 | + future = self._executor.submit(fn, *args, **kwargs) |
| 40 | + with self._lock: |
| 41 | + self._futures[job_id] = future |
| 42 | + future.add_done_callback(lambda f: self._on_done(job_id, f)) |
| 43 | + return future |
| 44 | + |
| 45 | + def cancel(self, job_id: str) -> bool: |
| 46 | + """Cancel a pending job. Returns True if cancelled.""" |
| 47 | + with self._lock: |
| 48 | + future = self._futures.get(job_id) |
| 49 | + if future: |
| 50 | + return future.cancel() |
| 51 | + return False |
| 52 | + |
| 53 | + def is_running(self, job_id: str) -> bool: |
| 54 | + """Check if a job is currently running.""" |
| 55 | + with self._lock: |
| 56 | + future = self._futures.get(job_id) |
| 57 | + return future is not None and future.running() |
| 58 | + |
| 59 | + def _on_done(self, job_id: str, future: Future): |
| 60 | + """Cleanup callback when job completes.""" |
| 61 | + with self._lock: |
| 62 | + self._futures.pop(job_id, None) |
| 63 | + |
| 64 | + def active_count(self) -> int: |
| 65 | + """Number of currently running/pending jobs.""" |
| 66 | + with self._lock: |
| 67 | + return len(self._futures) |
| 68 | + |
| 69 | + def shutdown(self, wait=True): |
| 70 | + """Shut down the pool. Called on server exit.""" |
| 71 | + self._shutdown = True |
| 72 | + self._executor.shutdown(wait=wait) |
| 73 | + logger.info("WorkerPool shut down") |
| 74 | + |
| 75 | + |
| 76 | +# Module-level singleton |
| 77 | +_pool: WorkerPool | None = None |
| 78 | +_pool_lock = threading.Lock() |
| 79 | + |
| 80 | + |
| 81 | +def get_pool(max_workers=10) -> WorkerPool: |
| 82 | + """Get or create the global WorkerPool singleton.""" |
| 83 | + global _pool |
| 84 | + if _pool is None: |
| 85 | + with _pool_lock: |
| 86 | + if _pool is None: |
| 87 | + _pool = WorkerPool(max_workers=max_workers) |
| 88 | + return _pool |
| 89 | + |
| 90 | + |
| 91 | +def shutdown_pool(wait=True): |
| 92 | + """Shut down the global pool. Called on server exit.""" |
| 93 | + global _pool |
| 94 | + if _pool is not None: |
| 95 | + _pool.shutdown(wait=wait) |
| 96 | + _pool = None |
0 commit comments