Skip to content

Commit e6fc7dc

Browse files
committed
Update to use references during message passing
1 parent 854c94c commit e6fc7dc

5 files changed

Lines changed: 145 additions & 67 deletions

File tree

lib/data_buffer/flusher.ex

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,20 @@ defmodule DataBuffer.Flusher do
4242
end
4343

4444
@impl GenServer
45-
def handle_info({:"ETS-TRANSFER", table, from, {partition, size}}, {buffer, opts} = state) do
45+
def handle_info(
46+
{:"ETS-TRANSFER", table, from, {partition, flush_ref, size}},
47+
{buffer, opts} = state
48+
) do
4649
Telemetry.span(:flush, %{buffer: buffer, partition: partition, size: size}, fn ->
4750
flush(table, buffer, opts)
4851
{:ok, %{buffer: buffer, partition: partition}}
4952
end)
5053

51-
send(from, :flush_complete)
54+
send(from, {:flush_complete, flush_ref})
5255
{:stop, :normal, state}
5356
catch
5457
_kind, reason ->
55-
send(from, :flush_complete)
58+
send(from, {:flush_complete, flush_ref})
5659
reraise(reason, __STACKTRACE__)
5760
end
5861

lib/data_buffer/partition.ex

Lines changed: 128 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,30 @@ defmodule DataBuffer.Partition do
77

88
require Logger
99

10+
defmodule State do
11+
@moduledoc false
12+
13+
defstruct [
14+
:name,
15+
:buffer,
16+
:max_size,
17+
:max_size_jitter,
18+
:flush_size,
19+
:flush_interval,
20+
:flush_jitter,
21+
:flush_timeout,
22+
:flush_opts,
23+
:flush_complete_ref,
24+
:flush_schedule_ref,
25+
:flush_schedule_timer_ref,
26+
:size,
27+
:flusher,
28+
:flush_timeout_ref,
29+
:flush_timeout_timer_ref,
30+
:table
31+
]
32+
end
33+
1034
@type partition :: atom()
1135
@type table :: :ets.tid()
1236

@@ -78,64 +102,64 @@ defmodule DataBuffer.Partition do
78102
@impl GenServer
79103
def init(opts) do
80104
Process.flag(:trap_exit, true)
81-
82-
state =
83-
opts
84-
|> init_state()
85-
|> do_prepare_flush
86-
105+
state = init_state(opts)
87106
{:ok, state}
88107
end
89108

90109
@impl GenServer
91-
def handle_call({:insert, data}, _from, state) do
110+
def handle_call({:insert, data}, _from, %State{} = state) do
92111
state = do_insert(state, data)
93112
{:reply, :ok, state}
94113
end
95114

96-
def handle_call(:flush, _from, state) do
115+
def handle_call(:flush, _from, %State{} = state) do
97116
state = do_flush(state)
98117
{:reply, :ok, state}
99118
end
100119

101-
def handle_call(:sync_flush, _from, state) do
120+
def handle_call(:sync_flush, _from, %State{} = state) do
102121
{data, state} = do_sync_flush(state)
103122
{:reply, data, state}
104123
end
105124

106-
def handle_call(:dump, _from, state) do
125+
def handle_call(:dump, _from, %State{} = state) do
107126
data = do_dump_table(state)
108127
{:reply, data, state}
109128
end
110129

111-
def handle_call(:size, _from, state) do
130+
def handle_call(:size, _from, %State{} = state) do
112131
{:reply, state.size, state}
113132
end
114133

115-
def handle_call(:info, _from, state) do
134+
def handle_call(:info, _from, %State{} = state) do
116135
info = do_get_info(state)
117136
{:reply, info, state}
118137
end
119138

120139
@impl GenServer
121-
def handle_info(:flush, state) do
140+
def handle_info(:flush, %State{} = state) do
122141
state = do_flush(state)
123142
{:noreply, state}
124143
end
125144

126-
def handle_info(:flush_timeout, state) do
127-
state = do_timeout_flush(state)
145+
def handle_info({:flush_schedule, flush_schedule_ref}, %State{} = state) do
146+
state = do_scheduled_flush(state, flush_schedule_ref)
128147
{:noreply, state}
129148
end
130149

131-
def handle_info(:flush_complete, state) do
132-
state = do_complete_flush(state)
150+
def handle_info({:flush_timeout, flush_schedule_ref}, %State{} = state) do
151+
state = do_timeout_flush(state, flush_schedule_ref)
152+
{:noreply, state}
153+
end
154+
155+
def handle_info({:flush_complete, flush_complete_ref}, %State{} = state) do
156+
state = do_complete_flush(state, flush_complete_ref)
133157
{:noreply, state}
134158
end
135159

