diff --git a/2.0/problems/vector_db_ann_disk/config.yaml b/2.0/problems/vector_db_ann_disk/config.yaml new file mode 100644 index 00000000..f6d638ca --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/config.yaml @@ -0,0 +1,41 @@ +tag: systems +runtime: + language: rust + timeout_seconds: 10800 + environment: "Rust project; hidden disk ANN benchmark; Python/NumPy judge" + apt_packages: + - build-essential + - cargo + - git + - rustc + judge_apt_packages: + - build-essential + - cargo + - rustc + - python3-pip + - python3-numpy + judge_pip_packages: + - faiss-cpu + docker: + image: ubuntu:24.04 +environment: + # If these resource limits change, also update the resource budget text in + # readme and harbor/app/README.md so agents can design parallel algorithms + # for the actual CPU and memory budget. + cpus: 8 + memory_mb: 8192 + storage_mb: 8192 + build_timeout_seconds: 3600 +evaluation: + # The judge drives the search service with this many concurrent workers. + # Keep this aligned with the CPU budget unless the task is intentionally + # changed into a higher-concurrency service benchmark. + query_concurrency: 8 + queries_per_worker: 64 +submission: + kind: directory + path: /app + exclude: + - target + - .git + - .frontier-cs diff --git a/2.0/problems/vector_db_ann_disk/evaluate.sh b/2.0/problems/vector_db_ann_disk/evaluate.sh new file mode 100755 index 00000000..76c2d2f9 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/evaluate.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) + +if [[ $# -gt 0 ]]; then + exec python3 "$SCRIPT_DIR/evaluator.py" "$@" +fi + +SOLUTION="/work/execution_env/solution_env/solution.rs" +if [[ ! -f "$SOLUTION" ]]; then + echo "Error: Missing $SOLUTION" >&2 + exit 1 +fi + +if ! command -v cargo >/dev/null 2>&1 || ! python3 -c 'import numpy, faiss' >/dev/null 2>&1; then + export DEBIAN_FRONTEND=noninteractive + apt-get update -qq + apt-get install -y -qq --no-install-recommends \ + build-essential cargo rustc python3-pip python3-numpy >/dev/null + python3 -c 'import faiss' >/dev/null 2>&1 || \ + pip3 install --break-system-packages -q faiss-cpu +fi + +WORKDIR=$(mktemp -d) +trap 'rm -rf "$WORKDIR"' EXIT +cp -R "$SCRIPT_DIR/harbor/app/." "$WORKDIR/" +cp "$SOLUTION" "$WORKDIR/src/db.rs" + +# The repository validator checks that the evaluator path works; the full +# 100M-vector disk benchmark is exercised through Harbor. +export FRONTIER_VECTOR_DB_N="${FRONTIER_VECTOR_DB_N:-5000}" +export FRONTIER_VECTOR_DB_Q="${FRONTIER_VECTOR_DB_Q:-16}" +export FRONTIER_VECTOR_DB_WARMUP="${FRONTIER_VECTOR_DB_WARMUP:-4}" +export FRONTIER_VECTOR_DB_CACHE="${FRONTIER_VECTOR_DB_CACHE:-/tmp/frontier_vector_db_ann_disk_ci}" + +python3 "$SCRIPT_DIR/evaluator.py" "$WORKDIR" diff --git a/2.0/problems/vector_db_ann_disk/evaluator.py b/2.0/problems/vector_db_ann_disk/evaluator.py new file mode 100644 index 00000000..fea2a19e --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/evaluator.py @@ -0,0 +1,625 @@ +#!/usr/bin/env python3 +"""Evaluator for the Frontier-CS 2.0 Vector DB ANN Disk task.""" + +from __future__ import annotations + +import json +import math +import os +import shutil +import socket +import subprocess +import tempfile +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib import request + +import numpy as np + + +def _read_evaluation_config() -> dict[str, int]: + config_path = Path(__file__).with_name("config.yaml") + if not config_path.exists(): + return {} + + values: dict[str, int] = {} + in_evaluation = False + for raw_line in config_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.split("#", 1)[0].rstrip() + if not line: + continue + if not raw_line.startswith((" ", "\t")): + in_evaluation = line == "evaluation:" + continue + if not in_evaluation: + continue + stripped = line.strip() + if ":" not in stripped: + continue + key, value = stripped.split(":", 1) + key = key.strip() + value = value.strip() + if key in {"query_concurrency", "queries_per_worker"} and value: + values[key] = int(value) + return values + + +def _config_int(name: str, default: int) -> int: + return int(os.environ.get(name, str(default))) + + +_EVALUATION_CONFIG = _read_evaluation_config() +CONFIG_CONCURRENCY = int(_EVALUATION_CONFIG.get("query_concurrency", 8)) +CONFIG_QUERIES_PER_WORKER = int(_EVALUATION_CONFIG.get("queries_per_worker", 64)) + + +DIM = 128 +N_BASE = _config_int("FRONTIER_VECTOR_DB_N", 100_000_000) +CONCURRENCY = _config_int("FRONTIER_VECTOR_DB_CONCURRENCY", CONFIG_CONCURRENCY) +QUERIES_PER_WORKER = _config_int( + "FRONTIER_VECTOR_DB_QUERIES_PER_WORKER", CONFIG_QUERIES_PER_WORKER +) +N_QUERIES = _config_int( + "FRONTIER_VECTOR_DB_Q", CONCURRENCY * QUERIES_PER_WORKER +) +TOP_K = _config_int("FRONTIER_VECTOR_DB_TOP_K", 10) +SEED = _config_int("FRONTIER_VECTOR_DB_SEED", 20260528) +GRAPH_DEGREE = _config_int("FRONTIER_VECTOR_DB_GRAPH_DEGREE", 64) +TARGET_RECALL = float(os.environ.get("FRONTIER_VECTOR_DB_TARGET_RECALL", "0.95")) +QUERY_NOISE = float(os.environ.get("FRONTIER_VECTOR_DB_QUERY_NOISE", "0.02")) +BUILD_TIMEOUT_SECONDS = _config_int("FRONTIER_VECTOR_DB_BUILD_TIMEOUT", 3600) +LOAD_TIMEOUT_SECONDS = _config_int("FRONTIER_VECTOR_DB_LOAD_TIMEOUT", 600) +WARMUP = _config_int("FRONTIER_VECTOR_DB_WARMUP", 32) +REFERENCE_BATCH_SIZE = _config_int("FRONTIER_VECTOR_DB_REFERENCE_BATCH_SIZE", 50_000) +LOCAL_GENERATION_LIMIT = _config_int( + "FRONTIER_VECTOR_DB_LOCAL_GENERATION_LIMIT", 2_000_000 +) +CACHE_DIR = Path( + os.environ.get("FRONTIER_VECTOR_DB_CACHE", "/tmp/frontier_vector_db_ann_disk") +) + +_BENCHMARK: "Benchmark | None" = None + + +@dataclass +class Benchmark: + graph_path: Path + vector_path: Path + queries_path: Path + truth: np.ndarray + baseline_qps: float + baseline_seconds: float + baseline_load_seconds: float + + +def prepare() -> dict: + print( + f"[vector-db-ann-disk] preparing benchmark n_base={N_BASE} " + f"n_queries={N_QUERIES} top_k={TOP_K} graph_degree={GRAPH_DEGREE}", + flush=True, + ) + benchmark = _ensure_benchmark() + print( + f"[vector-db-ann-disk] benchmark ready baseline_qps=" + f"{benchmark.baseline_qps:.6f} baseline_seconds=" + f"{benchmark.baseline_seconds:.6f} baseline_load_seconds=" + f"{benchmark.baseline_load_seconds:.6f}", + flush=True, + ) + return { + "n_base": N_BASE, + "n_queries": N_QUERIES, + "top_k": TOP_K, + "graph_degree": GRAPH_DEGREE, + "baseline_qps": benchmark.baseline_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + } + + +def _invalid(message: str, metrics: dict | None = None): + return 0.0, 0.0, message, metrics or {} + + +def _copy_project(src: Path, dst: Path) -> None: + ignore = shutil.ignore_patterns("target", ".git", ".frontier-cs") + shutil.copytree(src, dst, ignore=ignore) + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + + +def _post_json(url: str, payload: dict, timeout: float = 60.0) -> dict: + body = json.dumps(payload).encode("utf-8") + req = request.Request( + url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=timeout) as response: + return json.loads(response.read().decode("utf-8")) + + +def _wait_for_server( + port: int, deadline: float, process: subprocess.Popen | None = None +) -> None: + last_error: Exception | None = None + while time.time() < deadline: + if process is not None and process.poll() is not None: + stderr = b"" + if process.stderr is not None: + stderr = process.stderr.read()[-800:] + detail = stderr.decode("utf-8", errors="replace") + suffix = f": {detail}" if detail else "" + raise RuntimeError(f"server exited before becoming ready{suffix}") + try: + with socket.create_connection(("127.0.0.1", port), timeout=1.0): + pass + return + except Exception as exc: + last_error = exc + time.sleep(0.25) + raise RuntimeError(f"server did not become ready: {last_error}") + + +def _write_vectors(path: Path, values: np.ndarray) -> None: + values.astype(np.float32, copy=False).tofile(path) + + +def _load_vectors(path: Path, rows: int) -> np.memmap: + return np.memmap(path, dtype=np.float32, mode="r", shape=(rows, DIM)) + + +def _graph_vector_query_paths() -> tuple[Path, Path, Path, Path, Path]: + default_dir = CACHE_DIR / ( + f"n{N_BASE}_d{DIM}_deg{GRAPH_DEGREE}_q{N_QUERIES}_k{TOP_K}_seed{SEED}" + ) + benchmark_dir = Path(os.environ.get("FRONTIER_VECTOR_DB_BENCHMARK_DIR", default_dir)) + graph_path = Path( + os.environ.get("FRONTIER_VECTOR_DB_GRAPH_PATH", benchmark_dir / "graph.bin") + ) + vector_path = Path( + os.environ.get("FRONTIER_VECTOR_DB_VECTOR_PATH", benchmark_dir / "vectors.bin") + ) + queries_path = Path( + os.environ.get("FRONTIER_VECTOR_DB_QUERY_PATH", benchmark_dir / "queries.bin") + ) + truth_path = Path( + os.environ.get("FRONTIER_VECTOR_DB_TRUTH_PATH", benchmark_dir / "truth.u32") + ) + meta_path = Path( + os.environ.get("FRONTIER_VECTOR_DB_BASELINE_PATH", benchmark_dir / "baseline.json") + ) + return graph_path, vector_path, queries_path, truth_path, meta_path + + +def _check_file_size(path: Path, expected_bytes: int, label: str) -> bool: + return path.exists() and path.stat().st_size == expected_bytes + + +def _generate_vectors(vector_path: Path, queries_path: Path) -> None: + rng = np.random.default_rng(SEED) + chunk = 50_000 + base = np.memmap(vector_path, dtype=np.float32, mode="w+", shape=(N_BASE, DIM)) + for start in range(0, N_BASE, chunk): + end = min(start + chunk, N_BASE) + base[start:end] = rng.standard_normal((end - start, DIM), dtype=np.float32) + base.flush() + + ids = rng.integers(0, N_BASE, size=N_QUERIES) + selected = np.asarray(base[ids], dtype=np.float32) + noise = rng.standard_normal((N_QUERIES, DIM), dtype=np.float32) * QUERY_NOISE + _write_vectors(queries_path, selected + noise) + + +def _generate_local_graph(graph_path: Path) -> None: + graph_path.parent.mkdir(parents=True, exist_ok=True) + degree = min(GRAPH_DEGREE, max(0, N_BASE - 1)) + with graph_path.open("wb") as handle: + handle.write(b"FCANNDK1") + np.asarray([N_BASE], dtype=np.uint64).tofile(handle) + np.asarray([DIM, degree], dtype=np.uint32).tofile(handle) + if degree == 0: + return + chunk = 50_000 + offsets = np.arange(1, degree + 1, dtype=np.uint64) + for start in range(0, N_BASE, chunk): + end = min(start + chunk, N_BASE) + ids = np.arange(start, end, dtype=np.uint64).reshape(-1, 1) + neighbors = ((ids + offsets) % N_BASE).astype(np.uint32) + degrees = np.full((end - start, 1), degree, dtype=np.uint32) + np.concatenate([degrees, neighbors], axis=1).tofile(handle) + + +def _ensure_local_generation_allowed() -> None: + if N_BASE <= LOCAL_GENERATION_LIMIT: + return + raise RuntimeError( + "disk benchmark files were not found. Provide graph/vector/query/truth " + "paths with FRONTIER_VECTOR_DB_* environment variables, or set " + "FRONTIER_VECTOR_DB_N to a small value for local smoke testing." + ) + + +def _ensure_data_files( + graph_path: Path, vector_path: Path, queries_path: Path, truth_path: Path, meta_path: Path +) -> None: + expected_vector_bytes = N_BASE * DIM * 4 + expected_query_bytes = N_QUERIES * DIM * 4 + has_vectors = _check_file_size(vector_path, expected_vector_bytes, "vectors") + has_queries = _check_file_size(queries_path, expected_query_bytes, "queries") + + if not has_vectors or not has_queries: + _ensure_local_generation_allowed() + print("[vector-db-ann-disk] generating local vectors", flush=True) + vector_path.parent.mkdir(parents=True, exist_ok=True) + queries_path.parent.mkdir(parents=True, exist_ok=True) + _generate_vectors(vector_path, queries_path) + truth_path.unlink(missing_ok=True) + meta_path.unlink(missing_ok=True) + + if not graph_path.exists(): + _ensure_local_generation_allowed() + print("[vector-db-ann-disk] generating local graph", flush=True) + _generate_local_graph(graph_path) + + +def _run_reference_server(port: int) -> None: + import faiss + + index: faiss.IndexIDMap | None = None + + class ReferenceHandler(BaseHTTPRequestHandler): + server_version = "FrontierVectorDiskReference/1.0" + + def _write_json(self, status: int, payload: dict) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: + if self.path == "/health": + self._write_json(200, {"status": "ok"}) + return + self._write_json(404, {"status": "error", "error": "not found"}) + + def do_POST(self) -> None: + nonlocal index + if self.path == "/load": + try: + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + graph_path = Path(payload["graph_path"]) + vector_path = Path(payload["vector_path"]) + if not graph_path.exists(): + raise FileNotFoundError(f"graph_path not found: {graph_path}") + if not vector_path.exists(): + raise FileNotFoundError(f"vector_path not found: {vector_path}") + + base = _load_vectors(vector_path, N_BASE) + new_index = faiss.IndexIDMap(faiss.IndexFlatL2(DIM)) + for start in range(0, N_BASE, REFERENCE_BATCH_SIZE): + end = min(start + REFERENCE_BATCH_SIZE, N_BASE) + vectors = np.asarray(base[start:end], dtype=np.float32) + ids = np.arange(start, end, dtype=np.int64) + new_index.add_with_ids(vectors, ids) + index = new_index + self._write_json(200, {"status": "ok"}) + except Exception as exc: + self._write_json(400, {"status": "error", "error": str(exc)}) + return + + if self.path != "/search": + self._write_json(404, {"status": "error", "error": "not found"}) + return + + try: + if index is None: + raise RuntimeError("reference index has not been loaded") + length = int(self.headers.get("Content-Length", "0")) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + vector = np.asarray(payload["vector"], dtype=np.float32) + top_k = int(payload.get("top_k", TOP_K)) + if vector.shape != (DIM,): + raise ValueError("query vector has wrong dimension") + if top_k != TOP_K: + raise ValueError("unexpected top_k") + distances, ids = index.search(vector.reshape(1, DIM), top_k) + results = [ + {"id": int(id_), "distance": float(distance)} + for id_, distance in zip(ids[0], distances[0]) + ] + self._write_json(200, {"results": results}) + except Exception as exc: + self._write_json(400, {"status": "error", "error": str(exc)}) + + def log_message(self, fmt: str, *args: object) -> None: + return + + ThreadingHTTPServer(("127.0.0.1", port), ReferenceHandler).serve_forever() + + +def _load_service(base_url: str, graph_path: Path, vector_path: Path) -> float: + start = time.perf_counter() + response = _post_json( + f"{base_url}/load", + {"graph_path": str(graph_path), "vector_path": str(vector_path)}, + timeout=LOAD_TIMEOUT_SECONDS, + ) + load_seconds = max(time.perf_counter() - start, 1e-9) + if response.get("status") != "ok": + raise ValueError(f"load response did not report ok: {response}") + if load_seconds > LOAD_TIMEOUT_SECONDS: + raise TimeoutError(f"load timed out after {LOAD_TIMEOUT_SECONDS}s") + return load_seconds + + +def _measure_reference_baseline( + graph_path: Path, vector_path: Path, queries: np.ndarray +) -> tuple[np.ndarray, list[float], float, float]: + port = _free_port() + process = subprocess.Popen( + ["python3", str(Path(__file__).resolve()), "--reference-server", str(port)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + _wait_for_server(port, time.time() + 120, process) + load_seconds = _load_service(f"http://127.0.0.1:{port}", graph_path, vector_path) + results, latencies, baseline_seconds = _run_queries( + f"http://127.0.0.1:{port}", queries + ) + return results, latencies, baseline_seconds, load_seconds + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +def _ensure_benchmark() -> Benchmark: + global _BENCHMARK + if _BENCHMARK is not None: + return _BENCHMARK + + graph_path, vector_path, queries_path, truth_path, meta_path = ( + _graph_vector_query_paths() + ) + graph_path.parent.mkdir(parents=True, exist_ok=True) + vector_path.parent.mkdir(parents=True, exist_ok=True) + queries_path.parent.mkdir(parents=True, exist_ok=True) + truth_path.parent.mkdir(parents=True, exist_ok=True) + meta_path.parent.mkdir(parents=True, exist_ok=True) + + _ensure_data_files(graph_path, vector_path, queries_path, truth_path, meta_path) + + expected_vector_bytes = N_BASE * DIM * 4 + expected_query_bytes = N_QUERIES * DIM * 4 + if not _check_file_size(vector_path, expected_vector_bytes, "vectors"): + raise RuntimeError(f"vectors file has unexpected size: {vector_path}") + if not _check_file_size(queries_path, expected_query_bytes, "queries"): + raise RuntimeError(f"queries file has unexpected size: {queries_path}") + + queries = _load_vectors(queries_path, N_QUERIES) + + if truth_path.exists() and meta_path.exists(): + truth = np.fromfile(truth_path, dtype=np.uint32).reshape(N_QUERIES, TOP_K) + meta = json.loads(meta_path.read_text(encoding="utf-8")) + baseline_seconds = float(meta["baseline_seconds"]) + baseline_qps = float(meta["baseline_qps"]) + baseline_load_seconds = float(meta["baseline_load_seconds"]) + else: + if N_BASE > LOCAL_GENERATION_LIMIT: + raise RuntimeError( + "truth/baseline files were not found and the local exact reference " + "is disabled for this benchmark size" + ) + print("[vector-db-ann-disk] running Faiss HTTP exact baseline", flush=True) + truth, _, baseline_seconds, baseline_load_seconds = _measure_reference_baseline( + graph_path, vector_path, queries + ) + truth.astype(np.uint32, copy=False).tofile(truth_path) + baseline_qps = N_QUERIES / baseline_seconds + meta_path.write_text( + json.dumps( + { + "baseline_seconds": baseline_seconds, + "baseline_qps": baseline_qps, + "baseline_load_seconds": baseline_load_seconds, + }, + indent=2, + ), + encoding="utf-8", + ) + + _BENCHMARK = Benchmark( + graph_path=graph_path, + vector_path=vector_path, + queries_path=queries_path, + truth=truth, + baseline_qps=baseline_qps, + baseline_seconds=baseline_seconds, + baseline_load_seconds=baseline_load_seconds, + ) + return _BENCHMARK + + +def _search_one(base_url: str, query_index: int, vector: np.ndarray) -> tuple[int, list[int], float]: + start = time.perf_counter() + response = _post_json( + f"{base_url}/search", + {"vector": vector.astype(float).tolist(), "top_k": TOP_K}, + timeout=120.0, + ) + latency_ms = (time.perf_counter() - start) * 1000.0 + ids = [int(item.get("id", -1)) for item in response.get("results", [])[:TOP_K]] + return query_index, ids, latency_ms + + +def _run_queries( + base_url: str, queries: np.ndarray +) -> tuple[np.ndarray, list[float], float]: + for i in range(min(WARMUP, N_QUERIES)): + try: + _search_one(base_url, i, queries[i]) + except Exception: + pass + + results = np.zeros((N_QUERIES, TOP_K), dtype=np.uint32) + latencies: list[float] = [] + start = time.perf_counter() + with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool: + futures = [ + pool.submit(_search_one, base_url, i, queries[i]) + for i in range(N_QUERIES) + ] + for future in as_completed(futures): + query_index, ids, latency_ms = future.result() + if len(ids) != TOP_K: + raise ValueError("search response did not contain top_k results") + if len(set(ids)) != len(ids): + raise ValueError("search response contains duplicate vector ids") + if any(id_ < 0 or id_ >= N_BASE for id_ in ids): + raise ValueError("search response contains an out-of-range vector id") + results[query_index] = np.asarray(ids, dtype=np.uint32) + latencies.append(latency_ms) + duration = max(time.perf_counter() - start, 1e-9) + return results, latencies, duration + + +def _recall_at_k(results: np.ndarray, truth: np.ndarray) -> float: + hits = 0 + for got, expected in zip(results, truth): + hits += len(set(int(x) for x in got) & set(int(x) for x in expected)) + return hits / float(N_QUERIES * TOP_K) + + +def evaluate(solution_path: str): + root = Path(solution_path) + if not root.is_dir(): + return _invalid("submission path must be a Rust project directory") + if not (root / "Cargo.toml").exists(): + return _invalid("Cargo.toml not found in submission directory") + + try: + benchmark = _ensure_benchmark() + except Exception as exc: + return _invalid(f"benchmark preparation failed: {exc}") + + with tempfile.TemporaryDirectory(prefix="frontier_vector_db_ann_disk_") as tmp: + workdir = Path(tmp) / "project" + _copy_project(root, workdir) + try: + subprocess.run( + ["cargo", "build", "--release", "--quiet"], + cwd=workdir, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=BUILD_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired: + return _invalid("cargo build timed out") + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.decode("utf-8", errors="replace") + return _invalid(f"cargo build failed: {stderr[-800:]}") + + queries = _load_vectors(benchmark.queries_path, N_QUERIES) + port = _free_port() + base_url = f"http://127.0.0.1:{port}" + process = subprocess.Popen( + ["cargo", "run", "--release", "--quiet"], + cwd=workdir, + env={**os.environ, "PORT": str(port)}, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + load_seconds = 0.0 + try: + _wait_for_server(port, time.time() + 30, process) + load_seconds = _load_service( + base_url, benchmark.graph_path, benchmark.vector_path + ) + results, latencies, candidate_seconds = _run_queries(base_url, queries) + except Exception as exc: + stderr = b"" + if process.poll() is not None and process.stderr is not None: + stderr = process.stderr.read()[-800:] + metrics = { + "baseline_qps": benchmark.baseline_qps, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "qps": 0.0, + "candidate_seconds": 0.0, + "load_seconds": load_seconds, + "recall_at_10": 0.0, + } + detail = stderr.decode("utf-8", errors="replace") + suffix = f": {detail}" if detail else "" + return _invalid(f"candidate benchmark failed: {exc}{suffix}", metrics) + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + recall = _recall_at_k(results, benchmark.truth) + qps = N_QUERIES / candidate_seconds + if recall < TARGET_RECALL or qps <= benchmark.baseline_qps: + score = 0.0 + else: + score = 100.0 * (1.0 - math.sqrt(benchmark.baseline_qps) / math.sqrt(qps)) + + metrics = { + "qps": qps, + "baseline_qps": benchmark.baseline_qps, + "recall_at_10": recall, + "candidate_seconds": candidate_seconds, + "load_seconds": load_seconds, + "baseline_seconds": benchmark.baseline_seconds, + "baseline_load_seconds": benchmark.baseline_load_seconds, + "avg_latency_ms": float(np.mean(latencies)) if latencies else 0.0, + "p50_latency_ms": float(np.percentile(latencies, 50)) if latencies else 0.0, + "p95_latency_ms": float(np.percentile(latencies, 95)) if latencies else 0.0, + "p99_latency_ms": float(np.percentile(latencies, 99)) if latencies else 0.0, + "concurrency": float(CONCURRENCY), + "n_base": float(N_BASE), + "n_queries": float(N_QUERIES), + "top_k": float(TOP_K), + "graph_degree": float(GRAPH_DEGREE), + } + message = ( + f"N={N_BASE}; Q={N_QUERIES}; top_k={TOP_K}; " + f"recall_at_10={recall:.6f}; qps={qps:.6f}; " + f"baseline_qps={benchmark.baseline_qps:.6f}; " + f"load_seconds={load_seconds:.6f}; score={score:.6f}" + ) + return score, score, message, metrics + + +if __name__ == "__main__": + import sys + + if len(sys.argv) == 3 and sys.argv[1] == "--reference-server": + _run_reference_server(int(sys.argv[2])) + raise SystemExit(0) + + if len(sys.argv) != 2: + print("usage: evaluator.py /path/to/rust/project", file=sys.stderr) + raise SystemExit(2) + bounded, unbounded, detail, metrics = evaluate(sys.argv[1]) + print(detail) + print(json.dumps(metrics, indent=2)) + print(f"{bounded:.12f} {unbounded:.12f}") diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/Cargo.toml b/2.0/problems/vector_db_ann_disk/harbor/app/Cargo.toml new file mode 100644 index 00000000..53c3a883 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "vector-db-skeleton" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.7" +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } + +[profile.release] +lto = true +codegen-units = 1 +debug = true diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/LICENSE.KCORES b/2.0/problems/vector_db_ann_disk/harbor/app/LICENSE.KCORES new file mode 100644 index 00000000..c8393e37 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/LICENSE.KCORES @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 karminski + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/README.md b/2.0/problems/vector_db_ann_disk/harbor/app/README.md new file mode 100644 index 00000000..d0389032 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/README.md @@ -0,0 +1,36 @@ +# Vector DB Disk Skeleton + +This is a starter project for the Vector DB ANN Disk task. You may use it, +modify it, or replace it entirely. The judge only requires that `/app` builds +with: + +```bash +cargo build --release +PORT= cargo run --release --quiet +``` + +and serves the required `/load` and `/search` HTTP endpoints. + +The Harbor environment uses the Ubuntu `apt` Rust toolchain: + +```text +rustc 1.75 +cargo 1.75 +``` + +Pin crate versions if newer transitive dependencies require a newer Rust +compiler. + +The Harbor task provides the following resource budget: + +```text +vCPUs: 8 +memory: 8 GiB +query concurrency: 8 +timed queries per worker: 64 +``` + +## Attribution + +This starter skeleton is adapted from KCORES/vector-db-bench, licensed under +the MIT License. See `LICENSE.KCORES` for the upstream notice. diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/src/api.rs b/2.0/problems/vector_db_ann_disk/harbor/app/src/api.rs new file mode 100644 index 00000000..d0e5cab7 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/src/api.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize)] +pub struct LoadRequest { + pub graph_path: String, + pub vector_path: String, +} + +#[derive(Serialize)] +pub struct LoadResponse { + pub status: String, +} + +#[derive(Deserialize)] +pub struct SearchRequest { + pub vector: Vec, + pub top_k: u32, +} + +#[derive(Serialize)] +pub struct SearchResult { + pub id: u64, + pub distance: f64, +} + +#[derive(Serialize)] +pub struct SearchResponse { + pub results: Vec, +} diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/src/db.rs b/2.0/problems/vector_db_ann_disk/harbor/app/src/db.rs new file mode 100644 index 00000000..f6ebbd47 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/src/db.rs @@ -0,0 +1,21 @@ +use crate::api::SearchResult; + +pub struct VectorDB { + // Implement your index here. +} + +impl VectorDB { + pub fn new() -> Self { + todo!("initialize your vector database") + } + + pub fn load(&self, graph_path: &str, vector_path: &str) { + let _ = (graph_path, vector_path); + todo!("load the disk graph and vector data") + } + + pub fn search(&self, vector: &[f32], top_k: u32) -> Vec { + let _ = (vector, top_k); + todo!("return approximate nearest neighbors") + } +} diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/src/distance.rs b/2.0/problems/vector_db_ann_disk/harbor/app/src/distance.rs new file mode 100644 index 00000000..c67b9092 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/src/distance.rs @@ -0,0 +1,9 @@ +pub fn l2_distance(a: &[f32], b: &[f32]) -> f64 { + a.iter() + .zip(b) + .map(|(x, y)| { + let d = (*x as f64) - (*y as f64); + d * d + }) + .sum() +} diff --git a/2.0/problems/vector_db_ann_disk/harbor/app/src/main.rs b/2.0/problems/vector_db_ann_disk/harbor/app/src/main.rs new file mode 100644 index 00000000..5b254e01 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/harbor/app/src/main.rs @@ -0,0 +1,41 @@ +use axum::{extract::State, routing::post, Json, Router}; +use std::sync::Arc; +use tokio::net::TcpListener; + +mod api; +mod db; +mod distance; + +use api::*; +use db::VectorDB; + +#[tokio::main] +async fn main() { + let db = Arc::new(VectorDB::new()); + let app = Router::new() + .route("/load", post(handle_load)) + .route("/search", post(handle_search)) + .with_state(db); + + let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string()); + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} + +async fn handle_load( + State(db): State>, + Json(req): Json, +) -> Json { + db.load(&req.graph_path, &req.vector_path); + Json(LoadResponse { + status: "ok".to_string(), + }) +} + +async fn handle_search( + State(db): State>, + Json(req): Json, +) -> Json { + let results = db.search(&req.vector, req.top_k); + Json(SearchResponse { results }) +} diff --git a/2.0/problems/vector_db_ann_disk/readme b/2.0/problems/vector_db_ann_disk/readme new file mode 100644 index 00000000..dc0e2016 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/readme @@ -0,0 +1,155 @@ +# Vector DB ANN Disk + +## Problem + +Build a fast approximate nearest-neighbor vector search engine for a +SIFT100M-scale benchmark. + +The hidden benchmark contains exactly `100,000,000` base vectors with dimension +`128`. Queries use the same dimension, distance is squared Euclidean distance, +and each query asks for the top `10` nearest vector ids. + +The benchmark provides a pre-built graph index generated using the [DiskANN](https://github.com/microsoft/diskann) +construction algorithm. The graph has maximum degree `64` and is stored on +disk. Your objective is to maximize serving throughput while preserving search +quality: submissions must reach `recall@10 >= 0.95`, and valid submissions are +ranked by query throughput. + +The Harbor agent container starts with a small Rust skeleton project in +`/app`. You may use it, modify it, or replace it entirely. You may also use any +Rust crates, internal harness, data structures, and build layout you want, as +long as the final project satisfies the judge contract below. + +The judge builds and runs your service with: + +```bash +cargo build --release +PORT= cargo run --release --quiet +``` + +The Harbor environment uses the Ubuntu `apt` Rust toolchain: + +```text +rustc 1.75 +cargo 1.75 +``` + +If you add crates, choose versions compatible with this toolchain or pin +transitive dependencies accordingly. + +The service and judge run with the task resource limits below. Design your +search strategy for this budget: + +```text +vCPUs: 8 +memory: 8 GiB +query concurrency: 8 +timed queries per worker: 64 +``` + +The graph and vector data may be substantially larger than the available +memory. Submissions may load any information they deem useful into memory +during the load phase, subject to the memory limit above. + +The load phase is executed once before queries begin and must complete within: + +```text +600 seconds +``` + +The service must listen on `PORT` and implement these endpoints: + +```text +POST /load +POST /search +``` + +`/load` receives: + +```json +{ + "graph_path":"graph.bin", + "vector_path":"vectors.bin" +} +``` + +and returns: + +```json +{ + "status":"ok" +} +``` + +`/search` receives: + +```json +{"vector":[0.1,0.2,...],"top_k":10} +``` + +and returns: + +```json +{"results":[{"id":0,"distance":0.0}]} +``` + +## Local Harness + +The official evaluator uses hidden data and a black-box judge. You may call: + +```bash +bash /app/submit.sh +``` + +at any time to submit the current `/app` project to the official judge and get +score feedback. + +## Validity + +A submission is valid if: + +1. It builds successfully with `cargo build --release`. +2. `cargo run --release --quiet` starts the service and implements the required HTTP + endpoints. +3. Every returned id is in `[0, 100_000_000)`. +4. Its `recall@10` is at least `0.95` against the hidden exact top-10 ground + truth. +5. The `/load` phase completes within `600` seconds. + +## Scoring + +At trial startup, the Harbor judge sidecar prepares the hidden benchmark and +runs a reference HTTP service through the same `/load` and `/search` +client harnesses to produce ground-truth nearest neighbors and the trial-local +scoring baseline: + +```text +baseline_qps +``` + +Each submission is then timed independently. + +The load phase consists of a single `/load` call. Any preprocessing performed +during `/load` must complete within the timeout above. + +After the load phase completes, the query phase uses 8 concurrent workers, each +issuing 64 queries, and measures only `/search` throughput: + +```text +candidate_qps +``` + +The reported `qps` is the raw query-only QPS. + +If the submission is invalid, if `recall@10 < 0.95`, or if +`candidate_qps <= baseline_qps`, the score is `0`. + +Otherwise: + +```text +score = 100 * (1 - sqrt(baseline_qps) / sqrt(candidate_qps)) +``` + +The bounded and unbounded score fields both report this score. Harbor JSON +results include the measured `qps`, `baseline_qps`, `recall_at_10`, load time, +and runtime metrics under the `metrics` field. \ No newline at end of file diff --git a/2.0/problems/vector_db_ann_disk/reference.py b/2.0/problems/vector_db_ann_disk/reference.py new file mode 100644 index 00000000..47a3ad56 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/reference.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Reference note for Vector DB ANN. + +This is a project-style Rust service task. Agents provide their own harness in +/app; the official Python evaluator is the hidden judge control plane. +""" diff --git a/2.0/problems/vector_db_ann_disk/reference.rs b/2.0/problems/vector_db_ann_disk/reference.rs new file mode 100644 index 00000000..742e7aa7 --- /dev/null +++ b/2.0/problems/vector_db_ann_disk/reference.rs @@ -0,0 +1,49 @@ +use crate::api::SearchResult; +use crate::distance::l2_distance; +use std::sync::RwLock; + +const DIM: usize = 128; + +pub struct VectorDB { + vectors: RwLock)>>, +} + +impl VectorDB { + pub fn new() -> Self { + Self { + vectors: RwLock::new(Vec::new()), + } + } + + pub fn load(&self, _graph_path: &str, vector_path: &str) { + let bytes = std::fs::read(vector_path).expect("read vector file"); + let row_bytes = DIM * std::mem::size_of::(); + let mut vectors = Vec::with_capacity(bytes.len() / row_bytes); + for (id, row) in bytes.chunks_exact(row_bytes).enumerate() { + let mut vector = Vec::with_capacity(DIM); + for value in row.chunks_exact(4) { + vector.push(f32::from_le_bytes([ + value[0], value[1], value[2], value[3], + ])); + } + vectors.push((id as u64, vector)); + } + *self.vectors.write().unwrap() = vectors; + } + + pub fn search(&self, vector: &[f32], top_k: u32) -> Vec { + let mut scored: Vec = self + .vectors + .read() + .unwrap() + .iter() + .map(|(id, candidate)| SearchResult { + id: *id, + distance: l2_distance(vector, candidate), + }) + .collect(); + scored.sort_by(|a, b| a.distance.total_cmp(&b.distance)); + scored.truncate(top_k as usize); + scored + } +}