Skip to content

Commit fec5b3e

Browse files
committed
release 0.1.2: batch DAGs, notebook support, thread safety, and cache observability
Features: - Add client.submit_many() with TaskRef for batch submission and automatic topological ordering of dependencies. - Expose ResultRef.commit_hash so users can inspect lineage, diff, and manage commits directly from a result reference. - Add Client.clear() as a convenience wrapper for gc(timedelta(days=0)). - Add CASHET_DIR environment variable fallback for store_dir. - CLI improvements: cashet get pretty-prints strings/dicts/lists, cashet clear alias, and human-readable disk sizes in stats. Notebook & dynamic source support: - Add dill dependency for source extraction in notebooks/REPLs. - Tiered fallback in get_func_source(): inspect.getsource -> dill -> stable bytecode representation (co_code + co_consts). - Functions defined in Jupyter, IPython, exec(), and lambdas now hash correctly and invalidate cache on semantic changes. Thread safety & correctness: - Fix LocalExecutor check-then-act race with a shared lock registry keyed by store path. Concurrent submits deduplicate across threads and even across separate Client instances sharing the same store. - Fix client.get() to populate commit_hash on the temporary ResultRef. Robustness: - Replace string-building _stable_repr with progressive _stable_hash to avoid multi-megabyte intermediate strings for large args. - Add length-prefixed hashing tags to prevent collision attacks. - Add cycle detection in _stable_repr_to and _stable_hash for recursive data structures (lists, dicts, sets, objects). - Add input validation to submit_many for clear TypeError messages on bad task shapes. Tests: - Add coverage for submit_many, TaskRef wiring, progressive hash collision resistance, disk_bytes stats, CASHET_DIR env var, recursive structures, dynamic/bytecode hashing, and concurrent deduplication both within one Client and across multiple Clients. - Verify all 8 pipeline scenarios execute correctly inside an actual Jupyter kernel via nbconvert. Docs: - Document commit_hash, Client.clear(), Jupyter/REPL support, and thread safety guarantees in README.md.
1 parent 0da3801 commit fec5b3e

11 files changed

Lines changed: 370 additions & 30 deletions

File tree

README.md

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ Fix the `join_crm` function and re-run the script. Steps 1-2 return instantly fr
176176

177177
### 3. Reproducible Notebook Results
178178

179-
Share a result with a colleague and they can verify exactly how it was produced:
179+
`cashet` is designed to work in Jupyter notebooks and IPython sessions. Share a result with a colleague and they can verify exactly how it was produced:
180180

181181
```python
182182
# your notebook
@@ -340,13 +340,22 @@ Submit a function for execution. Returns a `ResultRef` — a lazy handle to the
340340

341341
```python
342342
ref = client.submit(my_func, arg1, arg2, key="value")
343-
ref.hash # content hash of the result
344-
ref.size # size in bytes
345-
ref.load() # deserialize and return the result
343+
ref.hash # content hash of the result blob
344+
ref.commit_hash # commit hash (use this for show/history/rm/get)
345+
ref.size # size in bytes
346+
ref.load() # deserialize and return the result
346347
```
347348

348349
If the same function + same arguments have been submitted before, returns the cached result **without re-executing**.
349350

351+
### `client.clear()`
352+
353+
Remove all cache entries and orphaned blobs. Equivalent to `client.gc(timedelta(days=0))`.
354+
355+
```python
356+
client.clear()
357+
```
358+
350359
### `client.submit_many(tasks) -> list[ResultRef]`
351360

352361
Submit a batch of tasks with automatic topological ordering. Use `TaskRef(index)` to wire outputs between tasks in the batch.
@@ -445,6 +454,47 @@ stats = client.stats()
445454
# {'total_commits': 42, 'completed_commits': 40, 'stored_objects': 38, 'disk_bytes': 10485760}
446455
```
447456

457+
### Jupyter & Notebook Support
458+
459+
`cashet` works seamlessly in Jupyter notebooks, IPython, and the Python REPL. It uses a tiered source-resolution strategy:
460+
461+
1. **`inspect.getsource()`** — for normal `.py` files
462+
2. **`dill.source.getsource()`** — for interactive sessions with live history
463+
3. **`dis.Bytecode` fallback** — for any live function, even after a kernel restart
464+
465+
This means you can define functions in a notebook cell, rerun the cell with changes, and `cashet` will correctly invalidate the cache based on the new code.
466+
467+
```python
468+
# In a notebook cell
469+
client = Client()
470+
471+
def preprocess(data):
472+
return [x * 2 for x in data]
473+
474+
ref = client.submit(preprocess, [1, 2, 3])
475+
```
476+
477+
Change the cell body and rerun — the cache invalidates automatically.
478+
479+
### Thread Safety
480+
481+
`cashet` is safe to use from multiple threads (and processes sharing the same store directory). Concurrent submissions of the same uncached task are deduplicated: the function executes **exactly once** and all callers receive the same cached result.
482+
483+
```python
484+
import threading
485+
486+
def worker():
487+
c = Client() # separate Client instance, same store
488+
c.submit(expensive_func, arg)
489+
490+
threads = [threading.Thread(target=worker) for _ in range(10)]
491+
for t in threads:
492+
t.start()
493+
for t in threads:
494+
t.join()
495+
# expensive_func ran only once
496+
```
497+
448498
### `ResultRef`
449499

