Skip to content

Commit b3a3692

Browse files
committed
adds changes
1 parent 6c12b9c commit b3a3692

13 files changed

Lines changed: 1050 additions & 593 deletions

ccbt.toml

Lines changed: 6 additions & 420 deletions
Large diffs are not rendered by default.

ccbt/discovery/dht.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ def __init__(
528528
# Background tasks
529529
self._refresh_task: Optional[asyncio.Task] = None
530530
self._cleanup_task: Optional[asyncio.Task] = None
531+
self._bootstrap_task: Optional[asyncio.Task] = None
531532

532533
# Callbacks with info_hash filtering
533534
# Maps info_hash -> list of callbacks, or None for global callbacks
@@ -635,8 +636,8 @@ async def start(self) -> None:
635636
self._refresh_task = asyncio.create_task(self._refresh_loop())
636637
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
637638

638-
# Bootstrap
639-
await self._bootstrap()
639+
# Bootstrap in background so daemon startup is not blocked (up to 30s when nodes unreachable)
640+
self._bootstrap_task = asyncio.create_task(self._bootstrap())
640641

641642
self.logger.info("DHT client started on %s:%s", self.bind_ip, self.bind_port)
642643

@@ -660,6 +661,12 @@ async def stop(self) -> None:
660661
with contextlib.suppress(asyncio.CancelledError):
661662
await self._cleanup_task
662663

664+
if self._bootstrap_task:
665+
self._bootstrap_task.cancel()
666+
with contextlib.suppress(asyncio.CancelledError):
667+
await self._bootstrap_task
668+
self._bootstrap_task = None
669+
663670
# Proper cleanup order: close transport first, then handle socket
664671
if self.transport:
665672
self.transport.close()

ccbt/discovery/tracker.py

Lines changed: 143 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import asyncio
1111
import contextlib
12+
import ipaddress
1213
import logging
1314
import re
1415
import time
@@ -591,11 +592,77 @@ def get_session_metrics(self) -> dict[str, dict[str, Any]]:
591592
metrics.get("connection_reuse_count", 0) / request_count * 100
592593
),
593594
"error_rate": (metrics.get("error_count", 0) / request_count * 100),
595+
"resolution_anomaly_count": metrics.get(
596+
"resolution_anomaly_count", 0
597+
),
598+
"udp_timeout_count": metrics.get("udp_timeout_count", 0),
599+
"udp_connect_failure_count": metrics.get(
600+
"udp_connect_failure_count", 0
601+
),
602+
"http_fallback_attempt_count": metrics.get(
603+
"http_fallback_attempt_count", 0
604+
),
605+
"http_fallback_failure_count": metrics.get(
606+
"http_fallback_failure_count", 0
607+
),
594608
}
595609
else: # pragma: no cover - Zero request count path, tested via stats with requests
596610
stats[host] = metrics
597611
return stats
598612

