diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-04-25 16:06:40 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-25 16:06:40 -0500 |
commit | a3ec4fef1a68628ea7fba161399cb46aea7280f6 (patch) | |
tree | 4e999f926ca2f5247a1587299cb7ffc5a5f8801b /src | |
parent | e5550fbd9ad3ee97cd90b01bd8162e38bd3f9299 (diff) | |
parent | c1c6891aafac548314c8eb610b8e63f1997b107c (diff) | |
download | couchdb-a3ec4fef1a68628ea7fba161399cb46aea7280f6.tar.gz |
Merge pull request #476 from apache/COUCHDB-3376-fix-mem3-shards
Couchdb 3376 fix mem3 shards
Diffstat (limited to 'src')
-rw-r--r-- | src/mem3/src/mem3_shards.erl | 340 |
1 files changed, 325 insertions, 15 deletions
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index c7f33c61f..3c2001b1b 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -27,7 +27,9 @@ -record(st, { max_size = 25000, cur_size = 0, - changes_pid + changes_pid, + update_seq, + write_timeout }). -include_lib("mem3/include/mem3.hrl"). @@ -36,6 +38,7 @@ -define(DBS, mem3_dbs). -define(SHARDS, mem3_shards). -define(ATIMES, mem3_atimes). +-define(OPENERS, mem3_openers). -define(RELISTEN_DELAY, 5000). start_link() -> @@ -171,6 +174,13 @@ handle_config_change("mem3", "shard_cache_size", SizeList, _, _) -> {ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)}; handle_config_change("mem3", "shards_db", _DbName, _, _) -> {ok, gen_server:call(?MODULE, shard_db_changed, infinity)}; +handle_config_change("mem3", "shard_write_timeout", Timeout, _, _) -> + Timeout = try + list_to_integer(Timeout) + catch _:_ -> + 1000 + end, + {ok, gen_server:call(?MODULE, {set_write_timeout, Timeout})}; handle_config_change(_, _, _, _, _) -> {ok, nil}. @@ -182,20 +192,24 @@ handle_config_terminate(_Server, _Reason, _State) -> init([]) -> ets:new(?SHARDS, [ bag, - protected, + public, named_table, {keypos,#shard.dbname}, {read_concurrency, true} ]), ets:new(?DBS, [set, protected, named_table]), ets:new(?ATIMES, [ordered_set, protected, named_table]), + ets:new(?OPENERS, [bag, public, named_table]), ok = config:listen_for_changes(?MODULE, nil), SizeList = config:get("mem3", "shard_cache_size", "25000"), - {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end), + WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000), + UpdateSeq = get_update_seq(), {ok, #st{ max_size = list_to_integer(SizeList), cur_size = 0, - changes_pid = Pid + changes_pid = start_changes_listener(UpdateSeq), + update_seq = UpdateSeq, + write_timeout = WriteTimeout }}. handle_call({set_max_size, Size}, _From, St) -> @@ -203,6 +217,8 @@ handle_call({set_max_size, Size}, _From, St) -> handle_call(shard_db_changed, _From, St) -> exit(St#st.changes_pid, shard_db_changed), {reply, ok, St}; +handle_call({set_write_timeout, Timeout}, _From, St) -> + {reply, ok, St#st{write_timeout = Timeout}}; handle_call(_Call, _From, St) -> {noreply, St}. @@ -210,12 +226,30 @@ handle_cast({cache_hit, DbName}, St) -> couch_stats:increment_counter([mem3, shard_cache, hit]), cache_hit(DbName), {noreply, St}; -handle_cast({cache_insert, DbName, Shards}, St) -> - couch_stats:increment_counter([mem3, shard_cache, miss]), - {noreply, cache_free(cache_insert(St, DbName, Shards))}; +handle_cast({cache_insert, DbName, Writer, UpdateSeq}, St) -> + % This comparison correctly uses the `<` operator + % and not `=<`. The easiest way to understand why is + % to think of when a _dbs db doesn't change. If it used + % `=<` it would be impossible to insert anything into + % the cache. + NewSt = case UpdateSeq < St#st.update_seq of + true -> + Writer ! cancel, + St; + false -> + cache_free(cache_insert(St, DbName, Writer, St#st.write_timeout)) + end, + {noreply, NewSt}; handle_cast({cache_remove, DbName}, St) -> couch_stats:increment_counter([mem3, shard_cache, eviction]), {noreply, cache_remove(St, DbName)}; +handle_cast({cache_insert_change, DbName, Writer, UpdateSeq}, St) -> + Msg = {cache_insert, DbName, Writer, UpdateSeq}, + {noreply, NewSt} = handle_cast(Msg, St), + {noreply, NewSt#st{update_seq = UpdateSeq}}; +handle_cast({cache_remove_change, DbName, UpdateSeq}, St) -> + {noreply, NewSt} = handle_cast({cache_remove, DbName}, St), + {noreply, NewSt#st{update_seq = UpdateSeq}}; handle_cast(_Msg, St) -> {noreply, St}. @@ -232,8 +266,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) -> erlang:send_after(5000, self(), {start_listener, Seq}), {noreply, NewSt#st{changes_pid=undefined}}; handle_info({start_listener, Seq}, St) -> - {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, St#st{changes_pid=NewPid}}; + {noreply, St#st{ + changes_pid = start_changes_listener(Seq) + }}; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, nil), {noreply, State}; @@ -249,6 +284,21 @@ code_change(_OldVsn, #st{}=St, _Extra) -> %% internal functions +start_changes_listener(SinceSeq) -> + Self = self(), + {Pid, _} = erlang:spawn_monitor(fun() -> + erlang:spawn_link(fun() -> + Ref = erlang:monitor(process, Self), + receive + {'DOWN', Ref, _, _, _} -> + ok + end, + exit(shutdown) + end), + listen_for_changes(SinceSeq) + end), + Pid. + fold_fun(#full_doc_info{}=FDI, _, Acc) -> DI = couch_doc:to_doc_info(FDI), fold_fun(DI, nil, Acc); @@ -287,10 +337,11 @@ changes_callback({stop, EndSeq}, _) -> exit({seq, EndSeq}); changes_callback({change, {Change}, _}, _) -> DbName = couch_util:get_value(<<"id">>, Change), + Seq = couch_util:get_value(<<"seq">>, Change), case DbName of <<"_design/", _/binary>> -> ok; _Else -> case mem3_util:is_deleted(Change) of true -> - gen_server:cast(?MODULE, {cache_remove, DbName}); + gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq}); false -> case couch_util:get_value(doc, Change) of {error, Reason} -> @@ -298,17 +349,23 @@ changes_callback({change, {Change}, _}, _) -> [DbName, Reason]); {Doc} -> Shards = mem3_util:build_ordered_shards(DbName, Doc), - gen_server:cast(?MODULE, {cache_insert, DbName, Shards}), + IdleTimeout = config:get_integer( + "mem3", "writer_idle_timeout", 30000), + Writer = spawn_shard_writer(DbName, Shards, IdleTimeout), + ets:insert(?OPENERS, {DbName, Writer}), + Msg = {cache_insert_change, DbName, Writer, Seq}, + gen_server:cast(?MODULE, Msg), [create_if_missing(mem3:name(S)) || S <- Shards, mem3:node(S) =:= node()] end end end, - {ok, couch_util:get_value(<<"seq">>, Change)}; + {ok, Seq}; changes_callback(timeout, _) -> ok. load_shards_from_disk(DbName) when is_binary(DbName) -> + couch_stats:increment_counter([mem3, shard_cache, miss]), X = ?l2b(config:get("mem3", "shards_db", "_dbs")), {ok, Db} = mem3_util:ensure_exists(X), try @@ -320,8 +377,21 @@ load_shards_from_disk(DbName) when is_binary(DbName) -> load_shards_from_db(#db{} = ShardDb, DbName) -> case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> + Seq = couch_db:get_update_seq(ShardDb), Shards = mem3_util:build_ordered_shards(DbName, Props), - gen_server:cast(?MODULE, {cache_insert, DbName, Shards}), + IdleTimeout = config:get_integer("mem3", "writer_idle_timeout", 30000), + case maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) of + Writer when is_pid(Writer) -> + case ets:insert_new(?OPENERS, {DbName, Writer}) of + true -> + Msg = {cache_insert, DbName, Writer, Seq}, + gen_server:cast(?MODULE, Msg); + false -> + Writer ! cancel + end; + ignore -> + ok + end, Shards; {not_found, _} -> erlang:error(database_does_not_exist, ?b2l(DbName)) @@ -352,10 +422,10 @@ create_if_missing(Name) -> end end. -cache_insert(#st{cur_size=Cur}=St, DbName, Shards) -> +cache_insert(#st{cur_size=Cur}=St, DbName, Writer, Timeout) -> NewATime = now(), true = ets:delete(?SHARDS, DbName), - true = ets:insert(?SHARDS, Shards), + flush_write(DbName, Writer, Timeout), case ets:lookup(?DBS, DbName) of [{DbName, ATime}] -> true = ets:delete(?ATIMES, ATime), @@ -406,6 +476,43 @@ cache_clear(St) -> true = ets:delete_all_objects(?ATIMES), St#st{cur_size=0}. +maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) -> + case ets:member(?OPENERS, DbName) of + true -> + ignore; + false -> + spawn_shard_writer(DbName, Shards, IdleTimeout) + end. + +spawn_shard_writer(DbName, Shards, IdleTimeout) -> + erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end). + +shard_writer(DbName, Shards, IdleTimeout) -> + try + receive + write -> + true = ets:insert(?SHARDS, Shards); + cancel -> + ok + after IdleTimeout -> + ok + end + after + true = ets:delete_object(?OPENERS, {DbName, self()}) + end. + +flush_write(DbName, Writer, WriteTimeout) -> + Ref = erlang:monitor(process, Writer), + Writer ! write, + receive + {'DOWN', Ref, _, _, normal} -> + ok; + {'DOWN', Ref, _, _, Error} -> + erlang:exit({mem3_shards_bad_write, Error}) + after WriteTimeout -> + erlang:exit({mem3_shards_write_timeout, DbName}) + end. + filter_shards_by_name(Name, Shards) -> filter_shards_by_name(Name, [], Shards). @@ -417,3 +524,206 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) -> filter_shards_by_name(Name, [S|Matches], Ss); filter_shards_by_name(Name, Matches, [_|Ss]) -> filter_shards_by_name(Name, Matches, Ss). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(DB, <<"eunit_db_name">>). +-define(INFINITY, 99999999). + + +mem3_shards_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_maybe_spawn_shard_writer_already_exists(), + t_maybe_spawn_shard_writer_new(), + t_flush_writer_exists_normal(), + t_flush_writer_times_out(), + t_flush_writer_crashes(), + t_writer_deletes_itself_when_done(), + t_writer_does_not_delete_other_writers_for_same_shard(), + t_spawn_writer_in_load_shards_from_db(), + t_cache_insert_takes_new_update(), + t_cache_insert_ignores_stale_update_and_kills_worker() + ] + }. + + +setup() -> + ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]), + ets:new(?OPENERS, [bag, public, named_table]), + ets:new(?DBS, [set, public, named_table]), + ets:new(?ATIMES, [ordered_set, public, named_table]), + meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"), + ok. + + +teardown(_) -> + meck:unload(), + ets:delete(?ATIMES), + ets:delete(?DBS), + ets:delete(?OPENERS), + ets:delete(?SHARDS). + + +t_maybe_spawn_shard_writer_already_exists() -> + ?_test(begin + ets:insert(?OPENERS, {?DB, self()}), + Shards = mock_shards(), + WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY), + ?assertEqual(ignore, WRes) + end). + + +t_maybe_spawn_shard_writer_new() -> + ?_test(begin + Shards = mock_shards(), + WPid = maybe_spawn_shard_writer(?DB, Shards, 1000), + WRef = erlang:monitor(process, WPid), + ?assert(is_pid(WPid)), + ?assert(is_process_alive(WPid)), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)) + end). + + +t_flush_writer_exists_normal() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)) + end). + + +t_flush_writer_times_out() -> + ?_test(begin + WPid = spawn(fun() -> receive will_never_receive_this -> ok end end), + Error = {mem3_shards_write_timeout, ?DB}, + ?assertExit(Error, flush_write(?DB, WPid, 100)), + exit(WPid, kill) + end). + + +t_flush_writer_crashes() -> + ?_test(begin + WPid = spawn(fun() -> receive write -> exit('kapow!') end end), + Error = {mem3_shards_bad_write, 'kapow!'}, + ?assertExit(Error, flush_write(?DB, WPid, 1000)) + end). + + +t_writer_deletes_itself_when_done() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + ets:insert(?OPENERS, {?DB, WPid}), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPENERS)) + end). + + +t_writer_does_not_delete_other_writers_for_same_shard() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + ets:insert(?OPENERS, {?DB, WPid}), + ets:insert(?OPENERS, {?DB, self()}), % should not be deleted + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual(1, ets:info(?OPENERS, size)), + ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS)) + end). + + +t_spawn_writer_in_load_shards_from_db() -> + ?_test(begin + meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}), + meck:expect(couch_db, get_update_seq, 1, 1), + meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), + erlang:register(?MODULE, self()), % register to get cache_insert cast + load_shards_from_db(#db{name = <<"testdb">>}, ?DB), + meck:validate(couch_db), + meck:validate(mem3_util), + Cast = receive + {'$gen_cast', Msg} -> Msg + after 1000 -> + timeout + end, + ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast), + {cache_insert, _, WPid, _} = Cast, + exit(WPid, kill), + ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS)) + end). + + +t_cache_insert_takes_new_update() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + Msg = {cache_insert, ?DB, WPid, 2}, + {noreply, NewState} = handle_cast(Msg, mock_state(1)), + ?assertMatch(#st{cur_size = 1}, NewState), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPENERS)) + end). + + +t_cache_insert_ignores_stale_update_and_kills_worker() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + Msg = {cache_insert, ?DB, WPid, 1}, + {noreply, NewState} = handle_cast(Msg, mock_state(2)), + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertMatch(#st{cur_size = 0}, NewState), + ?assertEqual([], ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPENERS)) + end). + + +mock_state(UpdateSeq) -> + #st{ + update_seq = UpdateSeq, + changes_pid = self(), + write_timeout = 1000 + }. + + +mock_shards() -> + [ + #ordered_shard{ + name = <<"testshardname">>, + node = node(), + dbname = ?DB, + range = [0,1], + order = 1 + } + ]. + + +wait_writer_result(WRef) -> + receive + {'DOWN', WRef, _, _, Result} -> + Result + after 1000 -> + timeout + end. + + +spawn_link_mock_writer(Db, Shards, Timeout) -> + erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end). + +-endif. |