Manage computation errors and recover failed jobs.
Continue processing despite individual failures:
# Stop on first error (default)
SessionAnalysis.populate()
# Log errors but continue
SessionAnalysis.populate(suppress_errors=True)Check the jobs table for errors:
# All error jobs
SessionAnalysis.jobs.errors
# View error details
for job in SessionAnalysis.jobs.errors.to_dicts():
print(f"Key: {job}")
print(f"Message: {job['error_message']}")Error stack traces are stored in the jobs table:
job = (SessionAnalysis.jobs.errors & key).fetch1()
print(job['error_stack'])Clear error status and rerun:
# Delete error records to retry
SessionAnalysis.jobs.errors.delete()
# Reprocess
SessionAnalysis.populate(reserve_jobs=True)Target specific failed jobs:
# Clear one error
(SessionAnalysis.jobs & key & "status='error'").delete()
# Retry just that key
SessionAnalysis.populate(key, reserve_jobs=True)Mark jobs to skip permanently:
# Mark job as ignored
SessionAnalysis.jobs.ignore(key)
# View ignored jobs
SessionAnalysis.jobs.ignoredHandle expected errors gracefully:
@schema
class SessionAnalysis(dj.Computed):
definition = """
-> Session
---
result : float64
"""
def make(self, key):
try:
data = (Session & key).fetch1('data')
result = risky_computation(data)
except ValueError as e:
# Log and skip this key
logger.warning(f"Skipping {key}: {e}")
return # Don't insert, job remains pending
self.insert1({**key, 'result': result})Failed make() calls automatically rollback:
def make(self, key):
# These inserts are in a transaction
self.insert1({**key, 'result': value1})
PartTable.insert(parts)
# If this raises, all inserts are rolled back
validate_result(key)Get exception objects for programmatic handling:
result = SessionAnalysis.populate(
suppress_errors=True,
return_exception_objects=True
)
for key, exception in result['error_list']:
if isinstance(exception, TimeoutError):
# Handle timeout differently
schedule_for_later(key)Track errors over time:
progress = SessionAnalysis.jobs.progress()
print(f"Pending: {progress.get('pending', 0)}")
print(f"Errors: {progress.get('error', 0)}")
print(f"Success: {progress.get('success', 0)}")
error_rate = progress.get('error', 0) / sum(progress.values())
print(f"Error rate: {error_rate:.1%}")def make(self, key):
data = (RawData & key).fetch1('data')
if not validate_data(data):
raise DataJointError(f"Invalid data for {key}")
# Process valid data
self.insert1({**key, 'result': process(data)})def make(self, key):
try:
result = memory_intensive_computation(key)
except MemoryError:
# Clear caches and retry once
gc.collect()
result = memory_intensive_computation(key)
self.insert1({**key, 'result': result})def make(self, key):
for attempt in range(3):
try:
data = fetch_from_external_api(key)
break
except ConnectionError:
if attempt == 2:
raise
time.sleep(2 ** attempt) # Exponential backoff
self.insert1({**key, 'result': process(data)})- Run Computations — Basic populate usage
- Distributed Computing — Multi-worker error handling
- Monitor Progress — Tracking job status