|
1 | 1 | import asyncio |
| 2 | +from datetime import datetime, timezone |
2 | 3 |
|
3 | 4 | from crowdgit.database.crud import ( |
4 | 5 | acquire_repo_for_processing, |
|
8 | 9 | update_last_processed_commit, |
9 | 10 | ) |
10 | 11 | from crowdgit.enums import RepositoryState |
11 | | -from crowdgit.errors import InternalError, ParentRepoInvalidError |
| 12 | +from crowdgit.errors import ( |
| 13 | + InternalError, |
| 14 | + ParentRepoInvalidError, |
| 15 | + ReOnboardingRequiredError, |
| 16 | + StuckRepoError, |
| 17 | +) |
12 | 18 |
|
13 | 19 | # Import configured loguru logger from crowdgit.logger |
14 | 20 | from crowdgit.logger import logger |
15 | | -from crowdgit.models.repository import Repository |
| 21 | +from crowdgit.models import Repository |
16 | 22 | from crowdgit.services import ( |
17 | 23 | CloneService, |
18 | 24 | CommitService, |
|
21 | 27 | SoftwareValueService, |
22 | 28 | ) |
23 | 29 | from crowdgit.services.utils import get_default_branch, get_repo_name |
24 | | -from crowdgit.settings import WORKER_ERROR_BACKOFF_SEC, WORKER_POLLING_INTERVAL_SEC |
| 30 | +from crowdgit.settings import ( |
| 31 | + STUCK_ONBOARDING_REPO_TIMEOUT_HOURS, |
| 32 | + STUCK_RECURRENT_REPO_TIMEOUT_HOURS, |
| 33 | + WORKER_ERROR_BACKOFF_SEC, |
| 34 | + WORKER_POLLING_INTERVAL_SEC, |
| 35 | +) |
25 | 36 |
|
26 | 37 |
|
27 | 38 | class RepositoryWorker: |
@@ -78,6 +89,36 @@ async def shutdown(self): |
78 | 89 |
|
79 | 90 | logger.info("Worker services shutdown triggered") |
80 | 91 |
|
| 92 | + async def _ensure_repo_not_stuck(self, repository: Repository): |
| 93 | + """ |
| 94 | + Check if repo is stuck and raise the appropriate exception if so. |
| 95 | + Repos can get stuck in processing state for different reasons: |
| 96 | + - Worker crash or restart (e.g. pod eviction due OOM, deployment after timeout, ...) |
| 97 | + - `last_processed_commit` is no loger valid due to force-push, dangling-commit, or so... |
| 98 | + - Race condition: remote is going under breaking changes at the same time we're processing it |
| 99 | + - Network issues breaking the clone/pull operation |
| 100 | + """ |
| 101 | + # detection |
| 102 | + processing_duration_hours = ( |
| 103 | + datetime.now(timezone.utc) - repository.locked_at.astimezone(timezone.utc) |
| 104 | + ).total_seconds() / 3600 |
| 105 | + repo_stuck: bool = ( |
| 106 | + repository.last_processed_commit |
| 107 | + and processing_duration_hours >= STUCK_RECURRENT_REPO_TIMEOUT_HOURS |
| 108 | + ) or ( |
| 109 | + repository.last_processed_commit is None # onboarding |
| 110 | + and processing_duration_hours >= STUCK_ONBOARDING_REPO_TIMEOUT_HOURS |
| 111 | + ) |
| 112 | + |
| 113 | + # handling |
| 114 | + if repo_stuck and repository.forked_from == repository.url: |
| 115 | + logger.warning( |
| 116 | + f"Repo {repository.url} is stuck due to force-push or dangling commit. Will be re-onboarded" |
| 117 | + ) |
| 118 | + raise ReOnboardingRequiredError() |
| 119 | + |
| 120 | + raise StuckRepoError() |
| 121 | + |
81 | 122 | async def _process_repositories(self): |
82 | 123 | """ |
83 | 124 | Process repositories by priority - check acquire_repo_for_processing() |
@@ -153,6 +194,10 @@ async def _validate_and_get_parent_repo(self, repository: Repository) -> Reposit |
153 | 194 | if not repository.forked_from: |
154 | 195 | return None |
155 | 196 |
|
| 197 | + if repository.forked_from == repository.url: |
| 198 | + # EDGE CASE: not a fork but repo get reonboarded a lot and we treat it as a "fork" to avoid producing tons of duplicate activities |
| 199 | + return repository.forked_from |
| 200 | + |
156 | 201 | logger.info( |
157 | 202 | f"Repository {repository.url} is forked from {repository.forked_from}, validating parent repo..." |
158 | 203 | ) |
@@ -200,9 +245,24 @@ async def _process_single_repository(self, repository: Repository): |
200 | 245 | commit_hash=batch_info.latest_commit_in_repo, |
201 | 246 | branch=await get_default_branch(batch_info.repo_path), |
202 | 247 | ) |
| 248 | + else: |
| 249 | + await self._ensure_repo_not_stuck(repository) |
203 | 250 |
|
204 | 251 | logger.info("Incremental processing completed successfully") |
205 | 252 | processing_state = RepositoryState.COMPLETED |
| 253 | + except StuckRepoError: |
| 254 | + logger.error( |
| 255 | + f"Repo {repository.url} is stuck for unkown reason, marking it as stuck until manually resolved!" |
| 256 | + ) |
| 257 | + processing_state = RepositoryState.STUCK |
| 258 | + except ReOnboardingRequiredError: |
| 259 | + logger.info(f"Resetting and queueing {repository.url} for re-onboarding") |
| 260 | + await update_last_processed_commit( |
| 261 | + repo_id=repository.id, |
| 262 | + commit_hash=None, |
| 263 | + branch=None, |
| 264 | + ) |
| 265 | + processing_state = RepositoryState.PENDING |
206 | 266 | except ParentRepoInvalidError as e: |
207 | 267 | logger.error(f"Parent repo validation failed: {repr(e)}") |
208 | 268 | processing_state = RepositoryState.REQUIRES_PARENT |
|
0 commit comments