Skip to content

Commit 76141a8

Browse files
authored
Add support for bulk inserts and include size in flush opts (#4)
1 parent 851522c commit 76141a8

13 files changed

Lines changed: 240 additions & 24 deletions

File tree

.github/workflows/ci.yml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: CI
2+
on:
3+
push:
4+
branches:
5+
- master
6+
7+
pull_request:
8+
branches:
9+
- master
10+
11+
jobs:
12+
test:
13+
name: Run tests
14+
runs-on: ubuntu-latest
15+
steps:
16+
- name: Checkout
17+
uses: actions/checkout@v3
18+
19+
- name: Setup Elixir and Erlang versions
20+
uses: erlef/setup-beam@v1
21+
id: setup-elixir
22+
with:
23+
version-type: strict
24+
version-file: .tool-versions
25+
26+
- name: Restore the cache
27+
uses: actions/cache@v3
28+
with:
29+
path: |
30+
deps
31+
_build
32+
dialyzer
33+
key: |
34+
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
35+
restore-keys: |
36+
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-
37+
38+
- name: Run CI
39+
run: |
40+
mix ci

.github/workflows/release.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
name: Release
2+
3+
on:
4+
release:
5+
types: [published]
6+
7+
jobs:
8+
release:
9+
name: Release package
10+
runs-on: ubuntu-latest
11+
steps:
12+
- name: Checkout
13+
uses: actions/checkout@v4
14+
15+
- name: Setup Elixir and Erlang versions
16+
uses: erlef/setup-beam@v1
17+
id: setup-elixir
18+
with:
19+
version-type: strict
20+
version-file: .tool-versions
21+
22+
- name: Restore the cache
23+
uses: actions/cache@v3
24+
with:
25+
path: |
26+
deps
27+
_build
28+
dialyzer
29+
key: |
30+
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
31+
restore-keys: |
32+
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-
33+
34+
- name: Setup project
35+
run: |
36+
mix setup
37+
38+
- name: Publish package
39+
run: |
40+
mix hex.publish --organization movableink --replace --yes
41+
env:
42+
HEX_API_KEY: ${{ secrets.HEX_API_KEY }}

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ data_buffer-*.tar
2525
# Ignore Elixir Language Server files
2626
/.elixir_ls
2727

28-
/bench
28+
# Dialyzer generated PLT files
29+
/dialyzer
30+
31+
/bench

.tool-versions

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
erlang 26.2.1
2+
elixir 1.16.0-otp-26

lib/data_buffer.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ defmodule DataBuffer do
3535
end)
3636
end
3737

38+
@doc """
39+
Inserts all of the provided `data` into the provided `buffer` at once.
40+
"""
41+
@spec insert_batch(buffer :: DataBuffer.t(), data :: Enumerable.t(), timeout()) :: :ok
42+
def insert_batch(buffer, data, timeout \\ 5_000) do
43+
Telemetry.span(:insert, %{buffer: buffer}, fn ->
44+
partition = PartitionPool.next(buffer)
45+
result = Partition.insert_batch(partition, data, timeout)
46+
{result, %{buffer: buffer, partition: partition}}
47+
end)
48+
end
49+
3850
@doc """
3951
Performs a flush operation on the provided `buffer`.
4052
"""

lib/data_buffer/flusher.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ defmodule DataBuffer.Flusher do
2727

2828
@spec flush(DataBuffer.Partition.table(), atom(), keyword()) :: {:ok, any()} | {:error, any()}
2929
def flush(table, buffer, opts \\ []) do
30-
meta = Keyword.get(opts, :meta)
30+
opts = Keyword.take(opts, [:meta, :size])
3131
data = handle_data(table)
32-
buffer.handle_flush(data, meta)
32+
buffer.handle_flush(data, opts)
3333
end
3434

3535
################################
@@ -47,6 +47,7 @@ defmodule DataBuffer.Flusher do
4747
{buffer, opts} = state
4848
) do
4949
Telemetry.span(:flush, %{buffer: buffer, partition: partition, size: size}, fn ->
50+
opts = Keyword.put(opts, :size, size)
5051
flush(table, buffer, opts)
5152
{:ok, %{buffer: buffer, partition: partition}}
5253
end)

lib/data_buffer/partition.ex

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ defmodule DataBuffer.Partition do
8585
GenServer.call(partition, {:insert, data}, timeout)
8686
end
8787

88+
@spec insert_batch(partition(), Enumerable.t(), timeout()) :: :ok
89+
def insert_batch(partition, data, timeout \\ 5_000) do
90+
GenServer.call(partition, {:insert_batch, data}, timeout)
91+
end
92+
8893
@spec size(partition(), timeout()) :: integer()
8994
def size(partition, timeout \\ 5_000) do
9095
GenServer.call(partition, :size, timeout)
@@ -112,6 +117,11 @@ defmodule DataBuffer.Partition do
112117
{:reply, :ok, state}
113118
end
114119

120+
def handle_call({:insert_batch, data}, _from, %State{} = state) do
121+
state = do_insert_batch(state, data)
122+
{:reply, :ok, state}
123+
end
124+
115125
def handle_call(:flush, _from, %State{} = state) do
116126
state = do_flush(state)
117127
{:reply, :ok, state}
@@ -236,6 +246,30 @@ defmodule DataBuffer.Partition do
236246
%{state | size: size}
237247
end
238248