136160
@doc false
137161
@impl GenServer
138-
def terminate(_reason, state) do
162+
def terminate(_reason, %State{} = state) do
139163
{_data, state} = do_sync_flush(state)
140164
state
141165
end
@@ -149,7 +173,7 @@ defmodule DataBuffer.Partition do
149173
end
150174

151175
defp init_state(opts) do
152-
%{
176+
state = %State{
153177
name: Keyword.get(opts, :name),
154178
buffer: Keyword.get(opts, :buffer),
155179
max_size: Keyword.get(opts, :max_size),
@@ -161,37 +185,45 @@ defmodule DataBuffer.Partition do
161185
flush_opts: [
162186
meta: Keyword.get(opts, :flush_meta)
163187
],
164-
flush_ref: nil,
165-
size: 0,
166-
flusher: nil,
167-
flusher_timeout_ref: nil,
168-
table: nil
188+
size: 0
169189
}
190+
191+
do_prepare_flush(state)
170192
end
171193

172-
defp init_table(state) do
194+
defp do_prepare_flush(state) do
173195
table = :ets.new(:partition, [:private, :ordered_set])
174196
flush_size = state.max_size + Enum.random(0..state.max_size_jitter)
175-
%{state | table: table, size: 0, flush_size: flush_size}
176-
end
177197

178-
defp schedule_flush(state) do
179-
if is_reference(state.flush_ref), do: Process.cancel_timer(state.flush_ref)
198+
if is_reference(state.flush_schedule_timer_ref),
199+
do: Process.cancel_timer(state.flush_schedule_timer_ref)
200+
180201
time = state.flush_interval + Enum.random(0..state.flush_jitter)
181-
flush_ref = Process.send_after(self(), :flush, time)
182-
%{state | flush_ref: flush_ref}
202+
flush_schedule_ref = make_ref()
203+
204+
flush_schedule_timer_ref =
205+
Process.send_after(self(), {:flush_schedule, flush_schedule_ref}, time)
206+
207+
%{
208+
state
209+
| table: table,
210+
size: 0,
211+
flush_size: flush_size,
212+
flush_schedule_ref: flush_schedule_ref,
213+
flush_schedule_timer_ref: flush_schedule_timer_ref
214+
}
183215
end
184216

185217
defguardp is_full(size, flush_size) when size >= flush_size
186218

187-
defp do_insert(%{flusher: flusher, size: size, flush_size: flush_size} = state, data)
219+
defp do_insert(%State{flusher: flusher, size: size, flush_size: flush_size} = state, data)
188220
when is_pid(flusher) and is_full(size, flush_size) do
189221
state
190222
|> do_await_flush()
191223
|> do_insert(data)
192224
end
193225

194-
defp do_insert(%{size: size, flush_size: flush_size} = state, data)
226+
defp do_insert(%State{size: size, flush_size: flush_size} = state, data)
195227
when is_full(size, flush_size) do
196228
state
197229
|> do_flush()
@@ -204,11 +236,32 @@ defmodule DataBuffer.Partition do
204236
%{state | size: size}
205237
end
206238

239+
defp do_scheduled_flush(state, flush_schedule_ref) do
240+
if state.flush_schedule_ref == flush_schedule_ref do
241+
do_flush(state)
242+
else
243+
state
244+
end
245+
end
246+
207247
defp do_flush(state) do
248+
flush_complete_ref = make_ref()
208249
{:ok, flusher} = FlusherPool.start_flusher(state.buffer, state.flush_opts)
209-
:ets.give_away(state.table, flusher, {state.name, state.size})
210-
flusher_timeout_ref = Process.send_after(self(), :flush_timeout, state.flush_timeout)
211-
state = %{state | flusher: flusher, flusher_timeout_ref: flusher_timeout_ref}
250+
:ets.give_away(state.table, flusher, {state.name, flush_complete_ref, state.size})
251+
flush_timeout_ref = make_ref()
252+
253+
flush_timeout_timer_ref =
254+
Process.send_after(self(), {:flush_timeout, flush_timeout_ref}, state.flush_timeout)
255+
256+
state = %{
257+
state
258+
| table: nil,
259+
flush_complete_ref: flush_complete_ref,
260+
flusher: flusher,
261+
flush_timeout_ref: flush_timeout_ref,
262+
flush_timeout_timer_ref: flush_timeout_timer_ref
263+
}
264+
212265
do_prepare_flush(state)
213266
end
214267

@@ -217,44 +270,56 @@ defmodule DataBuffer.Partition do
217270
{data, do_prepare_flush(state)}
218271
end
219272

220-
defp do_prepare_flush(state) do
221-
state
222-
|> init_table()
223-
|> schedule_flush()
224-
end
225-
226273
defp do_await_flush(state) do
227274
receive do
228-
:flush_complete -> do_complete_flush(state)
229-
:flush_timeout -> do_timeout_flush(state)
275+
{:flush_complete, flush_complete_ref} -> do_complete_flush(state, flush_complete_ref)
276+
{:flush_timeout, flush_timeout_ref} -> do_timeout_flush(state, flush_timeout_ref)
230277
end
231278
end
232279

233-
defp do_timeout_flush(state) do
234-
if is_pid(state.flusher), do: Process.exit(state.flusher, :timeout)
235-
buffer = state.buffer |> to_string() |> String.replace_leading("Elixir.", "")
280+
defp do_timeout_flush(state, flush_timeout_ref) do
281+
if state.flush_timeout_ref == flush_timeout_ref do
282+
if is_pid(state.flusher), do: Process.exit(state.flusher, :timeout)
236283

237-
Logger.error("""
238-
DataBuffer: flush timeout error for #{buffer}. This means your \
239-
handle_flush/2 callback failed to return within its timeout. You can \
240-
address this by:
284+
buffer = state.buffer |> to_string() |> String.replace_leading("Elixir.", "")
241285

242-
1. Increasing your buffer flush_timeout.
243-
2. Lowering your buffer max_size.
244-
3. Improving the performance of your handle_flush/2 callback.
286+
Logger.error("""
287+
DataBuffer: flush timeout error for #{buffer}. This means your \
288+
handle_flush/2 callback failed to return within its timeout. You can \
289+
address this by:
245290
246-
See DataBuffer.start_link/2 for more information.
247-
""")
291+
1. Increasing your buffer flush_timeout.
292+
2. Lowering your buffer max_size.
293+
3. Improving the performance of your handle_flush/2 callback.
248294
249-
do_complete_flush(state)
295+
See DataBuffer.start_link/2 for more information.
296+
""")
297+
298+
do_clear_flush(state)
299+
else
300+
state
301+
end
250302
end
251303

252-
defp do_complete_flush(state) do
253-
if is_reference(state.flusher_timeout_ref) do
254-
Process.cancel_timer(state.flusher_timeout_ref)
304+
defp do_complete_flush(state, flush_complete_ref) do
305+
if state.flush_complete_ref == flush_complete_ref do
306+
do_clear_flush(state)
307+
else
308+
state
255309
end
310+
end
311+
312+
defp do_clear_flush(state) do
313+
if is_reference(state.flush_timeout_timer_ref),
314+
do: Process.cancel_timer(state.flush_timeout_timer_ref)
256315

257-
%{state | flusher: nil, flusher_timeout_ref: nil}
316+
%{
317+
state
318+
| flush_complete_ref: nil,
319+
flusher: nil,
320+
flush_timeout_ref: nil,
321+
flush_timeout_timer_ref: nil
322+
}
258323
end
259324

260325
defp do_dump_table(state) do
@@ -268,7 +333,8 @@ defmodule DataBuffer.Partition do
268333
flush_size: state.flush_size,
269334
flush_interval: state.flush_interval,
270335
flush_jitter: state.flush_jitter,
271-
flush_timeout: state.flush_timeout
336+
flush_timeout: state.flush_timeout,
337+
pid: self()
272338
}
273339
end
274340
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule DataBuffer.MixProject do
22
use Mix.Project
33

