Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit ed90de4

Browse files
committed
unit tests
1 parent 3bda0f6 commit ed90de4

1 file changed

Lines changed: 137 additions & 1 deletion

File tree

packages/jumpstarter/jumpstarter/streams/encoding_test.py

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1+
import bz2
2+
import gzip
3+
import lzma
4+
import os
5+
import sys
16
from io import BytesIO
27

38
import pytest
49
from anyio import EndOfStream, create_memory_object_stream
510
from anyio.streams.stapled import StapledObjectStream
611

7-
from .encoding import compress_stream
12+
if sys.version_info >= (3, 14):
13+
from compression import zstd
14+
else:
15+
from backports import zstd
16+
17+
from .encoding import (
18+
COMPRESSION_SIGNATURES,
19+
AutoDecompressIterator,
20+
Compression,
21+
compress_stream,
22+
detect_compression_from_signature,
23+
)
824

925
pytestmark = pytest.mark.anyio
1026

@@ -28,3 +44,123 @@ async def test_compress_stream(compression):
2844
except EndOfStream:
2945
break
3046
assert result.getvalue() == b"hello"
47+
48+
49+
def _get_signature(compression: Compression) -> bytes:
50+
"""Helper to get signature bytes for a compression type."""
51+
for sig in COMPRESSION_SIGNATURES:
52+
if sig.compression == compression:
53+
return sig.signature
54+
raise ValueError(f"No signature found for {compression}")
55+
56+
57+
class TestDetectCompressionFromSignature:
58+
"""Tests for file signature detection."""
59+
60+
@pytest.mark.parametrize(
61+
"compression",
62+
[Compression.GZIP, Compression.XZ, Compression.BZ2, Compression.ZSTD],
63+
)
64+
def test_detect_from_signature(self, compression):
65+
"""Each compression format should be detected from its signature."""
66+
signature = _get_signature(compression)
67+
# Pad with random bytes to simulate real file content
68+
data = signature + os.urandom(4)
69+
assert detect_compression_from_signature(data) == compression
70+
71+
def test_detect_uncompressed(self):
72+
# Random data that doesn't match any compression format
73+
assert detect_compression_from_signature(b"hello world") is None
74+
75+
def test_detect_empty(self):
76+
assert detect_compression_from_signature(b"") is None
77+
78+
def test_detect_too_short(self):
79+
# Truncated signatures should not match
80+
assert detect_compression_from_signature(b"\x1f") is None # gzip partial
81+
assert detect_compression_from_signature(b"\xfd\x37\x7a") is None # xz partial
82+
83+
def test_detect_from_real_gzip_data(self):
84+
compressed = gzip.compress(b"test data")
85+
assert detect_compression_from_signature(compressed) == Compression.GZIP
86+
87+
def test_detect_from_real_xz_data(self):
88+
compressed = lzma.compress(b"test data", format=lzma.FORMAT_XZ)
89+
assert detect_compression_from_signature(compressed) == Compression.XZ
90+
91+
def test_detect_from_real_bz2_data(self):
92+
compressed = bz2.compress(b"test data")
93+
assert detect_compression_from_signature(compressed) == Compression.BZ2
94+
95+
def test_detect_from_real_zstd_data(self):
96+
compressed = zstd.compress(b"test data")
97+
assert detect_compression_from_signature(compressed) == Compression.ZSTD
98+
99+
100+
class TestAutoDecompressIterator:
101+
"""Tests for auto-decompressing async iterator."""
102+
103+
async def _async_iter_from_bytes(self, data: bytes, chunk_size: int):
104+
"""Helper to create an async iterator from bytes."""
105+
for i in range(0, len(data), chunk_size):
106+
yield data[i : i + chunk_size]
107+
108+
async def _decompress_and_check(self, compressed: bytes, expected: bytes, chunk_size: int = 16):
109+
"""Helper to decompress data and verify it matches expected output."""
110+
chunks = []
111+
async for chunk in AutoDecompressIterator(source=self._async_iter_from_bytes(compressed, chunk_size)):
112+
chunks.append(chunk)
113+
assert b"".join(chunks) == expected
114+
115+
async def test_passthrough_uncompressed(self):
116+
"""Uncompressed data should pass through unchanged."""
117+
original = b"hello world, this is uncompressed data"
118+
await self._decompress_and_check(original, original)
119+
120+
async def test_decompress_gzip(self):
121+
"""Gzip compressed data should be decompressed."""
122+
original = b"hello world, this is gzip compressed data"
123+
compressed = gzip.compress(original)
124+
await self._decompress_and_check(compressed, original)
125+
126+
async def test_decompress_xz(self):
127+
"""XZ compressed data should be decompressed."""
128+
original = b"hello world, this is xz compressed data"
129+
compressed = lzma.compress(original, format=lzma.FORMAT_XZ)
130+
await self._decompress_and_check(compressed, original)
131+
132+
async def test_decompress_bz2(self):
133+
"""BZ2 compressed data should be decompressed."""
134+
original = b"hello world, this is bz2 compressed data"
135+
compressed = bz2.compress(original)
136+
await self._decompress_and_check(compressed, original)
137+
138+
async def test_decompress_zstd(self):
139+
"""Zstd compressed data should be decompressed."""
140+
original = b"hello world, this is zstd compressed data"
141+
compressed = zstd.compress(original)
142+
await self._decompress_and_check(compressed, original)
143+
144+
async def test_small_chunks(self):
145+
"""Should work with very small chunks."""
146+
original = b"hello world"
147+
compressed = gzip.compress(original)
148+
await self._decompress_and_check(compressed, original, chunk_size=1)
149+
150+
async def test_empty_input(self):
151+
"""Empty input should produce no output."""
152+
153+
async def empty_iter():
154+
if False:
155+
yield
156+
157+
chunks = []
158+
async for chunk in AutoDecompressIterator(source=empty_iter()):
159+
chunks.append(chunk)
160+
assert chunks == []
161+
162+
async def test_large_data(self):
163+
"""Should handle large data correctly."""
164+
original = b"x" * 1024 * 1024 # 1MB of data
165+
compressed = gzip.compress(original)
166+
await self._decompress_and_check(compressed, original, chunk_size=65536)

0 commit comments

Comments
 (0)