summaryrefslogtreecommitdiff
path: root/src/mem3
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3')
-rw-r--r--src/mem3/src/mem3_reshard_dbdoc.erl275
-rw-r--r--src/mem3/src/mem3_reshard_index.erl164
-rw-r--r--src/mem3/src/mem3_reshard_job.erl722
-rw-r--r--src/mem3/src/mem3_reshard_store.erl288
-rw-r--r--src/mem3/src/mem3_reshard_validate.erl126
5 files changed, 1575 insertions, 0 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
new file mode 100644
index 000000000..7eb3e9f13
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -0,0 +1,275 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_dbdoc).
+
+-behaviour(gen_server).
+
+-export([
+ update_shard_map/1,
+
+ start_link/0,
+
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+-spec update_shard_map(#job{}) -> no_return | ok.
+update_shard_map(#job{source = Source, target = Target} = Job) ->
+ Node = hd(mem3_util:live_nodes()),
+ JobStr = mem3_reshard_job:jobfmt(Job),
+ LogMsg1 = "~p : ~p calling update_shard_map node:~p",
+ couch_log:notice(LogMsg1, [?MODULE, JobStr, Node]),
+ ServerRef = {?MODULE, Node},
+ CallArg = {update_shard_map, Source, Target},
+ TimeoutMSec = shard_update_timeout_msec(),
+ try
+ case gen_server:call(ServerRef, CallArg, TimeoutMSec) of
+ {ok, _} -> ok;
+ {error, CallError} -> throw({error, CallError})
+ end
+ catch
+ _:Err ->
+ exit(Err)
+ end,
+ LogMsg2 = "~p : ~p update_shard_map on node:~p returned",
+ couch_log:notice(LogMsg2, [?MODULE, JobStr, Node]),
+ UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000),
+ case wait_source_removed(Source, 5, UntilSec) of
+ true -> ok;
+ false -> exit(shard_update_did_not_propagate)
+ end.
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ couch_log:notice("~p start init()", [?MODULE]),
+ {ok, nil}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call({update_shard_map, Source, Target}, _From, State) ->
+ Res = try
+ update_shard_map(Source, Target)
+ catch
+ throw:{error, Error} ->
+ {error, Error}
+ end,
+ {reply, Res, State};
+
+handle_call(Call, From, State) ->
+ couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
+ {noreply, State}.
+
+
+handle_cast(Cast, State) ->
+ couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
+ {noreply, State}.
+
+
+handle_info(Info, State) ->
+ couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+% Private
+
+update_shard_map(Source, Target) ->
+ ok = validate_coordinator(),
+ ok = replicate_from_all_nodes(shard_update_timeout_msec()),
+ DocId = mem3:dbname(Source#shard.name),
+ OldDoc = case mem3_util:open_db_doc(DocId) of
+ {ok, #doc{deleted = true}} ->
+ throw({error, missing_source});
+ {ok, #doc{} = Doc} ->
+ Doc;
+ {not_found, deleted} ->
+ throw({error, missing_source});
+ OpenErr ->
+ throw({error, {shard_doc_open_error, OpenErr}})
+ end,
+ #doc{body = OldBody} = OldDoc,
+ NewBody = update_shard_props(OldBody, Source, Target),
+ {ok, _} = write_shard_doc(OldDoc, NewBody),
+ ok = replicate_to_all_nodes(shard_update_timeout_msec()),
+ {ok, NewBody}.
+
+
+validate_coordinator() ->
+ case hd(mem3_util:live_nodes()) =:= node() of
+ true -> ok;
+ false -> throw({error, coordinator_changed})
+ end.
+
+
+replicate_from_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
+ end.
+
+
+replicate_to_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
+ end.
+
+
+write_shard_doc(#doc{id = Id} = Doc, Body) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ UpdatedDoc = Doc#doc{body = Body},
+ couch_util:with_db(DbName, fun(Db) ->
+ try
+ {ok, _} = couch_db:update_doc(Db, UpdatedDoc, [])
+ catch
+ conflict ->
+ throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}})
+ end
+ end).
+
+
+update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) ->
+ {ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}),
+ ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}},
+ Props1 = lists:keyreplace(<<"by_node">>, 1, Props0, ByNodeKV),
+
+ {ByRange0} = couch_util:get_value(<<"by_range">>, Props1, {[]}),
+ ByRangeKV = {<<"by_range">>, {update_by_range(ByRange0, Source, Targets)}},
+ Props2 = lists:keyreplace(<<"by_range">>, 1, Props1, ByRangeKV),
+
+ Changelog = couch_util:get_value(<<"changelog">>, Props2, []),
+ {Node, Range} = {node_key(Source), range_key(Source)},
+ TRanges = [range_key(T) || T <- Targets],
+ ChangelogEntry = [[<<"split">>, Range, TRanges, Node]],
+ ChangelogKV = {<<"changelog">>, Changelog ++ ChangelogEntry},
+ Props3 = lists:keyreplace(<<"changelog">>, 1, Props2, ChangelogKV),
+
+ {Props3}.
+
+
+update_by_node(ByNode, #shard{} = Source, [#shard{} | _] = Targets) ->
+ {NodeKey, SKey} = {node_key(Source), range_key(Source)},
+ {_, Ranges} = lists:keyfind(NodeKey, 1, ByNode),
+ Ranges1 = Ranges -- [SKey],
+ Ranges2 = Ranges1 ++ [range_key(T) || T <- Targets],
+ lists:keyreplace(NodeKey, 1, ByNode, {NodeKey, lists:sort(Ranges2)}).
+
+
+update_by_range(ByRange, Source, Targets) ->
+ ByRange1 = remove_node_from_source(ByRange, Source),
+ lists:foldl(fun add_node_to_target_foldl/2, ByRange1, Targets).
+
+
+remove_node_from_source(ByRange, Source) ->
+ {NodeKey, SKey} = {node_key(Source), range_key(Source)},
+ {_, SourceNodes} = lists:keyfind(SKey, 1, ByRange),
+ % Double check that source had node to begin with
+ case lists:member(NodeKey, SourceNodes) of
+ true ->
+ ok;
+ false ->
+ throw({source_shard_missing_node, NodeKey, SourceNodes})
+ end,
+ SourceNodes1 = SourceNodes -- [NodeKey],
+ case SourceNodes1 of
+ [] ->
+ % If last node deleted, remove entry
+ lists:keydelete(SKey, 1, ByRange);
+ _ ->
+ lists:keyreplace(SKey, 1, ByRange, {SKey, SourceNodes1})
+ end.
+
+
+add_node_to_target_foldl(#shard{} = Target, ByRange) ->
+ {NodeKey, TKey} = {node_key(Target), range_key(Target)},
+ case lists:keyfind(TKey, 1, ByRange) of
+ {_, Nodes} ->
+ % Double check that target does not have node already
+ case lists:member(NodeKey, Nodes) of
+ false ->
+ ok;
+ true ->
+ throw({target_shard_already_has_node, NodeKey, Nodes})
+ end,
+ Nodes1 = lists:sort([NodeKey | Nodes]),
+ lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1});
+ false ->
+ % fabric_db_create:make_document/3 says they should be sorted
+ lists:sort([{TKey, [NodeKey]} | ByRange])
+ end.
+
+
+node_key(#shard{node = Node}) ->
+ couch_util:to_binary(Node).
+
+
+range_key(#shard{range = [B, E]}) ->
+ BHex = couch_util:to_hex(<<B:32/integer>>),
+ EHex = couch_util:to_hex(<<E:32/integer>>),
+ list_to_binary([BHex, "-", EHex]).
+
+
+shard_update_timeout_msec() ->
+ config:get_integer("reshard", "shard_upate_timeout_msec", 300000).
+
+
+wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) ->
+ case check_source_removed(Source) of
+ true ->
+ true;
+ false ->
+ case mem3_reshard:now_sec() < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for shard ~p removal confirmation",
+ couch_log:notice(LogMsg, [?MODULE, Name]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_removed(Source, SleepSec, UntilSec);
+ false ->
+ false
+ end
+ end.
+
+
+check_source_removed(#shard{name = Name}) ->
+ DbName = mem3:dbname(Name),
+ Live = mem3_util:live_nodes(),
+ ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)],
+ Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]),
+ {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]),
+ Shards = lists:usort(lists:flatten(Responses)),
+ SourcePresent = [S || S = #shard{name = S, node = N} <- Shards, S =:= Name,
+ N =:= node()],
+ case SourcePresent of
+ [] -> true;
+ [_ | _] -> false
+ end.
diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl
new file mode 100644
index 000000000..d4cb7caa1
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -0,0 +1,164 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_index).
+
+
+-export([
+ design_docs/1,
+ target_indices/2,
+ spawn_builders/1
+]).
+
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+%% Public API
+
+design_docs(DbName) ->
+ try
+ case fabric_design_docs(mem3:dbname(DbName)) of
+ {error, {maintenance_mode, _, _Node}} ->
+ {ok, []};
+ {ok, DDocs} ->
+ JsonDocs = [couch_doc:from_json_obj(DDoc) || DDoc <- DDocs],
+ {ok, JsonDocs};
+ Else ->
+ Else
+ end
+ catch error:database_does_not_exist ->
+ {ok, []}
+ end.
+
+
+target_indices(Docs, Targets) ->
+ Indices = [[indices(N, D) || D <- Docs] || #shard{name = N} <- Targets],
+ lists:flatten(Indices).
+
+
+spawn_builders(Indices) ->
+ Results = [build_index(Index) || Index <- Indices],
+ Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)],
+ case Results -- Oks of
+ [] ->
+ {ok, [Pid || {ok, Pid} <- Results]};
+ Error ->
+ % Do a all or nothing pattern, if some indices could not be
+ % spawned, kill the spawned ones and and return the error.
+ ErrMsg = "~p failed to spawn index builders: ~p ~p",
+ couch_log:error(ErrMsg, [?MODULE, Error, Indices]),
+ lists:foreach(fun({ok, Pid}) ->
+ catch unlink(Pid),
+ catch exit(Pid, kill)
+ end, Oks),
+ {error, Error}
+ end.
+
+
+%% Private API
+
+fabric_design_docs(DbName) ->
+ case couch_util:with_proc(fabric, design_docs, [DbName], infinity) of
+ {ok, Resp} -> Resp;
+ {error, Error} -> Error
+ end.
+
+
+indices(DbName, Doc) ->
+ mrview_indices(DbName, Doc)
+ ++ [dreyfus_indices(DbName, Doc) || has_app(dreyfus)]
+ ++ [hastings_indices(DbName, Doc) || has_app(hastings)].
+
+
+mrview_indices(DbName, Doc) ->
+ try
+ {ok, MRSt} = couch_mrview_util:ddoc_to_mrst(DbName, Doc),
+ Views = couch_mrview_index:get(views, MRSt),
+ case Views =/= [] of
+ true ->
+ [{mrview, DbName, MRSt}];
+ false ->
+ []
+ end
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get mrview index ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+dreyfus_indices(DbName, Doc) ->
+ try
+ Indices = dreyfus_index:design_doc_to_indexes(Doc),
+ [{dreyfus, DbName, Index} || Index <- Indices]
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+hastings_indices(DbName, Doc) ->
+ try
+ Indices = hastings_index:design_doc_to_indexes(Doc),
+ [{hastings, DbName, Index} || Index <- Indices]
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get hasting indices ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+build_index({mrview, DbName, MRSt}) ->
+ case couch_index_server:get_index(couch_mrview_index, MRSt) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(couch_index, get_state, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end;
+
+build_index({dreyfus, DbName, Index})->
+ case dreyfus_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(dreyfus_index, await, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end;
+
+build_index({hastings, DbName, Index}) ->
+ case hastings_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(hastings_index, await, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end.
+
+
+has_app(App) ->
+ code:lib_dir(App) /= {error, bad_name}.
+
+
+get_update_seq(DbName) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ couch_db:get_update_seq(Db)
+ end).
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
new file mode 100644
index 000000000..d3a33d3f6
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -0,0 +1,722 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_job).
+
+
+-export([
+ start_link/1,
+
+ checkpoint_done/1,
+ jobfmt/1,
+ pickfun/3
+]).
+
+-export([
+ init/1,
+
+ initial_copy/1,
+ initial_copy_impl/1,
+
+ topoff/1,
+ topoff_impl/1,
+
+ build_indices/1,
+
+ copy_local_docs/1,
+ copy_local_docs_impl/1,
+
+ update_shardmap/1,
+
+ wait_source_close/1,
+ wait_source_close_impl/1,
+
+ source_delete/1,
+ source_delete_impl/1,
+
+ completed/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+% Batch size for internal replication topoffs
+-define(INTERNAL_REP_BATCH_SIZE, 2000).
+
+% The list of possible job states. The order of this
+% list is important as a job will progress linearly
+% through it. However, when starting a job we may
+% have to resume from an earlier state as listed
+% below in STATE_RESTART.
+-define(SPLIT_STATES, [
+ new,
+ initial_copy,
+ topoff1,
+ build_indices,
+ topoff2,
+ copy_local_docs,
+ update_shardmap,
+ wait_source_close,
+ topoff3,
+ source_delete,
+ completed
+]).
+
+
+% When a job starts it may be resuming from a partially
+% completed state. These state pairs list the state
+% we have to restart from for each possible state.
+-define(STATE_RESTART, #{
+ new => initial_copy,
+ initial_copy => initial_copy,
+ topoff1 => topoff1,
+ build_indices => topoff1,
+ topoff2 => topoff1,
+ copy_local_docs => topoff1,
+ update_shardmap => update_shardmap,
+ wait_source_close => wait_source_close,
+ topoff3 => wait_source_close,
+ source_delete => wait_source_close,
+ completed => completed
+}).
+
+
+% If we have a worker failing during any of these
+% states we need to clean up the targets
+-define(CLEAN_TARGET_STATES, [
+ initial_copy,
+ topoff1,
+ build_indices,
+ topoff2,
+ copy_local_docs
+]).
+
+
+start_link(#job{} = Job) ->
+ proc_lib:start_link(?MODULE, init, [Job]).
+
+
+% This is called by the main proces after it has checkpointed the progress
+% of the job. After the new state is checkpointed, we signal the job to start
+% executing that state.
+checkpoint_done(#job{pid = Pid} = Job) ->
+ couch_log:notice(" ~p : checkpoint done for ~p", [?MODULE, jobfmt(Job)]),
+ Pid ! checkpoint_done,
+ ok.
+
+
+% Formatting function, used for logging mostly
+jobfmt(#job{} = Job) ->
+ #job{
+ id = Id,
+ source = #shard{name = Source},
+ target = Target,
+ split_state = State,
+ job_state = JobState,
+ pid = Pid
+ } = Job,
+ TargetCount = length(Target),
+ Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}",
+ Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]),
+ lists:flatten(Fmt).
+
+
+% This is the function which picks between various targets. It is used here as
+% well as in mem3_rep internal replicator and couch_db_split bulk copy logic.
+% Given a document id and list of ranges, and a hash function, it will pick one
+% of the range or return not_in_range atom.
+pickfun(DocId, [[B, E] | _] = Ranges, {_M, _F, _A} = HashFun) when
+ is_integer(B), is_integer(E), B =< E ->
+ HashKey = mem3_hash:calculate(HashFun, DocId),
+ Pred = fun([Begin, End]) ->
+ Begin =< HashKey andalso HashKey =< End
+ end,
+ case lists:filter(Pred, Ranges) of
+ [] -> not_in_range;
+ [Key] -> Key
+ end.
+
+
+init(#job{} = Job0) ->
+ process_flag(trap_exit, true),
+ Job1 = set_start_state(Job0#job{
+ pid = self(),
+ start_time = mem3_reshard:now_sec(),
+ workers = [],
+ retries = 0
+ }),
+ Job2 = update_split_history(Job1),
+ proc_lib:init_ack({ok, self()}),
+ couch_log:notice("~p starting job ~s", [?MODULE, jobfmt(Job2)]),
+ ok = checkpoint(Job2),
+ run(Job2).
+
+
+run(#job{split_state = CurrState} = Job) ->
+ StateFun = case CurrState of
+ topoff1 -> topoff;
+ topoff2 -> topoff;
+ topoff3 -> topoff;
+ _ -> CurrState
+ end,
+ NewJob = try
+ Job1 = ?MODULE:StateFun(Job),
+ Job2 = wait_for_workers(Job1),
+ Job3 = switch_to_next_state(Job2),
+ ok = checkpoint(Job3),
+ Job3
+ catch
+ throw:{retry, RetryJob} ->
+ RetryJob
+ end,
+ run(NewJob).
+
+
+set_start_state(#job{split_state = State} = Job) ->
+ case {State, maps:get(State, ?STATE_RESTART, undefined)} of
+ {_, undefined} ->
+ Fmt1 = "~p recover : unknown state ~s",
+ couch_log:error(Fmt1, [?MODULE, jobfmt(Job)]),
+ erlang:error({invalid_split_job_recover_state, Job});
+ {initial_copy, initial_copy} ->
+ % Since we recover from initial_copy to initial_copy, we need
+ % to reset the target state as initial_copy expects to
+ % create a new target
+ Fmt2 = "~p recover : resetting target ~s",
+ couch_log:notice(Fmt2, [?MODULE, jobfmt(Job)]),
+ reset_target(Job);
+ {_, StartState} ->
+ Job#job{split_state = StartState}
+ end.
+
+
+get_next_state(#job{split_state = State}) ->
+ get_next_state(State, ?SPLIT_STATES).
+
+
+get_next_state(completed, _) ->
+ completed;
+
+get_next_state(CurrState, [CurrState, NextState | _]) ->
+ NextState;
+
+get_next_state(CurrState, [_ | Rest]) ->
+ get_next_state(CurrState, Rest).
+
+
+switch_to_next_state(#job{} = Job0) ->
+ Info0 = Job0#job.state_info,
+ Info1 = info_delete(error, Info0),
+ Info2 = info_delete(reason, Info1),
+ Job1 = Job0#job{
+ split_state = get_next_state(Job0),
+ update_time = mem3_reshard:now_sec(),
+ retries = 0,
+ state_info = Info2,
+ workers = []
+ },
+ Job2 = update_split_history(Job1),
+ check_state(Job2).
+
+
+checkpoint(Job) ->
+ % Ask main process to checkpoint. When it has finished it will notify us
+ % by calling by checkpoint_done/1. The reason not to call the main process
+ % via a gen_server:call is because the main process could be in the middle
+ % of terminating the job and then it would deadlock (after sending us a
+ % shutdown message) and it would end up using the whole supervisor
+ % termination timeout before finally.
+ ok = mem3_reshard:checkpoint(Job#job.manager, Job),
+ Parent = parent(),
+ receive
+ {'EXIT', Parent, Reason} ->
+ handle_exit(Job, Reason);
+ checkpoint_done ->
+ ok;
+ Other ->
+ handle_unknown_msg(Job, "checkpoint", Other)
+ end.
+
+
+wait_for_workers(#job{workers = []} = Job) ->
+ Job;
+
+wait_for_workers(#job{workers = Workers} = Job) ->
+ Parent = parent(),
+ receive
+ {'EXIT', Parent, Reason} ->
+ handle_exit(Job, Reason);
+ {'EXIT', Pid, Reason} ->
+ case lists:member(Pid, Workers) of
+ true ->
+ NewJob = handle_worker_exit(Job, Pid, Reason),
+ wait_for_workers(NewJob);
+ false ->
+ handle_unknown_msg(Job, "wait_for_workers", {Pid, Reason})
+ end;
+ Other ->
+ handle_unknown_msg(Job, "wait_for_workers", Other)
+ end.
+
+
+handle_worker_exit(#job{workers = Workers} = Job, Pid, normal) ->
+ Job#job{workers = Workers -- [Pid]};
+
+handle_worker_exit(#job{} = Job, _Pid, {error, missing_source}) ->
+ Msg1 = "~p stopping worker due to source missing ~p",
+ couch_log:error(Msg1, [?MODULE, jobfmt(Job)]),
+ kill_workers(Job),
+ case lists:member(Job#job.split_state, ?CLEAN_TARGET_STATES) of
+ true ->
+ Msg2 = "~p cleaning target after db was deleted ~p",
+ couch_log:error(Msg2, [?MODULE, jobfmt(Job)]),
+ reset_target(Job),
+ exit({error, missing_source});
+ false ->
+ exit({error, missing_source})
+ end;
+
+handle_worker_exit(#job{} = Job, _Pid, {error, missing_target}) ->
+ Msg = "~p stopping worker due to target db missing ~p",
+ couch_log:error(Msg, [?MODULE, jobfmt(Job)]),
+ kill_workers(Job),
+ exit({error, missing_target});
+
+handle_worker_exit(#job{} = Job0, _Pid, Reason) ->
+ couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]),
+ kill_workers(Job0),
+ Job1 = Job0#job{workers = []},
+ case Job1#job.retries =< max_retries() of
+ true ->
+ retry_state(Job1, Reason);
+ false ->
+ exit(Reason)
+ end.
+
+
+% Cleanup and exit when we receive an 'EXIT' message from our parent. In case
+% the shard map is being updated, try to wait some time for it to finish.
+handle_exit(#job{split_state = update_shardmap, workers = [WPid]} = Job,
+ Reason) ->
+ Timeout = update_shard_map_timeout_sec(),
+ Msg1 = "~p job exit ~s ~p while shard map is updating, waiting ~p sec",
+ couch_log:warning(Msg1, [?MODULE, jobfmt(Job), Reason, Timeout]),
+ receive
+ {'EXIT', WPid, normal} ->
+ Msg2 = "~p ~s shard map finished updating successfully, exiting",
+ couch_log:notice(Msg2, [?MODULE, jobfmt(Job)]),
+ exit(Reason);
+ {'EXIT', WPid, Error} ->
+ Msg3 = "~p ~s shard map update failed with error ~p",
+ couch_log:error(Msg3, [?MODULE, jobfmt(Job), Error]),
+ exit(Reason)
+ after Timeout * 1000->
+ Msg4 = "~p ~s shard map update timeout exceeded ~p sec",
+ couch_log:error(Msg4, [?MODULE, jobfmt(Job), Timeout]),
+ kill_workers(Job),
+ exit(Reason)
+ end;
+
+handle_exit(#job{} = Job, Reason) ->
+ kill_workers(Job),
+ exit(Reason).
+
+
+retry_state(#job{retries = Retries, state_info = Info} = Job0, Error) ->
+ Job1 = Job0#job{
+ retries = Retries + 1,
+ state_info = info_update(error, Error, Info)
+ },
+ couch_log:notice("~p retrying ~p ~p", [?MODULE, jobfmt(Job1), Retries]),
+ Job2 = report(Job1),
+ Timeout = retry_interval_sec(),
+ Parent = parent(),
+ receive
+ {'EXIT', Parent, Reason} ->
+ handle_exit(Job2, Reason);
+ Other ->
+ handle_unknown_msg(Job2, "retry_state", Other)
+ after Timeout * 1000 ->
+ ok
+ end,
+ throw({retry, Job2}).
+
+
+report(#job{manager = ManagerPid} = Job) ->
+ Job1 = Job#job{update_time = mem3_reshard:now_sec()},
+ ok = mem3_reshard:report(ManagerPid, Job1),
+ Job1.
+
+
+kill_workers(#job{workers = Workers}) ->
+ lists:foreach(fun(Worker) ->
+ unlink(Worker),
+ exit(Worker, kill)
+ end, Workers),
+ flush_worker_messages().
+
+
+flush_worker_messages() ->
+ Parent = parent(),
+ receive
+ {'EXIT', Pid, _} when Pid =/= Parent ->
+ flush_worker_messages()
+ after 0 ->
+ ok
+ end.
+
+
+parent() ->
+ case get('$ancestors') of
+ [Pid | _] when is_pid(Pid) -> Pid;
+ [Name | _] when is_atom(Name) -> whereis(Name);
+ _ -> undefined
+ end.
+
+
+handle_unknown_msg(Job, When, RMsg) ->
+ LogMsg = "~p ~s received an unknown message ~p when in ~s",
+ couch_log:error(LogMsg, [?MODULE, jobfmt(Job), RMsg, When]),
+ erlang:error({invalid_split_job_message, Job#job.id, When, RMsg}).
+
+
+initial_copy(#job{} = Job) ->
+ Pid = spawn_link(?MODULE, initial_copy_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+initial_copy_impl(#job{source = Source, target = Targets0} = Job) ->
+ #shard{name = SourceName} = Source,
+ Targets = [{R, N} || #shard{range = R, name = N} <- Targets0],
+ TMap = maps:from_list(Targets),
+ LogMsg1 = "~p initial_copy started ~s",
+ LogArgs1 = [?MODULE, shardsstr(Source, Targets0)],
+ couch_log:notice(LogMsg1, LogArgs1),
+ case couch_db_split:split(SourceName, TMap, fun pickfun/3) of
+ {ok, Seq} ->
+ LogMsg2 = "~p initial_copy of ~s finished @ seq:~p",
+ LogArgs2 = [?MODULE, shardsstr(Source, Targets0), Seq],
+ couch_log:notice(LogMsg2, LogArgs2),
+ create_artificial_mem3_rep_checkpoints(Job, Seq);
+ {error, Error} ->
+ LogMsg3 = "~p initial_copy of ~p finished @ ~p",
+ LogArgs3 = [?MODULE, shardsstr(Source, Targets0), Error],
+ couch_log:notice(LogMsg3, LogArgs3),
+ exit({error, Error})
+ end.
+
+
+topoff(#job{} = Job) ->
+ Pid = spawn_link(?MODULE, topoff_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+topoff_impl(#job{source = #shard{} = Source, target = Targets}) ->
+ couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]),
+ check_source_exists(Source, topoff),
+ check_targets_exist(Targets, topoff),
+ TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]),
+ Opts = [{batch_size, ?INTERNAL_REP_BATCH_SIZE}, {batch_count, all}],
+ case mem3_rep:go(Source, TMap, Opts) of
+ {ok, Count} ->
+ Args = [?MODULE, shardsstr(Source, Targets), Count],
+ couch_log:notice("~p topoff done ~s, count: ~p", Args),
+ ok;
+ {error, Error} ->
+ Args = [?MODULE, shardsstr(Source, Targets), Error],
+ couch_log:error("~p topoff failed ~s, error: ~p", Args),
+ exit({error, Error})
+ end.
+
+
+build_indices(#job{} = Job) ->
+ #job{
+ source = #shard{name = SourceName} = Source,
+ target = Targets,
+ retries = Retries,
+ state_info = Info
+ } = Job,
+ check_source_exists(Source, build_indices),
+ {ok, DDocs} = mem3_reshard_index:design_docs(SourceName),
+ Indices = mem3_reshard_index:target_indices(DDocs, Targets),
+ case mem3_reshard_index:spawn_builders(Indices) of
+ {ok, []} ->
+ % Skip the log spam if this is a no-op
+ Job#job{workers = []};
+ {ok, Pids} ->
+ report(Job#job{workers = Pids});
+ {error, Error} ->
+ case Job#job.retries =< max_retries() of
+ true ->
+ build_indices(Job#job{
+ retries = Retries + 1,
+ state_info = info_update(error, Error, Info)
+ });
+ false ->
+ exit(Error)
+ end
+ end.
+
+
+copy_local_docs(#job{split_state = copy_local_docs} = Job) ->
+ Pid = spawn_link(?MODULE, copy_local_docs_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+copy_local_docs_impl(#job{source = Source, target = Targets0}) ->
+ #shard{name = SourceName} = Source,
+ Targets = [{R, N} || #shard{range = R, name = N} <- Targets0],
+ TMap = maps:from_list(Targets),
+ LogArg1 = [?MODULE, shardsstr(Source, Targets)],
+ couch_log:notice("~p copy local docs start ~s", LogArg1),
+ case couch_db_split:copy_local_docs(SourceName, TMap, fun pickfun/3) of
+ ok ->
+ couch_log:notice("~p copy local docs finished for ~s", LogArg1),
+ ok;
+ {error, Error} ->
+ LogArg2 = [?MODULE, shardsstr(Source, Targets), Error],
+ couch_log:error("~p copy local docs failed for ~s ~p", LogArg2),
+ exit({error, Error})
+ end.
+
+
+update_shardmap(#job{} = Job) ->
+ Pid = spawn_link(mem3_reshard_dbdoc, update_shard_map, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+wait_source_close(#job{source = #shard{name = Name}} = Job) ->
+ couch_event:notify(Name, deleted),
+ Pid = spawn_link(?MODULE, wait_source_close_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+wait_source_close_impl(#job{source = #shard{name = Name}, target = Targets}) ->
+ Timeout = config:get_integer("reshard", "source_close_timeout_sec", 600),
+ check_targets_exist(Targets, wait_source_close),
+ case couch_db:open_int(Name, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Now = mem3_reshard:now_sec(),
+ case wait_source_close(Db, 1, Now + Timeout) of
+ true ->
+ ok;
+ false ->
+ exit({error, source_db_close_timeout, Name, Timeout})
+ end;
+ {not_found, _} ->
+ couch_log:warning("~p source already deleted ~p", [?MODULE, Name]),
+ ok
+ end.
+
+
+wait_source_close(Db, SleepSec, UntilSec) ->
+ case couch_db:monitored_by(Db) -- [self()] of
+ [] ->
+ true;
+ [_ | _] ->
+ Now = mem3_reshard:now_sec(),
+ case Now < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for source shard ~p to be closed",
+ couch_log:notice(LogMsg, [?MODULE, couch_db:name(Db)]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_close(Db, SleepSec, UntilSec);
+ false ->
+ false
+ end
+ end.
+
+
+source_delete(#job{} = Job) ->
+ Pid = spawn_link(?MODULE, source_delete_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+source_delete_impl(#job{source = #shard{name = Name}, target = Targets}) ->
+ check_targets_exist(Targets, source_delete),
+ case config:get_boolean("mem3_reshard", "delete_source", true) of
+ true ->
+ case couch_server:delete(Name, [?ADMIN_CTX]) of
+ ok ->
+ couch_log:notice("~p : deleted source shard ~p",
+ [?MODULE, Name]);
+ not_found ->
+ couch_log:warning("~p : source was already deleted ~p",
+ [?MODULE, Name])
+ end;
+ false ->
+ % Emit deleted event even when not actually deleting the files this
+ % is the second one emitted, the other one was before
+ % wait_source_close. They should be idempotent. This one is just to
+ % match the one that couch_server would emit had the config not
+ % been set
+ couch_event:notify(Name, deleted),
+ LogMsg = "~p : according to configuration not deleting source ~p",
+ couch_log:warning(LogMsg, [?MODULE, Name])
+ end,
+ TNames = [TName || #shard{name = TName} <- Targets],
+ lists:foreach(fun(TName) -> couch_event:notify(TName, updated) end, TNames).
+
+
+completed(#job{} = Job) ->
+ couch_log:notice("~p : ~p completed, exit normal", [?MODULE, jobfmt(Job)]),
+ exit(normal).
+
+
+% This is for belt and suspenders really. Call periodically to validate the
+% state is one of the expected states.
+-spec check_state(#job{}) -> #job{} | no_return().
+check_state(#job{split_state = State} = Job) ->
+ case lists:member(State, ?SPLIT_STATES) of
+ true ->
+ Job;
+ false ->
+ erlang:error({invalid_shard_split_state, State, Job})
+ end.
+
+
+create_artificial_mem3_rep_checkpoints(#job{} = Job, Seq) ->
+ #job{source = Source = #shard{name = SourceName}, target = Targets} = Job,
+ check_source_exists(Source, initial_copy),
+ TNames = [TN || #shard{name = TN} <- Targets],
+ Timestamp = list_to_binary(mem3_util:iso8601_timestamp()),
+ couch_util:with_db(SourceName, fun(SDb) ->
+ [couch_util:with_db(TName, fun(TDb) ->
+ Doc = mem3_rep_checkpoint_doc(SDb, TDb, Timestamp, Seq),
+ {ok, _} = couch_db:update_doc(SDb, Doc, []),
+ {ok, _} = couch_db:update_doc(TDb, Doc, []),
+ ok
+ end) || TName <- TNames]
+ end),
+ ok.
+
+
+mem3_rep_checkpoint_doc(SourceDb, TargetDb, Timestamp, Seq) ->
+ Node = atom_to_binary(node(), utf8),
+ SourceUUID = couch_db:get_uuid(SourceDb),
+ TargetUUID = couch_db:get_uuid(TargetDb),
+ History = {[
+ {<<"source_node">>, Node},
+ {<<"source_uuid">>, SourceUUID},
+ {<<"source_seq">>, Seq},
+ {<<"timestamp">>, Timestamp},
+ {<<"target_node">>, Node},
+ {<<"target_uuid">>, TargetUUID},
+ {<<"target_seq">>, Seq}
+ ]},
+ Body = {[
+ {<<"seq">>, Seq},
+ {<<"target_uuid">>, TargetUUID},
+ {<<"history">>, {[{Node, [History]}]}}
+ ]},
+ Id = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+ #doc{id = Id, body = Body}.
+
+
+check_source_exists(#shard{name = Name}, StateName) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ ErrMsg = "~p source ~p is unexpectedly missing in ~p",
+ couch_log:error(ErrMsg, [?MODULE, Name, StateName]),
+ exit({error, missing_source})
+ end.
+
+
+check_targets_exist(Targets, StateName) ->
+ lists:foreach(fun(#shard{name = Name}) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ ErrMsg = "~p target ~p is unexpectedly missing in ~p",
+ couch_log:error(ErrMsg, [?MODULE, Name, StateName]),
+ exit({error, missing_target})
+ end
+ end, Targets).
+
+
+-spec max_retries() -> integer().
+max_retries() ->
+ config:get_integer("reshard", "max_retries", 1).
+
+
+-spec retry_interval_sec() -> integer().
+retry_interval_sec() ->
+ config:get_integer("reshard", "retry_interval_sec", 10).
+
+
+-spec update_shard_map_timeout_sec() -> integer().
+update_shard_map_timeout_sec() ->
+ config:get_integer("reshard", "update_shardmap_timeout_sec", 60).
+
+
+-spec info_update(atom(), any(), [tuple()]) -> [tuple()].
+info_update(Key, Val, StateInfo) ->
+ lists:keystore(Key, 1, StateInfo, {Key, Val}).
+
+
+-spec info_delete(atom(), [tuple()]) -> [tuple()].
+info_delete(Key, StateInfo) ->
+ lists:keydelete(Key, 1, StateInfo).
+
+
+-spec shardsstr(#shard{}, #shard{} | [#shard{}]) -> string().
+shardsstr(#shard{name = SourceName}, #shard{name = TargetName}) ->
+ lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetName]));
+
+shardsstr(#shard{name = SourceName}, Targets) ->
+ TNames = [TN || #shard{name = TN} <- Targets],
+ TargetsStr = string:join([binary_to_list(T) || T <- TNames], ","),
+ lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetsStr])).
+
+
+-spec reset_target(#job{}) -> #job{}.
+reset_target(#job{source = Source, target = Targets} = Job) ->
+ ShardNames = try
+ [N || #shard{name = N} <- mem3:local_shards(mem3:dbname(Source))]
+ catch
+ error:database_does_not_exist ->
+ []
+ end,
+ lists:map(fun(#shard{name = Name}) ->
+ case {couch_server:exists(Name), lists:member(Name, ShardNames)} of
+ {_, true} ->
+ % Should never get here but if we do crash and don't continue
+ LogMsg = "~p : ~p target unexpectedly found in shard map ~p",
+ couch_log:error(LogMsg, [?MODULE, jobfmt(Job), Name]),
+ erlang:error({target_present_in_shard_map, Name});
+ {true, false} ->
+ LogMsg = "~p : ~p resetting ~p target",
+ couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), Name]),
+ couch_db_split:cleanup_target(Source#shard.name, Name);
+ {false, false} ->
+ ok
+ end
+ end, Targets),
+ Job.
+
+
+-spec update_split_history(#job{}) -> #job{}.
+update_split_history(#job{split_state = St, update_time = Ts} = Job) ->
+ Hist = Job#job.history,
+ JobSt = case St of
+ completed -> completed;
+ failed -> failed;
+ new -> new;
+ stopped -> stopped;
+ _ -> running
+ end,
+ Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}.
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
new file mode 100644
index 000000000..a1e00544a
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -0,0 +1,288 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_store).
+
+
+-export([
+ init/3,
+
+ store_job/2,
+ load_job/2,
+ delete_job/2,
+ get_jobs/1,
+
+ store_state/1,
+ load_state/2,
+ delete_state/1, % for debugging
+
+ job_to_ejson_props/2,
+ state_to_ejson_props/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+-spec init(#state{}, binary(), binary()) -> #state{}.
+init(#state{} = State, JobPrefix, StateDocId) ->
+ State#state{
+ job_prefix = <<?LOCAL_DOC_PREFIX, JobPrefix/binary>>,
+ state_id = <<?LOCAL_DOC_PREFIX, StateDocId/binary>>
+ }.
+
+
+-spec store_job(#state{}, #job{}) -> ok.
+store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) ->
+ with_shards_db(fun(Db) ->
+ DocId = <<Prefix/binary, Id/binary>>,
+ ok = update_doc(Db, DocId, job_to_ejson_props(Job))
+ end).
+
+
+-spec load_job(#state{}, binary()) -> {ok, {[_]}} | not_found.
+load_job(#state{job_prefix = Prefix}, Id) ->
+ with_shards_db(fun(Db) ->
+ case load_doc(Db, <<Prefix/binary, Id/binary>>) of
+ {ok, DocBody} ->
+ {ok, job_from_ejson(DocBody)};
+ not_found ->
+ not_found
+ end
+ end).
+
+
+-spec delete_job(#state{}, binary()) -> ok.
+delete_job(#state{job_prefix = Prefix}, Id) ->
+ with_shards_db(fun(Db) ->
+ DocId = <<Prefix/binary, Id/binary>>,
+ ok = delete_doc(Db, DocId)
+ end).
+
+
+-spec get_jobs(#state{}) -> [#job{}].
+get_jobs(#state{job_prefix = Prefix}) ->
+ with_shards_db(fun(Db) ->
+ PrefixLen = byte_size(Prefix),
+ FoldFun = fun(#doc{id = Id, body = Body}, Acc) ->
+ case Id of
+ <<Prefix:PrefixLen/binary, _/binary>> ->
+ {ok, [job_from_ejson(Body) | Acc]};
+ _ ->
+ {stop, Acc}
+ end
+ end,
+ Opts = [{start_key, Prefix}],
+ {ok, Jobs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
+ lists:reverse(Jobs)
+ end).
+
+
+-spec store_state(#state{}) -> ok.
+store_state(#state{state_id = DocId} = State) ->
+ with_shards_db(fun(Db) ->
+ ok = update_doc(Db, DocId, state_to_ejson_props(State))
+ end).
+
+
+-spec load_state(#state{}, atom()) -> #state{}.
+load_state(#state{state_id = DocId} = State, Default) ->
+ with_shards_db(fun(Db) ->
+ case load_doc(Db, DocId) of
+ {ok, DocBody} ->
+ state_from_ejson(State, DocBody);
+ not_found ->
+ State#state{state = Default}
+ end
+ end).
+
+
+-spec delete_state(#state{}) -> ok.
+delete_state(#state{state_id = DocId}) ->
+ with_shards_db(fun(Db) ->
+ ok = delete_doc(Db, DocId)
+ end).
+
+
+job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) ->
+ Iso8601 = proplists:get_value(iso8601, Opts),
+ History = history_to_ejson(Job#job.history, Iso8601),
+ StartTime = case Iso8601 of
+ true -> iso8601(Job#job.start_time);
+ _ -> Job#job.start_time
+ end,
+ UpdateTime = case Iso8601 of
+ true -> iso8601(Job#job.update_time);
+ _ -> Job#job.update_time
+ end,
+ [
+ {id, Job#job.id},
+ {type, Job#job.type},
+ {source, Source#shard.name},
+ {target, [T#shard.name || T <- Targets]},
+ {job_state, Job#job.job_state},
+ {split_state, Job#job.split_state},
+ {state_info, state_info_to_ejson(Job#job.state_info)},
+ {node, atom_to_binary(Job#job.node, utf8)},
+ {start_time, StartTime},
+ {update_time, UpdateTime},
+ {history, History}
+ ].
+
+
+state_to_ejson_props(#state{} = State) ->
+ [
+ {state, atom_to_binary(State#state.state, utf8)},
+ {state_info, state_info_to_ejson(State#state.state_info)},
+ {update_time, State#state.update_time},
+ {node, atom_to_binary(State#state.node, utf8)}
+ ].
+
+
+% Private API
+
+with_shards_db(Fun) ->
+ DbName = config:get("mem3", "shards_db", "_dbs"),
+ case mem3_util:ensure_exists(DbName) of
+ {ok, Db} ->
+ try
+ Fun(Db)
+ after
+ catch couch_db:close(Db)
+ end;
+ Else ->
+ throw(Else)
+ end.
+
+
+delete_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = {_, Revs}}} ->
+ {ok, _} = couch_db:delete_doc(Db, DocId, Revs),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ ok;
+ {not_found, _} ->
+ ok
+ end.
+
+
+update_doc(Db, DocId, Body) ->
+ DocProps = [{<<"_id">>, DocId}] ++ Body,
+ Body1 = ?JSON_DECODE(?JSON_ENCODE({DocProps})),
+ BaseDoc = couch_doc:from_json_obj(Body1),
+ Doc = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = Revs}} ->
+ BaseDoc#doc{revs = Revs};
+ {not_found, _} ->
+ BaseDoc
+ end,
+ case store_state() of
+ true ->
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ couch_log:debug("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ ok;
+ false ->
+ couch_log:debug("~p not storing state in ~p", [?MODULE, DocId]),
+ ok
+ end.
+
+
+load_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, [ejson_body]) of
+ {ok, #doc{body = Body}} ->
+ couch_log:debug("~p loaded doc ~p ~p", [?MODULE, DocId, Body]),
+ {ok, Body};
+ {not_found, _} ->
+ not_found
+ end.
+
+
+job_to_ejson_props(#job{} = Job) ->
+ job_to_ejson_props(Job, []).
+
+
+job_from_ejson({Props}) ->
+ Id = couch_util:get_value(<<"id">>, Props),
+ Type = couch_util:get_value(<<"type">>, Props),
+ Source = couch_util:get_value(<<"source">>, Props),
+ Target = couch_util:get_value(<<"target">>, Props),
+ JobState = couch_util:get_value(<<"job_state">>, Props),
+ SplitState = couch_util:get_value(<<"split_state">>, Props),
+ StateInfo = couch_util:get_value(<<"state_info">>, Props),
+ TStarted = couch_util:get_value(<<"start_time">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ History = couch_util:get_value(<<"history">>, Props),
+ #job{
+ id = Id,
+ type = binary_to_atom(Type, utf8),
+ job_state = binary_to_atom(JobState, utf8),
+ split_state = binary_to_atom(SplitState, utf8),
+ state_info = state_info_from_ejson(StateInfo),
+ node = node(),
+ start_time = TStarted,
+ update_time = TUpdated,
+ source = mem3_reshard:shard_from_name(Source),
+ target = [mem3_reshard:shard_from_name(T) || T <- Target],
+ history = history_from_ejson(History)
+ }.
+
+
+state_from_ejson(#state{} = State, {Props}) ->
+ StateVal = couch_util:get_value(<<"state">>, Props),
+ StateInfo = couch_util:get_value(<<"state_info">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ State#state{
+ state = binary_to_atom(StateVal, utf8),
+ state_info = state_info_from_ejson(StateInfo),
+ node = node(),
+ update_time = TUpdated
+ }.
+
+
+state_info_from_ejson({Props}) ->
+ Props1 = [{binary_to_atom(K, utf8), couch_util:to_binary(V)}
+ || {K, V} <- Props],
+ lists:sort(Props1).
+
+
+history_to_ejson(Hist, true) when is_list(Hist) ->
+ [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {T, S, D} <- Hist];
+
+history_to_ejson(Hist, _) when is_list(Hist) ->
+ [{[{timestamp, T}, {type, S}, {detail, D}]} || {T, S, D} <- Hist].
+
+
+history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
+ lists:map(fun({EventProps}) ->
+ Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
+ State = couch_util:get_value(<<"type">>, EventProps),
+ Detail = couch_util:get_value(<<"detail">>, EventProps),
+ {Timestamp, binary_to_atom(State, utf8), Detail}
+ end, HistoryEJson).
+
+
+state_info_to_ejson(Props) ->
+ {lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}.
+
+
+store_state() ->
+ config:get_boolean("reshard", "store_state", true).
+
+
+iso8601(UnixSec) ->
+ Mega = UnixSec div 1000000,
+ Sec = UnixSec rem 1000000,
+ {{Y, M, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),
+ Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
+ iolist_to_binary(io_lib:format(Format, [Y, M, D, H, Min, S])).
diff --git a/src/mem3/src/mem3_reshard_validate.erl b/src/mem3/src/mem3_reshard_validate.erl
new file mode 100644
index 000000000..aa8df3e16
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_validate.erl
@@ -0,0 +1,126 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_validate).
+
+-export([
+ start_args/2,
+ source/1,
+ targets/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+-spec start_args(#shard{}, any()) -> ok | {error, term()}.
+start_args(Source, Split) ->
+ first_error([
+ check_split(Split),
+ check_range(Source, Split),
+ check_node(Source),
+ source(Source),
+ check_shard_map(Source)
+ ]).
+
+
+-spec source(#shard{}) -> ok | {error, term()}.
+source(#shard{name = Name}) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ {error, {source_shard_not_found, Name}}
+ end.
+
+
+-spec check_shard_map(#shard{}) -> ok | {error, term()}.
+check_shard_map(#shard{name = Name}) ->
+ DbName = mem3:dbname(Name),
+ AllShards = mem3:shards(DbName),
+ case mem3_util:calculate_max_n(AllShards) of
+ N when is_integer(N), N >= 1 ->
+ ok;
+ N when is_integer(N), N < 1 ->
+ {error, {not_enough_shard_copies, DbName}}
+ end.
+
+
+-spec targets(#shard{}, [#shard{}]) -> ok | {error, term()}.
+targets(#shard{} = Source, Targets) ->
+ first_error([
+ target_ranges(Source, Targets)
+ ]).
+
+
+-spec check_split(any()) -> ok | {error, term()}.
+check_split(Split) when is_integer(Split), Split > 1 ->
+ ok;
+check_split(Split) ->
+ {error, {invalid_split_parameter, Split}}.
+
+
+-spec check_range(#shard{}, any()) -> ok | {error, term()}.
+check_range(#shard{range = Range = [B, E]}, Split) ->
+ case (E + 1 - B) >= Split of
+ true ->
+ ok;
+ false ->
+ {error, {shard_range_cannot_be_split, Range, Split}}
+ end.
+
+
+-spec check_node(#shard{}) -> ok | {error, term()}.
+check_node(#shard{node = undefined}) ->
+ ok;
+
+check_node(#shard{node = Node}) when Node =:= node() ->
+ ok;
+
+check_node(#shard{node = Node}) ->
+ {error, {source_shard_node_is_not_current_node, Node}}.
+
+
+-spec target_ranges(#shard{}, [#shard{}]) -> ok | {error, any()}.
+target_ranges(#shard{range = [Begin, End]}, Targets) ->
+ Ranges = [R || #shard{range = R} <- Targets],
+ SortFun = fun([B1, _], [B2, _]) -> B1 =< B2 end,
+ [First | RestRanges] = lists:sort(SortFun, Ranges),
+ try
+ TotalRange = lists:foldl(fun([B2, E2], [B1, E1]) ->
+ case B2 =:= E1 + 1 of
+ true ->
+ ok;
+ false ->
+ throw({range_error, {B2, E1}})
+ end,
+ [B1, E2]
+ end, First, RestRanges),
+ case [Begin, End] =:= TotalRange of
+ true ->
+ ok;
+ false ->
+ throw({range_error, {[Begin, End], TotalRange}})
+ end
+ catch
+ throw:{range_error, Error} ->
+ {error, {shard_range_error, Error}}
+ end.
+
+
+-spec first_error([ok | {error, term()}]) -> ok | {error, term()}.
+first_error(Results) ->
+ case [Res || Res <- Results, Res =/= ok] of
+ [] ->
+ ok;
+ [FirstError | _] ->
+ FirstError
+ end.