4-
@version "0.4.1"
4+
@version "0.5.0"
55

66
def project do
77
[

test/data_buffer_test.exs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ defmodule DataBufferTest do
132132
assert_receive {:data, ["foo"]}, 150
133133
end
134134

135+
test "will not perform a scheduled flush from a different schedule reference" do
136+
start_buffer(partitions: 1, flush_interval: 300)
137+
DataBuffer.insert(TestBuffer, "foo")
138+
[partition] = DataBuffer.info(TestBuffer)
139+
send(partition.name, {:flush_schedule, make_ref()})
140+
refute_receive {:data, ["foo"]}, 150
141+
end
142+
135143
test "handles flush attempts that raise an exception or exit" do
136144
assert capture_log(fn ->
137145
start_buffer(

test/support/helpers.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ defmodule DataBuffer.Helpers do
88
def start_buffer(opts \\ []) do
99
{buffer, opts} = Keyword.pop(opts, :buffer, TestBuffer)
1010
{flush_meta, opts} = Keyword.pop(opts, :flush_meta, %{})
11-
default_opts = [flush_meta: Map.merge(flush_meta, %{pid: self()}), partitions: @partitions]
11+
partitions = Keyword.get(opts, :partitions, @partitions)
12+
default_opts = [flush_meta: Map.merge(flush_meta, %{pid: self()}), partitions: partitions]
1213
opts = Keyword.merge(default_opts, opts)
1314
start_supervised({buffer, opts})
1415
end

0 commit comments

Comments
 (0)