Skip to content

Commit 5e4b787

Browse files
authored
add streaming API (#2)
1 parent ca7d2bd commit 5e4b787

12 files changed

Lines changed: 719 additions & 248 deletions

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Added a new Streaming API that processes data in chunks, reducing peak memory
13+
usage when handling large datasets or network streams
14+
- Introduced `Msgpack.encode_stream/2` to lazily encode a stream of Elixir
15+
terms one by one
16+
- Introduced `Msgpack.decode_stream/2` to lazily decode a stream of
17+
MessagePack objects, capable of handling data that arrives in multiple
18+
chunks
1219
- Added CI workflow to run tests against supported Elixir versions
1320

1421
### Changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ types.
2020
limits to mitigate resource exhaustion from malformed or malicious payloads.
2121
- **Telemetry Integration:** Emits standard `:telemetry` events for integration
2222
with monitoring tools.
23+
- **Streaming API:** Process large collections or continuous data streams with
24+
low memory overhead using `Msgpack.encode_stream/2` and
25+
`Msgpack.decode_stream/2`.
2326

2427
## Installation
2528

@@ -50,6 +53,27 @@ iex> Msgpack.decode!(<<0xC1>>)
5053
** (Msgpack.DecodeError) Unknown type prefix: 193. The byte `0xC1` is not a valid MessagePack type marker.
5154
```
5255

56+
### Streaming Large Collections
57+
58+
For datasets that may not fit in memory, you can use the streaming API, which
59+
processes one term at a time.
60+
61+
```elixir
62+
# Create a lazy stream of terms to be encoded.
63+
iex> terms = Stream.cycle([1, "elixir", true])
64+
65+
# The output is a lazy stream of encoded binaries.
66+
iex> encoded_stream = Msgpack.encode_stream(terms)
67+
68+
# The stream is only consumed when you enumerate it.
69+
iex> encoded_stream |> Stream.take(3) |> Enum.to_list()
70+
[
71+
{:ok, <<1>>},
72+
{:ok, <<166, 101, 108, 105, 120, 105, 114>>},
73+
{:ok, <<195>>}
74+
]
75+
```
76+
5377
## Full Documentation
5478

5579
For detailed information on all features, options, and functions, see the [full

lib/msgpack.ex

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ defmodule Msgpack do
4949

5050
alias Msgpack.Encoder
5151
alias Msgpack.Decoder
52+
alias Msgpack.StreamEncoder
53+
alias Msgpack.StreamDecoder
5254
alias Msgpack.EncodeError
5355
alias Msgpack.DecodeError
5456

@@ -314,4 +316,63 @@ defmodule Msgpack do
314316
raise DecodeError, reason: reason
315317
end
316318
end
319+
320+
@doc """
321+
Encodes a stream of Elixir terms into a stream of MessagePack binaries.
322+
323+
Each term in the input enumerable is encoded individually. The output stream
324+
will contain `{:ok, binary}` tuples for successful encodings or `{:error,
325+
reason}` tuples for failures.
326+
327+
This function delegates to `Msgpack.StreamEncoder.encode/2`.
328+
329+
## Options
330+
331+
Accepts the same options as `Msgpack.encode/2`.
332+
333+
## Examples
334+
335+
```elixir
336+
iex> terms = [1, "elixir", :world]
337+
iex> Msgpack.encode_stream(terms, atoms: :string) |> Enum.to_list()
338+
[
339+
{:ok, <<1>>},
340+
{:ok, <<166, 101, 108, 105, 120, 105, 114>>},
341+
{:ok, <<165, 119, 111, 114, 108, 100>>}
342+
]
343+
```
344+
"""
345+
@spec encode_stream(Enumerable.t(), StreamEncoder.opts_t()) :: StreamEncoder.t()
346+
def encode_stream(enumerable, opts \\ []) do
347+
StreamEncoder.encode(enumerable, opts)
348+
end
349+
350+
@doc """
351+
Decodes a stream of MessagePack binaries into a stream of Elixir terms.
352+
353+
This function provides a streaming, lazy interface for decoding, making it
354+
suitable for handling large datasets that do not fit into memory.
355+
356+
It delegates to `Msgpack.StreamDecoder.decode/2`.
357+
358+
For more detailed information on behavior, see the `Msgpack.StreamDecoder`
359+
module documentation.
360+
361+
## Options
362+
363+
Accepts the same options as `Msgpack.decode/2`.
364+
365+
## Examples
366+
367+
```elixir
368+
iex> objects = [1, "elixir", true]
369+
iex> stream = Enum.map(objects, &Msgpack.encode!/1)
370+
iex> Msgpack.decode_stream(stream) |> Enum.to_list()
371+
[1, "elixir", true]
372+
```
373+
"""
374+
@spec decode_stream(Enumerable.t(binary()), StreamDecoder.opts_t()) :: StreamDecoder.t()
375+
def decode_stream(enumerable, opts \\ []) do
376+
StreamDecoder.decode(enumerable, opts)
377+
end
317378
end

lib/msgpack/decoder.ex

Lines changed: 11 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,14 @@ defmodule Msgpack.Decoder do
33
Handles the logic of decoding a MessagePack binary into an Elixir term.
44
"""
55

6-
@default_max_depth 100
7-
@default_max_byte_size 10_000_000 # 10MB
8-
9-
# The number of gregorian seconds from year 0 to the Unix epoch. This is a constant.
10-
@epoch_offset :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}})
6+
alias Msgpack.Decoder.Internal
117

128
@spec decode(binary(), keyword()) :: {:ok, term()} | {:error, term()}
139
def decode(binary, opts \\ []) do
14-
merged_opts =
15-
opts
16-
|> Keyword.put_new(:max_depth, @default_max_depth)
17-
|> Keyword.put_new(:max_byte_size, @default_max_byte_size)
10+
merged_opts = Keyword.merge(default_opts(), opts)
1811

1912
try do
20-
case do_decode(binary, merged_opts) do
13+
case Internal.decode(binary, merged_opts) do
2114
{:ok, {term, <<>>}} ->
2215
{:ok, term}
2316

@@ -33,241 +26,13 @@ defmodule Msgpack.Decoder do
3326
end
3427
end
3528

36-
# ==== Nil ====
37-
defp do_decode(<<0xC0, rest::binary>>, _opts), do: {:ok, {nil, rest}}
38-
39-
# ==== Boolean ====
40-
defp do_decode(<<0xC3, rest::binary>>, _opts), do: {:ok, {true, rest}}
41-
defp do_decode(<<0xC2, rest::binary>>, _opts), do: {:ok, {false, rest}}
42-
43-
# ==== Integers ====
44-
# ==== Positive Fixint ====
45-
defp do_decode(<<int::8, rest::binary>>, _opts) when int < 128 do
46-
{:ok, {int, rest}}
47-
end
48-
49-
# ==== Negative Fixint ====
50-
defp do_decode(<<int::signed-8, rest::binary>>, _opts) when int >= -32 and int < 0 do
51-
{:ok, {int, rest}}
52-
end
53-
54-
# ==== Unsigned Integers ====
55-
defp do_decode(<<0xCC, int::8, rest::binary>>, _opts), do: {:ok, {int, rest}}
56-
defp do_decode(<<0xCD, int::16, rest::binary>>, _opts), do: {:ok, {int, rest}}
57-
defp do_decode(<<0xCE, int::32, rest::binary>>, _opts), do: {:ok, {int, rest}}
58-
defp do_decode(<<0xCF, int::64, rest::binary>>, _opts), do: {:ok, {int, rest}}
59-
60-
# ==== Signed Integers ====
61-
defp do_decode(<<0xD0, int::signed-8, rest::binary>>, _opts), do: {:ok, {int, rest}}
62-
defp do_decode(<<0xD1, int::signed-16, rest::binary>>, _opts), do: {:ok, {int, rest}}
63-
defp do_decode(<<0xD2, int::signed-32, rest::binary>>, _opts), do: {:ok, {int, rest}}
64-
defp do_decode(<<0xD3, int::signed-64, rest::binary>>, _opts), do: {:ok, {int, rest}}
65-
66-
# ==== Floats ====
67-
defp do_decode(<<0xCA, float::float-32, rest::binary>>, _opts), do: {:ok, {float, rest}}
68-
defp do_decode(<<0xCB, float::float-64, rest::binary>>, _opts), do: {:ok, {float, rest}}
69-
70-
# ==== Strings ====
71-
defp do_decode(<<prefix, rest::binary>>, opts) when prefix >= 0xA0 and prefix <= 0xBF do
72-
size = prefix - 0xA0
73-
decode_string(rest, size, opts)
74-
end
75-
76-
defp do_decode(<<0xD9, size::8, rest::binary>>, opts), do: decode_string(rest, size, opts)
77-
defp do_decode(<<0xDA, size::16, rest::binary>>, opts), do: decode_string(rest, size, opts)
78-
defp do_decode(<<0xDB, size::32, rest::binary>>, opts), do: decode_string(rest, size, opts)
79-
80-
# ==== Raw Binary ====
81-
defp do_decode(<<0xC4, size::8, rest::binary>>, opts), do: decode_binary(rest, size, opts)
82-
defp do_decode(<<0xC5, size::16, rest::binary>>, opts), do: decode_binary(rest, size, opts)
83-
defp do_decode(<<0xC6, size::32, rest::binary>>, opts), do: decode_binary(rest, size, opts)
84-
85-
# ==== Arrays ====
86-
defp do_decode(<<prefix, rest::binary>>, opts) when prefix >= 0x90 and prefix <= 0x9F do
87-
size = prefix - 0x90
88-
decode_array(rest, size, opts)
89-
end
90-
91-
defp do_decode(<<0xDC, size::16, rest::binary>>, opts), do: decode_array(rest, size, opts)
92-
defp do_decode(<<0xDD, size::32, rest::binary>>, opts), do: decode_array(rest, size, opts)
93-
94-
# ==== Maps ====
95-
defp do_decode(<<prefix, rest::binary>>, opts) when prefix >= 0x80 and prefix <= 0x8F do
96-
size = prefix - 0x80
97-
decode_map(rest, size, opts)
98-
end
99-
100-
defp do_decode(<<0xDE, size::16, rest::binary>>, opts), do: decode_map(rest, size, opts)
101-
defp do_decode(<<0xDF, size::32, rest::binary>>, opts), do: decode_map(rest, size, opts)
102-
103-
# ==== Extensions & Timestamps ====
104-
# ==== Fixext ====
105-
defp do_decode(<<0xD4, type::signed-8, data::binary-size(1), rest::binary>>, opts),
106-
do: decode_ext(type, data, rest, opts)
107-
108-
defp do_decode(<<0xD5, type::signed-8, data::binary-size(2), rest::binary>>, opts),
109-
do: decode_ext(type, data, rest, opts)
110-
111-
defp do_decode(<<0xD6, type::signed-8, data::binary-size(4), rest::binary>>, opts),
112-
do: decode_ext(type, data, rest, opts)
113-
114-
defp do_decode(<<0xD7, type::signed-8, data::binary-size(8), rest::binary>>, opts),
115-
do: decode_ext(type, data, rest, opts)
116-
117-
defp do_decode(<<0xD8, type::signed-8, data::binary-size(16), rest::binary>>, opts),
118-
do: decode_ext(type, data, rest, opts)
119-
120-
# ==== Ext ====
121-
defp do_decode(<<0xC7, len::8, type::signed-8, data::binary-size(len), rest::binary>>, opts),
122-
do: decode_ext(type, data, rest, opts)
123-
124-
defp do_decode(<<0xC8, len::16, type::signed-8, data::binary-size(len), rest::binary>>, opts),
125-
do: decode_ext(type, data, rest, opts)
126-
127-
defp do_decode(<<0xC9, len::32, type::signed-8, data::binary-size(len), rest::binary>>, opts),
128-
do: decode_ext(type, data, rest, opts)
129-
130-
# ==== Unknown types ====
131-
defp do_decode(<<prefix, _rest::binary>>, _opts) do
132-
{:error, {:unknown_prefix, prefix}}
133-
end
134-
135-
defp do_decode(<<>>, _opts) do
136-
{:error, :unexpected_eof}
137-
end
138-
139-
# ==== Helpers ====
140-
defp decode_string(binary, size, opts) do
141-
if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size)
142-
143-
case binary do
144-
<<string::binary-size(size), rest::binary>> ->
145-
{:ok, {string, rest}}
146-
147-
_ ->
148-
{:error, :unexpected_eof}
149-
end
150-
end
151-
152-
defp decode_binary(binary, size, opts) do
153-
if max_size = opts[:max_byte_size], do: check_byte_size(size, max_size)
154-
155-
case binary do
156-
<<bin::binary-size(size), rest::binary>> ->
157-
{:ok, {bin, rest}}
158-
159-
_ ->
160-
{:error, :unexpected_eof}
161-
end
162-
end
163-
164-
defp decode_array(binary, size, opts) do
165-
depth = opts[:depth] || 0
166-
167-
check_depth(depth, opts[:max_depth])
168-
check_byte_size(size, opts[:max_byte_size])
169-
170-
new_opts = Keyword.put(opts, :depth, depth + 1)
171-
172-
decode_many(binary, size, [], new_opts)
173-
end
174-
175-
defp decode_map(binary, size, opts) do
176-
depth = opts[:depth] || 0
177-
178-
check_depth(depth, opts[:max_depth])
179-
check_byte_size(size * 2, opts[:max_byte_size])
180-
181-
new_opts = Keyword.put(opts, :depth, depth + 1)
182-
183-
with {:ok, {kv_pairs, rest}} <- decode_many(binary, size * 2, [], new_opts) do
184-
map =
185-
Enum.chunk_every(kv_pairs, 2)
186-
|> Enum.map(&List.to_tuple/1)
187-
|> Enum.into(%{})
188-
189-
{:ok, {map, rest}}
190-
end
191-
end
192-
193-
# Recursively decodes `count` terms from the binary
194-
defp decode_many(binary, 0, acc, _opts) do
195-
{:ok, {Enum.reverse(acc), binary}}
196-
end
197-
198-
defp decode_many(binary, count, acc, opts) do
199-
case do_decode(binary, opts) do
200-
{:ok, {term, rest}} ->
201-
decode_many(rest, count - 1, [term | acc], opts)
202-
203-
{:error, reason} ->
204-
{:error, reason}
205-
end
206-
end
207-
208-
defp decode_ext(-1, data, rest, _opts) do
209-
{:ok, {decode_timestamp(data), rest}}
210-
end
211-
212-
defp decode_ext(type, data, rest, _opts) do
213-
{:ok, {%Msgpack.Ext{type: type, data: data}, rest}}
214-
end
215-
216-
# timestamp 32: 4 bytes (32-bit unsigned integer seconds)
217-
defp decode_timestamp(<<unix_seconds::unsigned-32>>) do
218-
gregorian_seconds = unix_seconds + @epoch_offset
219-
erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds)
220-
NaiveDateTime.from_erl!(erlang_datetime)
221-
end
222-
223-
# timestamp 64: 8 bytes (30-bit nanoseconds + 34-bit seconds)
224-
defp decode_timestamp(<<data::unsigned-64>>) do
225-
nanoseconds = :erlang.bsr(data, 34)
226-
227-
if nanoseconds > 999_999_999 do
228-
throw({:error, :invalid_timestamp})
229-
else
230-
unix_seconds = :erlang.band(data, 0x00000003_FFFFFFFF)
231-
gregorian_seconds = unix_seconds + @epoch_offset
232-
erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds)
233-
base_datetime = NaiveDateTime.from_erl!(erlang_datetime)
234-
235-
if nanoseconds > 0 do
236-
microseconds = div(nanoseconds, 1000)
237-
%{base_datetime | microsecond: {microseconds, 6}}
238-
else
239-
base_datetime
240-
end
241-
end
242-
end
243-
244-
# timestamp 96: 12 bytes (32-bit nanoseconds + 64-bit seconds)
245-
defp decode_timestamp(<<nanoseconds::unsigned-32, unix_seconds::signed-64>>) do
246-
if nanoseconds > 999_999_999 do
247-
throw({:error, :invalid_timestamp})
248-
else
249-
gregorian_seconds = unix_seconds + @epoch_offset
250-
erlang_datetime = :calendar.gregorian_seconds_to_datetime(gregorian_seconds)
251-
base_datetime = NaiveDateTime.from_erl!(erlang_datetime)
252-
253-
if nanoseconds > 0 do
254-
microseconds = div(nanoseconds, 1000)
255-
%{base_datetime | microsecond: {microseconds, 6}}
256-
else
257-
base_datetime
258-
end
259-
end
260-
end
261-
262-
defp check_byte_size(size, max_size) when size > max_size do
263-
throw({:error, {:max_byte_size_exceeded, max_size}})
264-
end
265-
266-
defp check_byte_size(_size, _max_size), do: :ok
267-
268-
defp check_depth(depth, max_depth) when depth >= max_depth do
269-
throw({:error, {:max_depth_reached, max_depth}})
29+
@doc """
30+
Returns a keyword list of the default options for the decoder.
31+
"""
32+
def default_opts() do
33+
[
34+
max_depth: 100,
35+
max_byte_size: 10_000_000 # 10MB
36+
]
27037
end
271-
272-
defp check_depth(_depth, _max_depth), do: :ok
27338
end

0 commit comments

Comments
 (0)