summaryrefslogtreecommitdiff
path: root/src/couch_index
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2014-02-04 17:39:38 -0600
committerPaul J. Davis <paul.joseph.davis@gmail.com>2014-02-04 17:39:41 -0600
commitde4ff66d4eb56ae6eeba4508a220e55a6fdf92c0 (patch)
treeefbf9fcc7b44a4cee05ea47a6a0316f249d4feb4 /src/couch_index
parented98610c5f27e5ea7e7528c081e1e7b54330e221 (diff)
downloadcouchdb-de4ff66d4eb56ae6eeba4508a220e55a6fdf92c0.tar.gz
Remove src/couch_index
Diffstat (limited to 'src/couch_index')
-rw-r--r--src/couch_index/src/couch_index.app.src22
-rw-r--r--src/couch_index/src/couch_index.erl364
-rw-r--r--src/couch_index/src/couch_index_api.erl54
-rw-r--r--src/couch_index/src/couch_index_compactor.erl114
-rw-r--r--src/couch_index/src/couch_index_server.erl266
-rw-r--r--src/couch_index/src/couch_index_updater.erl211
-rw-r--r--src/couch_index/src/couch_index_util.erl81
7 files changed, 0 insertions, 1112 deletions
diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src
deleted file mode 100644
index 594589d5e..000000000
--- a/src/couch_index/src/couch_index.app.src
+++ /dev/null
@@ -1,22 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-{application, couch_index, [
- {description, "CouchDB Secondary Index Manager"},
- {vsn, git},
- {modules, [
- couch_index,
- couch_index_server
- ]},
- {registered, [couch_index_server]},
- {applications, [kernel, stdlib]}
-]}.
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
deleted file mode 100644
index 3253a32b2..000000000
--- a/src/couch_index/src/couch_index.erl
+++ /dev/null
@@ -1,364 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index).
--behaviour(gen_server).
--behaviour(config_listener).
-
-%% API
--export([start_link/1, stop/1, get_state/2, get_info/1]).
--export([trigger_update/2]).
--export([compact/1, compact/2]).
-
-%% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
-% config_listener api
--export([handle_config_change/5]).
-
-
--include_lib("couch/include/couch_db.hrl").
-
-
--record(st, {
- mod,
- idx_state,
- updater,
- compactor,
- waiters=[],
- commit_delay,
- committed=true,
- shutdown=false
-}).
-
-
-start_link({Module, IdxState}) ->
- proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).
-
-
-stop(Pid) ->
- gen_server:cast(Pid, stop).
-
-
-get_state(Pid, RequestSeq) ->
- gen_server:call(Pid, {get_state, RequestSeq}, infinity).
-
-
-get_info(Pid) ->
- gen_server:call(Pid, get_info).
-
-
-trigger_update(Pid, UpdateSeq) ->
- gen_server:cast(Pid, {trigger_update, UpdateSeq}).
-
-
-compact(Pid) ->
- compact(Pid, []).
-
-
-compact(Pid, Options) ->
- {ok, CPid} = gen_server:call(Pid, compact),
- case lists:member(monitor, Options) of
- true -> {ok, erlang:monitor(process, CPid)};
- false -> ok
- end.
-
-
-init({Mod, IdxState}) ->
- ok = config:listen_for_changes(?MODULE, nil),
- DbName = Mod:get(db_name, IdxState),
- Resp = couch_util:with_db(DbName, fun(Db) ->
- case Mod:open(Db, IdxState) of
- {ok, IdxSt} ->
- couch_db:monitor(Db),
- {ok, IdxSt};
- Error ->
- Error
- end
- end),
- case Resp of
- {ok, NewIdxState} ->
- {ok, UPid} = couch_index_updater:start_link(self(), Mod),
- {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
- Delay = config:get("query_server_config", "commit_freq", "5"),
- MsDelay = 1000 * list_to_integer(Delay),
- State = #st{
- mod=Mod,
- idx_state=NewIdxState,
- updater=UPid,
- compactor=CPid,
- commit_delay=MsDelay
- },
- Args = [
- Mod:get(db_name, IdxState),
- Mod:get(idx_name, IdxState),
- couch_index_util:hexsig(Mod:get(signature, IdxState))
- ],
- ?LOG_INFO("Opening index for db: ~s idx: ~s sig: ~p", Args),
- proc_lib:init_ack({ok, self()}),
- gen_server:enter_loop(?MODULE, [], State);
- Other ->
- proc_lib:init_ack(Other)
- end.
-
-
-terminate(Reason, State) ->
- #st{mod=Mod, idx_state=IdxState}=State,
- Mod:close(IdxState),
- send_all(State#st.waiters, Reason),
- couch_util:shutdown_sync(State#st.updater),
- couch_util:shutdown_sync(State#st.compactor),
- Args = [
- Mod:get(db_name, IdxState),
- Mod:get(idx_name, IdxState),
- couch_index_util:hexsig(Mod:get(signature, IdxState)),
- Reason
- ],
- ?LOG_INFO("Closing index for db: ~s idx: ~s sig: ~p~nreason: ~p", Args),
- ok.
-
-
-handle_call({get_state, ReqSeq}, From, State) ->
- #st{
- mod=Mod,
- idx_state=IdxState,
- waiters=Waiters
- } = State,
- IdxSeq = Mod:get(update_seq, IdxState),
- case ReqSeq =< IdxSeq of
- true ->
- {reply, {ok, IdxState}, State};
- _ -> % View update required
- couch_index_updater:run(State#st.updater, IdxState),
- Waiters2 = [{From, ReqSeq} | Waiters],
- {noreply, State#st{waiters=Waiters2}, infinity}
- end;
-handle_call(get_info, _From, State) ->
- #st{mod=Mod} = State,
- {ok, Info0} = Mod:get(info, State#st.idx_state),
- IsUpdating = couch_index_updater:is_running(State#st.updater),
- IsCompacting = couch_index_compactor:is_running(State#st.compactor),
- Info = Info0 ++ [
- {updater_running, IsUpdating},
- {compact_running, IsCompacting},
- {waiting_commit, State#st.committed == false},
- {waiting_clients, length(State#st.waiters)}
- ],
- {reply, {ok, Info}, State};
-handle_call(reset, _From, State) ->
- #st{
- mod=Mod,
- idx_state=IdxState
- } = State,
- {ok, NewIdxState} = Mod:reset(IdxState),
- {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
-handle_call(compact, _From, State) ->
- Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state),
- {reply, Resp, State};
-handle_call(get_compactor_pid, _From, State) ->
- {reply, {ok, State#st.compactor}, State};
-handle_call({compacted, NewIdxState}, _From, State) ->
- #st{
- mod=Mod,
- idx_state=OldIdxState,
- updater=Updater,
- commit_delay=Delay
- } = State,
- assert_signature_match(Mod, OldIdxState, NewIdxState),
- NewSeq = Mod:get(update_seq, NewIdxState),
- OldSeq = Mod:get(update_seq, OldIdxState),
- % For indices that require swapping files, we have to make sure we're
- % up to date with the current index. Otherwise indexes could roll back
- % (perhaps considerably) to previous points in history.
- case NewSeq >= OldSeq of
- true ->
- {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
- % Restart the indexer if it's running.
- case couch_index_updater:is_running(Updater) of
- true -> ok = couch_index_updater:restart(Updater, NewIdxState1);
- false -> ok
- end,
- case State#st.committed of
- true -> erlang:send_after(Delay, self(), commit);
- false -> ok
- end,
- {reply, ok, State#st{
- idx_state=NewIdxState1,
- committed=false
- }};
- _ ->
- {reply, recompact, State}
- end.
-
-
-handle_cast({config_change, NewDelay}, State) ->
- MsDelay = 1000 * list_to_integer(NewDelay),
- {noreply, State#st{commit_delay=MsDelay}};
-handle_cast({trigger_update, UpdateSeq}, State) ->
- #st{
- mod=Mod,
- idx_state=IdxState
- } = State,
- case UpdateSeq =< Mod:get(update_seq, IdxState) of
- true ->
- {noreply, State};
- false ->
- couch_index_updater:run(State#st.updater, IdxState),
- {noreply, State}
- end;
-handle_cast({updated, NewIdxState}, State) ->
- {noreply, NewState} = handle_cast({new_state, NewIdxState}, State),
- case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of
- true ->
- {stop, normal, NewState};
- false ->
- maybe_restart_updater(NewState),
- {noreply, NewState}
- end;
-handle_cast({new_state, NewIdxState}, State) ->
- #st{
- mod=Mod,
- idx_state=OldIdxState,
- commit_delay=Delay
- } = State,
- assert_signature_match(Mod, OldIdxState, NewIdxState),
- CurrSeq = Mod:get(update_seq, NewIdxState),
- Args = [
- Mod:get(db_name, NewIdxState),
- Mod:get(idx_name, NewIdxState),
- CurrSeq
- ],
- ?LOG_DEBUG("Updated index for db: ~s idx: ~s seq: ~B", Args),
- Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
- case State#st.committed of
- true -> erlang:send_after(Delay, self(), commit);
- false -> ok
- end,
- {noreply, State#st{
- idx_state=NewIdxState,
- waiters=Rest,
- committed=false
- }};
-handle_cast({update_error, Error}, State) ->
- send_all(State#st.waiters, Error),
- {noreply, State#st{waiters=[]}};
-handle_cast(stop, State) ->
- {stop, normal, State};
-handle_cast(delete, State) ->
- #st{mod=Mod, idx_state=IdxState} = State,
- ok = Mod:delete(IdxState),
- {stop, normal, State};
-handle_cast(ddoc_updated, State) ->
- #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
- DbName = Mod:get(db_name, IdxState),
- DDocId = Mod:get(idx_name, IdxState),
- Shutdown = couch_util:with_db(DbName, fun(Db) ->
- case couch_db:open_doc(Db, DDocId, [ejson_body]) of
- {not_found, deleted} ->
- true;
- {ok, DDoc} ->
- {ok, NewIdxState} = Mod:init(Db, DDoc),
- Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
- end
- end),
- case Shutdown of
- true ->
- case Waiters of
- [] ->
- {stop, normal, State};
- _ ->
- {noreply, State#st{shutdown = true}}
- end;
- false ->
- {noreply, State#st{shutdown = false}}
- end;
-handle_cast(_Mesg, State) ->
- {stop, unhandled_cast, State}.
-
-
-handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
- erlang:send_after(5000, self(), restart_config_listener),
- {noreply, State};
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State};
-handle_info(commit, #st{committed=true}=State) ->
- {noreply, State};
-handle_info(commit, State) ->
- #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State,
- DbName = Mod:get(db_name, IdxState),
- GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
- CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
- case CommittedSeq >= Mod:get(update_seq, IdxState) of
- true ->
- % Commit the updates
- ok = Mod:commit(IdxState),
- {noreply, State#st{committed=true}};
- _ ->
- % We can't commit the header because the database seq that's
- % fully committed to disk is still behind us. If we committed
- % now and the database lost those changes our view could be
- % forever out of sync with the database. But a crash before we
- % commit these changes, no big deal, we only lose incremental
- % changes since last committal.
- erlang:send_after(Delay, self(), commit),
- {noreply, State}
- end;
-handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
- Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
- ?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
- catch send_all(State#st.waiters, shutdown),
- {stop, normal, State#st{waiters=[]}}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-handle_config_change("query_server_config", "commit_freq", Val, _, _) ->
- {ok, gen_server:cast(?MODULE, {config_update, Val})};
-handle_config_change(_, _, _, _, _) ->
- {ok, nil}.
-
-
-maybe_restart_updater(#st{waiters=[]}) ->
- ok;
-maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) ->
- couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) ->
- UpdateSeq = couch_db:get_update_seq(Db),
- CommittedSeq = couch_db:get_committed_update_seq(Db),
- CanUpdate = UpdateSeq > CommittedSeq,
- UOpts = Mod:get(update_options, IdxState),
- case CanUpdate and lists:member(committed_only, UOpts) of
- true -> couch_db:ensure_full_commit(Db);
- false -> ok
- end
- end),
- couch_index_updater:run(State#st.updater, IdxState).
-
-
-send_all(Waiters, Reply) ->
- [gen_server:reply(From, Reply) || {From, _} <- Waiters].
-
-
-send_replies(Waiters, UpdateSeq, IdxState) ->
- Pred = fun({_, S}) -> S =< UpdateSeq end,
- {ToSend, Remaining} = lists:partition(Pred, Waiters),
- [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend],
- Remaining.
-
-assert_signature_match(Mod, OldIdxState, NewIdxState) ->
- case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of
- {Sig, Sig} -> ok;
- _ -> erlang:error(signature_mismatch)
- end.
diff --git a/src/couch_index/src/couch_index_api.erl b/src/couch_index/src/couch_index_api.erl
deleted file mode 100644
index 9d3a67ca1..000000000
--- a/src/couch_index/src/couch_index_api.erl
+++ /dev/null
@@ -1,54 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_api).
-
-get(Field, State) ->
- ok.
-
-init(Db, Ddoc) ->
- ok.
-
-open(Db, State) ->
- ok.
-
-close(State) ->
- ok.
-
-delete(State) ->
- ok.
-
-reset(State) ->
- ok.
-
-
-start_update(State, PurgedState, NumChanges) ->
- {ok, State}.
-
-purge(Db, PurgeSeq, PurgedIdRevs, State) ->
- ok.
-
-process_doc(Doc, Seq, State) ->
- ok.
-
-finish_update(State) ->
- {ok, State}.
-
-commit(State) ->
- ok.
-
-
-compact(Parent, State, Opts) ->
- ok.
-
-swap_compacted(OldState, NewState) ->
- ok.
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl
deleted file mode 100644
index 10c3e149a..000000000
--- a/src/couch_index/src/couch_index_compactor.erl
+++ /dev/null
@@ -1,114 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_compactor).
--behaviour(gen_server).
-
-
-%% API
--export([start_link/2, run/2, cancel/1, is_running/1]).
-
-%% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
-
--include_lib("couch/include/couch_db.hrl").
-
-
--record(st, {
- idx,
- mod,
- pid
-}).
-
-
-start_link(Index, Module) ->
- gen_server:start_link(?MODULE, {Index, Module}, []).
-
-
-run(Pid, IdxState) ->
- gen_server:call(Pid, {compact, IdxState}).
-
-
-cancel(Pid) ->
- gen_server:call(Pid, cancel).
-
-
-is_running(Pid) ->
- gen_server:call(Pid, is_running).
-
-
-init({Index, Module}) ->
- process_flag(trap_exit, true),
- {ok, #st{idx=Index, mod=Module}}.
-
-
-terminate(_Reason, State) ->
- couch_util:shutdown_sync(State#st.pid),
- ok.
-
-
-handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
- {reply, {ok, Pid}, State};
-handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) ->
- Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end),
- {reply, {ok, Pid}, State#st{pid=Pid}};
-handle_call(cancel, _From, #st{pid=undefined}=State) ->
- {reply, ok, State};
-handle_call(cancel, _From, #st{pid=Pid}=State) ->
- unlink(Pid),
- exit(Pid, kill),
- {reply, ok, State#st{pid=undefined}};
-handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
- {reply, true, State};
-handle_call(is_running, _From, State) ->
- {reply, false, State}.
-
-
-handle_cast(_Mesg, State) ->
- {stop, unknown_cast, State}.
-
-
-handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
- {noreply, State#st{pid=undefined}};
-handle_info({'EXIT', _Pid, normal}, State) ->
- {noreply, State};
-handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
- {stop, normal, State};
-handle_info(_Mesg, State) ->
- {stop, unknown_info, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-compact(Parent, Mod, IdxState) ->
- compact(Parent, Mod, IdxState, []).
-
-compact(Idx, Mod, IdxState, Opts) ->
- DbName = Mod:get(db_name, IdxState),
- Args = [DbName, Mod:get(idx_name, IdxState)],
- ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args),
- {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) ->
- Mod:compact(Db, IdxState, Opts)
- end),
- ok = Mod:commit(NewIdxState),
- case gen_server:call(Idx, {compacted, NewIdxState}) of
- recompact ->
- ?LOG_INFO("Compaction restarting for db: ~s idx: ~s", Args),
- compact(Idx, Mod, NewIdxState, [recompact]);
- _ ->
- ?LOG_INFO("Compaction finished for db: ~s idx: ~s", Args),
- ok
- end.
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
deleted file mode 100644
index 3d8a797f0..000000000
--- a/src/couch_index/src/couch_index_server.erl
+++ /dev/null
@@ -1,266 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_server).
--behaviour(gen_server).
--behaviour(config_listener).
-
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
--export([update_notify/1]).
-
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
-% config_listener api
--export([handle_config_change/5]).
-
--include_lib("couch/include/couch_db.hrl").
-
--define(BY_SIG, couchdb_indexes_by_sig).
--define(BY_PID, couchdb_indexes_by_pid).
--define(BY_DB, couchdb_indexes_by_db).
-
-
--record(st, {root_dir}).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-validate(DbName, DDoc) ->
- LoadModFun = fun
- ({ModNameList, "true"}) ->
- try
- [list_to_existing_atom(ModNameList)]
- catch error:badarg ->
- []
- end;
- ({_ModNameList, _Enabled}) ->
- []
- end,
- ValidateFun = fun
- (ModName, ok) ->
- try
- ModName:validate(DbName, DDoc)
- catch Type:Reason ->
- {Type, Reason}
- end;
- (_ModName, Error) ->
- Error
- end,
- EnabledIndexers = lists:flatmap(LoadModFun, config:get("indexers")),
- lists:foldl(ValidateFun, ok, EnabledIndexers).
-
-
-get_index(Module, <<"shards/", _/binary>>=DbName, DDoc) ->
- {Pid, Ref} = spawn_monitor(fun() ->
- exit(fabric:open_doc(mem3:dbname(DbName), DDoc, []))
- end),
- receive {'DOWN', Ref, process, Pid, {ok, Doc}} ->
- get_index(Module, DbName, Doc, nil);
- {'DOWN', Ref, process, Pid, Error} ->
- Error
- after 61000 ->
- erlang:demonitor(Ref, [flush]),
- {error, timeout}
- end;
-
-get_index(Module, DbName, DDoc) ->
- get_index(Module, DbName, DDoc, nil).
-
-
-get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
- couch_util:with_db(DbName, fun(Db) ->
- get_index(Module, Db, DDoc, Fun)
- end);
-get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
- case couch_db:open_doc(Db, DDoc, [ejson_body]) of
- {ok, Doc} -> get_index(Module, Db, Doc, Fun);
- Error -> Error
- end;
-get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
- {ok, InitState} = Module:init(Db, DDoc),
- {ok, FunResp} = Fun(InitState),
- {ok, Pid} = get_index(Module, InitState),
- {ok, Pid, FunResp};
-get_index(Module, Db, DDoc, _Fun) ->
- {ok, InitState} = Module:init(Db, DDoc),
- get_index(Module, InitState).
-
-
-get_index(Module, IdxState) ->
- DbName = Module:get(db_name, IdxState),
- Sig = Module:get(signature, IdxState),
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [{_, Pid}] when is_pid(Pid) ->
- {ok, Pid};
- _ ->
- Args = {Module, IdxState, DbName, Sig},
- gen_server:call(?MODULE, {get_index, Args}, infinity)
- end.
-
-
-init([]) ->
- process_flag(trap_exit, true),
- ok = config:listen_for_changes(?MODULE, nil),
- ets:new(?BY_SIG, [protected, set, named_table]),
- ets:new(?BY_PID, [private, set, named_table]),
- ets:new(?BY_DB, [protected, bag, named_table]),
- couch_db_update_notifier:start_link(fun ?MODULE:update_notify/1),
- RootDir = couch_index_util:root_dir(),
- % Deprecation warning if it wasn't index_dir
- case config:get("couchdb", "index_dir") of
- undefined ->
- Msg = "Deprecation warning: 'view_index_dir' is now 'index_dir'",
- ?LOG_ERROR(Msg, []);
- _ -> ok
- end,
- couch_file:init_delete_dir(RootDir),
- {ok, #st{root_dir=RootDir}}.
-
-
-terminate(_Reason, _State) ->
- Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
- lists:map(fun couch_util:shutdown_sync/1, Pids),
- ok.
-
-
-handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [] ->
- spawn_link(fun() -> new_index(Args) end),
- ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
- {noreply, State};
- [{_, Waiters}] when is_list(Waiters) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
- {noreply, State};
- [{_, Pid}] when is_pid(Pid) ->
- {reply, {ok, Pid}, State}
- end;
-handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
- [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
- link(Pid),
- add_to_ets(DbName, Sig, DDocId, Pid),
- {reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
- [gen_server:reply(From, Error) || From <- Waiters],
- ets:delete(?BY_SIG, {DbName, Sig}),
- {reply, ok, State};
-handle_call({reset_indexes, DbName}, _From, State) ->
- reset_indexes(DbName, State#st.root_dir),
- {reply, ok, State}.
-
-
-handle_cast({reset_indexes, DbName}, State) ->
- reset_indexes(DbName, State#st.root_dir),
- {noreply, State}.
-
-
-handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
- erlang:send_after(5000, self(), restart_config_listener),
- {noreply, State};
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, State#st.root_dir),
- {noreply, State};
-handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(?BY_PID, Pid) of
- [{Pid, {DbName, Sig}}] ->
- [{DbName, {DDocId, Sig}}] =
- ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
- rem_from_ets(DbName, Sig, DDocId, Pid);
- [] when Reason /= normal ->
- exit(Reason);
- _Else ->
- ok
- end,
- {noreply, Server};
-handle_info(Msg, State) ->
- twig:log(warn, "~p did not expect ~p", [?MODULE, Msg]),
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
- {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
- {ok, RootDir};
-handle_config_change("couchdb", "index_dir", _, _, _) ->
- exit(whereis(couch_index_server), config_change),
- remove_handler;
-handle_config_change("couchdb", "view_index_dir", _, _, _) ->
- exit(whereis(couch_index_server), config_change),
- remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
- {ok, RootDir}.
-
-new_index({Mod, IdxState, DbName, Sig}) ->
- DDocId = Mod:get(idx_name, IdxState),
- case couch_index:start_link({Mod, IdxState}) of
- {ok, Pid} ->
- ok = gen_server:call(
- ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}),
- unlink(Pid);
- Error ->
- ok = gen_server:call(
- ?MODULE, {async_error, {DbName, DDocId, Sig}, Error})
- end.
-
-
-reset_indexes(DbName, Root) ->
- % shutdown all the updaters and clear the files, the db got changed
- Fun = fun({_, {DDocId, Sig}}) ->
- [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
- MRef = erlang:monitor(process, Pid),
- gen_server:cast(Pid, delete),
- receive {'DOWN', MRef, _, _, _} -> ok end,
- rem_from_ets(DbName, Sig, DDocId, Pid)
- end,
- lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
- Path = couch_index_util:index_dir("", DbName),
- couch_file:nuke_dir(Root, Path).
-
-
-add_to_ets(DbName, Sig, DDocId, Pid) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
- ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
- ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
-
-
-rem_from_ets(DbName, Sig, DDocId, Pid) ->
- ets:delete(?BY_SIG, {DbName, Sig}),
- ets:delete(?BY_PID, Pid),
- ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
-
-
-update_notify({deleted, DbName}) ->
- gen_server:cast(?MODULE, {reset_indexes, DbName});
-update_notify({created, DbName}) ->
- gen_server:cast(?MODULE, {reset_indexes, DbName});
-update_notify({ddoc_updated, {DbName, DDocId}}) ->
- lists:foreach(
- fun({_DbName, {_DDocId, Sig}}) ->
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [{_, IndexPid}] ->
- (catch gen_server:cast(IndexPid, ddoc_updated));
- [] ->
- ok
- end
- end,
- ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}}));
-update_notify(_) ->
- ok.
-
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
deleted file mode 100644
index ab68dc5c2..000000000
--- a/src/couch_index/src/couch_index_updater.erl
+++ /dev/null
@@ -1,211 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_updater).
--behaviour(gen_server).
-
-
-%% API
--export([start_link/2, run/2, is_running/1, update/2, restart/2]).
-
-%% for upgrades
--export([update/3]).
-
-%% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
--record(st, {
- idx,
- mod,
- pid=nil
-}).
-
-
-start_link(Index, Module) ->
- gen_server:start_link(?MODULE, {Index, Module}, []).
-
-
-run(Pid, IdxState) ->
- gen_server:call(Pid, {update, IdxState}).
-
-
-is_running(Pid) ->
- gen_server:call(Pid, is_running).
-
-
-update(Mod, State) ->
- update(nil, Mod, State).
-
-
-restart(Pid, IdxState) ->
- gen_server:call(Pid, {restart, IdxState}).
-
-
-init({Index, Module}) ->
- process_flag(trap_exit, true),
- {ok, #st{idx=Index, mod=Module}}.
-
-
-terminate(_Reason, State) ->
- couch_util:shutdown_sync(State#st.pid),
- ok.
-
-
-handle_call({update, _IdxState}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
- {reply, ok, State};
-handle_call({update, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
- Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
- ?LOG_INFO("Starting index update for db: ~s idx: ~s", Args),
- Pid = spawn_link(?MODULE, update, [Idx, Mod, IdxState]),
- {reply, ok, State#st{pid=Pid}};
-handle_call({restart, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
- Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
- ?LOG_INFO("Restarting index update for db: ~s idx: ~s", Args),
- case is_pid(State#st.pid) of
- true -> couch_util:shutdown_sync(State#st.pid);
- _ -> ok
- end,
- Pid = spawn_link(?MODULE, update, [Idx, State#st.mod, IdxState]),
- {reply, ok, State#st{pid=Pid}};
-handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
- {reply, true, State};
-handle_call(is_running, _From, State) ->
- {reply, false, State}.
-
-
-handle_cast(_Mesg, State) ->
- {stop, unknown_cast, State}.
-
-
-handle_info({'EXIT', _, {updated, Pid, IdxState}}, #st{pid=Pid}=State) ->
- Mod = State#st.mod,
- Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
- ?LOG_INFO("Index update finished for db: ~s idx: ~s", Args),
- ok = gen_server:cast(State#st.idx, {updated, IdxState}),
- {noreply, State#st{pid=undefined}};
-handle_info({'EXIT', _, {reset, Pid}}, #st{idx=Idx, pid=Pid}=State) ->
- {ok, NewIdxState} = gen_server:call(State#st.idx, reset),
- Pid2 = spawn_link(?MODULE, update, [Idx, State#st.mod, NewIdxState]),
- {noreply, State#st{pid=Pid2}};
-handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
- {noreply, State#st{pid=undefined}};
-handle_info({'EXIT', Pid, {{nocatch, Error}, _Trace}}, State) ->
- handle_info({'EXIT', Pid, Error}, State);
-handle_info({'EXIT', Pid, Error}, #st{pid=Pid}=State) ->
- ok = gen_server:cast(State#st.idx, {update_error, Error}),
- {noreply, State#st{pid=undefined}};
-handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
- {stop, normal, State};
-handle_info({'EXIT', _Pid, normal}, State) ->
- {noreply, State};
-handle_info(_Mesg, State) ->
- {stop, unknown_info, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-update(Idx, Mod, IdxState) ->
- DbName = Mod:get(db_name, IdxState),
- CurrSeq = Mod:get(update_seq, IdxState),
- UpdateOpts = Mod:get(update_options, IdxState),
- CommittedOnly = lists:member(committed_only, UpdateOpts),
- IncludeDesign = lists:member(include_design, UpdateOpts),
- DocOpts = case lists:member(local_seq, UpdateOpts) of
- true -> [conflicts, deleted_conflicts, local_seq];
- _ -> [conflicts, deleted_conflicts]
- end,
-
- couch_util:with_db(DbName, fun(Db) ->
- DbUpdateSeq = couch_db:get_update_seq(Db),
- DbCommittedSeq = couch_db:get_committed_update_seq(Db),
-
- PurgedIdxState = case purge_index(Db, Mod, IdxState) of
- {ok, IdxState0} -> IdxState0;
- reset -> exit({reset, self()})
- end,
-
- NumChanges = couch_db:count_changes_since(Db, CurrSeq),
-
- GetSeq = fun
- (#full_doc_info{update_seq=Seq}) -> Seq;
- (#doc_info{high_seq=Seq}) -> Seq
- end,
-
- GetInfo = fun
- (#full_doc_info{id=Id, update_seq=Seq, deleted=Del}=FDI) ->
- {Id, Seq, Del, couch_doc:to_doc_info(FDI)};
- (#doc_info{id=Id, high_seq=Seq, revs=[RI|_]}=DI) ->
- {Id, Seq, RI#rev_info.deleted, DI}
- end,
-
- LoadDoc = fun(DI) ->
- {DocId, Seq, Deleted, DocInfo} = GetInfo(DI),
-
- case {IncludeDesign, DocId} of
- {false, <<"_design/", _/binary>>} ->
- {nil, Seq};
- _ when Deleted ->
- {#doc{id=DocId, deleted=true}, Seq};
- _ ->
- {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts),
- {Doc, Seq}
- end
- end,
-
- Proc = fun(DocInfo, _, {IdxStateAcc, _}) ->
- case CommittedOnly and (GetSeq(DocInfo) > DbCommittedSeq) of
- true ->
- {stop, {IdxStateAcc, false}};
- false ->
- {Doc, Seq} = LoadDoc(DocInfo),
- {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc),
- garbage_collect(),
- {ok, {NewSt, true}}
- end
- end,
-
- {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
- Acc0 = {InitIdxState, true},
- {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []),
- {ProcIdxSt, SendLast} = Acc,
-
- % If we didn't bail due to hitting the last committed seq we need
- % to send our last update_seq through.
- {ok, LastIdxSt} = case SendLast of
- true ->
- Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt);
- _ ->
- {ok, ProcIdxSt}
- end,
-
- {ok, FinalIdxState} = Mod:finish_update(LastIdxSt),
- exit({updated, self(), FinalIdxState})
- end).
-
-
-purge_index(Db, Mod, IdxState) ->
- DbPurgeSeq = couch_db:get_purge_seq(Db),
- IdxPurgeSeq = Mod:get(purge_seq, IdxState),
- if
- DbPurgeSeq == IdxPurgeSeq ->
- {ok, IdxState};
- DbPurgeSeq == IdxPurgeSeq + 1 ->
- {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
- Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
- true ->
- reset
- end.
diff --git a/src/couch_index/src/couch_index_util.erl b/src/couch_index/src/couch_index_util.erl
deleted file mode 100644
index cf1ff7561..000000000
--- a/src/couch_index/src/couch_index_util.erl
+++ /dev/null
@@ -1,81 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_util).
-
--export([root_dir/0, index_dir/2, index_file/3]).
--export([load_doc/3, sort_lib/1, hexsig/1]).
-
--include_lib("couch/include/couch_db.hrl").
-
-
-root_dir() ->
- case config:get("couchdb", "index_dir") of
- undefined -> config:get("couchdb", "view_index_dir");
- Value -> Value
- end.
-
-
-index_dir(Module, DbName) when is_binary(DbName) ->
- DbDir = "." ++ binary_to_list(DbName) ++ "_design",
- filename:join([root_dir(), DbDir, Module]);
-index_dir(Module, #db{}=Db) ->
- index_dir(Module, couch_db:name(Db)).
-
-
-index_file(Module, DbName, FileName) ->
- filename:join(index_dir(Module, DbName), FileName).
-
-
-load_doc(Db, #doc_info{}=DI, Opts) ->
- Deleted = lists:member(deleted, Opts),
- case (catch couch_db:open_doc(Db, DI, Opts)) of
- {ok, #doc{deleted=false}=Doc} -> Doc;
- {ok, #doc{deleted=true}=Doc} when Deleted -> Doc;
- _Else -> null
- end;
-load_doc(Db, {DocId, Rev}, Opts) ->
- case (catch load_doc(Db, DocId, Rev, Opts)) of
- #doc{deleted=false} = Doc -> Doc;
- _ -> null
- end.
-
-
-load_doc(Db, DocId, Rev, Options) ->
- case Rev of
- nil -> % open most recent rev
- case (catch couch_db:open_doc(Db, DocId, Options)) of
- {ok, Doc} -> Doc;
- _Error -> null
- end;
- _ -> % open a specific rev (deletions come back as stubs)
- case (catch couch_db:open_doc_revs(Db, DocId, [Rev], Options)) of
- {ok, [{ok, Doc}]} -> Doc;
- {ok, [{{not_found, missing}, Rev}]} -> null;
- {ok, [_Else]} -> null
- end
- end.
-
-
-sort_lib({Lib}) ->
- sort_lib(Lib, []).
-sort_lib([], LAcc) ->
- lists:keysort(1, LAcc);
-sort_lib([{LName, {LObj}}|Rest], LAcc) ->
- LSorted = sort_lib(LObj, []), % descend into nested object
- sort_lib(Rest, [{LName, LSorted}|LAcc]);
-sort_lib([{LName, LCode}|Rest], LAcc) ->
- sort_lib(Rest, [{LName, LCode}|LAcc]).
-
-
-hexsig(Sig) ->
- couch_util:to_hex(binary_to_list(Sig)).