diff options
Diffstat (limited to 'src/mem3')
-rw-r--r-- | src/mem3/src/mem3_reshard_dbdoc.erl | 275 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_index.erl | 164 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_job.erl | 722 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_store.erl | 288 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_validate.erl | 126 |
5 files changed, 1575 insertions, 0 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl new file mode 100644 index 000000000..7eb3e9f13 --- /dev/null +++ b/src/mem3/src/mem3_reshard_dbdoc.erl @@ -0,0 +1,275 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_dbdoc). + +-behaviour(gen_server). + +-export([ + update_shard_map/1, + + start_link/0, + + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("mem3_reshard.hrl"). + + +-spec update_shard_map(#job{}) -> no_return | ok. +update_shard_map(#job{source = Source, target = Target} = Job) -> + Node = hd(mem3_util:live_nodes()), + JobStr = mem3_reshard_job:jobfmt(Job), + LogMsg1 = "~p : ~p calling update_shard_map node:~p", + couch_log:notice(LogMsg1, [?MODULE, JobStr, Node]), + ServerRef = {?MODULE, Node}, + CallArg = {update_shard_map, Source, Target}, + TimeoutMSec = shard_update_timeout_msec(), + try + case gen_server:call(ServerRef, CallArg, TimeoutMSec) of + {ok, _} -> ok; + {error, CallError} -> throw({error, CallError}) + end + catch + _:Err -> + exit(Err) + end, + LogMsg2 = "~p : ~p update_shard_map on node:~p returned", + couch_log:notice(LogMsg2, [?MODULE, JobStr, Node]), + UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000), + case wait_source_removed(Source, 5, UntilSec) of + true -> ok; + false -> exit(shard_update_did_not_propagate) + end. + + +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + couch_log:notice("~p start init()", [?MODULE]), + {ok, nil}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call({update_shard_map, Source, Target}, _From, State) -> + Res = try + update_shard_map(Source, Target) + catch + throw:{error, Error} -> + {error, Error} + end, + {reply, Res, State}; + +handle_call(Call, From, State) -> + couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]), + {noreply, State}. + + +handle_cast(Cast, State) -> + couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]), + {noreply, State}. + + +handle_info(Info, State) -> + couch_log:error("~p unexpected info ~p", [?MODULE, Info]), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +% Private + +update_shard_map(Source, Target) -> + ok = validate_coordinator(), + ok = replicate_from_all_nodes(shard_update_timeout_msec()), + DocId = mem3:dbname(Source#shard.name), + OldDoc = case mem3_util:open_db_doc(DocId) of + {ok, #doc{deleted = true}} -> + throw({error, missing_source}); + {ok, #doc{} = Doc} -> + Doc; + {not_found, deleted} -> + throw({error, missing_source}); + OpenErr -> + throw({error, {shard_doc_open_error, OpenErr}}) + end, + #doc{body = OldBody} = OldDoc, + NewBody = update_shard_props(OldBody, Source, Target), + {ok, _} = write_shard_doc(OldDoc, NewBody), + ok = replicate_to_all_nodes(shard_update_timeout_msec()), + {ok, NewBody}. + + +validate_coordinator() -> + case hd(mem3_util:live_nodes()) =:= node() of + true -> ok; + false -> throw({error, coordinator_changed}) + end. + + +replicate_from_all_nodes(TimeoutMSec) -> + case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of + ok -> ok; + Error -> throw({error, Error}) + end. + + +replicate_to_all_nodes(TimeoutMSec) -> + case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of + ok -> ok; + Error -> throw({error, Error}) + end. + + +write_shard_doc(#doc{id = Id} = Doc, Body) -> + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + UpdatedDoc = Doc#doc{body = Body}, + couch_util:with_db(DbName, fun(Db) -> + try + {ok, _} = couch_db:update_doc(Db, UpdatedDoc, []) + catch + conflict -> + throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}}) + end + end). + + +update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) -> + {ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}), + ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}}, + Props1 = lists:keyreplace(<<"by_node">>, 1, Props0, ByNodeKV), + + {ByRange0} = couch_util:get_value(<<"by_range">>, Props1, {[]}), + ByRangeKV = {<<"by_range">>, {update_by_range(ByRange0, Source, Targets)}}, + Props2 = lists:keyreplace(<<"by_range">>, 1, Props1, ByRangeKV), + + Changelog = couch_util:get_value(<<"changelog">>, Props2, []), + {Node, Range} = {node_key(Source), range_key(Source)}, + TRanges = [range_key(T) || T <- Targets], + ChangelogEntry = [[<<"split">>, Range, TRanges, Node]], + ChangelogKV = {<<"changelog">>, Changelog ++ ChangelogEntry}, + Props3 = lists:keyreplace(<<"changelog">>, 1, Props2, ChangelogKV), + + {Props3}. + + +update_by_node(ByNode, #shard{} = Source, [#shard{} | _] = Targets) -> + {NodeKey, SKey} = {node_key(Source), range_key(Source)}, + {_, Ranges} = lists:keyfind(NodeKey, 1, ByNode), + Ranges1 = Ranges -- [SKey], + Ranges2 = Ranges1 ++ [range_key(T) || T <- Targets], + lists:keyreplace(NodeKey, 1, ByNode, {NodeKey, lists:sort(Ranges2)}). + + +update_by_range(ByRange, Source, Targets) -> + ByRange1 = remove_node_from_source(ByRange, Source), + lists:foldl(fun add_node_to_target_foldl/2, ByRange1, Targets). + + +remove_node_from_source(ByRange, Source) -> + {NodeKey, SKey} = {node_key(Source), range_key(Source)}, + {_, SourceNodes} = lists:keyfind(SKey, 1, ByRange), + % Double check that source had node to begin with + case lists:member(NodeKey, SourceNodes) of + true -> + ok; + false -> + throw({source_shard_missing_node, NodeKey, SourceNodes}) + end, + SourceNodes1 = SourceNodes -- [NodeKey], + case SourceNodes1 of + [] -> + % If last node deleted, remove entry + lists:keydelete(SKey, 1, ByRange); + _ -> + lists:keyreplace(SKey, 1, ByRange, {SKey, SourceNodes1}) + end. + + +add_node_to_target_foldl(#shard{} = Target, ByRange) -> + {NodeKey, TKey} = {node_key(Target), range_key(Target)}, + case lists:keyfind(TKey, 1, ByRange) of + {_, Nodes} -> + % Double check that target does not have node already + case lists:member(NodeKey, Nodes) of + false -> + ok; + true -> + throw({target_shard_already_has_node, NodeKey, Nodes}) + end, + Nodes1 = lists:sort([NodeKey | Nodes]), + lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1}); + false -> + % fabric_db_create:make_document/3 says they should be sorted + lists:sort([{TKey, [NodeKey]} | ByRange]) + end. + + +node_key(#shard{node = Node}) -> + couch_util:to_binary(Node). + + +range_key(#shard{range = [B, E]}) -> + BHex = couch_util:to_hex(<<B:32/integer>>), + EHex = couch_util:to_hex(<<E:32/integer>>), + list_to_binary([BHex, "-", EHex]). + + +shard_update_timeout_msec() -> + config:get_integer("reshard", "shard_upate_timeout_msec", 300000). + + +wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) -> + case check_source_removed(Source) of + true -> + true; + false -> + case mem3_reshard:now_sec() < UntilSec of + true -> + LogMsg = "~p : Waiting for shard ~p removal confirmation", + couch_log:notice(LogMsg, [?MODULE, Name]), + timer:sleep(SleepSec * 1000), + wait_source_removed(Source, SleepSec, UntilSec); + false -> + false + end + end. + + +check_source_removed(#shard{name = Name}) -> + DbName = mem3:dbname(Name), + Live = mem3_util:live_nodes(), + ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)], + Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]), + {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]), + Shards = lists:usort(lists:flatten(Responses)), + SourcePresent = [S || S = #shard{name = S, node = N} <- Shards, S =:= Name, + N =:= node()], + case SourcePresent of + [] -> true; + [_ | _] -> false + end. diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl new file mode 100644 index 000000000..d4cb7caa1 --- /dev/null +++ b/src/mem3/src/mem3_reshard_index.erl @@ -0,0 +1,164 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_index). + + +-export([ + design_docs/1, + target_indices/2, + spawn_builders/1 +]). + + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +%% Public API + +design_docs(DbName) -> + try + case fabric_design_docs(mem3:dbname(DbName)) of + {error, {maintenance_mode, _, _Node}} -> + {ok, []}; + {ok, DDocs} -> + JsonDocs = [couch_doc:from_json_obj(DDoc) || DDoc <- DDocs], + {ok, JsonDocs}; + Else -> + Else + end + catch error:database_does_not_exist -> + {ok, []} + end. + + +target_indices(Docs, Targets) -> + Indices = [[indices(N, D) || D <- Docs] || #shard{name = N} <- Targets], + lists:flatten(Indices). + + +spawn_builders(Indices) -> + Results = [build_index(Index) || Index <- Indices], + Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)], + case Results -- Oks of + [] -> + {ok, [Pid || {ok, Pid} <- Results]}; + Error -> + % Do a all or nothing pattern, if some indices could not be + % spawned, kill the spawned ones and and return the error. + ErrMsg = "~p failed to spawn index builders: ~p ~p", + couch_log:error(ErrMsg, [?MODULE, Error, Indices]), + lists:foreach(fun({ok, Pid}) -> + catch unlink(Pid), + catch exit(Pid, kill) + end, Oks), + {error, Error} + end. + + +%% Private API + +fabric_design_docs(DbName) -> + case couch_util:with_proc(fabric, design_docs, [DbName], infinity) of + {ok, Resp} -> Resp; + {error, Error} -> Error + end. + + +indices(DbName, Doc) -> + mrview_indices(DbName, Doc) + ++ [dreyfus_indices(DbName, Doc) || has_app(dreyfus)] + ++ [hastings_indices(DbName, Doc) || has_app(hastings)]. + + +mrview_indices(DbName, Doc) -> + try + {ok, MRSt} = couch_mrview_util:ddoc_to_mrst(DbName, Doc), + Views = couch_mrview_index:get(views, MRSt), + case Views =/= [] of + true -> + [{mrview, DbName, MRSt}]; + false -> + [] + end + catch + Tag:Err -> + Msg = "~p couldn't get mrview index ~p ~p ~p:~p", + couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]), + [] + end. + + +dreyfus_indices(DbName, Doc) -> + try + Indices = dreyfus_index:design_doc_to_indexes(Doc), + [{dreyfus, DbName, Index} || Index <- Indices] + catch + Tag:Err -> + Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p", + couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]), + [] + end. + + +hastings_indices(DbName, Doc) -> + try + Indices = hastings_index:design_doc_to_indexes(Doc), + [{hastings, DbName, Index} || Index <- Indices] + catch + Tag:Err -> + Msg = "~p couldn't get hasting indices ~p ~p ~p:~p", + couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]), + [] + end. + + +build_index({mrview, DbName, MRSt}) -> + case couch_index_server:get_index(couch_mrview_index, MRSt) of + {ok, Pid} -> + Args = [Pid, get_update_seq(DbName)], + WPid = spawn_link(couch_index, get_state, Args), + {ok, WPid}; + Error -> + Error + end; + +build_index({dreyfus, DbName, Index})-> + case dreyfus_index_manager:get_index(DbName, Index) of + {ok, Pid} -> + Args = [Pid, get_update_seq(DbName)], + WPid = spawn_link(dreyfus_index, await, Args), + {ok, WPid}; + Error -> + Error + end; + +build_index({hastings, DbName, Index}) -> + case hastings_index_manager:get_index(DbName, Index) of + {ok, Pid} -> + Args = [Pid, get_update_seq(DbName)], + WPid = spawn_link(hastings_index, await, Args), + {ok, WPid}; + Error -> + Error + end. + + +has_app(App) -> + code:lib_dir(App) /= {error, bad_name}. + + +get_update_seq(DbName) -> + couch_util:with_db(DbName, fun(Db) -> + couch_db:get_update_seq(Db) + end). diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl new file mode 100644 index 000000000..d3a33d3f6 --- /dev/null +++ b/src/mem3/src/mem3_reshard_job.erl @@ -0,0 +1,722 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_job). + + +-export([ + start_link/1, + + checkpoint_done/1, + jobfmt/1, + pickfun/3 +]). + +-export([ + init/1, + + initial_copy/1, + initial_copy_impl/1, + + topoff/1, + topoff_impl/1, + + build_indices/1, + + copy_local_docs/1, + copy_local_docs_impl/1, + + update_shardmap/1, + + wait_source_close/1, + wait_source_close_impl/1, + + source_delete/1, + source_delete_impl/1, + + completed/1 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("mem3_reshard.hrl"). + + +% Batch size for internal replication topoffs +-define(INTERNAL_REP_BATCH_SIZE, 2000). + +% The list of possible job states. The order of this +% list is important as a job will progress linearly +% through it. However, when starting a job we may +% have to resume from an earlier state as listed +% below in STATE_RESTART. +-define(SPLIT_STATES, [ + new, + initial_copy, + topoff1, + build_indices, + topoff2, + copy_local_docs, + update_shardmap, + wait_source_close, + topoff3, + source_delete, + completed +]). + + +% When a job starts it may be resuming from a partially +% completed state. These state pairs list the state +% we have to restart from for each possible state. +-define(STATE_RESTART, #{ + new => initial_copy, + initial_copy => initial_copy, + topoff1 => topoff1, + build_indices => topoff1, + topoff2 => topoff1, + copy_local_docs => topoff1, + update_shardmap => update_shardmap, + wait_source_close => wait_source_close, + topoff3 => wait_source_close, + source_delete => wait_source_close, + completed => completed +}). + + +% If we have a worker failing during any of these +% states we need to clean up the targets +-define(CLEAN_TARGET_STATES, [ + initial_copy, + topoff1, + build_indices, + topoff2, + copy_local_docs +]). + + +start_link(#job{} = Job) -> + proc_lib:start_link(?MODULE, init, [Job]). + + +% This is called by the main proces after it has checkpointed the progress +% of the job. After the new state is checkpointed, we signal the job to start +% executing that state. +checkpoint_done(#job{pid = Pid} = Job) -> + couch_log:notice(" ~p : checkpoint done for ~p", [?MODULE, jobfmt(Job)]), + Pid ! checkpoint_done, + ok. + + +% Formatting function, used for logging mostly +jobfmt(#job{} = Job) -> + #job{ + id = Id, + source = #shard{name = Source}, + target = Target, + split_state = State, + job_state = JobState, + pid = Pid + } = Job, + TargetCount = length(Target), + Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}", + Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]), + lists:flatten(Fmt). + + +% This is the function which picks between various targets. It is used here as +% well as in mem3_rep internal replicator and couch_db_split bulk copy logic. +% Given a document id and list of ranges, and a hash function, it will pick one +% of the range or return not_in_range atom. +pickfun(DocId, [[B, E] | _] = Ranges, {_M, _F, _A} = HashFun) when + is_integer(B), is_integer(E), B =< E -> + HashKey = mem3_hash:calculate(HashFun, DocId), + Pred = fun([Begin, End]) -> + Begin =< HashKey andalso HashKey =< End + end, + case lists:filter(Pred, Ranges) of + [] -> not_in_range; + [Key] -> Key + end. + + +init(#job{} = Job0) -> + process_flag(trap_exit, true), + Job1 = set_start_state(Job0#job{ + pid = self(), + start_time = mem3_reshard:now_sec(), + workers = [], + retries = 0 + }), + Job2 = update_split_history(Job1), + proc_lib:init_ack({ok, self()}), + couch_log:notice("~p starting job ~s", [?MODULE, jobfmt(Job2)]), + ok = checkpoint(Job2), + run(Job2). + + +run(#job{split_state = CurrState} = Job) -> + StateFun = case CurrState of + topoff1 -> topoff; + topoff2 -> topoff; + topoff3 -> topoff; + _ -> CurrState + end, + NewJob = try + Job1 = ?MODULE:StateFun(Job), + Job2 = wait_for_workers(Job1), + Job3 = switch_to_next_state(Job2), + ok = checkpoint(Job3), + Job3 + catch + throw:{retry, RetryJob} -> + RetryJob + end, + run(NewJob). + + +set_start_state(#job{split_state = State} = Job) -> + case {State, maps:get(State, ?STATE_RESTART, undefined)} of + {_, undefined} -> + Fmt1 = "~p recover : unknown state ~s", + couch_log:error(Fmt1, [?MODULE, jobfmt(Job)]), + erlang:error({invalid_split_job_recover_state, Job}); + {initial_copy, initial_copy} -> + % Since we recover from initial_copy to initial_copy, we need + % to reset the target state as initial_copy expects to + % create a new target + Fmt2 = "~p recover : resetting target ~s", + couch_log:notice(Fmt2, [?MODULE, jobfmt(Job)]), + reset_target(Job); + {_, StartState} -> + Job#job{split_state = StartState} + end. + + +get_next_state(#job{split_state = State}) -> + get_next_state(State, ?SPLIT_STATES). + + +get_next_state(completed, _) -> + completed; + +get_next_state(CurrState, [CurrState, NextState | _]) -> + NextState; + +get_next_state(CurrState, [_ | Rest]) -> + get_next_state(CurrState, Rest). + + +switch_to_next_state(#job{} = Job0) -> + Info0 = Job0#job.state_info, + Info1 = info_delete(error, Info0), + Info2 = info_delete(reason, Info1), + Job1 = Job0#job{ + split_state = get_next_state(Job0), + update_time = mem3_reshard:now_sec(), + retries = 0, + state_info = Info2, + workers = [] + }, + Job2 = update_split_history(Job1), + check_state(Job2). + + +checkpoint(Job) -> + % Ask main process to checkpoint. When it has finished it will notify us + % by calling by checkpoint_done/1. The reason not to call the main process + % via a gen_server:call is because the main process could be in the middle + % of terminating the job and then it would deadlock (after sending us a + % shutdown message) and it would end up using the whole supervisor + % termination timeout before finally. + ok = mem3_reshard:checkpoint(Job#job.manager, Job), + Parent = parent(), + receive + {'EXIT', Parent, Reason} -> + handle_exit(Job, Reason); + checkpoint_done -> + ok; + Other -> + handle_unknown_msg(Job, "checkpoint", Other) + end. + + +wait_for_workers(#job{workers = []} = Job) -> + Job; + +wait_for_workers(#job{workers = Workers} = Job) -> + Parent = parent(), + receive + {'EXIT', Parent, Reason} -> + handle_exit(Job, Reason); + {'EXIT', Pid, Reason} -> + case lists:member(Pid, Workers) of + true -> + NewJob = handle_worker_exit(Job, Pid, Reason), + wait_for_workers(NewJob); + false -> + handle_unknown_msg(Job, "wait_for_workers", {Pid, Reason}) + end; + Other -> + handle_unknown_msg(Job, "wait_for_workers", Other) + end. + + +handle_worker_exit(#job{workers = Workers} = Job, Pid, normal) -> + Job#job{workers = Workers -- [Pid]}; + +handle_worker_exit(#job{} = Job, _Pid, {error, missing_source}) -> + Msg1 = "~p stopping worker due to source missing ~p", + couch_log:error(Msg1, [?MODULE, jobfmt(Job)]), + kill_workers(Job), + case lists:member(Job#job.split_state, ?CLEAN_TARGET_STATES) of + true -> + Msg2 = "~p cleaning target after db was deleted ~p", + couch_log:error(Msg2, [?MODULE, jobfmt(Job)]), + reset_target(Job), + exit({error, missing_source}); + false -> + exit({error, missing_source}) + end; + +handle_worker_exit(#job{} = Job, _Pid, {error, missing_target}) -> + Msg = "~p stopping worker due to target db missing ~p", + couch_log:error(Msg, [?MODULE, jobfmt(Job)]), + kill_workers(Job), + exit({error, missing_target}); + +handle_worker_exit(#job{} = Job0, _Pid, Reason) -> + couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]), + kill_workers(Job0), + Job1 = Job0#job{workers = []}, + case Job1#job.retries =< max_retries() of + true -> + retry_state(Job1, Reason); + false -> + exit(Reason) + end. + + +% Cleanup and exit when we receive an 'EXIT' message from our parent. In case +% the shard map is being updated, try to wait some time for it to finish. +handle_exit(#job{split_state = update_shardmap, workers = [WPid]} = Job, + Reason) -> + Timeout = update_shard_map_timeout_sec(), + Msg1 = "~p job exit ~s ~p while shard map is updating, waiting ~p sec", + couch_log:warning(Msg1, [?MODULE, jobfmt(Job), Reason, Timeout]), + receive + {'EXIT', WPid, normal} -> + Msg2 = "~p ~s shard map finished updating successfully, exiting", + couch_log:notice(Msg2, [?MODULE, jobfmt(Job)]), + exit(Reason); + {'EXIT', WPid, Error} -> + Msg3 = "~p ~s shard map update failed with error ~p", + couch_log:error(Msg3, [?MODULE, jobfmt(Job), Error]), + exit(Reason) + after Timeout * 1000-> + Msg4 = "~p ~s shard map update timeout exceeded ~p sec", + couch_log:error(Msg4, [?MODULE, jobfmt(Job), Timeout]), + kill_workers(Job), + exit(Reason) + end; + +handle_exit(#job{} = Job, Reason) -> + kill_workers(Job), + exit(Reason). + + +retry_state(#job{retries = Retries, state_info = Info} = Job0, Error) -> + Job1 = Job0#job{ + retries = Retries + 1, + state_info = info_update(error, Error, Info) + }, + couch_log:notice("~p retrying ~p ~p", [?MODULE, jobfmt(Job1), Retries]), + Job2 = report(Job1), + Timeout = retry_interval_sec(), + Parent = parent(), + receive + {'EXIT', Parent, Reason} -> + handle_exit(Job2, Reason); + Other -> + handle_unknown_msg(Job2, "retry_state", Other) + after Timeout * 1000 -> + ok + end, + throw({retry, Job2}). + + +report(#job{manager = ManagerPid} = Job) -> + Job1 = Job#job{update_time = mem3_reshard:now_sec()}, + ok = mem3_reshard:report(ManagerPid, Job1), + Job1. + + +kill_workers(#job{workers = Workers}) -> + lists:foreach(fun(Worker) -> + unlink(Worker), + exit(Worker, kill) + end, Workers), + flush_worker_messages(). + + +flush_worker_messages() -> + Parent = parent(), + receive + {'EXIT', Pid, _} when Pid =/= Parent -> + flush_worker_messages() + after 0 -> + ok + end. + + +parent() -> + case get('$ancestors') of + [Pid | _] when is_pid(Pid) -> Pid; + [Name | _] when is_atom(Name) -> whereis(Name); + _ -> undefined + end. + + +handle_unknown_msg(Job, When, RMsg) -> + LogMsg = "~p ~s received an unknown message ~p when in ~s", + couch_log:error(LogMsg, [?MODULE, jobfmt(Job), RMsg, When]), + erlang:error({invalid_split_job_message, Job#job.id, When, RMsg}). + + +initial_copy(#job{} = Job) -> + Pid = spawn_link(?MODULE, initial_copy_impl, [Job]), + report(Job#job{workers = [Pid]}). + + +initial_copy_impl(#job{source = Source, target = Targets0} = Job) -> + #shard{name = SourceName} = Source, + Targets = [{R, N} || #shard{range = R, name = N} <- Targets0], + TMap = maps:from_list(Targets), + LogMsg1 = "~p initial_copy started ~s", + LogArgs1 = [?MODULE, shardsstr(Source, Targets0)], + couch_log:notice(LogMsg1, LogArgs1), + case couch_db_split:split(SourceName, TMap, fun pickfun/3) of + {ok, Seq} -> + LogMsg2 = "~p initial_copy of ~s finished @ seq:~p", + LogArgs2 = [?MODULE, shardsstr(Source, Targets0), Seq], + couch_log:notice(LogMsg2, LogArgs2), + create_artificial_mem3_rep_checkpoints(Job, Seq); + {error, Error} -> + LogMsg3 = "~p initial_copy of ~p finished @ ~p", + LogArgs3 = [?MODULE, shardsstr(Source, Targets0), Error], + couch_log:notice(LogMsg3, LogArgs3), + exit({error, Error}) + end. + + +topoff(#job{} = Job) -> + Pid = spawn_link(?MODULE, topoff_impl, [Job]), + report(Job#job{workers = [Pid]}). + + +topoff_impl(#job{source = #shard{} = Source, target = Targets}) -> + couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]), + check_source_exists(Source, topoff), + check_targets_exist(Targets, topoff), + TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]), + Opts = [{batch_size, ?INTERNAL_REP_BATCH_SIZE}, {batch_count, all}], + case mem3_rep:go(Source, TMap, Opts) of + {ok, Count} -> + Args = [?MODULE, shardsstr(Source, Targets), Count], + couch_log:notice("~p topoff done ~s, count: ~p", Args), + ok; + {error, Error} -> + Args = [?MODULE, shardsstr(Source, Targets), Error], + couch_log:error("~p topoff failed ~s, error: ~p", Args), + exit({error, Error}) + end. + + +build_indices(#job{} = Job) -> + #job{ + source = #shard{name = SourceName} = Source, + target = Targets, + retries = Retries, + state_info = Info + } = Job, + check_source_exists(Source, build_indices), + {ok, DDocs} = mem3_reshard_index:design_docs(SourceName), + Indices = mem3_reshard_index:target_indices(DDocs, Targets), + case mem3_reshard_index:spawn_builders(Indices) of + {ok, []} -> + % Skip the log spam if this is a no-op + Job#job{workers = []}; + {ok, Pids} -> + report(Job#job{workers = Pids}); + {error, Error} -> + case Job#job.retries =< max_retries() of + true -> + build_indices(Job#job{ + retries = Retries + 1, + state_info = info_update(error, Error, Info) + }); + false -> + exit(Error) + end + end. + + +copy_local_docs(#job{split_state = copy_local_docs} = Job) -> + Pid = spawn_link(?MODULE, copy_local_docs_impl, [Job]), + report(Job#job{workers = [Pid]}). + + +copy_local_docs_impl(#job{source = Source, target = Targets0}) -> + #shard{name = SourceName} = Source, + Targets = [{R, N} || #shard{range = R, name = N} <- Targets0], + TMap = maps:from_list(Targets), + LogArg1 = [?MODULE, shardsstr(Source, Targets)], + couch_log:notice("~p copy local docs start ~s", LogArg1), + case couch_db_split:copy_local_docs(SourceName, TMap, fun pickfun/3) of + ok -> + couch_log:notice("~p copy local docs finished for ~s", LogArg1), + ok; + {error, Error} -> + LogArg2 = [?MODULE, shardsstr(Source, Targets), Error], + couch_log:error("~p copy local docs failed for ~s ~p", LogArg2), + exit({error, Error}) + end. + + +update_shardmap(#job{} = Job) -> + Pid = spawn_link(mem3_reshard_dbdoc, update_shard_map, [Job]), + report(Job#job{workers = [Pid]}). + + +wait_source_close(#job{source = #shard{name = Name}} = Job) -> + couch_event:notify(Name, deleted), + Pid = spawn_link(?MODULE, wait_source_close_impl, [Job]), + report(Job#job{workers = [Pid]}). + + +wait_source_close_impl(#job{source = #shard{name = Name}, target = Targets}) -> + Timeout = config:get_integer("reshard", "source_close_timeout_sec", 600), + check_targets_exist(Targets, wait_source_close), + case couch_db:open_int(Name, [?ADMIN_CTX]) of + {ok, Db} -> + Now = mem3_reshard:now_sec(), + case wait_source_close(Db, 1, Now + Timeout) of + true -> + ok; + false -> + exit({error, source_db_close_timeout, Name, Timeout}) + end; + {not_found, _} -> + couch_log:warning("~p source already deleted ~p", [?MODULE, Name]), + ok + end. + + +wait_source_close(Db, SleepSec, UntilSec) -> + case couch_db:monitored_by(Db) -- [self()] of + [] -> + true; + [_ | _] -> + Now = mem3_reshard:now_sec(), + case Now < UntilSec of + true -> + LogMsg = "~p : Waiting for source shard ~p to be closed", + couch_log:notice(LogMsg, [?MODULE, couch_db:name(Db)]), + timer:sleep(SleepSec * 1000), + wait_source_close(Db, SleepSec, UntilSec); + false -> + false + end + end. + + +source_delete(#job{} = Job) -> + Pid = spawn_link(?MODULE, source_delete_impl, [Job]), + report(Job#job{workers = [Pid]}). + + +source_delete_impl(#job{source = #shard{name = Name}, target = Targets}) -> + check_targets_exist(Targets, source_delete), + case config:get_boolean("mem3_reshard", "delete_source", true) of + true -> + case couch_server:delete(Name, [?ADMIN_CTX]) of + ok -> + couch_log:notice("~p : deleted source shard ~p", + [?MODULE, Name]); + not_found -> + couch_log:warning("~p : source was already deleted ~p", + [?MODULE, Name]) + end; + false -> + % Emit deleted event even when not actually deleting the files this + % is the second one emitted, the other one was before + % wait_source_close. They should be idempotent. This one is just to + % match the one that couch_server would emit had the config not + % been set + couch_event:notify(Name, deleted), + LogMsg = "~p : according to configuration not deleting source ~p", + couch_log:warning(LogMsg, [?MODULE, Name]) + end, + TNames = [TName || #shard{name = TName} <- Targets], + lists:foreach(fun(TName) -> couch_event:notify(TName, updated) end, TNames). + + +completed(#job{} = Job) -> + couch_log:notice("~p : ~p completed, exit normal", [?MODULE, jobfmt(Job)]), + exit(normal). + + +% This is for belt and suspenders really. Call periodically to validate the +% state is one of the expected states. +-spec check_state(#job{}) -> #job{} | no_return(). +check_state(#job{split_state = State} = Job) -> + case lists:member(State, ?SPLIT_STATES) of + true -> + Job; + false -> + erlang:error({invalid_shard_split_state, State, Job}) + end. + + +create_artificial_mem3_rep_checkpoints(#job{} = Job, Seq) -> + #job{source = Source = #shard{name = SourceName}, target = Targets} = Job, + check_source_exists(Source, initial_copy), + TNames = [TN || #shard{name = TN} <- Targets], + Timestamp = list_to_binary(mem3_util:iso8601_timestamp()), + couch_util:with_db(SourceName, fun(SDb) -> + [couch_util:with_db(TName, fun(TDb) -> + Doc = mem3_rep_checkpoint_doc(SDb, TDb, Timestamp, Seq), + {ok, _} = couch_db:update_doc(SDb, Doc, []), + {ok, _} = couch_db:update_doc(TDb, Doc, []), + ok + end) || TName <- TNames] + end), + ok. + + +mem3_rep_checkpoint_doc(SourceDb, TargetDb, Timestamp, Seq) -> + Node = atom_to_binary(node(), utf8), + SourceUUID = couch_db:get_uuid(SourceDb), + TargetUUID = couch_db:get_uuid(TargetDb), + History = {[ + {<<"source_node">>, Node}, + {<<"source_uuid">>, SourceUUID}, + {<<"source_seq">>, Seq}, + {<<"timestamp">>, Timestamp}, + {<<"target_node">>, Node}, + {<<"target_uuid">>, TargetUUID}, + {<<"target_seq">>, Seq} + ]}, + Body = {[ + {<<"seq">>, Seq}, + {<<"target_uuid">>, TargetUUID}, + {<<"history">>, {[{Node, [History]}]}} + ]}, + Id = mem3_rep:make_local_id(SourceUUID, TargetUUID), + #doc{id = Id, body = Body}. + + +check_source_exists(#shard{name = Name}, StateName) -> + case couch_server:exists(Name) of + true -> + ok; + false -> + ErrMsg = "~p source ~p is unexpectedly missing in ~p", + couch_log:error(ErrMsg, [?MODULE, Name, StateName]), + exit({error, missing_source}) + end. + + +check_targets_exist(Targets, StateName) -> + lists:foreach(fun(#shard{name = Name}) -> + case couch_server:exists(Name) of + true -> + ok; + false -> + ErrMsg = "~p target ~p is unexpectedly missing in ~p", + couch_log:error(ErrMsg, [?MODULE, Name, StateName]), + exit({error, missing_target}) + end + end, Targets). + + +-spec max_retries() -> integer(). +max_retries() -> + config:get_integer("reshard", "max_retries", 1). + + +-spec retry_interval_sec() -> integer(). +retry_interval_sec() -> + config:get_integer("reshard", "retry_interval_sec", 10). + + +-spec update_shard_map_timeout_sec() -> integer(). +update_shard_map_timeout_sec() -> + config:get_integer("reshard", "update_shardmap_timeout_sec", 60). + + +-spec info_update(atom(), any(), [tuple()]) -> [tuple()]. +info_update(Key, Val, StateInfo) -> + lists:keystore(Key, 1, StateInfo, {Key, Val}). + + +-spec info_delete(atom(), [tuple()]) -> [tuple()]. +info_delete(Key, StateInfo) -> + lists:keydelete(Key, 1, StateInfo). + + +-spec shardsstr(#shard{}, #shard{} | [#shard{}]) -> string(). +shardsstr(#shard{name = SourceName}, #shard{name = TargetName}) -> + lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetName])); + +shardsstr(#shard{name = SourceName}, Targets) -> + TNames = [TN || #shard{name = TN} <- Targets], + TargetsStr = string:join([binary_to_list(T) || T <- TNames], ","), + lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetsStr])). + + +-spec reset_target(#job{}) -> #job{}. +reset_target(#job{source = Source, target = Targets} = Job) -> + ShardNames = try + [N || #shard{name = N} <- mem3:local_shards(mem3:dbname(Source))] + catch + error:database_does_not_exist -> + [] + end, + lists:map(fun(#shard{name = Name}) -> + case {couch_server:exists(Name), lists:member(Name, ShardNames)} of + {_, true} -> + % Should never get here but if we do crash and don't continue + LogMsg = "~p : ~p target unexpectedly found in shard map ~p", + couch_log:error(LogMsg, [?MODULE, jobfmt(Job), Name]), + erlang:error({target_present_in_shard_map, Name}); + {true, false} -> + LogMsg = "~p : ~p resetting ~p target", + couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), Name]), + couch_db_split:cleanup_target(Source#shard.name, Name); + {false, false} -> + ok + end + end, Targets), + Job. + + +-spec update_split_history(#job{}) -> #job{}. +update_split_history(#job{split_state = St, update_time = Ts} = Job) -> + Hist = Job#job.history, + JobSt = case St of + completed -> completed; + failed -> failed; + new -> new; + stopped -> stopped; + _ -> running + end, + Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}. diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl new file mode 100644 index 000000000..a1e00544a --- /dev/null +++ b/src/mem3/src/mem3_reshard_store.erl @@ -0,0 +1,288 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_store). + + +-export([ + init/3, + + store_job/2, + load_job/2, + delete_job/2, + get_jobs/1, + + store_state/1, + load_state/2, + delete_state/1, % for debugging + + job_to_ejson_props/2, + state_to_ejson_props/1 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("mem3_reshard.hrl"). + + +-spec init(#state{}, binary(), binary()) -> #state{}. +init(#state{} = State, JobPrefix, StateDocId) -> + State#state{ + job_prefix = <<?LOCAL_DOC_PREFIX, JobPrefix/binary>>, + state_id = <<?LOCAL_DOC_PREFIX, StateDocId/binary>> + }. + + +-spec store_job(#state{}, #job{}) -> ok. +store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) -> + with_shards_db(fun(Db) -> + DocId = <<Prefix/binary, Id/binary>>, + ok = update_doc(Db, DocId, job_to_ejson_props(Job)) + end). + + +-spec load_job(#state{}, binary()) -> {ok, {[_]}} | not_found. +load_job(#state{job_prefix = Prefix}, Id) -> + with_shards_db(fun(Db) -> + case load_doc(Db, <<Prefix/binary, Id/binary>>) of + {ok, DocBody} -> + {ok, job_from_ejson(DocBody)}; + not_found -> + not_found + end + end). + + +-spec delete_job(#state{}, binary()) -> ok. +delete_job(#state{job_prefix = Prefix}, Id) -> + with_shards_db(fun(Db) -> + DocId = <<Prefix/binary, Id/binary>>, + ok = delete_doc(Db, DocId) + end). + + +-spec get_jobs(#state{}) -> [#job{}]. +get_jobs(#state{job_prefix = Prefix}) -> + with_shards_db(fun(Db) -> + PrefixLen = byte_size(Prefix), + FoldFun = fun(#doc{id = Id, body = Body}, Acc) -> + case Id of + <<Prefix:PrefixLen/binary, _/binary>> -> + {ok, [job_from_ejson(Body) | Acc]}; + _ -> + {stop, Acc} + end + end, + Opts = [{start_key, Prefix}], + {ok, Jobs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts), + lists:reverse(Jobs) + end). + + +-spec store_state(#state{}) -> ok. +store_state(#state{state_id = DocId} = State) -> + with_shards_db(fun(Db) -> + ok = update_doc(Db, DocId, state_to_ejson_props(State)) + end). + + +-spec load_state(#state{}, atom()) -> #state{}. +load_state(#state{state_id = DocId} = State, Default) -> + with_shards_db(fun(Db) -> + case load_doc(Db, DocId) of + {ok, DocBody} -> + state_from_ejson(State, DocBody); + not_found -> + State#state{state = Default} + end + end). + + +-spec delete_state(#state{}) -> ok. +delete_state(#state{state_id = DocId}) -> + with_shards_db(fun(Db) -> + ok = delete_doc(Db, DocId) + end). + + +job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) -> + Iso8601 = proplists:get_value(iso8601, Opts), + History = history_to_ejson(Job#job.history, Iso8601), + StartTime = case Iso8601 of + true -> iso8601(Job#job.start_time); + _ -> Job#job.start_time + end, + UpdateTime = case Iso8601 of + true -> iso8601(Job#job.update_time); + _ -> Job#job.update_time + end, + [ + {id, Job#job.id}, + {type, Job#job.type}, + {source, Source#shard.name}, + {target, [T#shard.name || T <- Targets]}, + {job_state, Job#job.job_state}, + {split_state, Job#job.split_state}, + {state_info, state_info_to_ejson(Job#job.state_info)}, + {node, atom_to_binary(Job#job.node, utf8)}, + {start_time, StartTime}, + {update_time, UpdateTime}, + {history, History} + ]. + + +state_to_ejson_props(#state{} = State) -> + [ + {state, atom_to_binary(State#state.state, utf8)}, + {state_info, state_info_to_ejson(State#state.state_info)}, + {update_time, State#state.update_time}, + {node, atom_to_binary(State#state.node, utf8)} + ]. + + +% Private API + +with_shards_db(Fun) -> + DbName = config:get("mem3", "shards_db", "_dbs"), + case mem3_util:ensure_exists(DbName) of + {ok, Db} -> + try + Fun(Db) + after + catch couch_db:close(Db) + end; + Else -> + throw(Else) + end. + + +delete_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = {_, Revs}}} -> + {ok, _} = couch_db:delete_doc(Db, DocId, Revs), + {ok, _} = couch_db:ensure_full_commit(Db), + ok; + {not_found, _} -> + ok + end. + + +update_doc(Db, DocId, Body) -> + DocProps = [{<<"_id">>, DocId}] ++ Body, + Body1 = ?JSON_DECODE(?JSON_ENCODE({DocProps})), + BaseDoc = couch_doc:from_json_obj(Body1), + Doc = case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = Revs}} -> + BaseDoc#doc{revs = Revs}; + {not_found, _} -> + BaseDoc + end, + case store_state() of + true -> + {ok, _} = couch_db:update_doc(Db, Doc, []), + couch_log:debug("~p updated doc ~p ~p", [?MODULE, DocId, Body]), + {ok, _} = couch_db:ensure_full_commit(Db), + ok; + false -> + couch_log:debug("~p not storing state in ~p", [?MODULE, DocId]), + ok + end. + + +load_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, [ejson_body]) of + {ok, #doc{body = Body}} -> + couch_log:debug("~p loaded doc ~p ~p", [?MODULE, DocId, Body]), + {ok, Body}; + {not_found, _} -> + not_found + end. + + +job_to_ejson_props(#job{} = Job) -> + job_to_ejson_props(Job, []). + + +job_from_ejson({Props}) -> + Id = couch_util:get_value(<<"id">>, Props), + Type = couch_util:get_value(<<"type">>, Props), + Source = couch_util:get_value(<<"source">>, Props), + Target = couch_util:get_value(<<"target">>, Props), + JobState = couch_util:get_value(<<"job_state">>, Props), + SplitState = couch_util:get_value(<<"split_state">>, Props), + StateInfo = couch_util:get_value(<<"state_info">>, Props), + TStarted = couch_util:get_value(<<"start_time">>, Props), + TUpdated = couch_util:get_value(<<"update_time">>, Props), + History = couch_util:get_value(<<"history">>, Props), + #job{ + id = Id, + type = binary_to_atom(Type, utf8), + job_state = binary_to_atom(JobState, utf8), + split_state = binary_to_atom(SplitState, utf8), + state_info = state_info_from_ejson(StateInfo), + node = node(), + start_time = TStarted, + update_time = TUpdated, + source = mem3_reshard:shard_from_name(Source), + target = [mem3_reshard:shard_from_name(T) || T <- Target], + history = history_from_ejson(History) + }. + + +state_from_ejson(#state{} = State, {Props}) -> + StateVal = couch_util:get_value(<<"state">>, Props), + StateInfo = couch_util:get_value(<<"state_info">>, Props), + TUpdated = couch_util:get_value(<<"update_time">>, Props), + State#state{ + state = binary_to_atom(StateVal, utf8), + state_info = state_info_from_ejson(StateInfo), + node = node(), + update_time = TUpdated + }. + + +state_info_from_ejson({Props}) -> + Props1 = [{binary_to_atom(K, utf8), couch_util:to_binary(V)} + || {K, V} <- Props], + lists:sort(Props1). + + +history_to_ejson(Hist, true) when is_list(Hist) -> + [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {T, S, D} <- Hist]; + +history_to_ejson(Hist, _) when is_list(Hist) -> + [{[{timestamp, T}, {type, S}, {detail, D}]} || {T, S, D} <- Hist]. + + +history_from_ejson(HistoryEJson) when is_list(HistoryEJson) -> + lists:map(fun({EventProps}) -> + Timestamp = couch_util:get_value(<<"timestamp">>, EventProps), + State = couch_util:get_value(<<"type">>, EventProps), + Detail = couch_util:get_value(<<"detail">>, EventProps), + {Timestamp, binary_to_atom(State, utf8), Detail} + end, HistoryEJson). + + +state_info_to_ejson(Props) -> + {lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}. + + +store_state() -> + config:get_boolean("reshard", "store_state", true). + + +iso8601(UnixSec) -> + Mega = UnixSec div 1000000, + Sec = UnixSec rem 1000000, + {{Y, M, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}), + Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", + iolist_to_binary(io_lib:format(Format, [Y, M, D, H, Min, S])). diff --git a/src/mem3/src/mem3_reshard_validate.erl b/src/mem3/src/mem3_reshard_validate.erl new file mode 100644 index 000000000..aa8df3e16 --- /dev/null +++ b/src/mem3/src/mem3_reshard_validate.erl @@ -0,0 +1,126 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_validate). + +-export([ + start_args/2, + source/1, + targets/2 +]). + +-include_lib("mem3/include/mem3.hrl"). + + +-spec start_args(#shard{}, any()) -> ok | {error, term()}. +start_args(Source, Split) -> + first_error([ + check_split(Split), + check_range(Source, Split), + check_node(Source), + source(Source), + check_shard_map(Source) + ]). + + +-spec source(#shard{}) -> ok | {error, term()}. +source(#shard{name = Name}) -> + case couch_server:exists(Name) of + true -> + ok; + false -> + {error, {source_shard_not_found, Name}} + end. + + +-spec check_shard_map(#shard{}) -> ok | {error, term()}. +check_shard_map(#shard{name = Name}) -> + DbName = mem3:dbname(Name), + AllShards = mem3:shards(DbName), + case mem3_util:calculate_max_n(AllShards) of + N when is_integer(N), N >= 1 -> + ok; + N when is_integer(N), N < 1 -> + {error, {not_enough_shard_copies, DbName}} + end. + + +-spec targets(#shard{}, [#shard{}]) -> ok | {error, term()}. +targets(#shard{} = Source, Targets) -> + first_error([ + target_ranges(Source, Targets) + ]). + + +-spec check_split(any()) -> ok | {error, term()}. +check_split(Split) when is_integer(Split), Split > 1 -> + ok; +check_split(Split) -> + {error, {invalid_split_parameter, Split}}. + + +-spec check_range(#shard{}, any()) -> ok | {error, term()}. +check_range(#shard{range = Range = [B, E]}, Split) -> + case (E + 1 - B) >= Split of + true -> + ok; + false -> + {error, {shard_range_cannot_be_split, Range, Split}} + end. + + +-spec check_node(#shard{}) -> ok | {error, term()}. +check_node(#shard{node = undefined}) -> + ok; + +check_node(#shard{node = Node}) when Node =:= node() -> + ok; + +check_node(#shard{node = Node}) -> + {error, {source_shard_node_is_not_current_node, Node}}. + + +-spec target_ranges(#shard{}, [#shard{}]) -> ok | {error, any()}. +target_ranges(#shard{range = [Begin, End]}, Targets) -> + Ranges = [R || #shard{range = R} <- Targets], + SortFun = fun([B1, _], [B2, _]) -> B1 =< B2 end, + [First | RestRanges] = lists:sort(SortFun, Ranges), + try + TotalRange = lists:foldl(fun([B2, E2], [B1, E1]) -> + case B2 =:= E1 + 1 of + true -> + ok; + false -> + throw({range_error, {B2, E1}}) + end, + [B1, E2] + end, First, RestRanges), + case [Begin, End] =:= TotalRange of + true -> + ok; + false -> + throw({range_error, {[Begin, End], TotalRange}}) + end + catch + throw:{range_error, Error} -> + {error, {shard_range_error, Error}} + end. + + +-spec first_error([ok | {error, term()}]) -> ok | {error, term()}. +first_error(Results) -> + case [Res || Res <- Results, Res =/= ok] of + [] -> + ok; + [FirstError | _] -> + FirstError + end. |