Skip to content

Commit 5b82b47

Browse files
committed
adds bugfix, hydration , pools , connections, tests, and more
1 parent e6a453f commit 5b82b47

43 files changed

Lines changed: 3780 additions & 1021 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.environement.md

Lines changed: 0 additions & 626 deletions
This file was deleted.

ccbt/config/config.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,9 @@ def _get_env_config(self) -> dict[str, Any]:
694694
"CCBT_XET_MULTICAST_ADDRESS": "network.xet_multicast_address",
695695
"CCBT_XET_MULTICAST_PORT": "network.xet_multicast_port",
696696
"CCBT_PIPELINE_DEPTH": "network.pipeline_depth",
697+
"CCBT_SPARSE_PIPELINE_STALE_PAYLOAD_CANCEL_S": (
698+
"network.sparse_pipeline_stale_payload_cancel_s"
699+
),
697700
"CCBT_BLOCK_SIZE_KIB": "network.block_size_kib",
698701
"CCBT_CONNECTION_TIMEOUT": "network.connection_timeout",
699702
"CCBT_HANDSHAKE_TIMEOUT": "network.handshake_timeout",
@@ -704,6 +707,10 @@ def _get_env_config(self) -> dict[str, Any]:
704707
"CCBT_HANDSHAKE_TIMEOUT_HEALTHY_MIN": "network.handshake_timeout_healthy_min",
705708
"CCBT_HANDSHAKE_TIMEOUT_HEALTHY_MAX": "network.handshake_timeout_healthy_max",
706709
"CCBT_HANDSHAKE_ADAPTIVE_TIMEOUT_ENABLED": "network.handshake_adaptive_timeout_enabled",
710+
# false = deprecated legacy (always max timeout in desperation band)
711+
"CCBT_HANDSHAKE_TIMEOUT_DESPERATION_INTERPOLATE": (
712+
"network.handshake_timeout_desperation_interpolate"
713+
),
707714
"CCBT_ADAPTIVE_TIMEOUT_HEALTH_PEER_SOURCE": (
708715
"network.adaptive_timeout_health_peer_source"
709716
),
@@ -716,12 +723,19 @@ def _get_env_config(self) -> dict[str, Any]:
716723
"CCBT_METADATA_EXCHANGE_TIMEOUT": "network.metadata_exchange_timeout",
717724
"CCBT_PEER_QUALITY_PROBATION_TIMEOUT": "network.peer_quality_probation_timeout",
718725
"CCBT_METADATA_PIECE_TIMEOUT": "network.metadata_piece_timeout",
726+
"CCBT_BITFIELD_HAVE_WAIT_TIMEOUT_S": "network.bitfield_have_wait_timeout_s",
727+
"CCBT_BITFIELD_HAVE_WAIT_METADATA_INCOMPLETE_MULTIPLIER": (
728+
"network.bitfield_have_wait_metadata_incomplete_multiplier"
729+
),
719730
"CCBT_CONNECTION_HEALTH_CHECK_INTERVAL": "network.connection_health_check_interval",
720731
"CCBT_CONNECTION_VALIDATION_ENABLED": "network.connection_validation_enabled",
721732
"CCBT_PEER_VALIDATION_ENABLED": "network.peer_validation_enabled",
722733
"CCBT_SEND_BITFIELD_AFTER_METADATA": "network.send_bitfield_after_metadata",
723734
"CCBT_SEND_INTERESTED_AFTER_METADATA": "network.send_interested_after_metadata",
724735
"CCBT_MAX_CONCURRENT_CONNECTION_ATTEMPTS": "network.max_concurrent_connection_attempts",
736+
"CCBT_CONNECT_TO_PEERS_PARALLEL_BATCHES": (
737+
"network.connect_to_peers_parallel_batches"
738+
),
725739
"CCBT_MSE_INITIATOR_TIMEOUT_SCALE_ZERO_ACTIVE": (
726740
"network.mse_initiator_timeout_scale_zero_active"
727741
),
@@ -981,6 +995,28 @@ def _get_env_config(self) -> dict[str, Any]:
981995
"CCBT_TRACKER_PEER_COUNT_WEIGHT": "discovery.tracker_peer_count_weight",
982996
"CCBT_TRACKER_PERFORMANCE_WEIGHT": "discovery.tracker_performance_weight",
983997
"CCBT_DEFAULT_TRACKERS": "discovery.default_trackers",
998+
"CCBT_TRACKER_UDP_PENDING_SOFT_CAP_PER_HOST": (
999+
"discovery.tracker_udp_pending_soft_cap_per_host"
1000+
),
1001+
"CCBT_TRACKER_UDP_MAX_PENDING_REQUESTS": (
1002+
"discovery.tracker_udp_max_pending_requests"
1003+
),
1004+
"CCBT_TRACKER_UDP_WAIT_PACING_LOAD_RATIO": (
1005+
"discovery.tracker_udp_wait_pacing_load_ratio"
1006+
),
1007+
"CCBT_TRACKER_INGRESS_HOLD_PENDING_QUEUE_THRESHOLD": (
1008+
"discovery.tracker_ingress_hold_pending_queue_threshold"
1009+
),
1010+
# Deprecated to set false: legacy peer ordering; default true is supported path.
1011+
"CCBT_STRICT_TRACKER_SOURCE_CONNECT_PRIORITY": (
1012+
"discovery.strict_tracker_source_connect_priority"
1013+
),
1014+
"CCBT_STRICT_TRACKER_PENDING_DHT_PEX_BOOST": (
1015+
"discovery.strict_tracker_pending_dht_pex_boost"
1016+
),
1017+
"CCBT_STRICT_TRACKER_PENDING_TRACKER_PREFIX": (
1018+
"discovery.strict_tracker_pending_tracker_prefix"
1019+
),
9841020
"CCBT_PEX_INTERVAL": "discovery.pex_interval",
9851021
"CCBT_STRICT_PRIVATE_MODE": "discovery.strict_private_mode",
9861022
# BEP 32: IPv6 Extension for DHT
@@ -1074,6 +1110,13 @@ def _get_env_config(self) -> dict[str, Any]:
10741110
"CCBT_TRACKER_IMMEDIATE_PER_SOURCE_CAP_MODE": (
10751111
"discovery.tracker_immediate_per_source_cap_mode"
10761112
),
1113+
"CCBT_TRACKER_IMMEDIATE_PER_TRACKER_COOLDOWN_ENABLED": (
1114+
"discovery.tracker_immediate_per_tracker_cooldown_enabled"
1115+
),
1116+
"CCBT_MAX_TRACKER_URLS_PER_TORRENT": "discovery.max_tracker_urls_per_torrent",
1117+
"CCBT_ANNOUNCE_MAX_TRACKERS_PER_ROUND": (
1118+
"discovery.announce_max_trackers_per_round"
1119+
),
10771120
# XET chunk discovery
10781121
"CCBT_XET_CHUNK_QUERY_BATCH_SIZE": "discovery.xet_chunk_query_batch_size",
10791122
"CCBT_XET_CHUNK_QUERY_MAX_CONCURRENT": "discovery.xet_chunk_query_max_concurrent",
@@ -1378,6 +1421,42 @@ def export(self, fmt: str = "toml", encrypt_passwords: bool = True) -> str:
13781421
msg = f"Unsupported export format: {fmt}" # pragma: no cover
13791422
raise ConfigurationError(msg) # pragma: no cover
13801423

