This document specifies the DataJoint Codec API for creating custom attribute types. For the complete type system architecture (core types, built-in codecs, storage modes), see the Type System Specification.
Codecs define bidirectional conversion between Python objects and database storage. They enable storing complex data types (graphs, models, custom formats) while maintaining DataJoint's query capabilities.
flowchart LR
A["Python Object<br/>(e.g. Graph)"] -- encode --> B["Storage Type<br/>(e.g. bytes)"]
B -- decode --> A
There are two approaches for creating custom codecs:
| Pattern | When to Use | Base Class |
|---|---|---|
| Type Chaining | Transform Python objects, use existing storage | dj.Codec |
| SchemaCodec Subclassing | Custom file formats with schema-addressed paths | dj.SchemaCodec |
Chain to an existing codec for storage. Your codec transforms objects; the chained codec handles storage.
import datajoint as dj
import networkx as nx
class GraphCodec(dj.Codec):
"""Store NetworkX graphs."""
name = "graph" # Use as <graph> in definitions
def get_dtype(self, is_store: bool) -> str:
return "<blob>" # Delegate to blob for serialization
def encode(self, graph, *, key=None, store_name=None):
return {
'nodes': list(graph.nodes(data=True)),
'edges': list(graph.edges(data=True)),
}
def decode(self, stored, *, key=None):
G = nx.Graph()
G.add_nodes_from(stored['nodes'])
G.add_edges_from(stored['edges'])
return G
# Use in table definition
@schema
class Connectivity(dj.Manual):
definition = '''
conn_id : int
---
network : <graph> # in-table storage
network_ext : <graph@> # object store
'''For custom file formats that need schema-addressed storage paths.
import datajoint as dj
class ParquetCodec(dj.SchemaCodec):
"""Store DataFrames as Parquet files."""
name = "parquet"
# get_dtype inherited: returns "json", requires @
def encode(self, df, *, key=None, store_name=None):
import io
schema, table, field, pk = self._extract_context(key)
path, _ = self._build_path(schema, table, field, pk, ext=".parquet")
backend = self._get_backend(store_name)
buffer = io.BytesIO()
df.to_parquet(buffer)
backend.put_buffer(buffer.getvalue(), path)
return {"path": path, "store": store_name, "shape": list(df.shape)}
def decode(self, stored, *, key=None):
return ParquetRef(stored, self._get_backend(stored.get("store")))
# Use in table definition (store only)
@schema
class Results(dj.Manual):
definition = '''
result_id : int
---
data : <parquet@>
'''All custom codecs inherit from dj.Codec:
class Codec(ABC):
"""Base class for codec types."""
name: str | None = None # Required: unique identifier
def get_dtype(self, is_store: bool) -> str:
"""Return the storage dtype."""
raise NotImplementedError
@abstractmethod
def encode(self, value, *, key=None, store_name=None) -> Any:
"""Encode Python value for storage."""
...
@abstractmethod
def decode(self, stored, *, key=None) -> Any:
"""Decode stored value back to Python."""
...
def validate(self, value) -> None:
"""Optional: validate value before encoding."""
passFor schema-addressed storage (file formats), inherit from dj.SchemaCodec:
class SchemaCodec(Codec, register=False):
"""Base class for schema-addressed codecs."""
def get_dtype(self, is_store: bool) -> str:
"""Store only, returns 'json'."""
if not is_store:
raise DataJointError(f"<{self.name}> requires @ (store only)")
return "json"
def _extract_context(self, key: dict) -> tuple[str, str, str, dict]:
"""Parse key into (schema, table, field, primary_key)."""
...
def _build_path(self, schema, table, field, pk, ext=None) -> tuple[str, str]:
"""Build schema-addressed path: {schema}/{table}/{pk}/{field}{ext}"""
...
def _get_backend(self, store_name: str = None):
"""Get storage backend by name."""
...The name class attribute is a unique identifier used in table definitions with
<name> syntax:
class MyCodec(dj.Codec):
name = "mycodec" # Use as <mycodec> in definitionsNaming conventions:
- Use lowercase with underscores:
spike_train,graph_embedding - Avoid generic names that might conflict: prefer
lab_modelovermodel - Names must be unique across all registered codecs
Returns the underlying storage type. The is_store parameter indicates whether
the @ modifier is present in the table definition:
def get_dtype(self, is_store: bool) -> str:
"""
Args:
is_store: True if @ modifier present (e.g., <mycodec@store>)
Returns:
- A core type: "bytes", "json", "varchar(N)", "int32", etc.
- Another codec: "<blob>", "<hash>", etc.
Raises:
DataJointError: If store not supported but @ is present
"""Examples:
# Simple: always store as bytes
def get_dtype(self, is_store: bool) -> str:
return "bytes"
# Different behavior for in-table/store
def get_dtype(self, is_store: bool) -> str:
return "<hash>" if is_store else "bytes"
# Store-only codec
def get_dtype(self, is_store: bool) -> str:
if not is_store:
raise DataJointError("<object> requires @ (store only)")
return "json"Converts Python objects to the format expected by get_dtype():
def encode(self, value: Any, *, key: dict | None = None, store_name: str | None = None) -> Any:
"""
Args:
value: The Python object to store
key: Primary key values (for context-dependent encoding)
store_name: Target store name (for in-store storage)
Returns:
Value in the format expected by get_dtype()
"""Converts stored values back to Python objects:
def decode(self, stored: Any, *, key: dict | None = None) -> Any:
"""
Args:
stored: Data retrieved from storage
key: Primary key values (for context-dependent decoding)
Returns:
The reconstructed Python object
"""Called automatically before encode() during INSERT operations:
def validate(self, value: Any) -> None:
"""
Args:
value: The value to validate
Raises:
TypeError: If the value has an incompatible type
ValueError: If the value fails domain validation
"""
if not isinstance(value, ExpectedType):
raise TypeError(f"Expected ExpectedType, got {type(value).__name__}")Codecs automatically register when their class is defined. No decorator needed:
# This codec is registered automatically when the class is defined
class MyCodec(dj.Codec):
name = "mycodec"
# ...For abstract base classes that shouldn't be registered:
class BaseCodec(dj.Codec, register=False):
"""Abstract base - not registered."""
name = None # Or omit entirely
class ConcreteCodec(BaseCodec):
name = "concrete" # This one IS registered
# ...Codecs are registered at class definition time. Ensure your codec classes are imported before any table definitions that use them:
# myproject/codecs.py
class GraphCodec(dj.Codec):
name = "graph"
...
# myproject/tables.py
import myproject.codecs # Ensure codecs are registered
@schema
class Networks(dj.Manual):
definition = '''
id : int
---
network : <graph>
'''Codecs can delegate to other codecs by returning <codec_name> from get_dtype().
This enables layered functionality:
class CompressedJsonCodec(dj.Codec):
"""Compress JSON data with zlib."""
name = "zjson"
def get_dtype(self, is_store: bool) -> str:
return "<blob>" # Delegate serialization to blob codec
def encode(self, value, *, key=None, store_name=None):
import json, zlib
json_bytes = json.dumps(value).encode('utf-8')
return zlib.compress(json_bytes)
def decode(self, stored, *, key=None):
import json, zlib
json_bytes = zlib.decompress(stored)
return json.loads(json_bytes.decode('utf-8'))When DataJoint encounters <zjson>:
- Calls
ZjsonCodec.get_dtype(is_store=False)→ returns"<blob>" - Calls
BlobCodec.get_dtype(is_store=False)→ returns"bytes" - Final storage type is
bytes(LONGBLOB in MySQL)
During INSERT:
ZjsonCodec.encode()converts Python dict → compressed bytesBlobCodec.encode()packs bytes → DJ blob format- Stored in database
During FETCH:
- Read from database
BlobCodec.decode()unpacks DJ blob → compressed bytesZjsonCodec.decode()decompresses → Python dict
DataJoint's built-in codecs form these chains:
| Codec | Chain | Final Storage |
|---|---|---|
<blob> |
<blob> → bytes |
Inline |
<blob@> |
<blob> → <hash> → json |
Store (hash-addressed) |
<attach> |
<attach> → bytes |
Inline |
<attach@> |
<attach> → <hash> → json |
Store (hash-addressed) |
<hash@> |
<hash> → json |
Store only (hash-addressed) |
<object@> |
<object> → json |
Store only (schema-addressed) |
<npy@> |
<npy> → json |
Store only (schema-addressed) |
<filepath@> |
<filepath> → json |
Store only (external ref) |
When using object storage (@), the store name propagates through the chain:
# Table definition
data : <mycodec@coldstore>
# Resolution:
# 1. MyCodec.get_dtype(is_store=True) → "<blob>"
# 2. BlobCodec.get_dtype(is_store=True) → "<hash>"
# 3. HashCodec.get_dtype(is_store=True) → "json"
# 4. store_name="coldstore" passed to HashCodec.encode()Codecs can be distributed as installable packages using Python entry points.
dj-graph-codecs/
├── pyproject.toml
└── src/
└── dj_graph_codecs/
├── __init__.py
└── codecs.py
[project]
name = "dj-graph-codecs"
version = "1.0.0"
dependencies = ["datajoint>=2.0", "networkx"]
[project.entry-points."datajoint.codecs"]
graph = "dj_graph_codecs.codecs:GraphCodec"
weighted_graph = "dj_graph_codecs.codecs:WeightedGraphCodec"# src/dj_graph_codecs/codecs.py
import datajoint as dj
import networkx as nx
class GraphCodec(dj.Codec):
name = "graph"
def get_dtype(self, is_store: bool) -> str:
return "<blob>"
def encode(self, graph, *, key=None, store_name=None):
return {
'nodes': list(graph.nodes(data=True)),
'edges': list(graph.edges(data=True)),
}
def decode(self, stored, *, key=None):
G = nx.Graph()
G.add_nodes_from(stored['nodes'])
G.add_edges_from(stored['edges'])
return G
class WeightedGraphCodec(dj.Codec):
name = "weighted_graph"
def get_dtype(self, is_store: bool) -> str:
return "<blob>"
def encode(self, graph, *, key=None, store_name=None):
return {
'nodes': list(graph.nodes(data=True)),
'edges': [(u, v, d) for u, v, d in graph.edges(data=True)],
}
def decode(self, stored, *, key=None):
G = nx.Graph()
G.add_nodes_from(stored['nodes'])
for u, v, d in stored['edges']:
G.add_edge(u, v, **d)
return Gpip install dj-graph-codecs# Codecs are automatically discovered and available
@schema
class Networks(dj.Manual):
definition = '''
network_id : int
---
topology : <graph>
weights : <weighted_graph>
'''DataJoint loads entry points lazily when a codec is first requested:
- Check explicit registry (codecs defined in current process)
- Load entry points from
datajoint.codecsgroup - Also checks legacy
datajoint.typesgroup for compatibility
import datajoint as dj
# List all registered codec names
dj.list_codecs() # Returns: ['blob', 'hash', 'object', 'attach', 'filepath', ...]
# Get a codec instance by name
codec = dj.get_codec("blob")
codec = dj.get_codec("<blob>") # Angle brackets are optional
codec = dj.get_codec("<blob@store>") # Store parameter is strippedfrom datajoint.codecs import (
is_codec_registered, # Check if codec exists
unregister_codec, # Remove codec (testing only)
resolve_dtype, # Resolve codec chain
parse_type_spec, # Parse "<name@store>" syntax
)DataJoint provides these built-in codecs. See the Type System Specification for detailed behavior and implementation.
| Codec | Inline | Store | Addressing | Description |
|---|---|---|---|---|
<blob> |
bytes |
<hash@> |
Hash | DataJoint serialization for Python objects |
<attach> |
bytes |
<hash@> |
Hash | File attachments with filename preserved |
<hash@> |
N/A | json |
Hash | Hash-addressed storage with MD5 deduplication |
<object@> |
N/A | json |
Schema | Schema-addressed storage for files/folders |
<npy@> |
N/A | json |
Schema | Schema-addressed storage for numpy arrays |
<filepath@> |
N/A | json |
Reference | Reference to existing files in store |
Addressing schemes:
- Hash-addressed: Path from content hash. Automatic deduplication.
- Schema-addressed: Path mirrors database structure. One location per entity.
import datajoint as dj
import numpy as np
class SpikeTrainCodec(dj.Codec):
"""Efficient storage for sparse spike timing data."""
name = "spike_train"
def get_dtype(self, is_store: bool) -> str:
return "<blob>"
def validate(self, value):
if not isinstance(value, np.ndarray):
raise TypeError("Expected numpy array of spike times")
if value.ndim != 1:
raise ValueError("Spike train must be 1-dimensional")
if len(value) > 1 and not np.all(np.diff(value) >= 0):
raise ValueError("Spike times must be sorted")
def encode(self, spike_times, *, key=None, store_name=None):
# Store as differences (smaller values, better compression)
return np.diff(spike_times, prepend=0).astype(np.float32)
def decode(self, stored, *, key=None):
# Reconstruct original spike times
return np.cumsum(stored).astype(np.float64)import datajoint as dj
import pickle
class ModelCodec(dj.Codec):
"""Store ML models with optional in-store storage."""
name = "model"
def get_dtype(self, is_store: bool) -> str:
# Use hash-addressed storage for large models
return "<hash>" if is_store else "<blob>"
def encode(self, model, *, key=None, store_name=None):
return pickle.dumps(model, protocol=pickle.HIGHEST_PROTOCOL)
def decode(self, stored, *, key=None):
return pickle.loads(stored)
def validate(self, value):
# Check that model has required interface
if not hasattr(value, 'predict'):
raise TypeError("Model must have a predict() method")Usage:
@schema
class Models(dj.Manual):
definition = '''
model_id : int
---
small_model : <model> # In-table storage
large_model : <model@> # In-store (default store)
archive_model : <model@cold> # In-store (specific store)
'''import datajoint as dj
import jsonschema
class ConfigCodec(dj.Codec):
"""Store validated JSON configuration."""
name = "config"
SCHEMA = {
"type": "object",
"properties": {
"version": {"type": "integer", "minimum": 1},
"settings": {"type": "object"},
},
"required": ["version", "settings"],
}
def get_dtype(self, is_store: bool) -> str:
return "json"
def validate(self, value):
jsonschema.validate(value, self.SCHEMA)
def encode(self, config, *, key=None, store_name=None):
return config # JSON type handles serialization
def decode(self, stored, *, key=None):
return storedimport datajoint as dj
class VersionedDataCodec(dj.Codec):
"""Handle different encoding versions based on primary key."""
name = "versioned"
def get_dtype(self, is_store: bool) -> str:
return "<blob>"
def encode(self, value, *, key=None, store_name=None):
version = key.get("schema_version", 1) if key else 1
if version >= 2:
return {"v": 2, "data": self._encode_v2(value)}
return {"v": 1, "data": self._encode_v1(value)}
def decode(self, stored, *, key=None):
version = stored.get("v", 1)
if version >= 2:
return self._decode_v2(stored["data"])
return self._decode_v1(stored["data"])
def _encode_v1(self, value):
return value
def _decode_v1(self, data):
return data
def _encode_v2(self, value):
# New encoding format
return {"optimized": True, "payload": value}
def _decode_v2(self, data):
return data["payload"]import datajoint as dj
from pathlib import Path
class ZarrCodec(dj.Codec):
"""Store Zarr arrays in object storage."""
name = "zarr"
def get_dtype(self, is_store: bool) -> str:
if not is_store:
raise dj.DataJointError("<zarr> requires @ (in-store only)")
return "<object>" # Delegate to object storage
def encode(self, value, *, key=None, store_name=None):
import zarr
import tempfile
# If already a path, pass through
if isinstance(value, (str, Path)):
return str(value)
# If zarr array, save to temp and return path
if isinstance(value, zarr.Array):
tmpdir = tempfile.mkdtemp()
path = Path(tmpdir) / "data.zarr"
zarr.save(path, value)
return str(path)
raise TypeError(f"Expected zarr.Array or path, got {type(value)}")
def decode(self, stored, *, key=None):
# ObjectCodec returns ObjectRef, use its fsmap for zarr
import zarr
return zarr.open(stored.fsmap, mode='r')| Data Type | Recommended get_dtype() |
|---|---|
| Python objects (dicts, arrays) | "<blob>" |
| Large binary data | "<hash>" (external) |
| Files/folders (Zarr, HDF5) | "<object>" (external) |
| Simple JSON-serializable | "json" |
| Short strings | "varchar(N)" |
| Numeric identifiers | "int32", "int64" |
Nullable columns may pass None to your codec:
def encode(self, value, *, key=None, store_name=None):
if value is None:
return None # Pass through for nullable columns
return self._actual_encode(value)
def decode(self, stored, *, key=None):
if stored is None:
return None
return self._actual_decode(stored)Always verify that decode(encode(x)) == x:
def test_codec_roundtrip():
codec = MyCodec()
test_values = [
{"key": "value"},
[1, 2, 3],
np.array([1.0, 2.0]),
]
for original in test_values:
encoded = codec.encode(original)
decoded = codec.decode(encoded)
assert decoded == original or np.array_equal(decoded, original)Catch errors early with validate():
def validate(self, value):
if not isinstance(value, ExpectedType):
raise TypeError(f"Expected ExpectedType, got {type(value).__name__}")
if not self._is_valid(value):
raise ValueError("Value fails validation constraints")Include docstrings explaining input/output formats:
class MyCodec(dj.Codec):
"""
Store MyType objects.
Input format (encode):
MyType instance with attributes: x, y, z
Storage format:
Dict with keys: 'x', 'y', 'z'
Output format (decode):
MyType instance reconstructed from storage
"""If your encoding format might change:
def encode(self, value, *, key=None, store_name=None):
return {
"_version": 2,
"_data": self._encode_v2(value),
}
def decode(self, stored, *, key=None):
version = stored.get("_version", 1)
data = stored.get("_data", stored)
if version == 1:
return self._decode_v1(data)
return self._decode_v2(data)| Error | Cause | Solution |
|---|---|---|
Unknown codec: <name> |
Codec not registered | Import module defining codec before table definition |
Codec <name> already registered |
Duplicate name | Use unique names; check for conflicts |
<codec> requires @ |
In-store-only codec used without @ | Add @ or @store to attribute type |
Circular codec reference |
Codec chain forms a loop | Check get_dtype() return values |
# Check what codecs are registered
print(dj.list_codecs())
# Inspect a codec
codec = dj.get_codec("mycodec")
print(f"Name: {codec.name}")
print(f"In-table dtype: {codec.get_dtype(is_store=False)}")
print(f"In-store dtype: {codec.get_dtype(is_store=True)}")
# Resolve full chain
from datajoint.codecs import resolve_dtype
final_type, chain, store = resolve_dtype("<mycodec@store>")
print(f"Final storage type: {final_type}")
print(f"Codec chain: {[c.name for c in chain]}")
print(f"Store: {store}")