Skip to content

Commit 0da3801

Browse files
committed
release 0.1.2: batch tasks, progressive hashing, CLI polish, thread safety
Features: - Add client.submit_many() with TaskRef for batch submission and automatic topological ordering of dependencies. - Add CASHET_DIR environment variable fallback for store_dir. - Export TaskRef from cashet package. - Add Commit.__repr__ for nicer debugging. Performance & correctness: - Replace string-building _stable_repr with progressive _stable_hash to avoid building multi-megabyte intermediate strings for large args (lists/dicts with 100k+ items). - Add length-prefixed hashing tags to prevent collision attacks. CLI improvements: - cashet get now pretty-prints strings, dicts, and lists. - Add cashet clear alias for gc --older-than 0. - cashet stats now reports disk_bytes with human-readable sizes. Robustness: - Add input validation to submit_many for clear error messages on bad task shapes. - Fix LocalExecutor check-then-act race with threading.Lock so concurrent submits for the same uncached task execute only once. Tests: - Add coverage for submit_many, TaskRef wiring, progressive hash collision resistance, disk_bytes stats, CASHET_DIR env var, and concurrent deduplication.
1 parent 8f7d7f7 commit 0da3801

14 files changed

Lines changed: 707 additions & 72 deletions

File tree

README.md

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,16 +111,16 @@ You already have caches (`functools.lru_cache`, `joblib.Memory`). Here's what's
111111

