Skip to content

Commit 0671c12

Browse files
committed
Add a benchmark for concurrent map updates.
Update a series of maps representing teams which are computed from cumulative updates to a series of individual player scores. Add and remove members of the team, while concurrent score changes are occurring.
1 parent f48365f commit 0671c12

2 files changed

Lines changed: 170 additions & 0 deletions

File tree

examples/teams-crdt-map.config

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{mode,{rate,max}}.
2+
{duration,10}.
3+
{concurrent,150}.
4+
{rng_seed,now}.
5+
6+
%% This bucket type must be created and set to be datatype, maps.
7+
{riakc_pb_bucket,{<<"maps">>,<<"testbucket">>}}.
8+
9+
{key_generator, {uniform_int, 100}}.
10+
{value_generator, {uniform_int, 1000}}.
11+
12+
{operations,[{{game,completed},10},
13+
{{team,player,addition},3},
14+
{{team,player,removal},3},
15+
{{team,read},100},
16+
{{team,write},1}]}.
17+
18+
{riakc_pb_ips,[{"riak101.aws",10017},
19+
{"riak102.aws",10017},
20+
{"riak103.aws",10017},
21+
{"riak104.aws",10017},
22+
{"riak105.aws",10017}]}.
23+
24+
{riakc_pb_replies,default}.
25+
26+
{driver,basho_bench_driver_riakc_pb}.

src/basho_bench_driver_riakc_pb.erl

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,150 @@ warn_bucket_mr_correctness(_) ->
146146
%% preload is specified, so no warning necessary
147147
ok.
148148

149+
%% Write information about the team.
150+
run({team, write}, KeyGen, _ValueGen, State) ->
151+
Key = integer_to_list(KeyGen()),
152+
Result = riakc_pb_socket:modify_type(State#state.pid,
153+
fun(M) ->
154+
riakc_map:update(
155+
{<<"name">>, register},
156+
fun(R) ->
157+
riakc_register:set(
158+
list_to_binary("Team " ++ Key), R)
159+
end, M)
160+
end,
161+
State#state.bucket, Key, [create]),
162+
case Result of
163+
ok ->
164+
lager:info("Team write succeeded."),
165+
{ok, State};
166+
{ok, _} ->
167+
lager:info("Team write succeeded."),
168+
{ok, State};
169+
{error, Reason} ->
170+
lager:info("Team write failed, error: ~p", [Reason]),
171+
{error, Reason, State}
172+
end;
173+
174+
%% Read information about the team.
175+
run({team, read}, KeyGen, ValueGen, State) ->
176+
Key = integer_to_list(KeyGen()),
177+
Options = [{r,2}, {notfound_ok, true}, {timeout, 5000}],
178+
Result = riakc_pb_socket:fetch_type(State#state.pid,
179+
State#state.bucket,
180+
Key,
181+
Options),
182+
case Result of
183+
{ok, _} ->
184+
lager:info("Team read succeeded."),
185+
{ok, State};
186+
{error, {notfound, _}} ->
187+
lager:info("Team does not exist yet."),
188+
run({team, write}, KeyGen, ValueGen, State);
189+
{error, Reason} ->
190+
lager:info("Team read failed, error: ~p", [Reason]),
191+
{error, Reason, State}
192+
end;
193+
194+
%% Remove a player from the team.
195+
run({team, player, removal}, KeyGen, ValueGen, State) ->
196+
Key = integer_to_list(KeyGen()),
197+
Options = [{r,2}, {notfound_ok, true}, {timeout, 5000}],
198+
Result = riakc_pb_socket:fetch_type(State#state.pid,
199+
State#state.bucket,
200+
Key,
201+
Options),
202+
case Result of
203+
{ok, M0} ->
204+
M = riakc_map:value(M0),
205+
Members = proplists:get_value({<<"members">>, set}, M, []),
206+
case length(Members) > 0 of
207+
true ->
208+
Value = hd(Members),
209+
lager:info("Team read succeeded"),
210+
Result2 = riakc_pb_socket:modify_type(State#state.pid,
211+
fun(M2) ->
212+
riakc_map:update(
213+
{<<"members">>, set},
214+
fun(R) ->
215+
riakc_set:del_element(
216+
Value, R)
217+
end, M2)
218+
end,
219+
State#state.bucket, Key, [create]),
220+
case Result2 of
221+
ok ->
222+
lager:info("Team player removal succeeded."),
223+
{ok, State};
224+
{ok, _} ->
225+
lager:info("Team player removal succeeded."),
226+
{ok, State};
227+
{error, Reason} ->
228+
lager:info("Team player removal failed, error: ~p", [Reason]),
229+
{error, Reason, State}
230+
end;
231+
false ->
232+
lager:info("Team player removal success, no members."),
233+
{ok, State}
234+
end;
235+
{error, {notfound, _}} ->
236+
lager:info("Team does not exist yet."),
237+
run({team, write}, KeyGen, ValueGen, State);
238+
{error, Reason} ->
239+
lager:info("Team read failed, error: ~p", [Reason]),
240+
{error, Reason, State}
241+
end;
242+
243+
%% Add a player to the team.
244+
run({team, player, addition}, KeyGen, ValueGen, State) ->
245+
Key = integer_to_list(KeyGen()),
246+
Value = "Team member " ++ integer_to_list(ValueGen()),
247+
Result = riakc_pb_socket:modify_type(State#state.pid,
248+
fun(M) ->
249+
riakc_map:update(
250+
{<<"members">>, set},
251+
fun(S) ->
252+
riakc_set:add_element(list_to_binary(Value), S)
253+
end, M)
254+
end,
255+
State#state.bucket, Key, [create]),
256+
case Result of
257+
ok ->
258+
lager:info("Team player addition succeeded."),
259+
{ok, State};
260+
{ok, _} ->
261+
lager:info("Team player addition succeeded."),
262+
{ok, State};
263+
{error, Reason} ->
264+
lager:info("Team player addition failed, error: ~p", [Reason]),
265+
{error, Reason, State}
266+
end;
267+
268+
%% Mark a game as completed.
269+
run({game, completed}, KeyGen, ValueGen, State) ->
270+
Key = integer_to_list(KeyGen()),
271+
Value = ValueGen(),
272+
Result = riakc_pb_socket:modify_type(State#state.pid,
273+
fun(M) ->
274+
riakc_map:update(
275+
{<<"score">>, counter},
276+
fun(C) ->
277+
riakc_counter:increment(Value, C)
278+
end, M)
279+
end,
280+
State#state.bucket, Key, [create]),
281+
case Result of
282+
ok ->
283+
lager:info("Score change succeeded."),
284+
{ok, State};
285+
{ok, _} ->
286+
lager:info("Score change succeeded."),
287+
{ok, State};
288+
{error, Reason} ->
289+
lager:info("Score change failed, error: ~p", [Reason]),
290+
{error, Reason, State}
291+
end;
292+
149293
run(get, KeyGen, _ValueGen, State) ->
150294
Key = KeyGen(),
151295
case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,

0 commit comments

Comments
 (0)