Skip to content

Commit d025274

Browse files
dtfiedlerclaude
andcommitted
feat(bundle): add stream signing support for large files
Add streaming hash and signing functions that process data incrementally without loading the entire payload into memory. This enables signing large files with constant memory usage and progress callbacks. New functions: - deep_hash_blob_stream: Stream-hash a blob - get_signature_data_stream: Compute signature hash with streaming data - sign_stream: Sign streaming data end-to-end Features: - 256 KiB default chunk size (matches Arweave) - Progress callbacks: on_progress(processed_bytes, total_bytes) - Produces identical signatures to in-memory sign() PE-8860 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 198c9ec commit d025274

3 files changed

Lines changed: 515 additions & 1 deletion

File tree

tests/test_stream_signing.py

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
"""
2+
Unit tests for stream signing functionality.
3+
4+
Tests verify that stream signing produces identical results to in-memory signing
5+
and that progress callbacks work correctly.
6+
"""
7+
8+
import io
9+
import pytest
10+
from turbo_sdk.bundle import create_data, sign, encode_tags
11+
from turbo_sdk.bundle.sign import (
12+
deep_hash,
13+
deep_hash_blob_stream,
14+
get_signature_data,
15+
get_signature_data_stream,
16+
sign_stream,
17+
DEFAULT_STREAM_CHUNK_SIZE,
18+
)
19+
from turbo_sdk.signers import EthereumSigner
20+
21+
22+
# Test private key (not a real key, just for testing)
23+
TEST_PRIVATE_KEY = "0x" + "ab" * 32
24+
25+
26+
class TestDeepHashBlobStream:
27+
"""Tests for deep_hash_blob_stream function."""
28+
29+
def test_matches_in_memory_deep_hash(self):
30+
"""Stream hash should match in-memory hash for same data."""
31+
test_data = b"Hello, this is test data for streaming hash verification!" * 100
32+
stream = io.BytesIO(test_data)
33+
34+
in_memory_hash = deep_hash(test_data)
35+
stream_hash = deep_hash_blob_stream(stream, len(test_data))
36+
37+
assert in_memory_hash == stream_hash
38+
39+
def test_empty_data(self):
40+
"""Should handle empty data correctly."""
41+
test_data = b""
42+
stream = io.BytesIO(test_data)
43+
44+
in_memory_hash = deep_hash(test_data)
45+
stream_hash = deep_hash_blob_stream(stream, len(test_data))
46+
47+
assert in_memory_hash == stream_hash
48+
49+
def test_single_byte(self):
50+
"""Should handle single byte data."""
51+
test_data = b"x"
52+
stream = io.BytesIO(test_data)
53+
54+
in_memory_hash = deep_hash(test_data)
55+
stream_hash = deep_hash_blob_stream(stream, len(test_data))
56+
57+
assert in_memory_hash == stream_hash
58+
59+
def test_large_data(self):
60+
"""Should handle data larger than chunk size."""
61+
# Create data larger than default chunk size (256 KiB)
62+
test_data = b"x" * (DEFAULT_STREAM_CHUNK_SIZE * 3 + 1000)
63+
stream = io.BytesIO(test_data)
64+
65+
in_memory_hash = deep_hash(test_data)
66+
stream_hash = deep_hash_blob_stream(stream, len(test_data))
67+
68+
assert in_memory_hash == stream_hash
69+
70+
def test_custom_chunk_size(self):
71+
"""Should work with custom chunk sizes."""
72+
test_data = b"Test data for custom chunk size" * 100
73+
stream = io.BytesIO(test_data)
74+
75+
in_memory_hash = deep_hash(test_data)
76+
stream_hash = deep_hash_blob_stream(stream, len(test_data), chunk_size=64)
77+
78+
assert in_memory_hash == stream_hash
79+
80+
def test_progress_callback_called(self):
81+
"""Progress callback should be called during hashing."""
82+
test_data = b"x" * 5000
83+
stream = io.BytesIO(test_data)
84+
progress_calls = []
85+
86+
def on_progress(processed, total):
87+
progress_calls.append((processed, total))
88+
89+
deep_hash_blob_stream(stream, len(test_data), chunk_size=1024, on_progress=on_progress)
90+
91+
assert len(progress_calls) > 0
92+
assert progress_calls[-1] == (len(test_data), len(test_data))
93+
94+
def test_progress_callback_increments(self):
95+
"""Progress should increment correctly."""
96+
test_data = b"x" * 3000
97+
stream = io.BytesIO(test_data)
98+
progress_calls = []
99+
100+
def on_progress(processed, total):
101+
progress_calls.append((processed, total))
102+
103+
deep_hash_blob_stream(stream, len(test_data), chunk_size=1000, on_progress=on_progress)
104+
105+
# Should have 3 calls for 3000 bytes with 1000 byte chunks
106+
assert len(progress_calls) == 3
107+
assert progress_calls[0] == (1000, 3000)
108+
assert progress_calls[1] == (2000, 3000)
109+
assert progress_calls[2] == (3000, 3000)
110+
111+
def test_raises_on_premature_stream_end(self):
112+
"""Should raise error if stream ends before expected size."""
113+
test_data = b"short"
114+
stream = io.BytesIO(test_data)
115+
116+
with pytest.raises(ValueError, match="Stream ended prematurely"):
117+
deep_hash_blob_stream(stream, len(test_data) + 100)
118+
119+
120+
class TestGetSignatureDataStream:
121+
"""Tests for get_signature_data_stream function."""
122+
123+
def test_matches_in_memory_signature_data(self):
124+
"""Stream signature data should match in-memory version."""
125+
sig_type = 3 # Ethereum
126+
raw_owner = b"x" * 65
127+
raw_target = b""
128+
raw_anchor = b"a" * 32
129+
raw_tags = b""
130+
data = b"Test payload data" * 500
131+
132+
# Create mock dataitem for in-memory comparison
133+
class MockDataItem:
134+
signature_type = sig_type
135+
raw_owner = b"x" * 65
136+
raw_target = b""
137+
raw_anchor = b"a" * 32
138+
raw_tags = b""
139+
raw_data = data
140+
141+
mock_item = MockDataItem()
142+
in_memory_hash = get_signature_data(mock_item)
143+
144+
data_stream = io.BytesIO(data)
145+
stream_hash = get_signature_data_stream(
146+
signature_type=sig_type,
147+
raw_owner=raw_owner,
148+
raw_target=raw_target,
149+
raw_anchor=raw_anchor,
150+
raw_tags=raw_tags,
151+
data_stream=data_stream,
152+
data_size=len(data),
153+
)
154+
155+
assert in_memory_hash == stream_hash
156+
157+
def test_with_tags(self):
158+
"""Should work correctly with encoded tags."""
159+
sig_type = 3
160+
raw_owner = b"x" * 65
161+
raw_target = b""
162+
raw_anchor = b"a" * 32
163+
raw_tags = encode_tags([{"name": "Content-Type", "value": "text/plain"}])
164+
data = b"Data with tags"
165+
166+
class MockDataItem:
167+
signature_type = sig_type
168+
raw_owner = b"x" * 65
169+
raw_target = b""
170+
raw_anchor = b"a" * 32
171+
raw_tags = encode_tags([{"name": "Content-Type", "value": "text/plain"}])
172+
raw_data = data
173+
174+
mock_item = MockDataItem()
175+
in_memory_hash = get_signature_data(mock_item)
176+
177+
data_stream = io.BytesIO(data)
178+
stream_hash = get_signature_data_stream(
179+
signature_type=sig_type,
180+
raw_owner=raw_owner,
181+
raw_target=raw_target,
182+
raw_anchor=raw_anchor,
183+
raw_tags=raw_tags,
184+
data_stream=data_stream,
185+
data_size=len(data),
186+
)
187+
188+
assert in_memory_hash == stream_hash
189+
190+
def test_with_target(self):
191+
"""Should work correctly with target address."""
192+
sig_type = 3
193+
raw_owner = b"x" * 65
194+
raw_target = b"t" * 32 # 32-byte target
195+
raw_anchor = b"a" * 32
196+
raw_tags = b""
197+
data = b"Data with target"
198+
199+
class MockDataItem:
200+
signature_type = sig_type
201+
raw_owner = b"x" * 65
202+
raw_target = b"t" * 32
203+
raw_anchor = b"a" * 32
204+
raw_tags = b""
205+
raw_data = data
206+
207+
mock_item = MockDataItem()
208+
in_memory_hash = get_signature_data(mock_item)
209+
210+
data_stream = io.BytesIO(data)
211+
stream_hash = get_signature_data_stream(
212+
signature_type=sig_type,
213+
raw_owner=raw_owner,
214+
raw_target=raw_target,
215+
raw_anchor=raw_anchor,
216+
raw_tags=raw_tags,
217+
data_stream=data_stream,
218+
data_size=len(data),
219+
)
220+
221+
assert in_memory_hash == stream_hash
222+
223+
224+
class TestSignStream:
225+
"""Tests for sign_stream function."""
226+
227+
@pytest.fixture
228+
def signer(self):
229+
"""Create a real Ethereum signer for testing."""
230+
return EthereumSigner(TEST_PRIVATE_KEY)
231+
232+
def test_matches_in_memory_sign(self, signer):
233+
"""Stream signing should produce identical signature to in-memory signing."""
234+
test_data = b"Hello Arweave! " * 1000
235+
tags = [{"name": "Content-Type", "value": "text/plain"}]
236+
237+
# In-memory signing via DataItem
238+
data_item = create_data(bytearray(test_data), signer, tags)
239+
sign(data_item, signer)
240+
in_memory_signature = data_item.raw_signature
241+
242+
# Stream signing
243+
encoded_tags = encode_tags(tags)
244+
stream = io.BytesIO(test_data)
245+
246+
stream_signature = sign_stream(
247+
signature_type=signer.signature_type,
248+
raw_owner=signer.public_key,
249+
raw_target=b"",
250+
raw_anchor=data_item.raw_anchor, # Use same anchor
251+
raw_tags=encoded_tags,
252+
data_stream=stream,
253+
data_size=len(test_data),
254+
signer=signer,
255+
)
256+
257+
assert in_memory_signature == stream_signature
258+
259+
def test_with_progress_callback(self, signer):
260+
"""Progress callback should work during stream signing."""
261+
test_data = b"x" * 10000
262+
progress_calls = []
263+
264+
def on_progress(processed, total):
265+
progress_calls.append((processed, total))
266+
267+
stream = io.BytesIO(test_data)
268+
269+
sign_stream(
270+
signature_type=signer.signature_type,
271+
raw_owner=signer.public_key,
272+
raw_target=b"",
273+
raw_anchor=b"a" * 32,
274+
raw_tags=b"",
275+
data_stream=stream,
276+
data_size=len(test_data),
277+
signer=signer,
278+
chunk_size=1000,
279+
on_progress=on_progress,
280+
)
281+
282+
assert len(progress_calls) == 10
283+
assert progress_calls[-1] == (10000, 10000)
284+
285+
def test_different_data_produces_different_signature(self, signer):
286+
"""Different data should produce different signatures."""
287+
data1 = b"First data payload"
288+
data2 = b"Second data payload"
289+
anchor = b"a" * 32
290+
291+
stream1 = io.BytesIO(data1)
292+
sig1 = sign_stream(
293+
signature_type=signer.signature_type,
294+
raw_owner=signer.public_key,
295+
raw_target=b"",
296+
raw_anchor=anchor,
297+
raw_tags=b"",
298+
data_stream=stream1,
299+
data_size=len(data1),
300+
signer=signer,
301+
)
302+
303+
stream2 = io.BytesIO(data2)
304+
sig2 = sign_stream(
305+
signature_type=signer.signature_type,
306+
raw_owner=signer.public_key,
307+
raw_target=b"",
308+
raw_anchor=anchor,
309+
raw_tags=b"",
310+
data_stream=stream2,
311+
data_size=len(data2),
312+
signer=signer,
313+
)
314+
315+
assert sig1 != sig2
316+
317+
318+
class TestDefaultChunkSize:
319+
"""Tests for default chunk size constant."""
320+
321+
def test_default_chunk_size_is_256_kib(self):
322+
"""Default chunk size should be 256 KiB to match Arweave."""
323+
assert DEFAULT_STREAM_CHUNK_SIZE == 256 * 1024

turbo_sdk/bundle/__init__.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
from .constants import SIG_CONFIG, MAX_TAG_BYTES, MIN_BINARY_SIZE
22
from .dataitem import DataItem
33
from .create import create_data
4-
from .sign import sign, deep_hash, get_signature_data
4+
from .sign import (
5+
sign,
6+
deep_hash,
7+
get_signature_data,
8+
sign_stream,
9+
get_signature_data_stream,
10+
deep_hash_blob_stream,
11+
DEFAULT_STREAM_CHUNK_SIZE,
12+
)
513
from .tags import encode_tags, decode_tags
614
from .utils import set_bytes, byte_array_to_long
715

@@ -14,6 +22,10 @@
1422
"sign",
1523
"deep_hash",
1624
"get_signature_data",
25+
"sign_stream",
26+
"get_signature_data_stream",
27+
"deep_hash_blob_stream",
28+
"DEFAULT_STREAM_CHUNK_SIZE",
1729
"encode_tags",
1830
"decode_tags",
1931
"set_bytes",

0 commit comments

Comments
 (0)