forked from zarr-developers/zarr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_sync_codec_pipeline.py
More file actions
145 lines (118 loc) · 5.16 KB
/
test_sync_codec_pipeline.py
File metadata and controls
145 lines (118 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import annotations
from typing import Any
import numpy as np
import pytest
from zarr.abc.codec import ArrayBytesCodec, Codec
from zarr.codecs.bytes import BytesCodec
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.codecs.gzip import GzipCodec
from zarr.codecs.transpose import TransposeCodec
from zarr.codecs.zstd import ZstdCodec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype
from zarr.core.codec_pipeline import ChunkTransform
from zarr.core.dtype import get_data_type_from_native_dtype
class AsyncOnlyCodec(ArrayBytesCodec):
"""A codec that only supports async, for testing rejection of non-sync codecs."""
is_fixed_size = True
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
raise NotImplementedError # pragma: no cover
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer | None:
raise NotImplementedError # pragma: no cover
def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
return input_byte_length # pragma: no cover
def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec:
zdtype = get_data_type_from_native_dtype(dtype)
return ArraySpec(
shape=shape,
dtype=zdtype,
fill_value=zdtype.cast_scalar(0),
config=ArrayConfig(order="C", write_empty_chunks=True),
prototype=default_buffer_prototype(),
)
def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer:
return default_buffer_prototype().nd_buffer.from_numpy_array(arr)
@pytest.mark.parametrize(
("shape", "codecs"),
[
((100,), (BytesCodec(),)),
((100,), (BytesCodec(), GzipCodec())),
((3, 4), (TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec())),
],
ids=["bytes-only", "with-compression", "full-chain"],
)
def test_construction(shape: tuple[int, ...], codecs: tuple[Codec, ...]) -> None:
"""Construction succeeds when all codecs implement SupportsSyncCodec."""
spec = _make_array_spec(shape, np.dtype("float64"))
ChunkTransform(codecs=codecs, array_spec=spec)
@pytest.mark.parametrize(
("shape", "codecs"),
[
((100,), (AsyncOnlyCodec(),)),
((3, 4), (TransposeCodec(order=(1, 0)), AsyncOnlyCodec())),
],
ids=["async-only", "mixed-sync-and-async"],
)
def test_construction_rejects_non_sync(shape: tuple[int, ...], codecs: tuple[Codec, ...]) -> None:
"""Construction raises TypeError when any codec lacks SupportsSyncCodec."""
spec = _make_array_spec(shape, np.dtype("float64"))
with pytest.raises(TypeError, match="AsyncOnlyCodec"):
ChunkTransform(codecs=codecs, array_spec=spec)
@pytest.mark.parametrize(
("arr", "codecs"),
[
(np.arange(100, dtype="float64"), (BytesCodec(),)),
(np.arange(100, dtype="float64"), (BytesCodec(), GzipCodec(level=1))),
(
np.arange(12, dtype="float64").reshape(3, 4),
(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)),
),
(np.arange(100, dtype="float64"), (BytesCodec(), Crc32cCodec())),
(np.arange(50, dtype="int32"), (BytesCodec(), ZstdCodec(level=1))),
],
ids=["bytes-only", "gzip", "transpose+zstd", "crc32c", "int32"],
)
def test_encode_decode_roundtrip(
arr: np.ndarray[Any, np.dtype[Any]], codecs: tuple[Codec, ...]
) -> None:
"""Data survives a full encode/decode cycle."""
spec = _make_array_spec(arr.shape, arr.dtype)
chain = ChunkTransform(codecs=codecs, array_spec=spec)
nd_buf = _make_nd_buffer(arr)
encoded = chain.encode(nd_buf)
assert encoded is not None
decoded = chain.decode(encoded)
np.testing.assert_array_equal(arr, decoded.as_numpy_array())
@pytest.mark.parametrize(
("shape", "codecs", "input_size", "expected_size"),
[
((100,), (BytesCodec(),), 800, 800),
((100,), (BytesCodec(), Crc32cCodec()), 800, 804),
((3, 4), (TransposeCodec(order=(1, 0)), BytesCodec()), 96, 96),
],
ids=["bytes-only", "crc32c", "transpose"],
)
def test_compute_encoded_size(
shape: tuple[int, ...],
codecs: tuple[Codec, ...],
input_size: int,
expected_size: int,
) -> None:
"""compute_encoded_size returns the correct byte length."""
spec = _make_array_spec(shape, np.dtype("float64"))
chain = ChunkTransform(codecs=codecs, array_spec=spec)
assert chain.compute_encoded_size(input_size, spec) == expected_size
def test_encode_returns_none_propagation() -> None:
"""When an AA codec returns None, encode short-circuits and returns None."""
class NoneReturningAACodec(TransposeCodec):
"""An ArrayArrayCodec that always returns None from encode."""
def _encode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer | None:
return None
spec = _make_array_spec((3, 4), np.dtype("float64"))
chain = ChunkTransform(
codecs=(NoneReturningAACodec(order=(1, 0)), BytesCodec()),
array_spec=spec,
)
arr = np.arange(12, dtype="float64").reshape(3, 4)
nd_buf = _make_nd_buffer(arr)
assert chain.encode(nd_buf) is None