diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2014-02-04 17:39:38 -0600 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2014-02-04 17:39:41 -0600 |
commit | de4ff66d4eb56ae6eeba4508a220e55a6fdf92c0 (patch) | |
tree | efbf9fcc7b44a4cee05ea47a6a0316f249d4feb4 /src/couch_index | |
parent | ed98610c5f27e5ea7e7528c081e1e7b54330e221 (diff) | |
download | couchdb-de4ff66d4eb56ae6eeba4508a220e55a6fdf92c0.tar.gz |
Remove src/couch_index
Diffstat (limited to 'src/couch_index')
-rw-r--r-- | src/couch_index/src/couch_index.app.src | 22 | ||||
-rw-r--r-- | src/couch_index/src/couch_index.erl | 364 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_api.erl | 54 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_compactor.erl | 114 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_server.erl | 266 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_updater.erl | 211 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_util.erl | 81 |
7 files changed, 0 insertions, 1112 deletions
diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src deleted file mode 100644 index 594589d5e..000000000 --- a/src/couch_index/src/couch_index.app.src +++ /dev/null @@ -1,22 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -{application, couch_index, [ - {description, "CouchDB Secondary Index Manager"}, - {vsn, git}, - {modules, [ - couch_index, - couch_index_server - ]}, - {registered, [couch_index_server]}, - {applications, [kernel, stdlib]} -]}. diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl deleted file mode 100644 index 3253a32b2..000000000 --- a/src/couch_index/src/couch_index.erl +++ /dev/null @@ -1,364 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index). --behaviour(gen_server). --behaviour(config_listener). - -%% API --export([start_link/1, stop/1, get_state/2, get_info/1]). --export([trigger_update/2]). --export([compact/1, compact/2]). - -%% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - -% config_listener api --export([handle_config_change/5]). - - --include_lib("couch/include/couch_db.hrl"). - - --record(st, { - mod, - idx_state, - updater, - compactor, - waiters=[], - commit_delay, - committed=true, - shutdown=false -}). - - -start_link({Module, IdxState}) -> - proc_lib:start_link(?MODULE, init, [{Module, IdxState}]). - - -stop(Pid) -> - gen_server:cast(Pid, stop). - - -get_state(Pid, RequestSeq) -> - gen_server:call(Pid, {get_state, RequestSeq}, infinity). - - -get_info(Pid) -> - gen_server:call(Pid, get_info). - - -trigger_update(Pid, UpdateSeq) -> - gen_server:cast(Pid, {trigger_update, UpdateSeq}). - - -compact(Pid) -> - compact(Pid, []). - - -compact(Pid, Options) -> - {ok, CPid} = gen_server:call(Pid, compact), - case lists:member(monitor, Options) of - true -> {ok, erlang:monitor(process, CPid)}; - false -> ok - end. - - -init({Mod, IdxState}) -> - ok = config:listen_for_changes(?MODULE, nil), - DbName = Mod:get(db_name, IdxState), - Resp = couch_util:with_db(DbName, fun(Db) -> - case Mod:open(Db, IdxState) of - {ok, IdxSt} -> - couch_db:monitor(Db), - {ok, IdxSt}; - Error -> - Error - end - end), - case Resp of - {ok, NewIdxState} -> - {ok, UPid} = couch_index_updater:start_link(self(), Mod), - {ok, CPid} = couch_index_compactor:start_link(self(), Mod), - Delay = config:get("query_server_config", "commit_freq", "5"), - MsDelay = 1000 * list_to_integer(Delay), - State = #st{ - mod=Mod, - idx_state=NewIdxState, - updater=UPid, - compactor=CPid, - commit_delay=MsDelay - }, - Args = [ - Mod:get(db_name, IdxState), - Mod:get(idx_name, IdxState), - couch_index_util:hexsig(Mod:get(signature, IdxState)) - ], - ?LOG_INFO("Opening index for db: ~s idx: ~s sig: ~p", Args), - proc_lib:init_ack({ok, self()}), - gen_server:enter_loop(?MODULE, [], State); - Other -> - proc_lib:init_ack(Other) - end. - - -terminate(Reason, State) -> - #st{mod=Mod, idx_state=IdxState}=State, - Mod:close(IdxState), - send_all(State#st.waiters, Reason), - couch_util:shutdown_sync(State#st.updater), - couch_util:shutdown_sync(State#st.compactor), - Args = [ - Mod:get(db_name, IdxState), - Mod:get(idx_name, IdxState), - couch_index_util:hexsig(Mod:get(signature, IdxState)), - Reason - ], - ?LOG_INFO("Closing index for db: ~s idx: ~s sig: ~p~nreason: ~p", Args), - ok. - - -handle_call({get_state, ReqSeq}, From, State) -> - #st{ - mod=Mod, - idx_state=IdxState, - waiters=Waiters - } = State, - IdxSeq = Mod:get(update_seq, IdxState), - case ReqSeq =< IdxSeq of - true -> - {reply, {ok, IdxState}, State}; - _ -> % View update required - couch_index_updater:run(State#st.updater, IdxState), - Waiters2 = [{From, ReqSeq} | Waiters], - {noreply, State#st{waiters=Waiters2}, infinity} - end; -handle_call(get_info, _From, State) -> - #st{mod=Mod} = State, - {ok, Info0} = Mod:get(info, State#st.idx_state), - IsUpdating = couch_index_updater:is_running(State#st.updater), - IsCompacting = couch_index_compactor:is_running(State#st.compactor), - Info = Info0 ++ [ - {updater_running, IsUpdating}, - {compact_running, IsCompacting}, - {waiting_commit, State#st.committed == false}, - {waiting_clients, length(State#st.waiters)} - ], - {reply, {ok, Info}, State}; -handle_call(reset, _From, State) -> - #st{ - mod=Mod, - idx_state=IdxState - } = State, - {ok, NewIdxState} = Mod:reset(IdxState), - {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}}; -handle_call(compact, _From, State) -> - Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state), - {reply, Resp, State}; -handle_call(get_compactor_pid, _From, State) -> - {reply, {ok, State#st.compactor}, State}; -handle_call({compacted, NewIdxState}, _From, State) -> - #st{ - mod=Mod, - idx_state=OldIdxState, - updater=Updater, - commit_delay=Delay - } = State, - assert_signature_match(Mod, OldIdxState, NewIdxState), - NewSeq = Mod:get(update_seq, NewIdxState), - OldSeq = Mod:get(update_seq, OldIdxState), - % For indices that require swapping files, we have to make sure we're - % up to date with the current index. Otherwise indexes could roll back - % (perhaps considerably) to previous points in history. - case NewSeq >= OldSeq of - true -> - {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState), - % Restart the indexer if it's running. - case couch_index_updater:is_running(Updater) of - true -> ok = couch_index_updater:restart(Updater, NewIdxState1); - false -> ok - end, - case State#st.committed of - true -> erlang:send_after(Delay, self(), commit); - false -> ok - end, - {reply, ok, State#st{ - idx_state=NewIdxState1, - committed=false - }}; - _ -> - {reply, recompact, State} - end. - - -handle_cast({config_change, NewDelay}, State) -> - MsDelay = 1000 * list_to_integer(NewDelay), - {noreply, State#st{commit_delay=MsDelay}}; -handle_cast({trigger_update, UpdateSeq}, State) -> - #st{ - mod=Mod, - idx_state=IdxState - } = State, - case UpdateSeq =< Mod:get(update_seq, IdxState) of - true -> - {noreply, State}; - false -> - couch_index_updater:run(State#st.updater, IdxState), - {noreply, State} - end; -handle_cast({updated, NewIdxState}, State) -> - {noreply, NewState} = handle_cast({new_state, NewIdxState}, State), - case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of - true -> - {stop, normal, NewState}; - false -> - maybe_restart_updater(NewState), - {noreply, NewState} - end; -handle_cast({new_state, NewIdxState}, State) -> - #st{ - mod=Mod, - idx_state=OldIdxState, - commit_delay=Delay - } = State, - assert_signature_match(Mod, OldIdxState, NewIdxState), - CurrSeq = Mod:get(update_seq, NewIdxState), - Args = [ - Mod:get(db_name, NewIdxState), - Mod:get(idx_name, NewIdxState), - CurrSeq - ], - ?LOG_DEBUG("Updated index for db: ~s idx: ~s seq: ~B", Args), - Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState), - case State#st.committed of - true -> erlang:send_after(Delay, self(), commit); - false -> ok - end, - {noreply, State#st{ - idx_state=NewIdxState, - waiters=Rest, - committed=false - }}; -handle_cast({update_error, Error}, State) -> - send_all(State#st.waiters, Error), - {noreply, State#st{waiters=[]}}; -handle_cast(stop, State) -> - {stop, normal, State}; -handle_cast(delete, State) -> - #st{mod=Mod, idx_state=IdxState} = State, - ok = Mod:delete(IdxState), - {stop, normal, State}; -handle_cast(ddoc_updated, State) -> - #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, - DbName = Mod:get(db_name, IdxState), - DDocId = Mod:get(idx_name, IdxState), - Shutdown = couch_util:with_db(DbName, fun(Db) -> - case couch_db:open_doc(Db, DDocId, [ejson_body]) of - {not_found, deleted} -> - true; - {ok, DDoc} -> - {ok, NewIdxState} = Mod:init(Db, DDoc), - Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) - end - end), - case Shutdown of - true -> - case Waiters of - [] -> - {stop, normal, State}; - _ -> - {noreply, State#st{shutdown = true}} - end; - false -> - {noreply, State#st{shutdown = false}} - end; -handle_cast(_Mesg, State) -> - {stop, unhandled_cast, State}. - - -handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> - erlang:send_after(5000, self(), restart_config_listener), - {noreply, State}; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; -handle_info(commit, #st{committed=true}=State) -> - {noreply, State}; -handle_info(commit, State) -> - #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State, - DbName = Mod:get(db_name, IdxState), - GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, - CommittedSeq = couch_util:with_db(DbName, GetCommSeq), - case CommittedSeq >= Mod:get(update_seq, IdxState) of - true -> - % Commit the updates - ok = Mod:commit(IdxState), - {noreply, State#st{committed=true}}; - _ -> - % We can't commit the header because the database seq that's - % fully committed to disk is still behind us. If we committed - % now and the database lost those changes our view could be - % forever out of sync with the database. But a crash before we - % commit these changes, no big deal, we only lose incremental - % changes since last committal. - erlang:send_after(Delay, self(), commit), - {noreply, State} - end; -handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) -> - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args), - catch send_all(State#st.waiters, shutdown), - {stop, normal, State#st{waiters=[]}}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -handle_config_change("query_server_config", "commit_freq", Val, _, _) -> - {ok, gen_server:cast(?MODULE, {config_update, Val})}; -handle_config_change(_, _, _, _, _) -> - {ok, nil}. - - -maybe_restart_updater(#st{waiters=[]}) -> - ok; -maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) -> - couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) -> - UpdateSeq = couch_db:get_update_seq(Db), - CommittedSeq = couch_db:get_committed_update_seq(Db), - CanUpdate = UpdateSeq > CommittedSeq, - UOpts = Mod:get(update_options, IdxState), - case CanUpdate and lists:member(committed_only, UOpts) of - true -> couch_db:ensure_full_commit(Db); - false -> ok - end - end), - couch_index_updater:run(State#st.updater, IdxState). - - -send_all(Waiters, Reply) -> - [gen_server:reply(From, Reply) || {From, _} <- Waiters]. - - -send_replies(Waiters, UpdateSeq, IdxState) -> - Pred = fun({_, S}) -> S =< UpdateSeq end, - {ToSend, Remaining} = lists:partition(Pred, Waiters), - [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend], - Remaining. - -assert_signature_match(Mod, OldIdxState, NewIdxState) -> - case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of - {Sig, Sig} -> ok; - _ -> erlang:error(signature_mismatch) - end. diff --git a/src/couch_index/src/couch_index_api.erl b/src/couch_index/src/couch_index_api.erl deleted file mode 100644 index 9d3a67ca1..000000000 --- a/src/couch_index/src/couch_index_api.erl +++ /dev/null @@ -1,54 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_api). - -get(Field, State) -> - ok. - -init(Db, Ddoc) -> - ok. - -open(Db, State) -> - ok. - -close(State) -> - ok. - -delete(State) -> - ok. - -reset(State) -> - ok. - - -start_update(State, PurgedState, NumChanges) -> - {ok, State}. - -purge(Db, PurgeSeq, PurgedIdRevs, State) -> - ok. - -process_doc(Doc, Seq, State) -> - ok. - -finish_update(State) -> - {ok, State}. - -commit(State) -> - ok. - - -compact(Parent, State, Opts) -> - ok. - -swap_compacted(OldState, NewState) -> - ok. diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl deleted file mode 100644 index 10c3e149a..000000000 --- a/src/couch_index/src/couch_index_compactor.erl +++ /dev/null @@ -1,114 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_compactor). --behaviour(gen_server). - - -%% API --export([start_link/2, run/2, cancel/1, is_running/1]). - -%% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - - --include_lib("couch/include/couch_db.hrl"). - - --record(st, { - idx, - mod, - pid -}). - - -start_link(Index, Module) -> - gen_server:start_link(?MODULE, {Index, Module}, []). - - -run(Pid, IdxState) -> - gen_server:call(Pid, {compact, IdxState}). - - -cancel(Pid) -> - gen_server:call(Pid, cancel). - - -is_running(Pid) -> - gen_server:call(Pid, is_running). - - -init({Index, Module}) -> - process_flag(trap_exit, true), - {ok, #st{idx=Index, mod=Module}}. - - -terminate(_Reason, State) -> - couch_util:shutdown_sync(State#st.pid), - ok. - - -handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, {ok, Pid}, State}; -handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) -> - Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end), - {reply, {ok, Pid}, State#st{pid=Pid}}; -handle_call(cancel, _From, #st{pid=undefined}=State) -> - {reply, ok, State}; -handle_call(cancel, _From, #st{pid=Pid}=State) -> - unlink(Pid), - exit(Pid, kill), - {reply, ok, State#st{pid=undefined}}; -handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, true, State}; -handle_call(is_running, _From, State) -> - {reply, false, State}. - - -handle_cast(_Mesg, State) -> - {stop, unknown_cast, State}. - - -handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) -> - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; -handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) -> - {stop, normal, State}; -handle_info(_Mesg, State) -> - {stop, unknown_info, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -compact(Parent, Mod, IdxState) -> - compact(Parent, Mod, IdxState, []). - -compact(Idx, Mod, IdxState, Opts) -> - DbName = Mod:get(db_name, IdxState), - Args = [DbName, Mod:get(idx_name, IdxState)], - ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args), - {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) -> - Mod:compact(Db, IdxState, Opts) - end), - ok = Mod:commit(NewIdxState), - case gen_server:call(Idx, {compacted, NewIdxState}) of - recompact -> - ?LOG_INFO("Compaction restarting for db: ~s idx: ~s", Args), - compact(Idx, Mod, NewIdxState, [recompact]); - _ -> - ?LOG_INFO("Compaction finished for db: ~s idx: ~s", Args), - ok - end. diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl deleted file mode 100644 index 3d8a797f0..000000000 --- a/src/couch_index/src/couch_index_server.erl +++ /dev/null @@ -1,266 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_server). --behaviour(gen_server). --behaviour(config_listener). - --export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]). --export([update_notify/1]). - --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - -% config_listener api --export([handle_config_change/5]). - --include_lib("couch/include/couch_db.hrl"). - --define(BY_SIG, couchdb_indexes_by_sig). --define(BY_PID, couchdb_indexes_by_pid). --define(BY_DB, couchdb_indexes_by_db). - - --record(st, {root_dir}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - -validate(DbName, DDoc) -> - LoadModFun = fun - ({ModNameList, "true"}) -> - try - [list_to_existing_atom(ModNameList)] - catch error:badarg -> - [] - end; - ({_ModNameList, _Enabled}) -> - [] - end, - ValidateFun = fun - (ModName, ok) -> - try - ModName:validate(DbName, DDoc) - catch Type:Reason -> - {Type, Reason} - end; - (_ModName, Error) -> - Error - end, - EnabledIndexers = lists:flatmap(LoadModFun, config:get("indexers")), - lists:foldl(ValidateFun, ok, EnabledIndexers). - - -get_index(Module, <<"shards/", _/binary>>=DbName, DDoc) -> - {Pid, Ref} = spawn_monitor(fun() -> - exit(fabric:open_doc(mem3:dbname(DbName), DDoc, [])) - end), - receive {'DOWN', Ref, process, Pid, {ok, Doc}} -> - get_index(Module, DbName, Doc, nil); - {'DOWN', Ref, process, Pid, Error} -> - Error - after 61000 -> - erlang:demonitor(Ref, [flush]), - {error, timeout} - end; - -get_index(Module, DbName, DDoc) -> - get_index(Module, DbName, DDoc, nil). - - -get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) -> - couch_util:with_db(DbName, fun(Db) -> - get_index(Module, Db, DDoc, Fun) - end); -get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) -> - case couch_db:open_doc(Db, DDoc, [ejson_body]) of - {ok, Doc} -> get_index(Module, Db, Doc, Fun); - Error -> Error - end; -get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) -> - {ok, InitState} = Module:init(Db, DDoc), - {ok, FunResp} = Fun(InitState), - {ok, Pid} = get_index(Module, InitState), - {ok, Pid, FunResp}; -get_index(Module, Db, DDoc, _Fun) -> - {ok, InitState} = Module:init(Db, DDoc), - get_index(Module, InitState). - - -get_index(Module, IdxState) -> - DbName = Module:get(db_name, IdxState), - Sig = Module:get(signature, IdxState), - case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, Pid}] when is_pid(Pid) -> - {ok, Pid}; - _ -> - Args = {Module, IdxState, DbName, Sig}, - gen_server:call(?MODULE, {get_index, Args}, infinity) - end. - - -init([]) -> - process_flag(trap_exit, true), - ok = config:listen_for_changes(?MODULE, nil), - ets:new(?BY_SIG, [protected, set, named_table]), - ets:new(?BY_PID, [private, set, named_table]), - ets:new(?BY_DB, [protected, bag, named_table]), - couch_db_update_notifier:start_link(fun ?MODULE:update_notify/1), - RootDir = couch_index_util:root_dir(), - % Deprecation warning if it wasn't index_dir - case config:get("couchdb", "index_dir") of - undefined -> - Msg = "Deprecation warning: 'view_index_dir' is now 'index_dir'", - ?LOG_ERROR(Msg, []); - _ -> ok - end, - couch_file:init_delete_dir(RootDir), - {ok, #st{root_dir=RootDir}}. - - -terminate(_Reason, _State) -> - Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)], - lists:map(fun couch_util:shutdown_sync/1, Pids), - ok. - - -handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) -> - case ets:lookup(?BY_SIG, {DbName, Sig}) of - [] -> - spawn_link(fun() -> new_index(Args) end), - ets:insert(?BY_SIG, {{DbName, Sig}, [From]}), - {noreply, State}; - [{_, Waiters}] when is_list(Waiters) -> - ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}), - {noreply, State}; - [{_, Pid}] when is_pid(Pid) -> - {reply, {ok, Pid}, State} - end; -handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), - [gen_server:reply(From, {ok, Pid}) || From <- Waiters], - link(Pid), - add_to_ets(DbName, Sig, DDocId, Pid), - {reply, ok, State}; -handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), - [gen_server:reply(From, Error) || From <- Waiters], - ets:delete(?BY_SIG, {DbName, Sig}), - {reply, ok, State}; -handle_call({reset_indexes, DbName}, _From, State) -> - reset_indexes(DbName, State#st.root_dir), - {reply, ok, State}. - - -handle_cast({reset_indexes, DbName}, State) -> - reset_indexes(DbName, State#st.root_dir), - {noreply, State}. - - -handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> - erlang:send_after(5000, self(), restart_config_listener), - {noreply, State}; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, State#st.root_dir), - {noreply, State}; -handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(?BY_PID, Pid) of - [{Pid, {DbName, Sig}}] -> - [{DbName, {DDocId, Sig}}] = - ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), - rem_from_ets(DbName, Sig, DDocId, Pid); - [] when Reason /= normal -> - exit(Reason); - _Else -> - ok - end, - {noreply, Server}; -handle_info(Msg, State) -> - twig:log(warn, "~p did not expect ~p", [?MODULE, Msg]), - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; -handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; -handle_config_change("couchdb", "index_dir", _, _, _) -> - exit(whereis(couch_index_server), config_change), - remove_handler; -handle_config_change("couchdb", "view_index_dir", _, _, _) -> - exit(whereis(couch_index_server), config_change), - remove_handler; -handle_config_change(_, _, _, _, RootDir) -> - {ok, RootDir}. - -new_index({Mod, IdxState, DbName, Sig}) -> - DDocId = Mod:get(idx_name, IdxState), - case couch_index:start_link({Mod, IdxState}) of - {ok, Pid} -> - ok = gen_server:call( - ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}), - unlink(Pid); - Error -> - ok = gen_server:call( - ?MODULE, {async_error, {DbName, DDocId, Sig}, Error}) - end. - - -reset_indexes(DbName, Root) -> - % shutdown all the updaters and clear the files, the db got changed - Fun = fun({_, {DDocId, Sig}}) -> - [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), - MRef = erlang:monitor(process, Pid), - gen_server:cast(Pid, delete), - receive {'DOWN', MRef, _, _, _} -> ok end, - rem_from_ets(DbName, Sig, DDocId, Pid) - end, - lists:foreach(Fun, ets:lookup(?BY_DB, DbName)), - Path = couch_index_util:index_dir("", DbName), - couch_file:nuke_dir(Root, Path). - - -add_to_ets(DbName, Sig, DDocId, Pid) -> - ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), - ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). - - -rem_from_ets(DbName, Sig, DDocId, Pid) -> - ets:delete(?BY_SIG, {DbName, Sig}), - ets:delete(?BY_PID, Pid), - ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}). - - -update_notify({deleted, DbName}) -> - gen_server:cast(?MODULE, {reset_indexes, DbName}); -update_notify({created, DbName}) -> - gen_server:cast(?MODULE, {reset_indexes, DbName}); -update_notify({ddoc_updated, {DbName, DDocId}}) -> - lists:foreach( - fun({_DbName, {_DDocId, Sig}}) -> - case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, IndexPid}] -> - (catch gen_server:cast(IndexPid, ddoc_updated)); - [] -> - ok - end - end, - ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})); -update_notify(_) -> - ok. - diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl deleted file mode 100644 index ab68dc5c2..000000000 --- a/src/couch_index/src/couch_index_updater.erl +++ /dev/null @@ -1,211 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_updater). --behaviour(gen_server). - - -%% API --export([start_link/2, run/2, is_running/1, update/2, restart/2]). - -%% for upgrades --export([update/3]). - -%% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). - --record(st, { - idx, - mod, - pid=nil -}). - - -start_link(Index, Module) -> - gen_server:start_link(?MODULE, {Index, Module}, []). - - -run(Pid, IdxState) -> - gen_server:call(Pid, {update, IdxState}). - - -is_running(Pid) -> - gen_server:call(Pid, is_running). - - -update(Mod, State) -> - update(nil, Mod, State). - - -restart(Pid, IdxState) -> - gen_server:call(Pid, {restart, IdxState}). - - -init({Index, Module}) -> - process_flag(trap_exit, true), - {ok, #st{idx=Index, mod=Module}}. - - -terminate(_Reason, State) -> - couch_util:shutdown_sync(State#st.pid), - ok. - - -handle_call({update, _IdxState}, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, ok, State}; -handle_call({update, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) -> - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Starting index update for db: ~s idx: ~s", Args), - Pid = spawn_link(?MODULE, update, [Idx, Mod, IdxState]), - {reply, ok, State#st{pid=Pid}}; -handle_call({restart, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) -> - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Restarting index update for db: ~s idx: ~s", Args), - case is_pid(State#st.pid) of - true -> couch_util:shutdown_sync(State#st.pid); - _ -> ok - end, - Pid = spawn_link(?MODULE, update, [Idx, State#st.mod, IdxState]), - {reply, ok, State#st{pid=Pid}}; -handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, true, State}; -handle_call(is_running, _From, State) -> - {reply, false, State}. - - -handle_cast(_Mesg, State) -> - {stop, unknown_cast, State}. - - -handle_info({'EXIT', _, {updated, Pid, IdxState}}, #st{pid=Pid}=State) -> - Mod = State#st.mod, - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Index update finished for db: ~s idx: ~s", Args), - ok = gen_server:cast(State#st.idx, {updated, IdxState}), - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', _, {reset, Pid}}, #st{idx=Idx, pid=Pid}=State) -> - {ok, NewIdxState} = gen_server:call(State#st.idx, reset), - Pid2 = spawn_link(?MODULE, update, [Idx, State#st.mod, NewIdxState]), - {noreply, State#st{pid=Pid2}}; -handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) -> - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', Pid, {{nocatch, Error}, _Trace}}, State) -> - handle_info({'EXIT', Pid, Error}, State); -handle_info({'EXIT', Pid, Error}, #st{pid=Pid}=State) -> - ok = gen_server:cast(State#st.idx, {update_error, Error}), - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) -> - {stop, normal, State}; -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; -handle_info(_Mesg, State) -> - {stop, unknown_info, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -update(Idx, Mod, IdxState) -> - DbName = Mod:get(db_name, IdxState), - CurrSeq = Mod:get(update_seq, IdxState), - UpdateOpts = Mod:get(update_options, IdxState), - CommittedOnly = lists:member(committed_only, UpdateOpts), - IncludeDesign = lists:member(include_design, UpdateOpts), - DocOpts = case lists:member(local_seq, UpdateOpts) of - true -> [conflicts, deleted_conflicts, local_seq]; - _ -> [conflicts, deleted_conflicts] - end, - - couch_util:with_db(DbName, fun(Db) -> - DbUpdateSeq = couch_db:get_update_seq(Db), - DbCommittedSeq = couch_db:get_committed_update_seq(Db), - - PurgedIdxState = case purge_index(Db, Mod, IdxState) of - {ok, IdxState0} -> IdxState0; - reset -> exit({reset, self()}) - end, - - NumChanges = couch_db:count_changes_since(Db, CurrSeq), - - GetSeq = fun - (#full_doc_info{update_seq=Seq}) -> Seq; - (#doc_info{high_seq=Seq}) -> Seq - end, - - GetInfo = fun - (#full_doc_info{id=Id, update_seq=Seq, deleted=Del}=FDI) -> - {Id, Seq, Del, couch_doc:to_doc_info(FDI)}; - (#doc_info{id=Id, high_seq=Seq, revs=[RI|_]}=DI) -> - {Id, Seq, RI#rev_info.deleted, DI} - end, - - LoadDoc = fun(DI) -> - {DocId, Seq, Deleted, DocInfo} = GetInfo(DI), - - case {IncludeDesign, DocId} of - {false, <<"_design/", _/binary>>} -> - {nil, Seq}; - _ when Deleted -> - {#doc{id=DocId, deleted=true}, Seq}; - _ -> - {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts), - {Doc, Seq} - end - end, - - Proc = fun(DocInfo, _, {IdxStateAcc, _}) -> - case CommittedOnly and (GetSeq(DocInfo) > DbCommittedSeq) of - true -> - {stop, {IdxStateAcc, false}}; - false -> - {Doc, Seq} = LoadDoc(DocInfo), - {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc), - garbage_collect(), - {ok, {NewSt, true}} - end - end, - - {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges), - Acc0 = {InitIdxState, true}, - {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []), - {ProcIdxSt, SendLast} = Acc, - - % If we didn't bail due to hitting the last committed seq we need - % to send our last update_seq through. - {ok, LastIdxSt} = case SendLast of - true -> - Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt); - _ -> - {ok, ProcIdxSt} - end, - - {ok, FinalIdxState} = Mod:finish_update(LastIdxSt), - exit({updated, self(), FinalIdxState}) - end). - - -purge_index(Db, Mod, IdxState) -> - DbPurgeSeq = couch_db:get_purge_seq(Db), - IdxPurgeSeq = Mod:get(purge_seq, IdxState), - if - DbPurgeSeq == IdxPurgeSeq -> - {ok, IdxState}; - DbPurgeSeq == IdxPurgeSeq + 1 -> - {ok, PurgedIdRevs} = couch_db:get_last_purged(Db), - Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState); - true -> - reset - end. diff --git a/src/couch_index/src/couch_index_util.erl b/src/couch_index/src/couch_index_util.erl deleted file mode 100644 index cf1ff7561..000000000 --- a/src/couch_index/src/couch_index_util.erl +++ /dev/null @@ -1,81 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_util). - --export([root_dir/0, index_dir/2, index_file/3]). --export([load_doc/3, sort_lib/1, hexsig/1]). - --include_lib("couch/include/couch_db.hrl"). - - -root_dir() -> - case config:get("couchdb", "index_dir") of - undefined -> config:get("couchdb", "view_index_dir"); - Value -> Value - end. - - -index_dir(Module, DbName) when is_binary(DbName) -> - DbDir = "." ++ binary_to_list(DbName) ++ "_design", - filename:join([root_dir(), DbDir, Module]); -index_dir(Module, #db{}=Db) -> - index_dir(Module, couch_db:name(Db)). - - -index_file(Module, DbName, FileName) -> - filename:join(index_dir(Module, DbName), FileName). - - -load_doc(Db, #doc_info{}=DI, Opts) -> - Deleted = lists:member(deleted, Opts), - case (catch couch_db:open_doc(Db, DI, Opts)) of - {ok, #doc{deleted=false}=Doc} -> Doc; - {ok, #doc{deleted=true}=Doc} when Deleted -> Doc; - _Else -> null - end; -load_doc(Db, {DocId, Rev}, Opts) -> - case (catch load_doc(Db, DocId, Rev, Opts)) of - #doc{deleted=false} = Doc -> Doc; - _ -> null - end. - - -load_doc(Db, DocId, Rev, Options) -> - case Rev of - nil -> % open most recent rev - case (catch couch_db:open_doc(Db, DocId, Options)) of - {ok, Doc} -> Doc; - _Error -> null - end; - _ -> % open a specific rev (deletions come back as stubs) - case (catch couch_db:open_doc_revs(Db, DocId, [Rev], Options)) of - {ok, [{ok, Doc}]} -> Doc; - {ok, [{{not_found, missing}, Rev}]} -> null; - {ok, [_Else]} -> null - end - end. - - -sort_lib({Lib}) -> - sort_lib(Lib, []). -sort_lib([], LAcc) -> - lists:keysort(1, LAcc); -sort_lib([{LName, {LObj}}|Rest], LAcc) -> - LSorted = sort_lib(LObj, []), % descend into nested object - sort_lib(Rest, [{LName, LSorted}|LAcc]); -sort_lib([{LName, LCode}|Rest], LAcc) -> - sort_lib(Rest, [{LName, LCode}|LAcc]). - - -hexsig(Sig) -> - couch_util:to_hex(binary_to_list(Sig)). |