613+
def _ensure_session_metric_bucket(self, tracker_host: str) -> dict[str, Any]:
614+
"""Return the metric bucket for a tracker host, creating it if needed."""
615+
if tracker_host not in self._session_metrics:
616+
self._session_metrics[tracker_host] = {
617+
"request_count": 0,
618+
"total_request_time": 0.0,
619+
"total_dns_time": 0.0,
620+
"connection_reuse_count": 0,
621+
"error_count": 0,
622+
"resolution_anomaly_count": 0,
623+
"udp_timeout_count": 0,
624+
"udp_connect_failure_count": 0,
625+
"http_fallback_attempt_count": 0,
626+
"http_fallback_failure_count": 0,
627+
}
628+
return self._session_metrics[tracker_host]
629+
630+
def _increment_session_metric(
631+
self, tracker_host: str, metric_name: str, amount: int = 1
632+
) -> None:
633+
"""Increment a tracker session metric."""
634+
metrics = self._ensure_session_metric_bucket(tracker_host)
635+
metrics[metric_name] = int(metrics.get(metric_name, 0) or 0) + amount
636+
637+
def _record_tracker_resolution_anomaly(
638+
self,
639+
tracker_host: str,
640+
scheme: str,
641+
error: Exception,
642+
) -> None:
643+
"""Record when a public tracker resolves to a loopback/private address."""
644+
error_text = str(error)
645+
resolved_matches = re.findall(
646+
r"\('([^']+)',\s*\d+\)",
647+
error_text,
648+
)
649+
if not resolved_matches:
650+
return
651+
resolved_host = resolved_matches[-1]
652+
try:
653+
parsed_ip = ipaddress.ip_address(resolved_host)
654+
except ValueError:
655+
return
656+
if not (parsed_ip.is_loopback or parsed_ip.is_private):
657+
return
658+
self._increment_session_metric(tracker_host, "resolution_anomaly_count")
659+
self.logger.warning(
660+
"TRACKER_RESOLUTION_ANOMALY: %s tracker %s resolved to %s during connect/fallback. This usually indicates hosts-file overrides, DNS filtering, proxy interception, or endpoint security software.",
661+
scheme.upper(),
662+
tracker_host,
663+
resolved_host,
664+
)
665+
599666
def rank_trackers(self, tracker_urls: list[str]) -> list[str]:
600667
"""Rank trackers by performance metrics.
601668
@@ -1092,6 +1159,8 @@ async def announce(
10921159
)
10931160

10941161
is_udp = normalized_url.startswith("udp://")
1162+
fallback_url: Optional[str] = None
1163+
tracker_host = urllib.parse.urlparse(normalized_url).hostname or ""
10951164

10961165
# BEP 15 (UDP) uses 20-byte info_hash; BEP 41 extends UDP with URLData only. Skip UDP for 32-byte (XET).
10971166
if is_udp and len(info_hash) == 32:
@@ -1210,17 +1279,19 @@ async def announce(
12101279
)
12111280

12121281
if udp_result is None:
1213-
# CRITICAL FIX: When UDP tracker fails, try HTTP fallback
1214-
# Convert udp:// to http:// and try HTTP tracker
1215-
http_url = normalized_url.replace("udp://", "http://", 1)
1282+
self._increment_session_metric(
1283+
tracker_host, "udp_timeout_count"
1284+
)
1285+
fallback_url = announce_url.replace("udp://", "http://", 1)
1286+
self._increment_session_metric(
1287+
tracker_host, "http_fallback_attempt_count"
1288+
)
12161289
self.logger.info(
12171290
"UDP tracker announce failed for %s, trying HTTP fallback: %s",
12181291
normalized_url,
1219-
http_url,
1292+
fallback_url,
12201293
)
1221-
# Fall through to HTTP tracker logic below
1222-
# Update normalized_url to HTTP version for HTTP tracker processing
1223-
normalized_url = http_url
1294+
normalized_url = fallback_url
12241295
is_udp = False
12251296
else:
12261297
# UDP announce succeeded - return result
@@ -1234,17 +1305,21 @@ async def announce(
12341305
incomplete=leechers, # Use 'incomplete' instead of 'leechers'
12351306
)
12361307
except Exception as udp_error:
1237-
# CRITICAL FIX: When UDP tracker fails with exception, try HTTP fallback
1308+
self._increment_session_metric(
1309+
tracker_host, "udp_connect_failure_count"
1310+
)
1311+
fallback_url = announce_url.replace("udp://", "http://", 1)
1312+
self._increment_session_metric(
1313+
tracker_host, "http_fallback_attempt_count"
1314+
)
12381315
self.logger.debug(
1239-
"UDP tracker announce failed for %s: %s, trying HTTP fallback",
1316+
"UDP tracker announce failed for %s: %s, trying HTTP fallback %s",
12401317
normalized_url,
12411318
udp_error,
1319+
fallback_url,
12421320
)
1243-
# Convert udp:// to http:// and try HTTP tracker
1244-
http_url = normalized_url.replace("udp://", "http://", 1)
1245-
normalized_url = http_url
1321+
normalized_url = fallback_url
12461322
is_udp = False
1247-
# Continue with HTTP tracker logic below
12481323

12491324
if not is_udp:
12501325
# HTTP tracker announce (including fallback from UDP)
@@ -1663,6 +1738,10 @@ async def announce_to_multiple(
16631738
successful_responses = []
16641739
failed_trackers = []
16651740
total_peers = 0
1741+
timeout_count = 0
1742+
connection_error_count = 0
1743+
invalid_payload_count = 0
1744+
skipped_count = 0
16661745

16671746
for task, result in zip(tasks, results):
16681747
url = url_to_task.get(task, "unknown")
@@ -1691,6 +1770,7 @@ async def announce_to_multiple(
16911770
elif result is None:
16921771
# CRITICAL FIX: Handle None result (UDP tracker skipped due to missing client)
16931772
tracker_type = "UDP" if url.startswith("udp://") else "HTTP/HTTPS"
1773+
skipped_count += 1
16941774
self.logger.debug(
16951775
"%s tracker %s skipped (UDP tracker client unavailable)",
16961776
tracker_type,
@@ -1706,6 +1786,7 @@ async def announce_to_multiple(
17061786

17071787
# Enhanced error messages for common failure types
17081788
if "timeout" in error_msg.lower() or "TimeoutError" in error_type:
1789+
timeout_count += 1
17091790
self.logger.warning(
17101791
"%s tracker %s timed out: %s (tracker may be slow or unreachable)",
17111792
tracker_type,
@@ -1715,12 +1796,24 @@ async def announce_to_multiple(
17151796
elif (
17161797
"connection" in error_msg.lower() or "ConnectionError" in error_type
17171798
):
1799+
connection_error_count += 1
17181800
self.logger.warning(
17191801
"%s tracker %s connection failed: %s (network issue or tracker down)",
17201802
tracker_type,
17211803
url[:80] + "..." if len(url) > 80 else url,
17221804
error_msg,
17231805
)
1806+
elif (
1807+
"parse tracker response" in error_msg.lower()
1808+
or "invalid bencode" in error_msg.lower()
1809+
):
1810+
invalid_payload_count += 1
1811+
self.logger.warning(
1812+
"%s tracker %s returned invalid payload: %s",
1813+
tracker_type,
1814+
url[:80] + "..." if len(url) > 80 else url,
1815+
error_msg,
1816+
)
17241817
else:
17251818
self.logger.warning(
17261819
"%s tracker %s failed: %s (%s)",
@@ -1731,11 +1824,15 @@ async def announce_to_multiple(
17311824
)
17321825

17331826
self.logger.info(
1734-
"✅ ANNOUNCE_TO_MULTIPLE: Multi-tracker announce completed: %d/%d successful, %d total peer(s) discovered (returning %d response(s))",
1827+
"✅ ANNOUNCE_TO_MULTIPLE: Multi-tracker announce completed: %d/%d successful, %d total peer(s) discovered (returning %d response(s), timeouts=%d, connection_errors=%d, invalid_payloads=%d, skipped=%d)",
17351828
len(successful_responses),
17361829
len(tracker_urls),
17371830
total_peers,
17381831
len(successful_responses),
1832+
timeout_count,
1833+
connection_error_count,
1834+
invalid_payload_count,
1835+
skipped_count,
17391836
)
17401837

17411838
# CRITICAL FIX: Log each successful response's peer count for diagnostics
@@ -1821,13 +1918,34 @@ async def _announce_to_tracker(
18211918
normalized_url = self._normalize_tracker_url(announce_url)
18221919
is_udp = normalized_url.startswith("udp://")
18231920
tracker_type = "UDP" if is_udp else "HTTP/HTTPS"
1921+
tracker_host = urllib.parse.urlparse(normalized_url).hostname or ""
1922+
error_text = str(e)
1923+
if is_udp and (
1924+
"HTTP tracker" in error_text or "HTTP fallback" in error_text
1925+
):
1926+
self._increment_session_metric(
1927+
tracker_host, "http_fallback_failure_count"
1928+
)
18241929

1825-
self.logger.warning(
1826-
"%s tracker announce failed for %s: %s",
1827-
tracker_type,
1828-
normalized_url[:100] if len(normalized_url) > 100 else normalized_url,
1829-
str(e),
1830-
)
1930+
if is_udp and (
1931+
"HTTP tracker" in error_text or "HTTP fallback" in error_text
1932+
):
1933+
self.logger.warning(
1934+
"UDP tracker announce failed for %s after HTTP fallback attempt: %s",
1935+
normalized_url[:100]
1936+
if len(normalized_url) > 100
1937+
else normalized_url,
1938+
error_text,
1939+
)
1940+
else:
1941+
self.logger.warning(
1942+
"%s tracker announce failed for %s: %s",
1943+
tracker_type,
1944+
normalized_url[:100]
1945+
if len(normalized_url) > 100
1946+
else normalized_url,
1947+
error_text,
1948+
)
18311949
raise
18321950
except Exception as e:
18331951
# Generic exception - add tracker type context
@@ -2183,16 +2301,7 @@ async def _make_request_async(self, url: str) -> bytes:
21832301
connection_reused = getattr(response, "_connection", None) is not None
21842302

21852303
# Update metrics
2186-
if tracker_host not in self._session_metrics:
2187-
self._session_metrics[tracker_host] = {
2188-
"request_count": 0,
2189-
"total_request_time": 0.0,
2190-
"total_dns_time": 0.0,
2191-
"connection_reuse_count": 0,
2192-
"error_count": 0,
2193-
}
2194-
2195-
metrics = self._session_metrics[tracker_host]
2304+
metrics = self._ensure_session_metric_bucket(tracker_host)
21962305
metrics["request_count"] += 1
21972306
metrics["total_request_time"] += request_time
21982307
metrics["total_dns_time"] += dns_time
@@ -2214,23 +2323,18 @@ async def _make_request_async(self, url: str) -> bytes:
22142323
return await response.read()
22152324

22162325
except aiohttp.ClientSSLError as e: # pragma: no cover - SSL error path tested via exception injection in test_make_request_ssl_error_updates_metrics, but coverage tool may not track exception handler execution perfectly
2217-
if tracker_host in self._session_metrics:
2218-
self._session_metrics[tracker_host]["error_count"] += (
2219-
1 # pragma: no cover - Same context
2220-
)
2326+
self._increment_session_metric(tracker_host, "error_count")
22212327
self.logger.exception("SSL error connecting to tracker %s", url)
22222328
msg = f"SSL handshake failed: {e}"
22232329
raise TrackerError(msg) from e
22242330
except aiohttp.ClientError as e: # pragma: no cover - ClientError path tested via exception injection, but coverage tool may not track exception handler execution perfectly
2225-
if tracker_host in self._session_metrics:
2226-
self._session_metrics[tracker_host]["error_count"] += (
2227-
1 # pragma: no cover - Same context
2228-
)
2331+
self._increment_session_metric(tracker_host, "error_count")
22292332
# CRITICAL FIX: Provide specific error messages instead of generic "Network error"
22302333
# Enhanced error messages to distinguish HTTP vs UDP tracker failures
22312334
error_type = type(e).__name__
22322335
parsed_url = urllib.parse.urlparse(url)
22332336
scheme = parsed_url.scheme
2337+
self._record_tracker_resolution_anomaly(tracker_host, scheme, e)
22342338

22352339
if isinstance(e, aiohttp.ClientConnectorError):
22362340
msg = f"HTTP tracker connection failed ({scheme}://{tracker_host}): {e}"
@@ -2247,8 +2351,7 @@ async def _make_request_async(self, url: str) -> bytes:
22472351
msg = f"HTTP tracker client error ({scheme}://{tracker_host}, {error_type}): {e}"
22482352
raise TrackerError(msg) from e
22492353
except Exception as e:
2250-
if tracker_host in self._session_metrics:
2251-
self._session_metrics[tracker_host]["error_count"] += 1
2354+
self._increment_session_metric(tracker_host, "error_count")
22522355
msg = f"Request failed: {e}"
22532356
raise TrackerError(msg) from e
22542357

0 commit comments

Comments
 (0)