1424+
def get_runtime_env_diagnostics(self) -> dict[str, Any]:
1425+
"""Return runtime env + dotenv provenance diagnostics for support reports."""
1426+
import os
1427+
1428+
return {
1429+
"dotenv_loader_requested": str(os.getenv("CCBT_LOAD_DOTENV", "")).strip(),
1430+
"dotenv_loaded": str(os.getenv("CCBT_DOTENV_LOADED", "0")).strip(),
1431+
"dotenv_path_effective": str(
1432+
os.getenv("CCBT_DOTENV_PATH_EFFECTIVE", "")
1433+
).strip(),
1434+
"dotenv_keys_loaded": str(
1435+
os.getenv("CCBT_DOTENV_KEYS_LOADED", "0")
1436+
).strip(),
1437+
"max_peers_per_torrent_effective": int(
1438+
getattr(self.config.network, "max_peers_per_torrent", 0) or 0
1439+
),
1440+
"tracker_immediate_connect_burst_total": int(
1441+
getattr(
1442+
self.config.discovery,
1443+
"tracker_immediate_connect_burst_total",
1444+
0,
1445+
)
1446+
or 0
1447+
),
1448+
"tracker_immediate_per_tracker_cooldown_enabled": bool(
1449+
getattr(
1450+
self.config.discovery,
1451+
"tracker_immediate_per_tracker_cooldown_enabled",
1452+
True,
1453+
)
1454+
),
1455+
"target_requestable_peers": int(
1456+
getattr(self.config.discovery, "target_requestable_peers", 0) or 0
1457+
),
1458+
}
1459+
13811460
def save_config(self) -> None:
13821461
"""Save current configuration to file.
13831462

ccbt/config/env_bootstrap.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def maybe_load_dotenv_from_env() -> None:
123123
"""If ``CCBT_LOAD_DOTENV`` is truthy, merge ``.env`` into the process environment."""
124124
raw_flag = os.getenv(_LOAD_DOTENV_FLAG)
125125
if not _truthy_env(raw_flag):
126+
os.environ.setdefault("CCBT_DOTENV_LOADED", "0")
126127
logger.debug(
127128
"Skipping dotenv load: %s is not truthy (value=%r)",
128129
_LOAD_DOTENV_FLAG,
@@ -133,6 +134,9 @@ def maybe_load_dotenv_from_env() -> None:
133134
raw_path = os.getenv(_DOTENV_PATH_VAR)
134135
path = Path(raw_path).expanduser() if raw_path else Path.cwd() / ".env"
135136
n = load_dotenv_file(path)
137+
os.environ["CCBT_DOTENV_LOADED"] = "1"
138+
os.environ["CCBT_DOTENV_PATH_EFFECTIVE"] = str(path)
139+
os.environ["CCBT_DOTENV_KEYS_LOADED"] = str(int(n))
136140
logger.debug(
137141
"Loaded %d variable(s) from dotenv (path=%s, CCBT_LOAD_DOTENV set)",
138142
n,

ccbt/discovery/tracker_dedupe.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Deduplicate tracker announce URLs that target the same host:port endpoint.
2+
3+
Multiple schemes (https/http/udp) to the same endpoint create redundant announces
4+
and multiply load on the shared UDP tracker client. We keep the highest-priority
5+
scheme per endpoint while preserving first-seen ordering of endpoints.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
from typing import Optional, Tuple
11+
from urllib.parse import urlparse
12+
13+
# Prefer TLS-capable HTTP when the same host:port is reachable as both.
14+
_SCHEME_PRIORITY: dict[str, int] = {
15+
"https": 3,
16+
"http": 2,
17+
"udp": 1,
18+
}
19+
20+
21+
def _default_port_for_scheme(scheme: str) -> Optional[int]:
22+
s = scheme.lower()
23+
if s == "http":
24+
return 80
25+
if s == "https":
26+
return 443
27+
return None
28+
29+
30+
def tracker_endpoint_key(url: str) -> Optional[Tuple[str, int]]:
31+
"""Return (host_lower, port) for deduplication, or None if not dedupeable."""
32+
try:
33+
parsed = urlparse(url.strip())
34+
host = (parsed.hostname or "").lower()
35+
if not host:
36+
return None
37+
port = parsed.port
38+
if port is None:
39+
port = _default_port_for_scheme(parsed.scheme or "")
40+
if port is None:
41+
return None
42+
return (host, int(port))
43+
except (TypeError, ValueError):
44+
return None
45+
46+
47+
def dedupe_tracker_urls_by_host_port(urls: list[str]) -> list[str]:
48+
"""Collapse URLs that share the same host:port, preferring https > http > udp.
49+
50+
Order: first occurrence of each endpoint in ``urls`` defines output position.
51+
Unparseable URLs are appended in original order (string-deduped).
52+
"""
53+
if not urls:
54+
return []
55+
56+
best_by_endpoint: dict[tuple[str, int], tuple[int, str]] = {}
57+
order: list[tuple[str, int]] = []
58+
seen_ep: set[tuple[str, int]] = set()
59+
unparsed: list[str] = []
60+
unparsed_seen: set[str] = set()
61+
62+
for raw in urls:
63+
if not isinstance(raw, str):
64+
continue
65+
u = raw.strip()
66+
if not u:
67+
continue
68+
key = tracker_endpoint_key(u)
69+
if key is None:
70+
if u not in unparsed_seen:
71+
unparsed_seen.add(u)
72+
unparsed.append(u)
73+
continue
74+
scheme = (urlparse(u).scheme or "").lower()
75+
pri = _SCHEME_PRIORITY.get(scheme, 0)
76+
if key not in seen_ep:
77+
seen_ep.add(key)
78+
order.append(key)
79+
best_by_endpoint[key] = (pri, u)
80+
else:
81+
old_pri, _old_url = best_by_endpoint[key]
82+
if pri > old_pri:
83+
best_by_endpoint[key] = (pri, u)
84+
85+
out = [best_by_endpoint[k][1] for k in order]
86+
out.extend(unparsed)
87+
return out

ccbt/discovery/tracker_udp_client.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from urllib.parse import urlparse
3131

3232
from ccbt.config.config import get_config
33+
from ccbt.session.peer_discovery_telemetry import observe_udp_tracker_pending_window
3334

3435
if TYPE_CHECKING:
3536
from ccbt.models import PeerInfo
@@ -166,6 +167,11 @@ def __init__(self, peer_id: Optional[bytes] = None, test_mode: bool = False):
166167
self._timeout_warning_host_state: dict[str, tuple[float, int]] = {}
167168
self._tracker_response_timeout_alpha: float = 0.25
168169
self._tracker_response_timeout_ema: dict[str, float] = {}
170+
self._tracker_timeout_floor_scale: dict[str, float] = {}
171+
self._pending_request_host_by_tid: dict[int, str] = {}
172+
self._pending_request_soft_cap_per_host: int = 24
173+
self._udp_wait_pacing_load_ratio: float = 0.5
174+
self._last_udp_pending_gauge_monotonic: float = 0.0
169175

170176
# Background tasks
171177
self._cleanup_task: Optional[asyncio.Task] = None
@@ -195,6 +201,7 @@ def __init__(self, peer_id: Optional[bytes] = None, test_mode: bool = False):
195201
self._xet_chunk_registry: dict[tuple[bytes, Optional[str]], list[PeerInfo]] = {}
196202

197203
self.logger = logging.getLogger(__name__)
204+
self._refresh_udp_pending_settings_from_config()
198205
if not test_mode and not _udp_singleton_construct_in_progress():
199206
msg = (
200207
"AsyncUDPTrackerClient must be obtained via get_udp_tracker_client() "
@@ -203,6 +210,29 @@ def __init__(self, peer_id: Optional[bytes] = None, test_mode: bool = False):
203210
)
204211
raise RuntimeError(msg)
205212

213+
def _refresh_udp_pending_settings_from_config(self) -> None:
214+
"""Apply discovery.* limits for the process-wide UDP tracker singleton."""
215+
disc = getattr(self.config, "discovery", None)
216+
if disc is None:
217+
return
218+
with contextlib.suppress(Exception):
219+
self._pending_request_soft_cap_per_host = int(
220+
getattr(disc, "tracker_udp_pending_soft_cap_per_host", 24)
221+
)
222+
self._max_pending_requests = int(
223+
getattr(disc, "tracker_udp_max_pending_requests", 128)
224+
)
225+
self._udp_wait_pacing_load_ratio = float(
226+
getattr(disc, "tracker_udp_wait_pacing_load_ratio", 0.5)
227+
)
228+
229+
def _maybe_emit_udp_pending_gauge(self) -> None:
230+
now = time.monotonic()
231+
if now - self._last_udp_pending_gauge_monotonic < 0.25:
232+
return
233+
self._last_udp_pending_gauge_monotonic = now
234+
observe_udp_tracker_pending_window(len(self.pending_requests))
235+
206236
@property
207237
def socket_ready(self) -> bool:
208238
"""Check if socket is ready.
@@ -380,10 +410,16 @@ def _get_adaptive_wait_timeout(
380410
queue_pressure = pending_count / effective_cap if effective_cap > 0 else 0.0
381411
queue_pressure = max(0.0, min(1.0, queue_pressure))
382412

383-
# When there is queue pressure, reduce per-request timeout to avoid long stalls.
384-
queue_scale = 1.0 - (0.45 * queue_pressure)
385-
413+
# Queue pressure scaling with congestion floor + hysteresis.
414+
# Slightly steeper than legacy 0.45 to shorten waits under multiplex load.
415+
queue_scale = 1.0 - (0.55 * queue_pressure)
386416
host_key = self._get_tracker_host(tracker_host)
417+
previous_floor = float(self._tracker_timeout_floor_scale.get(host_key, 0.65))
418+
target_floor = 0.65 if queue_pressure < 0.7 else 0.8
419+
floor_scale = (0.85 * previous_floor) + (0.15 * target_floor)
420+
self._tracker_timeout_floor_scale[host_key] = floor_scale
421+
queue_scale = max(floor_scale, queue_scale)
422+
387423
host_ema = self._tracker_response_timeout_ema.get(host_key)
388424
host_scale = 1.0
389425
if host_ema is not None and host_ema > 0:
@@ -636,6 +672,7 @@ async def start(self) -> None:
636672
CRITICAL: Socket must be initialized during daemon startup via start_udp_tracker_client().
637673
Socket recreation is not supported as it breaks session logic.
638674
"""
675+
self._refresh_udp_pending_settings_from_config()
639676
# Note: Assert socket should never be recreated during runtime
640677
# If socket is already initialized and healthy, return immediately
641678
# Socket recreation breaks session logic and causes WinError 10022 on Windows
@@ -2446,6 +2483,7 @@ def _prune_stale_pending_requests(
24462483
future = self.pending_requests.pop(transaction_id, None)
24472484
self._pending_request_timestamps.pop(transaction_id, None)
24482485
self.pending_immediate_callbacks.pop(transaction_id, None)
2486+
self._pending_request_host_by_tid.pop(transaction_id, None)
24492487
if future is not None and not future.done():
24502488
future.cancel()
24512489
self._mark_stale_transaction(transaction_id, now=now)
@@ -2468,6 +2506,7 @@ def _prune_stale_pending_requests(
24682506
future = self.pending_requests.pop(transaction_id, None)
24692507
self._pending_request_timestamps.pop(transaction_id, None)
24702508
self.pending_immediate_callbacks.pop(transaction_id, None)
2509+
self._pending_request_host_by_tid.pop(transaction_id, None)
24712510
if future is not None and not future.done():
24722511
future.cancel()
24732512
self._mark_stale_transaction(transaction_id, now=now)
@@ -2493,6 +2532,29 @@ async def _wait_for_response(
24932532
future = asyncio.Future()
24942533
now = time.time()
24952534
start_wait = now
2535+
host_pending = sum(
2536+
1 for h in self._pending_request_host_by_tid.values() if h == host
2537+
)
2538+
if host_pending >= self._pending_request_soft_cap_per_host:
2539+
self.logger.debug(
2540+
"Tracker host pending soft cap reached: host=%s pending=%d cap=%d",
2541+
host,
2542+
host_pending,
2543+
self._pending_request_soft_cap_per_host,
2544+
)
2545+
return None
2546+
effective_cap_pre = self._get_effective_pending_request_cap()
2547+
pending_pre = len(self.pending_requests)
2548+
pace_threshold = self._udp_wait_pacing_load_ratio * effective_cap_pre
2549+
if effective_cap_pre > 0 and pending_pre > int(pace_threshold):
2550+
# Pace new waits when the shared UDP client is heavily loaded so responses
2551+
# can drain before adding more in-flight transactions.
2552+
half_span = max(1.0, pace_threshold)
2553+
pressure = min(
2554+
1.0,
2555+
(pending_pre - pace_threshold) / half_span,
2556+
)
2557+
await asyncio.sleep(0.04 + 0.12 * pressure)
24962558
pruned_count = self._prune_stale_pending_requests(
24972559
now=now, timeout=timeout, additional_new=1
24982560
)
@@ -2523,11 +2585,14 @@ async def _wait_for_response(
25232585
)
25242586
self.pending_requests[transaction_id] = future
25252587
self._pending_request_timestamps[transaction_id] = now
2588+
self._pending_request_host_by_tid[transaction_id] = host
25262589
if immediate_peers_callback is not None:
25272590
self.pending_immediate_callbacks[transaction_id] = immediate_peers_callback
25282591
else:
25292592
self.pending_immediate_callbacks.pop(transaction_id, None)
25302593

2594+
self._maybe_emit_udp_pending_gauge()
2595+
25312596
try:
25322597
response = await asyncio.wait_for(future, timeout=adaptive_timeout)
25332598
elapsed = time.time() - start_wait
@@ -2566,6 +2631,7 @@ async def _wait_for_response(
25662631
self.pending_requests.pop(transaction_id, None)
25672632
self._pending_request_timestamps.pop(transaction_id, None)
25682633
self.pending_immediate_callbacks.pop(transaction_id, None)
2634+
self._pending_request_host_by_tid.pop(transaction_id, None)
25692635
self._cleanup_stale_response_transaction_ids(now=time.time())
25702636

25712637
@staticmethod

0 commit comments

Comments
 (0)