249+
defp do_insert_batch(%State{flusher: flusher, size: size, flush_size: flush_size} = state, data)
250+
when is_pid(flusher) and is_full(size, flush_size) do
251+
state
252+
|> do_await_flush()
253+
|> do_insert_batch(data)
254+
end
255+
256+
defp do_insert_batch(%State{size: size, flush_size: flush_size} = state, data)
257+
when is_full(size, flush_size) do
258+
state
259+
|> do_flush()
260+
|> do_insert_batch(data)
261+
end
262+
263+
defp do_insert_batch(%State{size: size, table: table} = state, data) do
264+
{rows, size} =
265+
Enum.map_reduce(data, size, fn data, size ->
266+
{{size + 1, data}, size + 1}
267+
end)
268+
269+
:ets.insert(table, rows)
270+
%{state | size: size}
271+
end
272+
239273
defp do_scheduled_flush(state, flush_schedule_ref) do
240274
if state.flush_schedule_ref == flush_schedule_ref do
241275
do_flush(state)
@@ -266,7 +300,8 @@ defmodule DataBuffer.Partition do
266300
end
267301

268302
defp do_sync_flush(state) do
269-
data = Flusher.flush(state.table, state.buffer, state.flush_opts)
303+
opts = Keyword.put(state.flush_opts, :size, state.size)
304+
data = Flusher.flush(state.table, state.buffer, opts)
270305
{data, do_prepare_flush(state)}
271306
end
272307

lib/data_buffer/telemetry.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ defmodule DataBuffer.Telemetry do
6161
"""
6262

6363
@doc false
64-
@spec span(atom(), map(), (() -> {any, map})) :: any()
64+
@spec span(atom(), map(), (-> {any, map})) :: any()
6565
def span(name, meta, fun) do
6666
:telemetry.span([:data_buffer, name], meta, fun)
6767
end

mix.exs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ defmodule DataBuffer.MixProject do
77
[
88
app: :data_buffer,
99
version: @version,
10-
elixir: "~> 1.9",
10+
elixir: "~> 1.16",
1111
elixirc_paths: elixirc_paths(Mix.env()),
12+
dialyzer: dialyzer(),
1213
start_permanent: Mix.env() == :prod,
14+
aliases: aliases(),
15+
preferred_cli_env: preferred_cli_env(),
1316
deps: deps(),
1417
description: description(),
1518
package: package(),
@@ -28,6 +31,13 @@ defmodule DataBuffer.MixProject do
2831
defp elixirc_paths(:test), do: ["lib", "test/support"]
2932
defp elixirc_paths(_), do: ["lib"]
3033

34+
defp dialyzer do
35+
[
36+
plt_file: {:no_warn, "dialyzer/dialyzer.plt"},
37+
plt_add_apps: [:ex_unit, :mix]
38+
]
39+
end
40+
3141
defp description do
3242
"""
3343
DataBuffer provides an efficient way to buffer persistable data.
@@ -51,13 +61,43 @@ defmodule DataBuffer.MixProject do
5161
]
5262
end
5363

64+
# Aliases are shortcuts or tasks specific to the current project.
65+
defp aliases do
66+
[
67+
setup: [
68+
"local.hex --if-missing --force",
69+
"local.rebar --if-missing --force",
70+
"deps.get"
71+
],
72+
ci: [
73+
"setup",
74+
"compile --warnings-as-errors",
75+
"format --check-formatted",
76+
"credo --strict",
77+
"test",
78+
"dialyzer --format github",
79+
"sobelow --config"
80+
]
81+
]
82+
end
83+
84+
# Specifies the preferred env for mix commands.
85+
defp preferred_cli_env do
86+
[
87+
ci: :test
88+
]
89+
end
90+
5491
# Run "mix help deps" to learn about dependencies.
5592
defp deps do
5693
[
5794
{:keyword_validator, "~> 2.0"},
5895
{:telemetry, "~> 0.4"},
5996
{:benchee, "~> 1.0", only: :dev},
60-
{:ex_doc, "~> 0.22", only: :dev, runtime: false}
97+
{:ex_doc, "~> 0.22", only: :dev, runtime: false},
98+
{:credo, "~> 1.7.0", only: [:dev, :test], runtime: false},
99+
{:dialyxir, "~> 1.4.1", only: [:dev, :test], runtime: false},
100+
{:sobelow, "~> 0.13.0", only: [:dev, :test], runtime: false}
61101
]
62102
end
63103
end

mix.lock

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
%{
22
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
3+
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
4+
"credo": {:hex, :credo, "1.7.5", "643213503b1c766ec0496d828c90c424471ea54da77c8a168c725686377b9545", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f799e9b5cd1891577d8c773d245668aa74a2fcd15eb277f51a0131690ebfb3fd"},
35
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
6+
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
47
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
8+
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
59
"ex_doc": {:hex, :ex_doc, "0.22.2", "03a2a58bdd2ba0d83d004507c4ee113b9c521956938298eba16e55cc4aba4a6c", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "cf60e1b3e2efe317095b6bb79651f83a2c1b3edcb4d319c421d7fcda8b3aff26"},
10+
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
11+
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
612
"keyword_validator": {:hex, :keyword_validator, "2.0.1", "92ab90dc93ea9e049530eb0a79c8f074833942dfb93d25e6a946b71b70086b49", [:mix], [], "hexpm", "09715a32d458c6318d39c0a484e958ce20fff64d646188b5801c334179be9fc2"},
713
"makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"},
814
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
915
"nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"},
16+
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
1017
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
1118
}

0 commit comments

Comments
 (0)