112112
| | lru_cache | joblib.Memory | **cashet** |
113113
|---|---|---|---|
114-
| Persists across restarts | No | Yes | Yes |
115-
| Content-addressable storage | No | No | Yes (like git blobs) |
116114
| AST-normalized hashing | No | No | Yes (comments/formatting don't break cache) |
117115
| DAG resolution (chain outputs) | No | No | Yes |
116+
| Content-addressable storage | No | No | Yes (like git blobs) |
118117
| CLI to inspect history | No | No | Yes |
119118
| Diff two runs | No | No | Yes |
120119
| Garbage collection / eviction | No | No | Yes |
121120
| Pluggable serialization | No | No | Yes |
122121
| Explicit cache opt-out | No | Partial | Yes |
123122
| Pluggable store / executor | No | No | Yes |
123+
| Persists across restarts | No | Yes | Yes |
124124

125125
The core idea: **hash the function's AST-normalized source + arguments = unique cache key**. Comments, docstrings, and formatting changes don't invalidate the cache — only semantic changes do. Same function + same args = same result, stored immutably on disk. The result is a git-like blob you can inspect, diff, and chain.
126126

@@ -131,7 +131,7 @@ The core idea: **hash the function's AST-normalized source + arguments = unique
131131
You run 200 hyperparameter sweeps overnight. Half crash. You fix a bug and re-run. Without cashet, you re-process the dataset 200 times. With cashet:
132132

133133
```python
134-
from cashet import Client
134+
from cashet import Client, TaskRef
135135

136136
client = Client()
137137

@@ -142,11 +142,17 @@ def preprocess(dataset_path, image_size):
142142
def train(data, learning_rate, dropout):
143143
...
144144

145-
data = client.submit(preprocess, "s3://my-bucket/images", 224)
146-
147-
for lr in [0.01, 0.001, 0.0001]:
148-
for dropout in [0.2, 0.5]:
149-
client.submit(train, data, lr, dropout)
145+
# Batch submit with topological ordering
146+
# TaskRef(0) refers to the first task's output
147+
results = client.submit_many([
148+
(preprocess, ("s3://my-bucket/images", 224)),
149+
(train, (TaskRef(0), 0.01, 0.2)),
150+
(train, (TaskRef(0), 0.01, 0.5)),
151+
(train, (TaskRef(0), 0.001, 0.2)),
152+
(train, (TaskRef(0), 0.001, 0.5)),
153+
(train, (TaskRef(0), 0.0001, 0.2)),
154+
(train, (TaskRef(0), 0.0001, 0.5)),
155+
])
150156
```
151157

152158
`preprocess` runs **once** — all 6 training jobs reuse its cached output. Re-run the script tomorrow and even the training results come from cache (same function + same args = instant).
@@ -229,7 +235,10 @@ cashet log --tag env=prod --tag experiment=run-1
229235
# Show full commit details (source code, args, error)
230236
cashet show <hash>
231237

232-
# Retrieve a result to file
238+
# Retrieve a result (pretty-prints strings/dicts/lists)
239+
cashet get <hash>
240+
241+
# Write a result to file
233242
cashet get <hash> -o output.bin
234243

235244
# Compare two commits
@@ -244,7 +253,10 @@ cashet rm <hash>
244253
# Evict old cache entries and orphaned blobs
245254
cashet gc --older-than 30
246255

247-
# Storage statistics
256+
# Clear everything (alias for gc --older-than 0)
257+
cashet clear
258+
259+
# Storage statistics (includes disk size)
248260
cashet stats
249261
```
250262

@@ -256,7 +268,8 @@ cashet stats
256268
from cashet import Client
257269

258270
client = Client(
259-
store_dir=".cashet", # where to store blobs + metadata (SQLiteStore)
271+
store_dir=".cashet", # where to store blobs + metadata (SQLiteStore)
272+
# falls back to $CASHET_DIR env var if set
260273
store=None, # or inject any Store implementation
261274
executor=None, # or inject any Executor implementation
262275
serializer=None, # defaults to PickleSerializer
@@ -334,6 +347,22 @@ ref.load() # deserialize and return the result
334347

335348
If the same function + same arguments have been submitted before, returns the cached result **without re-executing**.
336349

350+
### `client.submit_many(tasks) -> list[ResultRef]`
351+
352+
Submit a batch of tasks with automatic topological ordering. Use `TaskRef(index)` to wire outputs between tasks in the batch.
353+
354+
```python
355+
from cashet import TaskRef
356+
357+
refs = client.submit_many([
358+
step1_func,
359+
(step2_func, (TaskRef(0),)),
360+
(step3_func, (TaskRef(1), "extra_arg")),
361+
])
362+
```
363+
364+
This enables parallel fan-out and ensures each task only runs after its dependencies.
365+
337366
**Opt out of caching:**
338367

339368
```python
@@ -413,7 +442,7 @@ evicted = client.gc(older_than=timedelta(days=7))
413442

414443
# Storage stats
415444
stats = client.stats()
416-
# {'total_commits': 42, 'completed_commits': 40, 'stored_objects': 38}
445+
# {'total_commits': 42, 'completed_commits': 40, 'stored_objects': 38, 'disk_bytes': 10485760}
417446
```
418447

419448
### `ResultRef`

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "cashet"
3-
version = "0.1.1"
3+
version = "0.1.2"
44
description = "Content-addressable compute cache with git semantics"
55
readme = "README.md"
66
license = "MIT"

src/cashet/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from cashet.client import Client
2-
from cashet.dag import ResultRef
2+
from cashet.dag import ResultRef, TaskRef
33
from cashet.executor import LocalExecutor
44
from cashet.hashing import (
55
ClosureWarning,
@@ -27,5 +27,6 @@
2727
"Serializer",
2828
"Store",
2929
"TaskDef",
30+
"TaskRef",
3031
"TaskStatus",
3132
]

src/cashet/cli.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,17 @@ def get_cmd(hash: str, output: str | None) -> None:
149149
pathlib.Path(output).write_bytes(ref)
150150
console.print(f"Written {len(ref)} bytes to {output}")
151151
else:
152-
console.print(f"[dim]{len(ref)} bytes — use -o to write to file[/dim]")
152+
try:
153+
value = client.serializer.loads(ref)
154+
except Exception:
155+
console.print(f"[dim]{len(ref)} bytes — use -o to write to file[/dim]")
156+
return
157+
if isinstance(value, str):
158+
console.print(value)
159+
elif isinstance(value, bytes):
160+
console.print(f"[dim]{len(ref)} bytes — use -o to write to file[/dim]")
161+
else:
162+
console.print(json.dumps(value, indent=2, default=str))
153163

154164

155165
@main.command("diff")
@@ -201,6 +211,15 @@ def history_cmd(hash: str) -> None:
201211
)
202212

203213

214+
def _fmt_bytes(n: int) -> str:
215+
val = float(n)
216+
for unit in ("B", "KB", "MB", "GB", "TB"):
217+
if val < 1024:
218+
return f"{val:.2f} {unit}" if unit != "B" else f"{int(val)} B"
219+
val /= 1024
220+
return f"{val:.2f} PB"
221+
222+
204223
@main.command("stats")
205224
def stats_cmd() -> None:
206225
"""Show storage statistics"""
@@ -210,7 +229,12 @@ def stats_cmd() -> None:
210229
table.add_column("Metric", style="cyan")
211230
table.add_column("Value", style="green", justify="right")
212231
for k, v in s.items():
213-
table.add_row(k, str(v))
232+
label = k
233+
val = str(v)
234+
if k == "disk_bytes":
235+
label = "disk_size"
236+
val = _fmt_bytes(v)
237+
table.add_row(label, val)
214238
console.print(table)
215239

216240

@@ -242,5 +266,15 @@ def gc_cmd(older_than: int) -> None:
242266
console.print(f"[green]Evicted {deleted} commit(s) older than {older_than} day(s).[/green]")
243267

244268

269+
@main.command("clear")
270+
def clear_cmd() -> None:
271+
"""Remove all cache entries and orphaned blobs (alias for gc --older-than 0)"""
272+
from datetime import timedelta
273+
274+
client = _client()
275+
deleted = client.gc(timedelta(days=0))
276+
console.print(f"[green]Cleared {deleted} commit(s).[/green]")
277+
278+
245279
if __name__ == "__main__":
246280
main()

0 commit comments

Comments
 (0)