summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-04-25 16:06:40 -0500
committerGitHub <noreply@github.com>2017-04-25 16:06:40 -0500
commita3ec4fef1a68628ea7fba161399cb46aea7280f6 (patch)
tree4e999f926ca2f5247a1587299cb7ffc5a5f8801b /src
parente5550fbd9ad3ee97cd90b01bd8162e38bd3f9299 (diff)
parentc1c6891aafac548314c8eb610b8e63f1997b107c (diff)
downloadcouchdb-a3ec4fef1a68628ea7fba161399cb46aea7280f6.tar.gz
Merge pull request #476 from apache/COUCHDB-3376-fix-mem3-shards
Couchdb 3376 fix mem3 shards
Diffstat (limited to 'src')
-rw-r--r--src/mem3/src/mem3_shards.erl340
1 files changed, 325 insertions, 15 deletions
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index c7f33c61f..3c2001b1b 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -27,7 +27,9 @@
-record(st, {
max_size = 25000,
cur_size = 0,
- changes_pid
+ changes_pid,
+ update_seq,
+ write_timeout
}).
-include_lib("mem3/include/mem3.hrl").
@@ -36,6 +38,7 @@
-define(DBS, mem3_dbs).
-define(SHARDS, mem3_shards).
-define(ATIMES, mem3_atimes).
+-define(OPENERS, mem3_openers).
-define(RELISTEN_DELAY, 5000).
start_link() ->
@@ -171,6 +174,13 @@ handle_config_change("mem3", "shard_cache_size", SizeList, _, _) ->
{ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)};
handle_config_change("mem3", "shards_db", _DbName, _, _) ->
{ok, gen_server:call(?MODULE, shard_db_changed, infinity)};
+handle_config_change("mem3", "shard_write_timeout", Timeout, _, _) ->
+ Timeout = try
+ list_to_integer(Timeout)
+ catch _:_ ->
+ 1000
+ end,
+ {ok, gen_server:call(?MODULE, {set_write_timeout, Timeout})};
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
@@ -182,20 +192,24 @@ handle_config_terminate(_Server, _Reason, _State) ->
init([]) ->
ets:new(?SHARDS, [
bag,
- protected,
+ public,
named_table,
{keypos,#shard.dbname},
{read_concurrency, true}
]),
ets:new(?DBS, [set, protected, named_table]),
ets:new(?ATIMES, [ordered_set, protected, named_table]),
+ ets:new(?OPENERS, [bag, public, named_table]),
ok = config:listen_for_changes(?MODULE, nil),
SizeList = config:get("mem3", "shard_cache_size", "25000"),
- {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end),
+ WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000),
+ UpdateSeq = get_update_seq(),
{ok, #st{
max_size = list_to_integer(SizeList),
cur_size = 0,
- changes_pid = Pid
+ changes_pid = start_changes_listener(UpdateSeq),
+ update_seq = UpdateSeq,
+ write_timeout = WriteTimeout
}}.
handle_call({set_max_size, Size}, _From, St) ->
@@ -203,6 +217,8 @@ handle_call({set_max_size, Size}, _From, St) ->
handle_call(shard_db_changed, _From, St) ->
exit(St#st.changes_pid, shard_db_changed),
{reply, ok, St};
+handle_call({set_write_timeout, Timeout}, _From, St) ->
+ {reply, ok, St#st{write_timeout = Timeout}};
handle_call(_Call, _From, St) ->
{noreply, St}.
@@ -210,12 +226,30 @@ handle_cast({cache_hit, DbName}, St) ->
couch_stats:increment_counter([mem3, shard_cache, hit]),
cache_hit(DbName),
{noreply, St};
-handle_cast({cache_insert, DbName, Shards}, St) ->
- couch_stats:increment_counter([mem3, shard_cache, miss]),
- {noreply, cache_free(cache_insert(St, DbName, Shards))};
+handle_cast({cache_insert, DbName, Writer, UpdateSeq}, St) ->
+ % This comparison correctly uses the `<` operator
+ % and not `=<`. The easiest way to understand why is
+ % to think of when a _dbs db doesn't change. If it used
+ % `=<` it would be impossible to insert anything into
+ % the cache.
+ NewSt = case UpdateSeq < St#st.update_seq of
+ true ->
+ Writer ! cancel,
+ St;
+ false ->
+ cache_free(cache_insert(St, DbName, Writer, St#st.write_timeout))
+ end,
+ {noreply, NewSt};
handle_cast({cache_remove, DbName}, St) ->
couch_stats:increment_counter([mem3, shard_cache, eviction]),
{noreply, cache_remove(St, DbName)};
+handle_cast({cache_insert_change, DbName, Writer, UpdateSeq}, St) ->
+ Msg = {cache_insert, DbName, Writer, UpdateSeq},
+ {noreply, NewSt} = handle_cast(Msg, St),
+ {noreply, NewSt#st{update_seq = UpdateSeq}};
+handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
+ {noreply, NewSt} = handle_cast({cache_remove, DbName}, St),
+ {noreply, NewSt#st{update_seq = UpdateSeq}};
handle_cast(_Msg, St) ->
{noreply, St}.
@@ -232,8 +266,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
erlang:send_after(5000, self(), {start_listener, Seq}),
{noreply, NewSt#st{changes_pid=undefined}};
handle_info({start_listener, Seq}, St) ->
- {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, St#st{changes_pid=NewPid}};
+ {noreply, St#st{
+ changes_pid = start_changes_listener(Seq)
+ }};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
@@ -249,6 +284,21 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
%% internal functions
+start_changes_listener(SinceSeq) ->
+ Self = self(),
+ {Pid, _} = erlang:spawn_monitor(fun() ->
+ erlang:spawn_link(fun() ->
+ Ref = erlang:monitor(process, Self),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ exit(shutdown)
+ end),
+ listen_for_changes(SinceSeq)
+ end),
+ Pid.
+
fold_fun(#full_doc_info{}=FDI, _, Acc) ->
DI = couch_doc:to_doc_info(FDI),
fold_fun(DI, nil, Acc);
@@ -287,10 +337,11 @@ changes_callback({stop, EndSeq}, _) ->
exit({seq, EndSeq});
changes_callback({change, {Change}, _}, _) ->
DbName = couch_util:get_value(<<"id">>, Change),
+ Seq = couch_util:get_value(<<"seq">>, Change),
case DbName of <<"_design/", _/binary>> -> ok; _Else ->
case mem3_util:is_deleted(Change) of
true ->
- gen_server:cast(?MODULE, {cache_remove, DbName});
+ gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq});
false ->
case couch_util:get_value(doc, Change) of
{error, Reason} ->
@@ -298,17 +349,23 @@ changes_callback({change, {Change}, _}, _) ->
[DbName, Reason]);
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
- gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+ IdleTimeout = config:get_integer(
+ "mem3", "writer_idle_timeout", 30000),
+ Writer = spawn_shard_writer(DbName, Shards, IdleTimeout),
+ ets:insert(?OPENERS, {DbName, Writer}),
+ Msg = {cache_insert_change, DbName, Writer, Seq},
+ gen_server:cast(?MODULE, Msg),
[create_if_missing(mem3:name(S)) || S
<- Shards, mem3:node(S) =:= node()]
end
end
end,
- {ok, couch_util:get_value(<<"seq">>, Change)};
+ {ok, Seq};
changes_callback(timeout, _) ->
ok.
load_shards_from_disk(DbName) when is_binary(DbName) ->
+ couch_stats:increment_counter([mem3, shard_cache, miss]),
X = ?l2b(config:get("mem3", "shards_db", "_dbs")),
{ok, Db} = mem3_util:ensure_exists(X),
try
@@ -320,8 +377,21 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
load_shards_from_db(#db{} = ShardDb, DbName) ->
case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
{ok, #doc{body = {Props}}} ->
+ Seq = couch_db:get_update_seq(ShardDb),
Shards = mem3_util:build_ordered_shards(DbName, Props),
- gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+ IdleTimeout = config:get_integer("mem3", "writer_idle_timeout", 30000),
+ case maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) of
+ Writer when is_pid(Writer) ->
+ case ets:insert_new(?OPENERS, {DbName, Writer}) of
+ true ->
+ Msg = {cache_insert, DbName, Writer, Seq},
+ gen_server:cast(?MODULE, Msg);
+ false ->
+ Writer ! cancel
+ end;
+ ignore ->
+ ok
+ end,
Shards;
{not_found, _} ->
erlang:error(database_does_not_exist, ?b2l(DbName))
@@ -352,10 +422,10 @@ create_if_missing(Name) ->
end
end.
-cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
+cache_insert(#st{cur_size=Cur}=St, DbName, Writer, Timeout) ->
NewATime = now(),
true = ets:delete(?SHARDS, DbName),
- true = ets:insert(?SHARDS, Shards),
+ flush_write(DbName, Writer, Timeout),
case ets:lookup(?DBS, DbName) of
[{DbName, ATime}] ->
true = ets:delete(?ATIMES, ATime),
@@ -406,6 +476,43 @@ cache_clear(St) ->
true = ets:delete_all_objects(?ATIMES),
St#st{cur_size=0}.
+maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) ->
+ case ets:member(?OPENERS, DbName) of
+ true ->
+ ignore;
+ false ->
+ spawn_shard_writer(DbName, Shards, IdleTimeout)
+ end.
+
+spawn_shard_writer(DbName, Shards, IdleTimeout) ->
+ erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end).
+
+shard_writer(DbName, Shards, IdleTimeout) ->
+ try
+ receive
+ write ->
+ true = ets:insert(?SHARDS, Shards);
+ cancel ->
+ ok
+ after IdleTimeout ->
+ ok
+ end
+ after
+ true = ets:delete_object(?OPENERS, {DbName, self()})
+ end.
+
+flush_write(DbName, Writer, WriteTimeout) ->
+ Ref = erlang:monitor(process, Writer),
+ Writer ! write,
+ receive
+ {'DOWN', Ref, _, _, normal} ->
+ ok;
+ {'DOWN', Ref, _, _, Error} ->
+ erlang:exit({mem3_shards_bad_write, Error})
+ after WriteTimeout ->
+ erlang:exit({mem3_shards_write_timeout, DbName})
+ end.
+
filter_shards_by_name(Name, Shards) ->
filter_shards_by_name(Name, [], Shards).
@@ -417,3 +524,206 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
filter_shards_by_name(Name, [S|Matches], Ss);
filter_shards_by_name(Name, Matches, [_|Ss]) ->
filter_shards_by_name(Name, Matches, Ss).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"eunit_db_name">>).
+-define(INFINITY, 99999999).
+
+
+mem3_shards_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_maybe_spawn_shard_writer_already_exists(),
+ t_maybe_spawn_shard_writer_new(),
+ t_flush_writer_exists_normal(),
+ t_flush_writer_times_out(),
+ t_flush_writer_crashes(),
+ t_writer_deletes_itself_when_done(),
+ t_writer_does_not_delete_other_writers_for_same_shard(),
+ t_spawn_writer_in_load_shards_from_db(),
+ t_cache_insert_takes_new_update(),
+ t_cache_insert_ignores_stale_update_and_kills_worker()
+ ]
+ }.
+
+
+setup() ->
+ ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]),
+ ets:new(?OPENERS, [bag, public, named_table]),
+ ets:new(?DBS, [set, public, named_table]),
+ ets:new(?ATIMES, [ordered_set, public, named_table]),
+ meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+ ok.
+
+
+teardown(_) ->
+ meck:unload(),
+ ets:delete(?ATIMES),
+ ets:delete(?DBS),
+ ets:delete(?OPENERS),
+ ets:delete(?SHARDS).
+
+
+t_maybe_spawn_shard_writer_already_exists() ->
+ ?_test(begin
+ ets:insert(?OPENERS, {?DB, self()}),
+ Shards = mock_shards(),
+ WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY),
+ ?assertEqual(ignore, WRes)
+ end).
+
+
+t_maybe_spawn_shard_writer_new() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = maybe_spawn_shard_writer(?DB, Shards, 1000),
+ WRef = erlang:monitor(process, WPid),
+ ?assert(is_pid(WPid)),
+ ?assert(is_process_alive(WPid)),
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS))
+ end).
+
+
+t_flush_writer_exists_normal() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS))
+ end).
+
+
+t_flush_writer_times_out() ->
+ ?_test(begin
+ WPid = spawn(fun() -> receive will_never_receive_this -> ok end end),
+ Error = {mem3_shards_write_timeout, ?DB},
+ ?assertExit(Error, flush_write(?DB, WPid, 100)),
+ exit(WPid, kill)
+ end).
+
+
+t_flush_writer_crashes() ->
+ ?_test(begin
+ WPid = spawn(fun() -> receive write -> exit('kapow!') end end),
+ Error = {mem3_shards_bad_write, 'kapow!'},
+ ?assertExit(Error, flush_write(?DB, WPid, 1000))
+ end).
+
+
+t_writer_deletes_itself_when_done() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ ets:insert(?OPENERS, {?DB, WPid}),
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+t_writer_does_not_delete_other_writers_for_same_shard() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ ets:insert(?OPENERS, {?DB, WPid}),
+ ets:insert(?OPENERS, {?DB, self()}), % should not be deleted
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+ ?assertEqual(1, ets:info(?OPENERS, size)),
+ ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS))
+ end).
+
+
+t_spawn_writer_in_load_shards_from_db() ->
+ ?_test(begin
+ meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}),
+ meck:expect(couch_db, get_update_seq, 1, 1),
+ meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
+ erlang:register(?MODULE, self()), % register to get cache_insert cast
+ load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
+ meck:validate(couch_db),
+ meck:validate(mem3_util),
+ Cast = receive
+ {'$gen_cast', Msg} -> Msg
+ after 1000 ->
+ timeout
+ end,
+ ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast),
+ {cache_insert, _, WPid, _} = Cast,
+ exit(WPid, kill),
+ ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS))
+ end).
+
+
+t_cache_insert_takes_new_update() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ Msg = {cache_insert, ?DB, WPid, 2},
+ {noreply, NewState} = handle_cast(Msg, mock_state(1)),
+ ?assertMatch(#st{cur_size = 1}, NewState),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+t_cache_insert_ignores_stale_update_and_kills_worker() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ Msg = {cache_insert, ?DB, WPid, 1},
+ {noreply, NewState} = handle_cast(Msg, mock_state(2)),
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertMatch(#st{cur_size = 0}, NewState),
+ ?assertEqual([], ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+mock_state(UpdateSeq) ->
+ #st{
+ update_seq = UpdateSeq,
+ changes_pid = self(),
+ write_timeout = 1000
+ }.
+
+
+mock_shards() ->
+ [
+ #ordered_shard{
+ name = <<"testshardname">>,
+ node = node(),
+ dbname = ?DB,
+ range = [0,1],
+ order = 1
+ }
+ ].
+
+
+wait_writer_result(WRef) ->
+ receive
+ {'DOWN', WRef, _, _, Result} ->
+ Result
+ after 1000 ->
+ timeout
+ end.
+
+
+spawn_link_mock_writer(Db, Shards, Timeout) ->
+ erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end).
+
+-endif.