Skip to content
This repository has been archived by the owner on Sep 19, 2019. It is now read-only.

Commit

Permalink
Add unit tests for mem3_shards
Browse files Browse the repository at this point in the history
COUCHDB-3376
  • Loading branch information
nickva authored and jaydoane committed Apr 22, 2017
1 parent 9c9670b commit 82de842
Showing 1 changed file with 212 additions and 8 deletions.
220 changes: 212 additions & 8 deletions src/mem3_shards.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,39 +440,243 @@ cache_clear(St) ->
true = ets:delete_all_objects(?ATIMES),
St#st{cur_size=0}.

maybe_spawn_shard_writer(DbName, Shards) ->
maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) ->
case ets:member(?OPENERS, DbName) of
true ->
ignore;
false ->
spawn_shard_writer(DbName, Shards)
spawn_shard_writer(DbName, Shards, IdleTimeout)
end.

spawn_shard_writer(DbName, Shards) ->
erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
spawn_shard_writer(DbName, Shards, IdleTimeout) ->
erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end).

shard_writer(DbName, Shards) ->
shard_writer(DbName, Shards, IdleTimeout) ->
try
receive
write ->
true = ets:insert(?SHARDS, Shards);
cancel ->
ok
after ?WRITE_IDLE_TIMEOUT ->
after IdleTimeout ->
ok
end
after
true = ets:delete_object(?OPENERS, {DbName, self()})
end.

flush_write(DbName, Writer) ->
flush_write(DbName, Writer, WriteTimeout) ->
Ref = erlang:monitor(process, Writer),
Writer ! write,
receive
{'DOWN', Ref, _, _, normal} ->
ok;
{'DOWN', Ref, _, _, Error} ->
erlang:exit({mem3_shards_bad_write, Error})
after ?WRITE_TIMEOUT ->
after WriteTimeout ->
erlang:exit({mem3_shards_write_timeout, DbName})
end.


-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").

-define(DB, <<"eunit_db_name">>).
-define(INFINITY, 99999999).


mem3_shards_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
t_maybe_spawn_shard_writer_already_exists(),
t_maybe_spawn_shard_writer_new(),
t_flush_writer_exists_normal(),
t_flush_writer_times_out(),
t_flush_writer_crashes(),
t_writer_deletes_itself_when_done(),
t_writer_does_not_delete_other_writers_for_same_shard(),
t_spawn_writer_in_load_shards_from_db(),
t_cache_insert_takes_new_update(),
t_cache_insert_ignores_stale_update_and_kills_worker()
]
}.


setup() ->
ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]),
ets:new(?OPENERS, [bag, public, named_table]),
ets:new(?DBS, [set, public, named_table]),
ets:new(?ATIMES, [ordered_set, public, named_table]),
meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
ok.


teardown(_) ->
meck:unload(),
ets:delete(?ATIMES),
ets:delete(?DBS),
ets:delete(?OPENERS),
ets:delete(?SHARDS).


t_maybe_spawn_shard_writer_already_exists() ->
?_test(begin
ets:insert(?OPENERS, {?DB, self()}),
Shards = mock_shards(),
WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY),
?assertEqual(ignore, WRes)
end).


t_maybe_spawn_shard_writer_new() ->
?_test(begin
Shards = mock_shards(),
WPid = maybe_spawn_shard_writer(?DB, Shards, 1000),
WRef = erlang:monitor(process, WPid),
?assert(is_pid(WPid)),
?assert(is_process_alive(WPid)),
WPid ! write,
?assertEqual(normal, wait_writer_result(WRef)),
?assertEqual(Shards, ets:tab2list(?SHARDS))
end).


t_flush_writer_exists_normal() ->
?_test(begin
Shards = mock_shards(),
WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)),
?assertEqual(Shards, ets:tab2list(?SHARDS))
end).


t_flush_writer_times_out() ->
?_test(begin
WPid = spawn(fun() -> receive will_never_receive_this -> ok end end),
Error = {mem3_shards_write_timeout, ?DB},
?assertExit(Error, flush_write(?DB, WPid, 100)),
exit(WPid, kill)
end).


t_flush_writer_crashes() ->
?_test(begin
WPid = spawn(fun() -> receive write -> exit('kapow!') end end),
Error = {mem3_shards_bad_write, 'kapow!'},
?assertExit(Error, flush_write(?DB, WPid, 1000))
end).


t_writer_deletes_itself_when_done() ->
?_test(begin
Shards = mock_shards(),
WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
WRef = erlang:monitor(process, WPid),
ets:insert(?OPENERS, {?DB, WPid}),
WPid ! write,
?assertEqual(normal, wait_writer_result(WRef)),
?assertEqual(Shards, ets:tab2list(?SHARDS)),
?assertEqual([], ets:tab2list(?OPENERS))
end).


t_writer_does_not_delete_other_writers_for_same_shard() ->
?_test(begin
Shards = mock_shards(),
WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
WRef = erlang:monitor(process, WPid),
ets:insert(?OPENERS, {?DB, WPid}),
ets:insert(?OPENERS, {?DB, self()}), % should not be deleted
WPid ! write,
?assertEqual(normal, wait_writer_result(WRef)),
?assertEqual(Shards, ets:tab2list(?SHARDS)),
?assertEqual(1, ets:info(?OPENERS, size)),
?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS))
end).


t_spawn_writer_in_load_shards_from_db() ->
?_test(begin
meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}),
meck:expect(couch_db, get_update_seq, 1, 1),
meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
erlang:register(?MODULE, self()), % register to get cache_insert cast
load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
meck:validate(couch_db),
meck:validate(mem3_util),
Cast = receive
{'$gen_cast', Msg} -> Msg
after 1000 ->
timeout
end,
?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast),
{cache_insert, _, WPid, _} = Cast,
exit(WPid, kill),
?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS))
end).


t_cache_insert_takes_new_update() ->
?_test(begin
Shards = mock_shards(),
WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
Msg = {cache_insert, ?DB, WPid, 2},
{noreply, NewState} = handle_cast(Msg, mock_state(1)),
?assertMatch(#st{cur_size = 1}, NewState),
?assertEqual(Shards, ets:tab2list(?SHARDS)),
?assertEqual([], ets:tab2list(?OPENERS))
end).


t_cache_insert_ignores_stale_update_and_kills_worker() ->
?_test(begin
Shards = mock_shards(),
WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
WRef = erlang:monitor(process, WPid),
Msg = {cache_insert, ?DB, WPid, 1},
{noreply, NewState} = handle_cast(Msg, mock_state(2)),
?assertEqual(normal, wait_writer_result(WRef)),
?assertMatch(#st{cur_size = 0}, NewState),
?assertEqual([], ets:tab2list(?SHARDS)),
?assertEqual([], ets:tab2list(?OPENERS))
end).


mock_state(UpdateSeq) ->
#st{
update_seq = UpdateSeq,
changes_pid = self(),
write_timeout = 1000
}.


mock_shards() ->
[
#ordered_shard{
name = <<"testshardname">>,
node = node(),
dbname = ?DB,
range = [0,1],
order = 1
}
].


wait_writer_result(WRef) ->
receive
{'DOWN', WRef, _, _, Result} ->
Result
after 1000 ->
timeout
end.


spawn_link_mock_writer(Db, Shards, Timeout) ->
erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end).


-endif.

0 comments on commit 82de842

Please sign in to comment.