11# DataBuffer
22
3- ** TODO: Add description**
3+ [ ![ Hex.pm] ( https://img.shields.io/hexpm/v/data_buffer.svg )] ( https://hex.pm/packages/data_buffer )
4+ [ ![ Documentation] ( https://img.shields.io/badge/documentation-gray )] ( https://hexdocs.pm/data_buffer )
5+
6+ DataBuffer is a high-performance Elixir library for buffering and batch processing data. It provides automatic flushing based on size or time thresholds, making it ideal for scenarios where you need to aggregate data before processing it in bulk.
7+
8+ ## Features
9+
10+ - 🚀 ** High Performance** - Efficient in-memory buffering with ETS-backed storage
11+ - ⚡ ** Automatic Flushing** - Configurable size and time-based triggers
12+ - 🔄 ** Multiple Partitions** - Distribute load across multiple buffer partitions
13+ - 📊 ** Telemetry Integration** - Built-in observability and monitoring
14+ - 🛡️ ** Fault Tolerant** - Graceful shutdown with automatic flush on termination
15+ - ⏱️ ** Backpressure Handling** - Configurable timeouts and overflow protection
16+ - 🎲 ** Jitter Support** - Prevent thundering herd with configurable jitter
417
518## Installation
619
7- If [ available in Hex] ( https://hex.pm/docs/publish ) , the package can be installed
8- by adding ` data_buffer ` to your list of dependencies in ` mix.exs ` :
20+ Add ` data_buffer ` to your list of dependencies in ` mix.exs ` :
921
1022``` elixir
1123def deps do
@@ -15,7 +27,249 @@ def deps do
1527end
1628```
1729
18- Documentation can be generated with [ ExDoc] ( https://github.com/elixir-lang/ex_doc )
19- and published on [ HexDocs] ( https://hexdocs.pm ) . Once published, the docs can
20- be found at [ https://hexdocs.pm/data_buffer ] ( https://hexdocs.pm/data_buffer ) .
30+ ## Quick Start
31+
32+ ### 1. Define Your Buffer Module
33+
34+ Create a module that implements the ` DataBuffer ` behaviour:
35+
36+ ``` elixir
37+ defmodule MyApp .EventBuffer do
38+ use DataBuffer
39+
40+ def start_link (opts) do
41+ DataBuffer .start_link (__MODULE__ , opts)
42+ end
43+
44+ @impl DataBuffer
45+ def handle_flush (data_stream, opts) do
46+ # Process your buffered data here
47+ events = Enum .to_list (data_stream)
48+
49+ # Example: Bulk insert to database
50+ MyApp .Repo .insert_all (" events" , events)
51+
52+ # Or send to external service
53+ MyApp .Analytics .track_batch (events)
54+
55+ :ok
56+ end
57+ end
58+ ```
59+
60+ ### 2. Add to Your Supervision Tree
61+
62+ ``` elixir
63+ defmodule MyApp .Application do
64+ use Application
65+
66+ def start (_type , _args ) do
67+ children = [
68+ # Start your buffer with configuration
69+ {MyApp .EventBuffer ,
70+ name: MyApp .EventBuffer ,
71+ partitions: 4 ,
72+ max_size: 1000 ,
73+ flush_interval: 5000 }
74+ ]
75+
76+ opts = [strategy: :one_for_one , name: MyApp .Supervisor ]
77+ Supervisor .start_link (children, opts)
78+ end
79+ end
80+ ```
81+
82+ ### 3. Use Your Buffer
83+
84+ ``` elixir
85+ # Insert single items
86+ DataBuffer .insert (MyApp .EventBuffer , %{type: " click" , user_id: 123 })
87+
88+ # Insert batches for better performance
89+ events = [
90+ %{type: " view" , user_id: 123 },
91+ %{type: " click" , user_id: 456 },
92+ %{type: " purchase" , user_id: 789 }
93+ ]
94+ DataBuffer .insert_batch (MyApp .EventBuffer , events)
95+
96+ # Manually trigger flush if needed
97+ DataBuffer .flush (MyApp .EventBuffer )
98+
99+ # Check buffer status
100+ size = DataBuffer .size (MyApp .EventBuffer )
101+ info = DataBuffer .info (MyApp .EventBuffer )
102+ ```
103+
104+ ## Configuration Options
105+
106+ | Option | Type | Default | Description |
107+ | --------| ------| ---------| -------------|
108+ | ` :name ` | atom | required | Process name for the buffer |
109+ | ` :partitions ` | integer | 1 | Number of partition processes |
110+ | ` :max_size ` | integer | 5000 | Maximum items before automatic flush |
111+ | ` :max_size_jitter ` | integer | 0 | Random jitter (0 to n) added to max_size |
112+ | ` :flush_interval ` | integer | 10000 | Time in ms between automatic flushes |
113+ | ` :flush_jitter ` | integer | 2000 | Random jitter (0 to n) added to flush_interval |
114+ | ` :flush_timeout ` | integer | 60000 | Timeout in ms for flush operations |
115+ | ` :flush_meta ` | any | nil | Metadata passed to handle_flush callback |
116+
117+ ## Advanced Usage
118+
119+ ### Custom Flush Metadata
120+
121+ Pass metadata to your flush handler for context:
122+
123+ ``` elixir
124+ {MyApp .EventBuffer ,
125+ name: MyApp .EventBuffer ,
126+ flush_meta: %{destination: " analytics_db" , priority: :high }}
127+
128+ # In your handle_flush callback:
129+ def handle_flush (data_stream, opts) do
130+ meta = Keyword .get (opts, :meta )
131+ size = Keyword .get (opts, :size )
132+
133+ case meta.destination do
134+ " analytics_db" -> process_analytics (data_stream)
135+ " metrics_db" -> process_metrics (data_stream)
136+ end
137+ end
138+ ```
139+
140+ ### Multiple Buffers
141+
142+ You can run multiple buffers for different data types:
143+
144+ ``` elixir
145+ children = [
146+ {MyApp .EventBuffer , name: MyApp .EventBuffer , max_size: 1000 },
147+ {MyApp .MetricsBuffer , name: MyApp .MetricsBuffer , max_size: 5000 },
148+ {MyApp .LogBuffer , name: MyApp .LogBuffer , flush_interval: 1000 }
149+ ]
150+ ```
151+
152+ ### Synchronous Operations
153+
154+ For testing or specific use cases, use synchronous operations:
155+
156+ ``` elixir
157+ # Wait for flush to complete and get results
158+ results = DataBuffer .sync_flush (MyApp .EventBuffer )
159+
160+ # Dump buffer contents without flushing
161+ data = DataBuffer .dump (MyApp .EventBuffer )
162+ ```
163+
164+ ### Monitoring with Telemetry
165+
166+ DataBuffer emits telemetry events that you can hook into:
167+
168+ ``` elixir
169+ :telemetry .attach (
170+ " buffer-metrics" ,
171+ [:data_buffer , :flush , :stop ],
172+ fn _event_name , measurements, metadata, _config ->
173+ Logger .info (" Flushed #{ metadata.size } items from #{ metadata.buffer } " )
174+ end ,
175+ nil
176+ )
177+ ```
178+
179+ Available events:
180+ - ` [:data_buffer, :insert, :start] ` / ` [:data_buffer, :insert, :stop] `
181+ - ` [:data_buffer, :flush, :start] ` / ` [:data_buffer, :flush, :stop] `
182+
183+ ## Implementation Details
184+
185+ ### Architecture
186+
187+ DataBuffer uses a multi-process architecture for reliability and performance:
188+
189+ 1 . ** Supervisor Process** - Manages the buffer's child processes
190+ 2 . ** Partition Processes** - Handle data storage and triggering flushes
191+ 3 . ** Flusher Processes** - Execute flush operations in separate processes
192+
193+ ### Partitioning Strategy
194+
195+ Data is distributed across partitions using round-robin selection. Each partition:
196+ - Maintains its own ETS table for data storage
197+ - Has independent size and time triggers
198+ - Flushes asynchronously without blocking other partitions
199+
200+ ### Flush Triggers
201+
202+ Flushes are triggered when:
203+ 1 . ** Size threshold** is reached (` max_size ` + random jitter)
204+ 2 . ** Time interval** expires (` flush_interval ` + random jitter)
205+ 3 . ** Manual flush** is called via ` DataBuffer.flush/2 `
206+ 4 . ** Process termination** occurs (graceful shutdown)
207+
208+ ### Backpressure Handling
209+
210+ When a partition is full and flushing:
211+ - New inserts wait for the flush to complete
212+ - Configurable timeout prevents indefinite blocking
213+ - Multiple partitions help distribute load
214+
215+ ### Fault Tolerance
216+
217+ - Flushes run in separate processes to isolate failures
218+ - Timeouts prevent stuck flush operations
219+ - Graceful shutdown ensures data is flushed on termination
220+ - Supervisor restarts failed components
221+
222+ ## Use Cases
223+
224+ DataBuffer is ideal for:
225+
226+ - ** Database Write Batching** - Accumulate records for bulk inserts
227+ - ** Event Aggregation** - Collect events before sending to analytics services
228+ - ** Log Processing** - Buffer log entries for batch processing
229+ - ** Metrics Collection** - Aggregate metrics before reporting
230+ - ** API Rate Limiting** - Batch API calls to respect rate limits
231+ - ** Stream Processing** - Buffer streaming data for chunk processing
232+
233+ ## Performance Considerations
234+
235+ - ** Partition Count** : More partitions = better concurrency but more memory overhead
236+ - ** Buffer Size** : Larger buffers = fewer flushes but more memory usage
237+ - ** Flush Timeout** : Balance between reliability and throughput
238+ - ** Jitter** : Prevents synchronized flushes across multiple buffers/nodes
239+
240+ ## Testing
241+
242+ For testing, you can use smaller thresholds and synchronous operations:
243+
244+ ``` elixir
245+ defmodule MyApp .EventBufferTest do
246+ use ExUnit .Case
247+
248+ setup do
249+ {:ok , _pid } = MyApp .EventBuffer .start_link (
250+ name: TestBuffer ,
251+ max_size: 10 ,
252+ flush_interval: 100
253+ )
254+
255+ {:ok , buffer: TestBuffer }
256+ end
257+
258+ test " buffers and flushes data" , %{buffer: buffer} do
259+ DataBuffer .insert (buffer, %{id: 1 })
260+ DataBuffer .insert (buffer, %{id: 2 })
261+
262+ results = DataBuffer .sync_flush (buffer)
263+ assert length (results) == 1
264+ end
265+ end
266+ ```
267+
268+ ## Contributing
269+
270+ Contributions are welcome! Please feel free to submit a Pull Request.
271+
272+ ## License
273+
274+ MIT License - see LICENSE file for details
21275
0 commit comments