450500
A lazy reference to a stored result. Pass it as an argument to chain tasks:

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ classifiers = [
2121
requires-python = ">=3.11"
2222
dependencies = [
2323
"click>=8.1",
24+
"dill>=0.3",
2425
"rich>=13.0",
2526
]
2627

src/cashet/client.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def submit(
9292
)
9393
if commit.output_ref is None:
9494
raise RuntimeError(f"Task {commit.task_def.func_name} failed: {commit.error}")
95-
ref = ResultRef(commit.output_ref, self.store, self.serializer)
95+
ref = ResultRef(commit.output_ref, self.store, self.serializer, commit_hash=commit.hash)
9696
return ref
9797

9898
@overload
@@ -168,7 +168,9 @@ def get(self, hash: str) -> Any:
168168
raise KeyError(f"No commit found with hash {hash}")
169169
if commit.output_ref is None:
170170
raise ValueError(f"Commit {hash[:12]} has no output")
171-
ref = ResultRef(commit.output_ref, self.store, self.serializer)
171+
ref = ResultRef(
172+
commit.output_ref, self.store, self.serializer, commit_hash=commit.hash
173+
)
172174
return ref.load()
173175

174176
def diff(self, hash_a: str, hash_b: str) -> dict[str, Any]:
@@ -189,6 +191,9 @@ def gc(self, older_than: timedelta | None = None) -> int:
189191
cutoff = datetime.now(UTC) - ttl
190192
return self.store.evict(cutoff)
191193

194+
def clear(self) -> int:
195+
return self.gc(timedelta(days=0))
196+
192197
def close(self) -> None:
193198
self.store.close()
194199

@@ -342,7 +347,7 @@ def _execute_batch(
342347
)
343348
if commit.output_ref is None:
344349
raise RuntimeError(f"Task {commit.task_def.func_name} failed: {commit.error}")
345-
ref = ResultRef(commit.output_ref, store, serializer)
350+
ref = ResultRef(commit.output_ref, store, serializer, commit_hash=commit.hash)
346351
results[key] = ref
347352
return results
348353

src/cashet/dag.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,19 @@ def resolve_input_refs(args: tuple[Any, ...], kwargs: dict[str, Any]) -> list[Ob
2626

2727

2828
class ResultRef:
29-
__slots__ = ("_loaded", "_ref", "_serializer", "_store", "_value")
30-
31-
def __init__(self, ref: ObjectRef, store: Store, serializer: Serializer) -> None:
29+
__slots__ = ("_commit_hash", "_loaded", "_ref", "_serializer", "_store", "_value")
30+
31+
def __init__(
32+
self,
33+
ref: ObjectRef,
34+
store: Store,
35+
serializer: Serializer,
36+
commit_hash: str = "",
37+
) -> None:
3238
self._ref = ref
3339
self._store = store
3440
self._serializer = serializer
41+
self._commit_hash = commit_hash
3542
self._value: Any = None
3643
self._loaded = False
3744

@@ -42,6 +49,10 @@ def __cashet_ref__(self) -> ObjectRef:
4249
def hash(self) -> str:
4350
return self._ref.hash
4451

52+
@property
53+
def commit_hash(self) -> str:
54+
return self._commit_hash
55+
4556
@property
4657
def short_hash(self) -> str:
4758
return self._ref.short()
@@ -58,7 +69,8 @@ def load(self) -> Any:
5869
return self._value
5970

6071
def __repr__(self) -> str:
61-
return f"ResultRef(hash={self.short_hash}, size={self.size}, loaded={self._loaded})"
72+
ch = self._commit_hash[:12] if self._commit_hash else "?"
73+
return f"ResultRef(commit={ch}, blob={self.short_hash}, size={self.size})"
6274

6375

6476
def compute_commit_hash(

src/cashet/executor.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import threading
44
import traceback
5+
from pathlib import Path
56
from typing import Any
67

78
from cashet.dag import (
@@ -14,11 +15,26 @@
1415
from cashet.models import Commit, ObjectRef, TaskDef, TaskStatus
1516
from cashet.protocols import Store
1617

18+
_STORE_LOCKS: dict[str, threading.Lock] = {}
19+
_STORE_LOCKS_GUARD = threading.Lock()
20+
_FALLBACK_LOCK = threading.Lock()
1721

18-
class LocalExecutor:
19-
def __init__(self) -> None:
20-
self._lock = threading.Lock()
2122

23+
def _get_store_lock(store: Store) -> threading.Lock:
24+
root = getattr(store, "root", None)
25+
if root is not None:
26+
key = str(Path(root).resolve())
27+
with _STORE_LOCKS_GUARD:
28+
if key not in _STORE_LOCKS:
29+
_STORE_LOCKS[key] = threading.Lock()
30+
return _STORE_LOCKS[key]
31+
store_lock = getattr(store, "_lock", None)
32+
if store_lock is not None:
33+
return store_lock
34+
return _FALLBACK_LOCK
35+
36+
37+
class LocalExecutor:
2238
def submit(
2339
self,
2440
func: Any,
@@ -28,7 +44,7 @@ def submit(
2844
store: Store,
2945
serializer: Serializer,
3046
) -> tuple[Commit, bool]:
31-
with self._lock:
47+
with _get_store_lock(store):
3248
input_refs = resolve_input_refs(args, kwargs)
3349
existing = find_existing_commit(store, task_def)
3450
if existing is not None:

0 commit comments

Comments
 (0)