Skip to content

Commit 85de0dd

Browse files
authored
Merge pull request #196 from Context-Engine-AI/chunk
Add tree cache to ASTAnalyzer and optimize vector projection
2 parents 34db888 + 97a8a87 commit 85de0dd

6 files changed

Lines changed: 1974 additions & 32 deletions

File tree

scripts/ast_analyzer.py

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -209,18 +209,29 @@ class ASTAnalyzer:
209209
- Dependency tracking
210210
- Semantic chunking (preserve boundaries)
211211
- Cross-reference analysis
212+
- Tree cache for parsed ASTs (avoids re-parsing unchanged files)
212213
"""
213214

214-
def __init__(self, use_tree_sitter: bool = True):
215+
def __init__(self, use_tree_sitter: bool = True, use_tree_cache: bool = True):
215216
"""
216217
Initialize AST analyzer.
217218
218219
Args:
219220
use_tree_sitter: Use tree-sitter when available (fallback to ast module)
221+
use_tree_cache: Cache parsed trees for unchanged files (mtime-based invalidation)
220222
"""
221223
self.use_tree_sitter = use_tree_sitter and _TS_AVAILABLE
222224
self._parsers: Dict[str, Any] = {}
223225

226+
# Tree cache for avoiding re-parsing unchanged files
227+
self._tree_cache = None
228+
if use_tree_cache:
229+
try:
230+
from scripts.ingest.tree_cache import get_default_cache
231+
self._tree_cache = get_default_cache()
232+
except ImportError:
233+
logger.debug("TreeCache not available, parsing will not be cached")
234+
224235
# Language support matrix
225236
self.supported_languages = {
226237
"python": {"ast": True, "tree_sitter": True},
@@ -234,7 +245,49 @@ def __init__(self, use_tree_sitter: bool = True):
234245
"ruby": {"ast": False, "tree_sitter": True},
235246
}
236247

237-
logger.info(f"ASTAnalyzer initialized: tree_sitter={self.use_tree_sitter}")
248+
logger.info(f"ASTAnalyzer initialized: tree_sitter={self.use_tree_sitter}, tree_cache={'enabled' if self._tree_cache else 'disabled'}")
249+
250+
def _parse_with_cache(self, parser: Any, content: str, file_path: str, language: str, content_provided: bool = False) -> Optional[Any]:
251+
"""Parse content with tree-sitter, using cache when available.
252+
253+
Args:
254+
parser: Tree-sitter parser instance
255+
content: Source code content
256+
file_path: Path to the file (used as cache key)
257+
language: Programming language
258+
content_provided: If True, content was explicitly provided (not read from disk),
259+
so skip cache to avoid returning stale tree
260+
261+
Returns:
262+
Parsed tree or None on failure
263+
"""
264+
path = Path(file_path) if file_path else None
265+
266+
# Try to get cached tree (only for real files when content was NOT explicitly provided)
267+
# If content_provided=True, the caller passed in-memory content that may differ from disk
268+
if self._tree_cache and path and path.exists() and not content_provided:
269+
cached_tree = self._tree_cache.get(path)
270+
if cached_tree is not None:
271+
return cached_tree
272+
273+
# Parse the content
274+
try:
275+
tree = parser.parse(content.encode("utf-8"))
276+
except Exception as e:
277+
logger.debug(f"Tree-sitter parse failed for {language}: {e}")
278+
return None
279+
280+
# Cache the result for real files
281+
if self._tree_cache and path and path.exists() and tree is not None:
282+
self._tree_cache.put(path, tree)
283+
284+
return tree
285+
286+
def get_tree_cache_stats(self) -> Dict[str, Any]:
287+
"""Get tree cache statistics for monitoring."""
288+
if self._tree_cache:
289+
return self._tree_cache.get_stats()
290+
return {"enabled": False}
238291

239292
def analyze_file(
240293
self, file_path: str, language: str, content: Optional[str] = None
@@ -250,6 +303,10 @@ def analyze_file(
250303
Returns:
251304
Dict with symbols, imports, calls, and dependencies
252305
"""
306+
# Track if content was explicitly provided (vs read from disk)
307+
# This affects caching - explicit content may differ from on-disk state
308+
content_provided = content is not None
309+
253310
if content is None:
254311
try:
255312
content = Path(file_path).read_text(encoding="utf-8", errors="ignore")
@@ -259,7 +316,7 @@ def analyze_file(
259316

260317
# Use language mappings (32 languages, declarative queries)
261318
if _LANGUAGE_MAPPINGS_AVAILABLE and self.use_tree_sitter:
262-
result = self._analyze_with_mapping(content, file_path, language)
319+
result = self._analyze_with_mapping(content, file_path, language, content_provided)
263320
if result and (result.get("symbols") or result.get("imports") or result.get("calls")):
264321
return result
265322

@@ -438,11 +495,17 @@ def extract_dependencies(
438495

439496
# ---- Language Mappings Analysis (unified, concept-based) ----
440497

441-
def _analyze_with_mapping(self, content: str, file_path: str, language: str) -> Dict[str, Any]:
498+
def _analyze_with_mapping(self, content: str, file_path: str, language: str, content_provided: bool = False) -> Dict[str, Any]:
442499
"""Analyze code using language mappings (concept-based extraction).
443500
444501
This uses the declarative tree-sitter queries from language_mappings
445502
to extract symbols, imports, and calls. Supports 34 languages.
503+
504+
Args:
505+
content: Source code content
506+
file_path: Path to the file
507+
language: Programming language
508+
content_provided: If True, content was explicitly provided (not read from disk)
446509
"""
447510
if not _LANGUAGE_MAPPINGS_AVAILABLE:
448511
return self._empty_analysis()
@@ -461,12 +524,12 @@ def _analyze_with_mapping(self, content: str, file_path: str, language: str) ->
461524
if not parser:
462525
return self._empty_analysis()
463526

464-
try:
465-
tree = parser.parse(content.encode("utf-8"))
466-
root = tree.root_node
467-
except Exception as e:
468-
logger.debug(f"Tree-sitter parse failed for {language}: {e}")
527+
# Parse with caching (avoids re-parsing unchanged files)
528+
# Skip cache if content was explicitly provided to avoid stale results
529+
tree = self._parse_with_cache(parser, content, file_path, language, content_provided)
530+
if tree is None:
469531
return self._empty_analysis()
532+
root = tree.root_node
470533

471534
content_bytes = content.encode("utf-8")
472535
symbols: List[CodeSymbol] = []

scripts/ingest/qdrant.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -855,10 +855,19 @@ def delete_points_by_path(client: QdrantClient, collection: str, file_path: str)
855855

856856

857857
def upsert_points(
858-
client: QdrantClient, collection: str, points: List[models.PointStruct]
858+
client: QdrantClient, collection: str, points: List[models.PointStruct],
859+
*, wait: bool = None
859860
):
860861
"""Upsert points with retry and batching.
861862
863+
Args:
864+
client: Qdrant client instance
865+
collection: Collection name
866+
points: List of points to upsert
867+
wait: Whether to wait for upsert to complete. Default is controlled by
868+
INDEX_UPSERT_ASYNC env var (0=sync/wait, 1=async/no-wait).
869+
Async mode is faster but may cause read-after-write issues.
870+
862871
Raises:
863872
ValueError: If collection is None or empty.
864873
"""
@@ -878,19 +887,24 @@ def upsert_points(
878887
backoff = float(os.environ.get("INDEX_UPSERT_BACKOFF", "0.5") or 0.5)
879888
except Exception:
880889
backoff = 0.5
890+
891+
# Determine wait mode: explicit param > env var > default (sync)
892+
if wait is None:
893+
async_mode = os.environ.get("INDEX_UPSERT_ASYNC", "0").strip().lower() in {"1", "true", "yes", "on"}
894+
wait = not async_mode
881895

882896
failed_count = 0
883897
for i in range(0, len(points), max(1, bsz)):
884898
batch = points[i : i + max(1, bsz)]
885899
attempt = 0
886900
while True:
887901
try:
888-
client.upsert(collection_name=collection, points=batch, wait=True)
902+
client.upsert(collection_name=collection, points=batch, wait=wait)
889903
break
890904
except Exception as e:
891905
attempt += 1
892906
if attempt >= retries:
893-
# Final fallback: try smaller sub-batches
907+
# Final fallback: try smaller sub-batches (always sync for reliability)
894908
sub_size = max(1, bsz // 4)
895909
sub_failed = 0
896910
for j in range(0, len(batch), sub_size):
@@ -901,7 +915,6 @@ def upsert_points(
901915
)
902916
except Exception as sub_e:
903917
sub_failed += len(sub)
904-
# Log individual sub-batch failures for debugging
905918
print(f"[UPSERT_WARNING] Sub-batch upsert failed ({len(sub)} points): {sub_e}", flush=True)
906919
if sub_failed > 0:
907920
failed_count += sub_failed
@@ -917,6 +930,45 @@ def upsert_points(
917930
print(f"[UPSERT_SUMMARY] Total {failed_count}/{len(points)} points failed to upsert", flush=True)
918931

919932

933+
def flush_upserts(client: QdrantClient, collection: str) -> None:
934+
"""Best-effort sync for pending async upserts.
935+
936+
Call this after a batch of async upserts (INDEX_UPSERT_ASYNC=1) to improve
937+
likelihood that data is visible for subsequent reads.
938+
939+
IMPORTANT: Qdrant's wait=False semantics mean upserts are "confirmed received"
940+
but not necessarily "applied". This function performs operations that encourage
941+
the server to process pending writes, but cannot guarantee immediate consistency.
942+
943+
For strict consistency requirements:
944+
- Use wait=True (INDEX_UPSERT_ASYNC=0) during upserts, or
945+
- Add application-level retry logic for read-after-write scenarios
946+
947+
For remote deployments, network latency may increase the window between
948+
upsert confirmation and data visibility.
949+
950+
Args:
951+
client: Qdrant client instance
952+
collection: Collection name
953+
"""
954+
if not collection:
955+
return
956+
try:
957+
# 1. Get collection info (lightweight metadata read)
958+
client.get_collection(collection)
959+
960+
# 2. Perform a minimal scroll to encourage segment processing
961+
# This touches actual data, which helps flush pending writes
962+
client.scroll(
963+
collection_name=collection,
964+
limit=1,
965+
with_payload=False,
966+
with_vectors=False,
967+
)
968+
except Exception as e:
969+
logger.debug(f"flush_upserts: {e}")
970+
971+
920972
def hash_id(text: str, path: str, start: int, end: int) -> int:
921973
"""Generate a stable hash ID for a chunk."""
922974
h = hashlib.sha1(

scripts/ingest/vectors.py

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,28 @@
1818
_STOP,
1919
)
2020

21+
# Try to use numpy for faster vector operations (10-50x speedup)
22+
try:
23+
import numpy as np
24+
_NUMPY_AVAILABLE = True
25+
except ImportError:
26+
np = None # type: ignore
27+
_NUMPY_AVAILABLE = False
28+
2129
# ---------------------------------------------------------------------------
2230
# Mini vector projection cache
2331
# ---------------------------------------------------------------------------
24-
_MINI_PROJ_CACHE: dict[tuple[int, int, int], list[list[float]]] = {}
32+
# Cache stores numpy arrays when numpy is available, else nested lists
33+
_MINI_PROJ_CACHE: dict[tuple[int, int, int], Any] = {}
2534

2635

2736
def _get_mini_proj(
2837
in_dim: int, out_dim: int, seed: int | None = None
29-
) -> list[list[float]]:
30-
"""Get or create a random projection matrix for mini vectors."""
38+
) -> Any:
39+
"""Get or create a random projection matrix for mini vectors.
40+
41+
Returns numpy array if numpy is available, else nested list.
42+
"""
3143
import math
3244
import random
3345

@@ -38,31 +50,54 @@ def _get_mini_proj(
3850
rnd = random.Random(s)
3951
scale = 1.0 / math.sqrt(out_dim)
4052
# Dense Rademacher matrix (+/-1) scaled; good enough for fast gating
41-
M = [
42-
[scale * (1.0 if rnd.random() < 0.5 else -1.0) for _ in range(out_dim)]
43-
for _ in range(in_dim)
44-
]
53+
if _NUMPY_AVAILABLE:
54+
# Use numpy for faster matrix operations
55+
# Generate same values as pure Python for reproducibility
56+
M_list = [
57+
[scale * (1.0 if rnd.random() < 0.5 else -1.0) for _ in range(out_dim)]
58+
for _ in range(in_dim)
59+
]
60+
M = np.array(M_list, dtype=np.float32)
61+
else:
62+
M = [
63+
[scale * (1.0 if rnd.random() < 0.5 else -1.0) for _ in range(out_dim)]
64+
for _ in range(in_dim)
65+
]
4566
_MINI_PROJ_CACHE[key] = M
4667
return M
4768

4869

4970
def project_mini(vec: list[float], out_dim: int | None = None) -> list[float]:
50-
"""Project a dense vector to a compact mini vector using random projection."""
71+
"""Project a dense vector to a compact mini vector using random projection.
72+
73+
Uses numpy when available for 10-50x speedup, falls back to pure Python.
74+
"""
5175
if not vec:
5276
return [0.0] * (int(out_dim or MINI_VEC_DIM))
5377
od = int(out_dim or MINI_VEC_DIM)
5478
M = _get_mini_proj(len(vec), od)
55-
out = [0.0] * od
56-
# y = x @ M
57-
for i, val in enumerate(vec):
58-
if val == 0.0:
59-
continue
60-
row = M[i]
61-
for j in range(od):
62-
out[j] += val * row[j]
63-
# L2 normalize to keep scale consistent
64-
norm = (sum(x * x for x in out) or 0.0) ** 0.5 or 1.0
65-
return [x / norm for x in out]
79+
80+
if _NUMPY_AVAILABLE:
81+
# Fast path: numpy matrix multiply + normalize
82+
x = np.array(vec, dtype=np.float32)
83+
out = x @ M # (in_dim,) @ (in_dim, out_dim) -> (out_dim,)
84+
norm = np.linalg.norm(out)
85+
if norm > 0:
86+
out = out / norm
87+
return out.tolist()
88+
else:
89+
# Fallback: pure Python implementation
90+
out = [0.0] * od
91+
# y = x @ M
92+
for i, val in enumerate(vec):
93+
if val == 0.0:
94+
continue
95+
row = M[i]
96+
for j in range(od):
97+
out[j] += val * row[j]
98+
# L2 normalize to keep scale consistent
99+
norm = (sum(x * x for x in out) or 0.0) ** 0.5 or 1.0
100+
return [x / norm for x in out]
66101

67102

68103
def _split_ident_lex(s: str) -> List[str]:

0 commit comments

Comments
 (0)