diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-07-16 12:57:13 -0400 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-07-31 18:59:02 -0400 |
commit | bab8763c57ff6ca962bbb5f06e1dba39f452dcb5 (patch) | |
tree | 180a6484ec5d77674884aa3d296bc8b3745564d3 | |
parent | 9ff8fa1fad512b3ecfa867db6a9c82e16446f8b0 (diff) | |
download | couchdb-bab8763c57ff6ca962bbb5f06e1dba39f452dcb5.tar.gz |
FDB Replicator WIP
21 files changed, 990 insertions, 1326 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 4d32c03c5..7326bca63 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -403,20 +403,6 @@ maybe_log(_HttpReq, #httpd_resp{should_log = false}) -> ok. -%% HACK: replication currently handles two forms of input, #db{} style -%% and #http_db style. We need a third that makes use of fabric. #db{} -%% works fine for replicating the dbs and nodes database because they -%% aren't sharded. So for now when a local db is specified as the source or -%% the target, it's hacked to make it a full url and treated as a remote. -possibly_hack(#httpd{path_parts=[<<"_replicate">>]}=Req) -> - {Props0} = chttpd:json_body_obj(Req), - Props1 = fix_uri(Req, Props0, <<"source">>), - Props2 = fix_uri(Req, Props1, <<"target">>), - put(post_body, {Props2}), - Req; -possibly_hack(Req) -> - Req. - check_request_uri_length(Uri) -> check_request_uri_length(Uri, config:get("httpd", "max_uri_length")). @@ -439,53 +425,6 @@ check_url_encoding([$% | _]) -> check_url_encoding([_ | Rest]) -> check_url_encoding(Rest). -fix_uri(Req, Props, Type) -> - case replication_uri(Type, Props) of - undefined -> - Props; - Uri0 -> - case is_http(Uri0) of - true -> - Props; - false -> - Uri = make_uri(Req, quote(Uri0)), - [{Type,Uri}|proplists:delete(Type,Props)] - end - end. - -replication_uri(Type, PostProps) -> - case couch_util:get_value(Type, PostProps) of - {Props} -> - couch_util:get_value(<<"url">>, Props); - Else -> - Else - end. - -is_http(<<"http://", _/binary>>) -> - true; -is_http(<<"https://", _/binary>>) -> - true; -is_http(_) -> - false. - -make_uri(Req, Raw) -> - Port = integer_to_list(mochiweb_socket_server:get(chttpd, port)), - Url = list_to_binary(["http://", config:get("httpd", "bind_address"), - ":", Port, "/", Raw]), - Headers = [ - {<<"authorization">>, ?l2b(header_value(Req,"authorization",""))}, - {<<"cookie">>, ?l2b(extract_cookie(Req))} - ], - {[{<<"url">>,Url}, {<<"headers">>,{Headers}}]}. - -extract_cookie(#httpd{mochi_req = MochiReq}) -> - case MochiReq:get_cookie_value("AuthSession") of - undefined -> - ""; - AuthSession -> - "AuthSession=" ++ AuthSession - end. -%%% end hack set_auth_handlers() -> AuthenticationDefault = "{chttpd_auth, cookie_authentication_handler}, diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 11d2c5b72..d832ade8c 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -203,8 +203,8 @@ handle_task_status_req(Req) -> handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) -> chttpd:validate_ctype(Req, "application/json"), %% see HACK in chttpd.erl about replication - PostBody = get(post_body), - case replicate(PostBody, Ctx) of + PostBody = chttpd:json_body_obj(Req), + case couch_replicator:replicate(PostBody, Ctx) of {ok, {continuous, RepId}} -> send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]}); {ok, {cancelled, RepId}} -> @@ -223,50 +223,6 @@ handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) -> handle_replicate_req(Req) -> send_method_not_allowed(Req, "POST"). -replicate({Props} = PostBody, Ctx) -> - case couch_util:get_value(<<"cancel">>, Props) of - true -> - cancel_replication(PostBody, Ctx); - _ -> - Node = choose_node([ - couch_util:get_value(<<"source">>, Props), - couch_util:get_value(<<"target">>, Props) - ]), - case rpc:call(Node, couch_replicator, replicate, [PostBody, Ctx]) of - {badrpc, Reason} -> - erlang:error(Reason); - Res -> - Res - end - end. - -cancel_replication(PostBody, Ctx) -> - {Res, _Bad} = rpc:multicall(couch_replicator, replicate, [PostBody, Ctx]), - case [X || {ok, {cancelled, _}} = X <- Res] of - [Success|_] -> - % Report success if at least one node canceled the replication - Success; - [] -> - case lists:usort(Res) of - [UniqueReply] -> - % Report a universally agreed-upon reply - UniqueReply; - [] -> - {error, badrpc}; - Else -> - % Unclear what to do here -- pick the first error? - % Except try ignoring any {error, not_found} responses - % because we'll always get two of those - hd(Else -- [{error, not_found}]) - end - end. - -choose_node(Key) when is_binary(Key) -> - Checksum = erlang:crc32(Key), - Nodes = lists:sort([node()|erlang:nodes()]), - lists:nth(1 + Checksum rem length(Nodes), Nodes); -choose_node(Key) -> - choose_node(term_to_binary(Key)). handle_reload_query_servers_req(#httpd{method='POST'}=Req) -> chttpd:validate_ctype(Req, "application/json"), diff --git a/src/chttpd/test/eunit/chttpd_handlers_tests.erl b/src/chttpd/test/eunit/chttpd_handlers_tests.erl index f3e8f5dcd..5ae80d02b 100644 --- a/src/chttpd/test/eunit/chttpd_handlers_tests.erl +++ b/src/chttpd/test/eunit/chttpd_handlers_tests.erl @@ -70,7 +70,8 @@ request_replicate(Url, Body) -> Headers = [{"Content-Type", "application/json"}], Handler = {chttpd_misc, handle_replicate_req}, request(post, Url, Headers, Body, Handler, fun(Req) -> - chttpd:send_json(Req, 200, get(post_body)) + PostBody = chttpd:json_body_obj(Req), + chttpd:send_json(Req, 200, PostBody) end). request(Method, Url, Headers, Body, {M, F}, MockFun) -> diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl index d469ed41a..393f2563f 100644 --- a/src/couch_jobs/src/couch_jobs.erl +++ b/src/couch_jobs/src/couch_jobs.erl @@ -19,6 +19,8 @@ remove/3, get_job_data/3, get_job_state/3, + get_jobs/2, + get_jobs/3, % Job processing accept/1, @@ -103,6 +105,21 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) -> end). +-spec get_jobs(jtx(), job_type()) -> #{}. +get_jobs(Tx, Type) when is_binary(JobId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_job_fdb:get_jobs(JTx, Type) + end). + + +-spec get_jobs(jtx(), job_type()) -> #{}. +get_jobs(Tx, Type, Filter) when is_binary(JobId), is_function(Filter, 1) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_job_fdb:get_jobs(JTx, Type, Filter) + end). + + + %% Job processor API -spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}. diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index e4fa31cee..225a258b0 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -52,30 +52,27 @@ {ok, {cancelled, binary()}} | {error, any()} | no_return(). -replicate(PostBody, Ctx) -> - {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx), - Rep = Rep0#rep{start_time = os:timestamp()}, - #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep, - case get_value(cancel, Options, false) of - true -> - CancelRepId = case get_value(id, Options, nil) of - nil -> - RepId; - RepId2 -> - RepId2 - end, - case check_authorization(CancelRepId, UserCtx) of - ok -> - cancel_replication(CancelRepId); - not_found -> - {error, not_found} - end; - false -> - check_authorization(RepId, UserCtx), - {ok, Listener} = rep_result_listener(RepId), - Result = do_replication_loop(Rep), - couch_replicator_notifier:stop(Listener), - Result +replicate(PostBody, UserCtx) -> + {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, UserCtx), + Rep = Rep0#{<<"start_time">> => erlang:system_time()}, + #{<<"id">> := RepId, <<"options">> := Options} = Rep, + case maps:get(<<"cancel">>, Options, false) of + true -> + CancelRepId = case maps:get(<<"id">>, Options, nil) of + nil -> RepId; + RepId2 -> RepId2 + end, + case check_authorization(CancelRepId, UserCtx) of + ok -> cancel_replication(CancelRepId); + not_found -> {error, not_found} + end; + false -> + check_authorization(RepId, UserCtx), + ok = couch_replicator_scheduler:add_job(Rep), + case maps:get(<<"continuous">>, Options, false) of + true -> {ok, {continuous, Id}}; + false -> wait_for_result(Id) + end end. @@ -83,57 +80,50 @@ replicate(PostBody, Ctx) -> % it returns `ignore`. -spec ensure_rep_db_exists() -> ignore. ensure_rep_db_exists() -> - {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(), + ok = couch_replicator_docs:ensure_rep_db_exists(), + couch_jobs:set_type_timeout(?REP_DOCS, ?REP_DOCS_TIMEOUT_MSEC), + couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_MSEC), ignore. --spec do_replication_loop(#rep{}) -> - {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}. -do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) -> - ok = couch_replicator_scheduler:add_job(Rep), - case get_value(continuous, Options, false) of - true -> - {ok, {continuous, ?l2b(BaseId ++ Ext)}}; - false -> - wait_for_result(Id) - end. - - --spec rep_result_listener(rep_id()) -> {ok, pid()}. -rep_result_listener(RepId) -> - ReplyTo = self(), - {ok, _Listener} = couch_replicator_notifier:start_link( - fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> - ReplyTo ! Ev; - (_) -> - ok - end). - - -spec wait_for_result(rep_id()) -> {ok, {[_]}} | {error, any()}. wait_for_result(RepId) -> - receive - {finished, RepId, RepResult} -> - {ok, RepResult}; - {error, RepId, Reason} -> - {error, Reason} + FinishRes = case couch_jobs:subscribe(?REP_JOBS, RepId) of + {ok, finished, JobData} -> + {ok, JobData}; + {ok, SubId, _, _} -> + case couch_jobs:wait(SubId, finished, infinity) of + {?REP_JOBS, RepId, finished, JobData} -> {ok, JobData}; + timeout -> timeout + end; + {error, Error} -> + {error, Error} + end, + case FinishRes of + {ok, #{<<"finished_result">> := CheckpointHistory}} -> + {ok, CheckpointHistory}; + timeout -> + {error, timeout}; + {error, Error} -> + {error, Error} end. -spec cancel_replication(rep_id()) -> {ok, {cancelled, binary()}} | {error, not_found}. -cancel_replication({BasedId, Extension} = RepId) -> - FullRepId = BasedId ++ Extension, - couch_log:notice("Canceling replication '~s' ...", [FullRepId]), - case couch_replicator_scheduler:rep_state(RepId) of - #rep{} -> - ok = couch_replicator_scheduler:remove_job(RepId), - couch_log:notice("Replication '~s' cancelled", [FullRepId]), - {ok, {cancelled, ?l2b(FullRepId)}}; - nil -> - couch_log:notice("Replication '~s' not found", [FullRepId]), - {error, not_found} +cancel_replication(RepId) when is_binary(RepId) -> + couch_log:notice("Canceling replication '~s' ...", [RepId]), + case couch_jobs:get_job_data(undefined, ?REP_JOBS, RepId) of + {error_not, found} -> + {error, not_found}; + #{<<"rep">> := #{<<"db_name">> := null}} -> + couch_jobs:remove(undefined, ?REP_JOBS, RepId) + {ok, {cancelled, ?l2b(FullRepId)}}; + #{<<"rep">> := #{}} -> + % Job was started from a replicator doc canceling via _replicate + % doesn't quite make sense, instead replicator should be deleted. + {error, not_found} end. @@ -142,10 +132,10 @@ replication_states() -> ?REPLICATION_STATES. --spec strip_url_creds(binary() | {[_]}) -> binary(). +-spec strip_url_creds(binary() | #{}) -> binary(). strip_url_creds(Endpoint) -> - case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of - #httpdb{url=Url} -> + case couch_replicator_docs:parse_rep_db(Endpoint, #{}, #{}) of + #{<<"url">> := Url} -> iolist_to_binary(couch_util:url_strip_password(Url)); LocalDb when is_binary(LocalDb) -> LocalDb @@ -286,13 +276,13 @@ state_atom(State) when is_atom(State) -> -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found. check_authorization(RepId, #user_ctx{name = Name} = Ctx) -> - case couch_replicator_scheduler:rep_state(RepId) of - #rep{user_ctx = #user_ctx{name = Name}} -> - ok; - #rep{} -> - couch_httpd:verify_is_server_admin(Ctx); - nil -> - not_found + case couch_jobs:get_job_data(undefined, ?REP_JOBS, RePid) of + {error_not, found} -> + not_found; + #{<<"rep">> := {<<"user">> := Name}} -> + ok; + #{} -> + couch_httpd:verify_is_server_admin(Ctx) end. @@ -342,13 +332,6 @@ t_replication_not_found() -> end). -expect_rep_user_ctx(Name, Role) -> - meck:expect(couch_replicator_scheduler, rep_state, - fun(_Id) -> - UserCtx = #user_ctx{name = Name, roles = [Role]}, - #rep{user_ctx = UserCtx} - end). - strip_url_creds_test_() -> { diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl index 2a5b7c8c8..6584078fc 100644 --- a/src/couch_replicator/src/couch_replicator.hrl +++ b/src/couch_replicator/src/couch_replicator.hrl @@ -11,6 +11,10 @@ % the License. -define(REP_ID_VERSION, 4). +-define(REP_DOCS, <<"repdocs">>). +-define(REP_JOBS, <<"repjobs">>). +-define(REP_DOCS_TIMEOUT_MSEC, 17000). +-define(REP_JOBS_TIMEOUT_MSEC, 33000). -record(rep, { id :: rep_id() | '_' | 'undefined', @@ -22,11 +26,12 @@ view = nil :: any() | '_', doc_id :: any() | '_', db_name = null :: null | binary() | '_', - start_time = {0, 0, 0} :: erlang:timestamp() | '_', + start_time = 0 :: integer() | '_', stats = couch_replicator_stats:new() :: orddict:orddict() | '_' }). --type rep_id() :: {string(), string()}. +-type rep_id() :: binary(). +-type user_name() :: binary() | null. -type db_doc_id() :: {binary(), binary() | '_'}. -type seconds() :: non_neg_integer(). -type rep_start_result() :: diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index ab1de7df9..5c03632f0 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -38,8 +38,8 @@ open_doc/3, open_doc_revs/6, changes_since/5, - db_uri/1, - normalize_db/1 + db_uri/1 + db_from_map/1, ]). -import(couch_replicator_httpc, [ @@ -57,21 +57,19 @@ -define(MAX_URL_LEN, 7000). -define(MIN_URL_LEN, 200). -db_uri(#httpdb{url = Url}) -> +db_uri(#{<<"url">> := Url}) -> couch_util:url_strip_password(Url); -db_uri(DbName) when is_binary(DbName) -> - ?b2l(DbName); +db_uri(#httpdb{url = Url}) -> + couch_util:url_strip_password(Url). -db_uri(Db) -> - db_uri(couch_db:name(Db)). +db_open(#{} = Db) -> + db_open(Db, false, []); -db_open(Db) -> - db_open(Db, false, []). -db_open(#httpdb{} = Db1, Create, CreateParams) -> - {ok, Db} = couch_replicator_httpc:setup(Db1), +db_open(#{} = Db0, Create, CreateParams) -> + {ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)), try case Create of false -> @@ -149,6 +147,7 @@ get_pending_count(#httpdb{} = Db, Seq) -> {ok, couch_util:get_value(<<"pending">>, Props, null)} end). + get_view_info(#httpdb{} = Db, DDocId, ViewName) -> Path = io_lib:format("~s/_view/~s/_info", [DDocId, ViewName]), send_req(Db, [{path, Path}], @@ -285,7 +284,6 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc) end. - error_reason({http_request_failed, "GET", _Url, {error, timeout}}) -> timeout; error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) -> @@ -356,7 +354,6 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> end end). - update_docs(Db, DocList, Options) -> update_docs(Db, DocList, Options, interactive_edit). @@ -889,23 +886,6 @@ header_value(Key, Headers, Default) -> end. -% Normalize an #httpdb{} or #db{} record such that it can be used for -% comparisons. This means remove things like pids and also sort options / props. -normalize_db(#httpdb{} = HttpDb) -> - #httpdb{ - url = HttpDb#httpdb.url, - auth_props = lists:sort(HttpDb#httpdb.auth_props), - headers = lists:keysort(1, HttpDb#httpdb.headers), - timeout = HttpDb#httpdb.timeout, - ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options), - retries = HttpDb#httpdb.retries, - http_connections = HttpDb#httpdb.http_connections - }; - -normalize_db(<<DbName/binary>>) -> - DbName. - - maybe_append_create_query_params(Db, []) -> Db; @@ -914,27 +894,72 @@ maybe_append_create_query_params(Db, CreateParams) -> Db#httpdb{url = NewUrl}. --ifdef(TEST). +db_from_json(#{} = DbMap) -> + #{ + <<"url">> := Url, + <<"auth">> := Auth, + <<"headers">> := Headers0, + <<"ibrowse_options">> := IBrowseOptions0, + <<"timeout">> := Timeout, + <<"http_connections">> := HttpConnections, + <<"retries">> := Retries, + <<"proxy_url">> := ProxyURL0 + } = DbMap, + Headers = maps:fold(fun(K, V, Acc) -> + [{binary_to_list(K), binary_to_list(V)} | Acc] + end, [], Headers0), + IBrowseOptions0 = maps:fold(fun + (<<"proxy_protocol">>, V, Acc) -> + [{binary_to_atom(K), binary_to_existing_atom(V)} | Acc]; + (<<"socket_options">>, #{} = SockOpts, Acc) -> + SockOptsKVs = maps:fold(fun sock_opts_fold/3, [], SockOpts), + [{socket_options, SockOptsKVs} | Acc]; + (<<"ssl_options">>, #{} = SslOpts, Acc) -> + SslOptsKVs = maps:fold(fun ssl_opts_fold/3, [], SslOpts), + [{ssl_options, SslOptsKVs} | Acc]; + (K, V, Acc) when is_binary(V) -> + [{binary_to_atom(K), binary_to_list(V)} | Acc]; + (K, V, Acc) -> + [{binary_to_atom(K), V} | Acc] + end, [], IBrowseOptions0), + ProxyUrl = case ProxyUrl0 of + null -> undefined, + V when is_binary(V) -> binary_to_list(V) + end, + #httpdb{ + url = binary_to_list(Url), + auth_props = maps:to_list(Auth), + headers = Headers, + ibrowse_options = IBrowseOptions, + timeout = Timeout, + http_connections = HttpConnections, + retries = Retries, + proxy_url = ProxyURL + }. + + --include_lib("eunit/include/eunit.hrl"). +% See couch_replicator_docs:ssl_params/1 for ssl parsed options +% and http://erlang.org/doc/man/ssl.html#type-server_option +% all latest SSL server options +% +ssl_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) -> + [{binary_to_atom(K), V} | Acc]; + +ssl_opts_fold(K, null, Acc) -> + [{binary_to_atom(K), undefined} | Acc]; +ssl_opts_fold(<<"verify">>, V, Acc) -> + [{binary_to_atom(K), binary_to_atom(V)}; -normalize_http_db_test() -> - HttpDb = #httpdb{ - url = "http://host/db", - auth_props = [{"key", "val"}], - headers = [{"k2","v2"}, {"k1","v1"}], - timeout = 30000, - ibrowse_options = [{k2, v2}, {k1, v1}], - retries = 10, - http_connections = 20 - }, - Expected = HttpDb#httpdb{ - headers = [{"k1","v1"}, {"k2","v2"}], - ibrowse_options = [{k1, v1}, {k2, v2}] - }, - ?assertEqual(Expected, normalize_db(HttpDb)), - ?assertEqual(<<"local">>, normalize_db(<<"local">>)). +ssl_opts_fold(K, V, Acc) when is_list(V) -> + [{binary_to_atom(K), binary_to_list(V)} | Acc]. --endif. +% See ?VALID_SOCK_OPTS in couch_replicator_docs for accepted socket options +% +sock_opts_fold(K, V, Acc) when is_list(V) -> + [{binary_to_atom(K), binary_to_atom(V)} | Acc]; + +sock_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) -> + [{binary_to_atom(K), V} | Acc]. diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl deleted file mode 100644 index a7f7573b6..000000000 --- a/src/couch_replicator/src/couch_replicator_clustering.erl +++ /dev/null @@ -1,248 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - - -% Maintain cluster membership and stability notifications for replications. -% On changes to cluster membership, broadcast events to `replication` gen_event. -% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events. -% -% Cluster stability is defined as "there have been no nodes added or removed in -% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a -% speedier startup, during initialization there is a shorter StartupPeriod -% in effect (also configurable). -% -% This module is also in charge of calculating ownership of replications based -% on where their _replicator db documents shards live. - - --module(couch_replicator_clustering). - --behaviour(gen_server). --behaviour(config_listener). --behaviour(mem3_cluster). - --export([ - start_link/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 -]). - --export([ - owner/2, - is_stable/0, - link_cluster_event_listener/3 -]). - -% config_listener callbacks --export([ - handle_config_change/5, - handle_config_terminate/3 -]). - -% mem3_cluster callbacks --export([ - cluster_stable/1, - cluster_unstable/1 -]). - --include_lib("couch/include/couch_db.hrl"). --include_lib("mem3/include/mem3.hrl"). - --define(DEFAULT_QUIET_PERIOD, 60). % seconds --define(DEFAULT_START_PERIOD, 5). % seconds --define(RELISTEN_DELAY, 5000). - --record(state, { - mem3_cluster_pid :: pid(), - cluster_stable :: boolean() -}). - - --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - -% owner/2 function computes ownership for a {DbName, DocId} tuple -% `unstable` if cluster is considered to be unstable i.e. it has changed -% recently, or returns node() which of the owner. -% --spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable. -owner(<<"shards/", _/binary>> = DbName, DocId) -> - case is_stable() of - false -> - unstable; - true -> - owner_int(DbName, DocId) - end; -owner(_DbName, _DocId) -> - node(). - - --spec is_stable() -> true | false. -is_stable() -> - gen_server:call(?MODULE, is_stable). - - --spec link_cluster_event_listener(atom(), atom(), list()) -> pid(). -link_cluster_event_listener(Mod, Fun, Args) - when is_atom(Mod), is_atom(Fun), is_list(Args) -> - CallbackFun = - fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]); - (_) -> ok - end, - {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun), - Pid. - - -% Mem3 cluster callbacks - -cluster_unstable(Server) -> - ok = gen_server:call(Server, set_unstable), - couch_replicator_notifier:notify({cluster, unstable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - couch_log:notice("~s : cluster unstable", [?MODULE]), - Server. - -cluster_stable(Server) -> - ok = gen_server:call(Server, set_stable), - couch_replicator_notifier:notify({cluster, stable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), - couch_log:notice("~s : cluster stable", [?MODULE]), - Server. - - -% gen_server callbacks - -init([]) -> - ok = config:listen_for_changes(?MODULE, nil), - Period = abs(config:get_integer("replicator", "cluster_quiet_period", - ?DEFAULT_QUIET_PERIOD)), - StartPeriod = abs(config:get_integer("replicator", "cluster_start_period", - ?DEFAULT_START_PERIOD)), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod, - Period), - {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}. - - -terminate(_Reason, _State) -> - ok. - - -handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) -> - {reply, IsStable, State}; - -handle_call(set_stable, _From, State) -> - {reply, ok, State#state{cluster_stable = true}}; - -handle_call(set_unstable, _From, State) -> - {reply, ok, State#state{cluster_stable = false}}. - - -handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) -> - ok = mem3_cluster:set_period(Pid, Period), - {noreply, State}. - - -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%% Internal functions - - -handle_config_change("replicator", "cluster_quiet_period", V, _, S) -> - ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}), - {ok, S}; -handle_config_change(_, _, _, _, S) -> - {ok, S}. - - -handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_S, _R, _St) -> - Pid = whereis(?MODULE), - erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). - - --spec owner_int(binary(), binary()) -> node(). -owner_int(ShardName, DocId) -> - DbName = mem3:dbname(ShardName), - Live = [node() | nodes()], - Shards = mem3:shards(DbName, DocId), - Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)], - mem3:owner(DbName, DocId, Nodes). - - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - - -replicator_clustering_test_() -> - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_stable_callback(), - t_unstable_callback() - ] - }. - - -t_stable_callback() -> - ?_test(begin - ?assertEqual(false, is_stable()), - cluster_stable(whereis(?MODULE)), - ?assertEqual(true, is_stable()) - end). - - -t_unstable_callback() -> - ?_test(begin - cluster_stable(whereis(?MODULE)), - ?assertEqual(true, is_stable()), - cluster_unstable(whereis(?MODULE)), - ?assertEqual(false, is_stable()) - end). - - -setup() -> - meck:expect(couch_log, notice, 2, ok), - meck:expect(config, get, fun(_, _, Default) -> Default end), - meck:expect(config, listen_for_changes, 2, ok), - meck:expect(couch_stats, update_gauge, 2, ok), - meck:expect(couch_replicator_notifier, notify, 1, ok), - {ok, Pid} = start_link(), - Pid. - - -teardown(Pid) -> - unlink(Pid), - exit(Pid, kill), - meck:unload(). - --endif. diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl deleted file mode 100644 index 92b0222c4..000000000 --- a/src/couch_replicator/src/couch_replicator_db_changes.erl +++ /dev/null @@ -1,108 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_db_changes). - --behaviour(gen_server). - --export([ - start_link/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 -]). - --export([ - notify_cluster_event/2 -]). - --record(state, { - event_listener :: pid(), - mdb_changes :: pid() | nil -}). - - --spec notify_cluster_event(pid(), {cluster, any()}) -> ok. -notify_cluster_event(Server, {cluster, _} = Event) -> - gen_server:cast(Server, Event). - - --spec start_link() -> - {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link(?MODULE, [], []). - - -init([]) -> - EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE, - notify_cluster_event, [self()]), - State = #state{event_listener = EvtPid, mdb_changes = nil}, - case couch_replicator_clustering:is_stable() of - true -> - {ok, restart_mdb_changes(State)}; - false -> - {ok, State} - end. - - -terminate(_Reason, _State) -> - ok. - - -handle_call(_Msg, _From, State) -> - {reply, {error, invalid_call}, State}. - - -handle_cast({cluster, unstable}, State) -> - {noreply, stop_mdb_changes(State)}; - -handle_cast({cluster, stable}, State) -> - {noreply, restart_mdb_changes(State)}. - - -handle_info(_Msg, State) -> - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - --spec restart_mdb_changes(#state{}) -> #state{}. -restart_mdb_changes(#state{mdb_changes = nil} = State) -> - Suffix = <<"_replicator">>, - CallbackMod = couch_replicator_doc_processor, - Options = [skip_ddocs], - {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil, - Options), - couch_stats:increment_counter([couch_replicator, db_scans]), - couch_log:notice("Started replicator db changes listener ~p", [Pid]), - State#state{mdb_changes = Pid}; - -restart_mdb_changes(#state{mdb_changes = _Pid} = State) -> - restart_mdb_changes(stop_mdb_changes(State)). - - --spec stop_mdb_changes(#state{}) -> #state{}. -stop_mdb_changes(#state{mdb_changes = nil} = State) -> - State; -stop_mdb_changes(#state{mdb_changes = Pid} = State) -> - couch_log:notice("Stopping replicator db changes listener ~p", [Pid]), - unlink(Pid), - exit(Pid, kill), - State#state{mdb_changes = nil}. diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index 772037d8d..6f4481491 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -13,7 +13,6 @@ -module(couch_replicator_doc_processor). -behaviour(gen_server). --behaviour(couch_multidb_changes). -export([ start_link/0 @@ -29,10 +28,9 @@ ]). -export([ - db_created/2, - db_deleted/2, - db_found/2, - db_change/3 + during_doc_update/3, + after_db_create/1, + after_db_delete/1 ]). -export([ @@ -40,8 +38,7 @@ doc/2, doc_lookup/3, update_docs/0, - get_worker_ref/1, - notify_cluster_event/2 + get_worker_ref/1 ]). -include_lib("couch/include/couch_db.hrl"). @@ -76,88 +73,103 @@ }). -% couch_multidb_changes API callbacks +during_doc_update(#doc{} = Doc, Db, _UpdateType) -> + couch_stats:increment_counter([couch_replicator, docs, db_changes]), + ok = process_change(Db, Doc). -db_created(DbName, Server) -> +after_db_create(#{name := DbName}) -> couch_stats:increment_counter([couch_replicator, docs, dbs_created]), - couch_replicator_docs:ensure_rep_ddoc_exists(DbName), - Server. + couch_replicator_docs:ensure_rep_ddoc_exists(DbName). -db_deleted(DbName, Server) -> +after_db_delete(#{name := DbName}) -> couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), - ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity), - Server. - + remove_replications_by_dbname(DbName). -db_found(DbName, Server) -> - couch_stats:increment_counter([couch_replicator, docs, dbs_found]), - couch_replicator_docs:ensure_rep_ddoc_exists(DbName), - Server. +process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) -> + ok; -db_change(DbName, {ChangeProps} = Change, Server) -> - couch_stats:increment_counter([couch_replicator, docs, db_changes]), - try - ok = process_change(DbName, Change) +process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) -> + Id = docs_job_id(DbName, Doc#doc.id), + ok = remove_replication_by_doc_job_id(Db, Id); + +process_change(#{name := DbName} = Db, #doc{} = Doc) -> + #doc{id = DocId, body = {Props} = Body} = Doc, + {Rep, RepError} = try + Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body), + Rep1 = Rep0#{ + <<"db_name">> => DbName, + <<"start_time">> => erlang:system_time() + }, + {Rep1, null} catch - _Tag:Error -> - {RepProps} = get_json_value(doc, ChangeProps), - DocId = get_json_value(<<"_id">>, RepProps), - couch_replicator_docs:update_failed(DbName, DocId, Error) + throw:{bad_rep_doc, Reason} -> + {null, couch_replicator_utils:rep_error_to_binary(Reason)} end, - Server. - - --spec get_worker_ref(db_doc_id()) -> reference() | nil. -get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> - case ets:lookup(?MODULE, {DbName, DocId}) of - [#rdoc{worker = WRef}] when is_reference(WRef) -> - WRef; - [#rdoc{worker = nil}] -> - nil; - [] -> - nil + % We keep track of the doc's state in order to clear it if update_docs + % is toggled from true to false + DocState = get_json_value(<<"_replication_state">>, Props, null), + case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of + {error, not_found} -> + update_replication_job(Db, DbName, DocId, Rep, RepError, DocState); + {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}} + when Rep =:= null -> + % Same error as before occurred, don't bother updating the job + ok; + {ok, #{<<"rep">> := null}} when Rep =:= null -> + % Error occured but it's a different error. Update the job so user + % sees the new error + update_replication_job(Db, DbName, DocId, Rep, RepError, DocState); + {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} -> + NormOldRep = couch_replicator_util:normalize_rep(OldRep), + NormRep = couch_replicator_util:normalize_rep(Rep), + case NormOldRep == NormRep of + true -> + % Document was changed but none of the parameters relevent + % for the replication job have changed, so make it a no-op + ok; + false -> + update_replication_job(Db, DbName, DocId, Rep, RepError, + DocState) + end end. -% Cluster membership change notification callback --spec notify_cluster_event(pid(), {cluster, any()}) -> ok. -notify_cluster_event(Server, {cluster, _} = Event) -> - gen_server:cast(Server, Event). - - -process_change(DbName, {Change}) -> - {RepProps} = JsonRepDoc = get_json_value(doc, Change), - DocId = get_json_value(<<"_id">>, RepProps), - Owner = couch_replicator_clustering:owner(DbName, DocId), - Id = {DbName, DocId}, - case {Owner, get_json_value(deleted, Change, false)} of - {_, true} -> - ok = gen_server:call(?MODULE, {removed, Id}, infinity); - {unstable, false} -> - couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]); - {ThisNode, false} when ThisNode =:= node() -> - case get_json_value(<<"_replication_state">>, RepProps) of - undefined -> - ok = process_updated(Id, JsonRepDoc); - <<"triggered">> -> - maybe_remove_state_fields(DbName, DocId), - ok = process_updated(Id, JsonRepDoc); - <<"completed">> -> - ok = gen_server:call(?MODULE, {completed, Id}, infinity); - <<"error">> -> - % Handle replications started from older versions of replicator - % which wrote transient errors to replication docs - maybe_remove_state_fields(DbName, DocId), - ok = process_updated(Id, JsonRepDoc); - <<"failed">> -> - ok - end; - {Owner, false} -> - ok +rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) -> + #{ + <<"rep_parse_error">> := Error, + <<"db_name">> := DbName, + <<"doc_id">> := DocId, + } = JobData, + JobData1 = JobData#{ + <<"finished_state">> := <<"failed">>, + <<"finished_result">> := Error + } + case couch_jobs:finish(undefined, Job, JobData1) of + ok -> + couch_replicator_docs:update_failed(DbName, DocId, Error), + ok; + {error, JobError} -> + Msg = "Replication ~s job could not finish. JobError:~p", + couch_log:error(Msg, [RepId, JobError]), + {error, JobError} + end; + +rep_docs_job_execute(#{} = Job, #{} = JobData) -> + #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData, + case lists:member(DocState, [<<"triggered">>, <<"error">>]) of + true -> maybe_remove_state_fields(DbName, DocId), + false -> ok end, - ok. + % completed jobs should finish right away + + % otherwise start computing the replication id + + Rep1 = update_replication_id(Rep), + + % when done add or update the replicaton job + % if jobs has a filter keep checking if filter changes maybe_remove_state_fields(DbName, DocId) -> @@ -203,8 +215,6 @@ start_link() -> init([]) -> ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id}, {read_concurrency, true}, {write_concurrency, true}]), - couch_replicator_clustering:link_cluster_event_listener(?MODULE, - notify_cluster_event, [self()]), {ok, nil}. @@ -228,15 +238,6 @@ handle_call({clean_up_replications, DbName}, _From, State) -> ok = removed_db(DbName), {reply, ok, State}. -handle_cast({cluster, unstable}, State) -> - % Ignoring unstable state transition - {noreply, State}; - -handle_cast({cluster, stable}, State) -> - % Membership changed recheck all the replication document ownership - nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE), - {noreply, State}; - handle_cast(Msg, State) -> {stop, {error, unexpected_message, Msg}, State}. @@ -591,21 +592,57 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) -> lists:member(State, States). --spec cluster_membership_foldl(#rdoc{}, nil) -> nil. -cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) -> - case couch_replicator_clustering:owner(DbName, DocId) of - unstable -> - nil; - ThisNode when ThisNode =:= node() -> - nil; - OtherNode -> - Msg = "Replication doc ~p:~p with id ~p usurped by node ~p", - couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]), - removed_doc(Id), - nil +-spec update_replication(any(), binary(), binary(), #{} | null, + binary() | null, binary() | null) -> ok. +update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) -> + JobId = docs_job_id(DbName, DocId), + ok = remove_replication_by_doc_job_id(Db, JobId), + RepDocsJob = #{ + <<"rep_id">> := null, + <<"db_name">> := DbName, + <<"doc_id">> := DocId, + <<"rep">> := Rep, + <<"rep_parse_error">> := RepParseError, + <<"doc_state">> := DocState + }, + ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob). + + +docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) -> + <<DbName/binary, "|", Id/binary>>. + + +-spec remove_replication_by_doc_job_id(Tx, Id) -> ok. +remove_replication_by_doc_job_id(Tx, Id) -> + case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of + {error, not_found} -> + ok; + {ok, #{<<"rep_id">> := null}} -> + couch_jobs:remove(Tx, ?REP_DOCS, Id), + ok; + {ok, #{<<"rep_id">> := RepId}} -> + couch_jobs:remove(Tx, ?REP_JOBS, RepId), + couch_jobs:remove(Tx, ?REP_DOCS, Id), + ok end. +-spec remove_replications_by_dbname(DbName) -> ok. +remove_replications_by_dbname(DbName) -> + DbNameSize = byte_size(DbName), + Filter = fun + (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true; + (_) -> false + end, + JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter), + % Batch these into smaller transactions eventually... + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + maps:map(fun(Id, _) -> + remove_replication_by_doc_job_id(JTx, Id) + end, JobsMap) + end). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -634,8 +671,7 @@ doc_processor_test_() -> t_failed_change(), t_change_for_different_node(), t_change_when_cluster_unstable(), - t_ejson_docs(), - t_cluster_membership_foldl() + t_ejson_docs() ] }. @@ -787,21 +823,6 @@ t_ejson_docs() -> end). -% Check that when cluster membership changes records from doc processor and job -% scheduler get removed -t_cluster_membership_foldl() -> - ?_test(begin - mock_existing_jobs_lookup([test_rep(?R1)]), - ?assertEqual(ok, process_change(?DB, change())), - meck:expect(couch_replicator_clustering, owner, 2, different_node), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - gen_server:cast(?MODULE, {cluster, stable}), - meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000), - ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(removed_job(?R1)) - end). - - get_worker_ref_test_() -> { setup, diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index c07caa1aa..cdc6a106b 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -22,12 +22,11 @@ after_doc_read/2, ensure_rep_db_exists/0, ensure_rep_ddoc_exists/1, - ensure_cluster_rep_ddoc_exists/1, remove_state_fields/2, update_doc_completed/3, update_failed/3, update_rep_id/1, - update_triggered/2, + update_triggered/3, update_error/2 ]). @@ -57,6 +56,23 @@ -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). +-define(DEFAULT_SOCK_OPTS, "[{keepalive, true}, {nodelay, false}]"). +-define(VALID_SOCK_OPTS, [buffer, delay_send, exit_on_close, ipv6_v6only, + keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf, + priority, tos, tclass +]). + +-define(CONFIG_DEFAULTS, [ + {"worker_processes", "4", fun list_to_integer/1}, + {"worker_batch_size", "500", fun list_to_integer/1}, + {"http_connections", "20", fun list_to_integer/1}, + {"connection_timeout", "30000", fun list_to_integer/1}, + {"retries_per_request", "5", fun list_to_integer/1}, + {"use_checkpoints", "true", fun list_to_existing_atom/1}, + {"checkpoint_interval", "30000", fun list_to_integer/1}, + {"socket_options", ?DEFAULT_SOCK_OPTS, fun parse_sock_opts/1} +]). + remove_state_fields(DbName, DocId) -> update_rep_doc(DbName, DocId, [ @@ -90,28 +106,27 @@ update_failed(DbName, DocId, Error) -> failed_state_updates]). --spec update_triggered(#rep{}, rep_id()) -> ok. -update_triggered(Rep, {Base, Ext}) -> - #rep{ - db_name = DbName, - doc_id = DocId - } = Rep, +-spec update_triggered(binary(), binary(), binary()) -> ok. +update_triggered(Id, DocId, DbName) -> update_rep_doc(DbName, DocId, [ {<<"_replication_state">>, <<"triggered">>}, {<<"_replication_state_reason">>, undefined}, - {<<"_replication_id">>, iolist_to_binary([Base, Ext])}, + {<<"_replication_id">>, Id}, {<<"_replication_stats">>, undefined}]), ok. --spec update_error(#rep{}, any()) -> ok. -update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> +-spec update_error(#{}, any()) -> ok. +update_error(#rep{} = Rep, Error) -> + #{ + <<"id">> := RepId0, + <<"db_name">> := DbName, + <<"doc_id">> := DocId, + } = Rep, Reason = error_reason(Error), - BinRepId = case RepId of - {Base, Ext} -> - iolist_to_binary([Base, Ext]); - _Other -> - null + RepId = case RepId0 of + Id when is_binary(Id) -> Id; + _Other -> null end, update_rep_doc(DbName, DocId, [ {<<"_replication_state">>, <<"error">>}, @@ -121,34 +136,22 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> ok. --spec ensure_rep_db_exists() -> {ok, Db::any()}. +-spec ensure_rep_db_exists() -> ok. ensure_rep_db_exists() -> - Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, - nologifmissing]) of - {ok, Db0} -> - Db0; - _Error -> - {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]), - Db0 - end, - ok = ensure_rep_ddoc_exists(?REP_DB_NAME), - {ok, Db}. - - --spec ensure_rep_ddoc_exists(binary()) -> ok. -ensure_rep_ddoc_exists(RepDb) -> - case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of - true -> - ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); - false -> + Opts = [?CTX, sys_db, nologifmissing], + case fabric2_db:create(?REP_DB_NAME, Opts) of + {error, file_exists} -> + ok; + {ok, _Db} -> ok end. --spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. -ensure_rep_ddoc_exists(RepDb, DDocId) -> +-spec ensure_rep_ddoc_exists(binary()) -> ok. +ensure_rep_ddoc_exists(RepDb) -> + DDocId = ?REP_DESIGN_DOC, case open_rep_doc(RepDb, DDocId) of - {not_found, no_db_file} -> + {not_found, database_does_not_exist} -> %% database was deleted. ok; {not_found, _Reason} -> @@ -179,13 +182,6 @@ ensure_rep_ddoc_exists(RepDb, DDocId) -> ok. --spec ensure_cluster_rep_ddoc_exists(binary()) -> ok. -ensure_cluster_rep_ddoc_exists(RepDb) -> - DDocId = ?REP_DESIGN_DOC, - [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId), - ensure_rep_ddoc_exists(DbShard, DDocId). - - -spec compare_ejson({[_]}, {[_]}) -> boolean(). compare_ejson(EJson1, EJson2) -> EjsonSorted1 = couch_replicator_filters:ejsort(EJson1), @@ -202,31 +198,10 @@ replication_design_doc_props(DDocId) -> ]. -% Note: parse_rep_doc can handle filtered replications. During parsing of the -% replication doc it will make possibly remote http requests to the source -% database. If failure or parsing of filter docs fails, parse_doc throws a -% {filter_fetch_error, Error} excation. This exception should be considered -% transient in respect to the contents of the document itself, since it depends -% on netowrk availability of the source db and other factors. --spec parse_rep_doc({[_]}) -> #rep{}. -parse_rep_doc(RepDoc) -> - {ok, Rep} = try - parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) - catch - throw:{error, Reason} -> - throw({bad_rep_doc, Reason}); - throw:{filter_fetch_error, Reason} -> - throw({filter_fetch_error, Reason}); - Tag:Err -> - throw({bad_rep_doc, to_binary({Tag, Err})}) - end, - Rep. - - --spec parse_rep_doc_without_id({[_]}) -> #rep{}. +-spec parse_rep_doc_without_id({[_]}) -> #{}. parse_rep_doc_without_id(RepDoc) -> {ok, Rep} = try - parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc)) + parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc)) catch throw:{error, Reason} -> throw({bad_rep_doc, Reason}); @@ -236,11 +211,12 @@ parse_rep_doc_without_id(RepDoc) -> Rep. --spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}. -parse_rep_doc(Doc, UserCtx) -> - {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx), - Cancel = get_value(cancel, Rep#rep.options, false), - Id = get_value(id, Rep#rep.options, nil), +-spec parse_rep_doc({[_]}, user_name()) -> {ok, #{}}. +parse_rep_doc({[_]} = Doc, UserName) -> + {ok, Rep} = parse_rep_doc_without_id(Doc, UserName), + #{<<"options">> := Options} = Rep, + Cancel = maps:get(<<"cancel">>, Options, false), + Id = maps:get(<<"id">>, Options, nil), case {Cancel, Id} of {true, nil} -> % Cancel request with no id, must parse id out of body contents @@ -254,38 +230,43 @@ parse_rep_doc(Doc, UserCtx) -> end. --spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}. -parse_rep_doc_without_id({Props}, UserCtx) -> - Proxy = get_value(<<"proxy">>, Props, <<>>), - Opts = make_options(Props), - case get_value(cancel, Opts, false) andalso - (get_value(id, Opts, nil) =/= nil) of +-spec parse_rep_doc_without_id({[_]} | #{}, user_name()) -> {ok, #{}}. +parse_rep_doc_without_id({[_]} = EJson, UserName) -> + % Normalize all field names to be binaries and turn into a map + Map = ?JSON_DECODE(?JSON_ENCODE(EJson)), + parse_rep_doc_without_id(Map, UserName); + +parse_rep_doc_without_id(#{} = Doc, UserName) -> + Proxy = parse_proxy_params(maps:get(<<"proxy">>, Doc, <<>>)), + Opts = make_options(Doc), + Cancel = maps:get(<<"cancel">>, Opts, false), + Id = maps:get(<<"id">>, Opts, nil), + case Cancel andalso Id =/= nil of true -> - {ok, #rep{options = Opts, user_ctx = UserCtx}}; + {ok, #{<<"options">> => Opts, <<"user">> => UserName}}; false -> - Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts), - Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts), + #{<<"source">> := Source0, <<"target">> := Target0} = Doc, + Source = parse_rep_db(Source0, Proxy, Opts), + Target = parse_rep_db(Target0, Proxy, Opts), {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of - {error, Error} -> - throw({bad_request, Error}); - Result -> - Result + {error, Error} -> throw({bad_request, Error}); + Result -> Result end, - Rep = #rep{ - source = Source, - target = Target, - options = Opts, - user_ctx = UserCtx, - type = Type, - view = View, - doc_id = get_value(<<"_id">>, Props, null) + Rep = #{ + <<"id">> => null, + <<"base_id">> => null, + <<"source">> => Source, + <<"target">> => Target, + <<"options">> => Opts, + <<"user">> => UserName, + <<"type">> => Type, + <<"view">> => View, + <<"doc_id">> => maps:get(<<"_id">>, Doc, null) }, % Check if can parse filter code, if not throw exception case couch_replicator_filters:parse(Opts) of - {error, FilterError} -> - throw({error, FilterError}); - {ok, _Filter} -> - ok + {error, FilterError} -> throw({error, FilterError}); + {ok, _Filter} -> ok end, {ok, Rep} end. @@ -295,9 +276,10 @@ parse_rep_doc_without_id({Props}, UserCtx) -> % fetching a filter from the source db, and so it could fail intermetently. % In case of a failure to fetch the filter this function will throw a % `{filter_fetch_error, Reason} exception. -update_rep_id(Rep) -> - RepId = couch_replicator_ids:replication_id(Rep), - Rep#rep{id = RepId}. +update_rep_id(#{} = Rep) -> + {BaseId, ExtId} = couch_replicator_ids:replication_id(Rep), + RepId = erlang:iolist_to_binary([BaseId, ExtId]), + Rep#{<<"id">> => RepId, <<"base_id">> = BaseId}. update_rep_doc(RepDbName, RepDocId, KVs) -> @@ -350,22 +332,21 @@ update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> open_rep_doc(DbName, DocId) -> - case couch_db:open_int(DbName, [?CTX, sys_db]) of - {ok, Db} -> - try - couch_db:open_doc(Db, DocId, [ejson_body]) - after - couch_db:close(Db) - end; - Else -> - Else + try + case fabric2_db:open(DbName, [?CTX, sys_db]) of + {ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]); + Else -> Else + end + catch + error:database_does_not_exist -> + {not_found, database_does_not_exist} end. save_rep_doc(DbName, Doc) -> - {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), + {ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db]), try - couch_db:update_doc(Db, Doc, []) + fabric2_db:update_doc(Db, Doc, []) catch % User can accidently write a VDU which prevents _replicator from % updating replication documents. Avoid crashing replicator and thus @@ -374,54 +355,56 @@ save_rep_doc(DbName, Doc) -> Msg = "~p VDU function preventing doc update to ~s ~s ~p", couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]), {ok, forbidden} - after - couch_db:close(Db) end. --spec rep_user_ctx({[_]}) -> #user_ctx{}. -rep_user_ctx({RepDoc}) -> +-spec rep_user_name({[_]}) -> binary() | null. +rep_user_name({RepDoc}) -> case get_json_value(<<"user_ctx">>, RepDoc) of - undefined -> - #user_ctx{}; - {UserCtx} -> - #user_ctx{ - name = get_json_value(<<"name">>, UserCtx, null), - roles = get_json_value(<<"roles">>, UserCtx, []) - } + undefined -> null; + {UserCtx} -> get_json_value(<<"name">>, UserCtx, null) end. --spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary(). -parse_rep_db({Props}, Proxy, Options) -> - ProxyParams = parse_proxy_params(Proxy), +-spec parse_rep_db(#{}, #{}, #{}) -> #{}. +parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) -> ProxyURL = case ProxyParams of - [] -> undefined; - _ -> binary_to_list(Proxy) + #{<<"proxy_url">> := PUrl} -> PUrl; + _ -> null end, - Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), - {AuthProps} = get_value(<<"auth">>, Props, {[]}), - {BinHeaders} = get_value(<<"headers">>, Props, {[]}), - Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), - DefaultHeaders = (#httpdb{})#httpdb.headers, - #httpdb{ - url = Url, - auth_props = AuthProps, - headers = lists:ukeymerge(1, Headers, DefaultHeaders), - ibrowse_options = lists:keysort(1, - [{socket_options, get_value(socket_options, Options)} | - ProxyParams ++ ssl_params(Url)]), - timeout = get_value(connection_timeout, Options), - http_connections = get_value(http_connections, Options), - retries = get_value(retries, Options), - proxy_url = ProxyURL - }; + + Url0 = maps:get(<<"url">>, Endpoint), + Url = maybe_add_trailing_slash(Url0), + + AuthProps = maps:get(<<"auth">>, Endpoint, #{}), + + Headers0 = maps:get(<<"headers">>, Endpoint, #{}), + DefaultHeaders = couch_replicator_utils:get_default_headers(), + % For same keys values in second map override those in the first + Headers = maps:merge(DefaultHeaders, Headers0), + + SockOpts = maps:get(<<"socket_options">>, Options, #{}), + SockAndProxy = maps:merge(SockOpts, ProxyParams), + + SslParams = ssl_params(Url), + + #{ + <<"url">> => Url, + <<"auth_props">> => AuthProps, + <<"headers">> => Headers, + <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy), + <<"timeout">> => maps:get(<<"timeout">>, Options), + <<"http_connections">> => maps:get(<<"http_connections">>, Options), + <<"retries">> => maps:get(<<"retries">>, Options) + <<"proxy_url">> => ProxyUrl + }. + parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); + parse_rep_db(#{<<"url">> => Url}, Proxy, Options); parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); + parse_rep_db(#{<<"url">> => Url}, Proxy, Options); parse_rep_db(<<_/binary>>, _Proxy, _Options) -> throw({error, <<"Local endpoints not supported since CouchDB 3.x">>}); @@ -430,118 +413,99 @@ parse_rep_db(undefined, _Proxy, _Options) -> throw({error, <<"Missing replicator database">>}). --spec maybe_add_trailing_slash(binary() | list()) -> list(). +-spec maybe_add_trailing_slash(binary()) -> binary(). +maybe_add_trailing_slash(<<>>) -> + <<>>; + maybe_add_trailing_slash(Url) when is_binary(Url) -> - maybe_add_trailing_slash(?b2l(Url)); -maybe_add_trailing_slash(Url) -> - case lists:member($?, Url) of - true -> - Url; % skip if there are query params - false -> - case lists:last(Url) of - $/ -> - Url; - _ -> - Url ++ "/" - end + case binary:match(Url, <<"?">>) of + nomatch -> + case binary:last(Url) of + $/ -> Url; + _ -> <<Url/binary, "/">>; + _ -> + Url % skip if there are query params end. --spec make_options([_]) -> [_]. -make_options(Props) -> - Options0 = lists:ukeysort(1, convert_options(Props)), +-spec make_options(#{}) -> #{}. +make_options(#{} = RepDoc) -> + Options0 = maps:fold(fun convert_options/3, #{}, RepDoc) Options = check_options(Options0), - DefWorkers = config:get("replicator", "worker_processes", "4"), - DefBatchSize = config:get("replicator", "worker_batch_size", "500"), - DefConns = config:get("replicator", "http_connections", "20"), - DefTimeout = config:get("replicator", "connection_timeout", "30000"), - DefRetries = config:get("replicator", "retries_per_request", "5"), - UseCheckpoints = config:get("replicator", "use_checkpoints", "true"), - DefCheckpointInterval = config:get("replicator", "checkpoint_interval", - "30000"), - {ok, DefSocketOptions} = couch_util:parse_term( - config:get("replicator", "socket_options", - "[{keepalive, true}, {nodelay, false}]")), - lists:ukeymerge(1, Options, lists:keysort(1, [ - {connection_timeout, list_to_integer(DefTimeout)}, - {retries, list_to_integer(DefRetries)}, - {http_connections, list_to_integer(DefConns)}, - {socket_options, DefSocketOptions}, - {worker_batch_size, list_to_integer(DefBatchSize)}, - {worker_processes, list_to_integer(DefWorkers)}, - {use_checkpoints, list_to_existing_atom(UseCheckpoints)}, - {checkpoint_interval, list_to_integer(DefCheckpointInterval)} - ])). - - --spec convert_options([_]) -> [_]. -convert_options([])-> - []; -convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)-> + ConfigOptions = lists:foldl(fun({K, Default, ConversionFun}, Acc) -> + V = ConversionFun(config:get("replicator", K, Default)), + Acc#{list_to_binary(K) => V} + end, #{}, ?CONFIG_DEFAULTS), + maps:merge(ConfigOptions, Options). + + +-spec convert_options(binary(), any(), #{}) -> #{}. +convert_options(<<"cancel">>, V, _Acc) when not is_boolean(V)-> throw({bad_request, <<"parameter `cancel` must be a boolean">>}); -convert_options([{<<"cancel">>, V} | R]) -> - [{cancel, V} | convert_options(R)]; -convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>; +convert_options(<<"cancel">>, V, Acc) -> + Acc#{<<"cancel">> => V}; +convert_options(IdOpt, V, Acc) when IdOpt =:= <<"_local_id">>; IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> - [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; -convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)-> + Acc#{<<"id">> => couch_replicator_ids:convert(V)}; +convert_options(<<"create_target">>, V, _Acc) when not is_boolean(V)-> throw({bad_request, <<"parameter `create_target` must be a boolean">>}); -convert_options([{<<"create_target">>, V} | R]) -> - [{create_target, V} | convert_options(R)]; -convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) -> +convert_options(<<"create_target">>, V, Acc) -> + Acc#{<<"create_target">> => V}; +convert_options(<<"create_target_params">>, V, _Acc) when not is_tuple(V) -> throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>}); -convert_options([{<<"create_target_params">>, V} | R]) -> - [{create_target_params, V} | convert_options(R)]; -convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)-> +convert_options(<<"create_target_params">>, V, Acc) -> + Acc#{<<"create_target_params">> => V}; +convert_options(<<"continuous">>, V, Acc) when not is_boolean(V)-> throw({bad_request, <<"parameter `continuous` must be a boolean">>}); -convert_options([{<<"continuous">>, V} | R]) -> - [{continuous, V} | convert_options(R)]; -convert_options([{<<"filter">>, V} | R]) -> - [{filter, V} | convert_options(R)]; -convert_options([{<<"query_params">>, V} | R]) -> - [{query_params, V} | convert_options(R)]; -convert_options([{<<"doc_ids">>, null} | R]) -> - convert_options(R); -convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> +convert_options(<<"continuous">>, V, Acc) -> + Acc#{<<"continuous">> => V}; +convert_options(<<"filter">>, V, Acc) -> + Acc#{<<"filter">> => V}; +convert_options(<<"query_params">>, V, Acc) -> + Acc#{<<"query_params">> => V}; +convert_options(<<"doc_ids">>, null, Acc) -> + Acc; +convert_options(<<"doc_ids">>, V, _Acc) when not is_list(V) -> throw({bad_request, <<"parameter `doc_ids` must be an array">>}); -convert_options([{<<"doc_ids">>, V} | R]) -> +convert_options(<<"doc_ids">>, V, Acc) -> % Ensure same behaviour as old replicator: accept a list of percent % encoded doc IDs. DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]), - [{doc_ids, DocIds} | convert_options(R)]; -convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> + Acc#{<<"doc_ids">> => DocIds}; +convert_options(<<"selector">>, V, _Acc) when not is_tuple(V) -> throw({bad_request, <<"parameter `selector` must be a JSON object">>}); -convert_options([{<<"selector">>, V} | R]) -> - [{selector, V} | convert_options(R)]; -convert_options([{<<"worker_processes">>, V} | R]) -> - [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"worker_batch_size">>, V} | R]) -> - [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"http_connections">>, V} | R]) -> - [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"connection_timeout">>, V} | R]) -> - [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"retries_per_request">>, V} | R]) -> - [{retries, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"socket_options">>, V} | R]) -> - {ok, SocketOptions} = couch_util:parse_term(V), - [{socket_options, SocketOptions} | convert_options(R)]; -convert_options([{<<"since_seq">>, V} | R]) -> - [{since_seq, V} | convert_options(R)]; -convert_options([{<<"use_checkpoints">>, V} | R]) -> - [{use_checkpoints, V} | convert_options(R)]; -convert_options([{<<"checkpoint_interval">>, V} | R]) -> - [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([_ | R]) -> % skip unknown option - convert_options(R). - - --spec check_options([_]) -> [_]. +convert_options(<<"selector">>, V, Acc) -> + Acc#{<<"selector">> => V}; +convert_options(<<"worker_processes">>, V, Acc) -> + Acc#{<<"worker_processes">> => couch_util:to_integer(V)}; +convert_options(<<"worker_batch_size">>, V, Acc) -> + Acc#{<<"worker_batch_size">> => couch_util:to_integer(V)}; +convert_options(<<"http_connections">>, V, Acc) -> + Acc#{<<"http_connections">> => couch_util:to_integer(V)}; +convert_options(<<"connection_timeout">>, V, Acc) -> + Acc#{<<"connection_timeout">> => couch_util:to_integer(V)}; +convert_options(<<"retries_per_request">>, V, Acc) -> + Acc#{<<"retries">> => couch_util:to_integer(V)}; +convert_options(<<"socket_options">>, V, Acc) -> + Acc#{<<"socket_options">> => parse_sock_opts(V)}; +convert_options(<<"since_seq">>, V, Acc) -> + Acc#{<<"since_seq">> => V}; +convert_options(<<"use_checkpoints">>, V, Acc) when not is_boolean(V)-> + throw({bad_request, <<"parameter `use_checkpoints` must be a boolean">>}); +convert_options(<<"use_checkpoints">>, V, Acc) -> + Acc#{<<"use_checkpoints">> => V}; +convert_options(<<"checkpoint_interval">>, V, Acc) -> + Acc#{<<"checkpoint_interval">>, couch_util:to_integer(V)}; +convert_options(_K, _V, Acc) -> % skip unknown option + Acc. + + +-spec check_options(#{}) -> #{}. check_options(Options) -> - DocIds = lists:keyfind(doc_ids, 1, Options), - Filter = lists:keyfind(filter, 1, Options), - Selector = lists:keyfind(selector, 1, Options), + DocIds = maps:is_key(<<"doc_ids">>, Options), + Filter = maps:is_key(<<"filter">>, Options), + Selector = maps:is_key(<<"selector">>, Options), case {DocIds, Filter, Selector} of {false, false, false} -> Options; {false, false, _} -> Options; @@ -553,66 +517,113 @@ check_options(Options) -> end. --spec parse_proxy_params(binary() | [_]) -> [_]. -parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> - parse_proxy_params(?b2l(ProxyUrl)); -parse_proxy_params([]) -> - []; -parse_proxy_params(ProxyUrl) -> +parse_sock_opts(V) -> + {ok, SocketOptions} = couch_util:parse_term(V), + lists:foldl(fun + ({K, V}, Acc) when is_atom(K) -> + case lists:member(K, ?VALID_SOCKET_OPTIONS) of + true -> Acc#{atom_to_binary(K) => V}; + false -> Acc + end; + (_, Acc) -> + Acc + end, #{}, SocketOptions). + + +-spec parse_proxy_params(binary() | #{}) -> #{}. +parse_proxy_params(<<>>) -> + #{}; +parse_proxy_params(ProxyUrl0) when is_binary(ProxyUrl0)-> + ProxyUrl = binary_to_list(ProxyUrl0), #url{ host = Host, port = Port, username = User, password = Passwd, - protocol = Protocol + protocol = Protocol0 } = ibrowse_lib:parse_url(ProxyUrl), - [ - {proxy_protocol, Protocol}, - {proxy_host, Host}, - {proxy_port, Port} - ] ++ case is_list(User) andalso is_list(Passwd) of + Protocol = atom_to_binary(Protocol, utf8), + case lists:member(Protocol, [<<"http">>, <<"https">>, <<"socks5">>]) of + true -> + atom_to_binary(Protocol, utf8); false -> - []; + Error = <<"Unsupported proxy protocol", Protocol/binary>>, + throw({bad_request, Error}) + end, + ProxyParams = #{ + <<"proxy_url">> => ProxyUrl, + <<"proxy_protocol">> => Protocol, + <<"proxy_host">> => list_to_binary(Host), + <<"proxy_port">> => Port + #}, + case is_list(User) andalso is_list(Passwd) of true -> - [{proxy_user, User}, {proxy_password, Passwd}] - end. + ProxyParams#{ + <<"proxy_user">> => list_to_binary(User), + <<"proxy_password">> => list_to_binary(Passwd) + }; + false -> + ProxyParams + end. --spec ssl_params([_]) -> [_]. +-spec ssl_params(binary()) -> #{}. ssl_params(Url) -> - case ibrowse_lib:parse_url(Url) of + case ibrowse_lib:parse_url(binary_to_list(Url)) of #url{protocol = https} -> Depth = list_to_integer( config:get("replicator", "ssl_certificate_max_depth", "3") ), VerifyCerts = config:get("replicator", "verify_ssl_certificates"), - CertFile = config:get("replicator", "cert_file", undefined), - KeyFile = config:get("replicator", "key_file", undefined), - Password = config:get("replicator", "password", undefined), - SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")], - SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of + CertFile = config:get("replicator", "cert_file", null), + KeyFile = config:get("replicator", "key_file", null), + Password = config:get("replicator", "password", null), + VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"), + SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}), + SslOpts1 = case CertFile /= null andalso KeyFile /= null of true -> - case Password of - undefined -> - [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts; + CertFileOpts = case Password of + null -> + #{ + <<"certfile">> => list_to_binary(CertFile), + <<"keyfile">> => list_to_binary(KeyFile) + }; _ -> - [{certfile, CertFile}, {keyfile, KeyFile}, - {password, Password}] ++ SslOpts - end; - false -> SslOpts + #{ + <<"certfile">> => list_to_binary(CertFile), + <<"keyfile">> => list_to_binary(KeyFile), + <<"password">> => list_to_binary(Password) + } + end, + maps:merge(SslOpts, CertFileOpts) + false -> + SslOpts end, - [{is_ssl, true}, {ssl_options, SslOpts1}]; + #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1}; #url{protocol = http} -> - [] + #{} end. -spec ssl_verify_options(true | false) -> [_]. ssl_verify_options(true) -> - CAFile = config:get("replicator", "ssl_trusted_certificates_file"), - [{verify, verify_peer}, {cacertfile, CAFile}]; + case config:get("replicator", "ssl_trusted_certificates_file", undefined) of + undefined -> + #{ + <<"verify">> => <<"verify_peer">>, + <<"cacertfile">> => null + }; + CAFile when is_list(CAFile) -> + #{ + <<"verify">> => <<"verify_peer">>, + <<"cacertfile">> => list_to_binary(CAFile) + } + end; + ssl_verify_options(false) -> - [{verify, verify_none}]. + #{ + <<"verify">> => <<"verify_none">> + }. -spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}. @@ -622,7 +633,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> #user_ctx{ roles = Roles, name = Name - } = couch_db:get_user_ctx(Db), + } = fabric2_db:get_user_ctx(Db), case lists:member(<<"_replicator">>, Roles) of true -> Doc; @@ -633,7 +644,7 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> Name -> Doc; Other -> - case (catch couch_db:check_is_admin(Db)) of + case (catch fabric2_db:check_is_admin(Db)) of ok when Other =:= null -> Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; ok -> @@ -650,8 +661,8 @@ before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) -> after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> Doc; after_doc_read(#doc{body = {Body}} = Doc, Db) -> - #user_ctx{name = Name} = couch_db:get_user_ctx(Db), - case (catch couch_db:check_is_admin(Db)) of + #user_ctx{name = Name} = fabric2_db:get_user_ctx(Db), + case (catch fabric2_db:check_is_admin(Db)) of ok -> Doc; _ -> @@ -659,16 +670,15 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) -> Name -> Doc; _Other -> - Source = strip_credentials(couch_util:get_value(<<"source">>, -Body)), - Target = strip_credentials(couch_util:get_value(<<"target">>, -Body)), + Source0 = couch_util:get_value(<<"source">>, Body), + Target0 = couch_util:get_value(<<"target">>, Body), + Source = strip_credentials(Source0), + Target = strip_credentials(Target0), NewBody0 = ?replace(Body, <<"source">>, Source), NewBody = ?replace(NewBody0, <<"target">>, Target), #doc{revs = {Pos, [_ | Revs]}} = Doc, NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}}, - NewRevId = couch_db:new_revid(NewDoc), - NewDoc#doc{revs = {Pos, [NewRevId | Revs]}} + fabric2_db:new_revid(NewDoc) end end. @@ -779,27 +789,24 @@ check_strip_credentials_test() -> setup() -> DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - create_vdu(DbName), + {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]), + create_vdu(Db), DbName. teardown(DbName) when is_binary(DbName) -> - couch_server:delete(DbName, [?ADMIN_CTX]), + fabric2_db:delete(DbName, [?ADMIN_CTX]), ok. -create_vdu(DbName) -> - couch_util:with_db(DbName, fun(Db) -> - VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>, - Doc = #doc{ - id = <<"_design/vdu">>, - body = {[{<<"validate_doc_update">>, VduFun}]} - }, - {ok, _} = couch_db:update_docs(Db, [Doc]), - couch_db:ensure_full_commit(Db) - end). +create_vdu(Db) -> + VduFun = <<"function(newdoc, olddoc, userctx) {throw({'forbidden':'fail'})}">>, + Doc = #doc{ + id = <<"_design/vdu">>, + body = {[{<<"validate_doc_update">>, VduFun}]} + }, + {ok, _} = fabric2_db:update_doc(Db, [Doc]), + ok. update_replicator_doc_with_bad_vdu_test_() -> diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl index c8980001a..b14ea3475 100644 --- a/src/couch_replicator/src/couch_replicator_filters.erl +++ b/src/couch_replicator/src/couch_replicator_filters.erl @@ -88,22 +88,24 @@ fetch(DDocName, FilterName, Source) -> % Get replication type and view (if any) from replication document props --spec view_type([_], [_]) -> - {view, {binary(), binary()}} | {db, nil} | {error, binary()}. -view_type(Props, Options) -> - case couch_util:get_value(<<"filter">>, Props) of - <<"_view">> -> - {QP} = couch_util:get_value(query_params, Options, {[]}), - ViewParam = couch_util:get_value(<<"view">>, QP), - case re:split(ViewParam, <<"/">>) of - [DName, ViewName] -> - {view, {<< "_design/", DName/binary >>, ViewName}}; - _ -> - {error, <<"Invalid `view` parameter.">>} - end; +-spec view_type(#{}, [_]) -> + {binary(), #{}} | {error, binary()}. +view_type(#{<<"filter">> := <<"_view">>}, Options) -> + {QP} = couch_util:get_value(query_params, Options, {[]}), + ViewParam = couch_util:get_value(<<"view">>, QP), + case re:split(ViewParam, <<"/">>) of + [DName, ViewName] -> + DDocMap = #{ + <<"ddoc">> => <<"_design/",DName/binary>>, + <<"view">> => ViewName + }, + {<<"view">>, DDocMap}; _ -> - {db, nil} - end. + {error, <<"Invalid `view` parameter.">>} + end; + +view_type(#{}, [_] = Options) -> + {<<"db">>, #{}}. % Private functions diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl index 04e71c3ef..a3f622046 100644 --- a/src/couch_replicator/src/couch_replicator_ids.erl +++ b/src/couch_replicator/src/couch_replicator_ids.erl @@ -30,28 +30,29 @@ % {filter_fetch_error, Error} exception. % -replication_id(#rep{options = Options} = Rep) -> +replication_id(#{<<"options">> := Options} = Rep) -> BaseId = replication_id(Rep, ?REP_ID_VERSION), - {BaseId, maybe_append_options([continuous, create_target], Options)}. + UseOpts = [<<"continuous">>, <<"create_target">>] + {BaseId, maybe_append_options(UseOpts, Options)}. % Versioned clauses for generating replication IDs. % If a change is made to how replications are identified, % please add a new clause and increase ?REP_ID_VERSION. -replication_id(#rep{} = Rep, 4) -> +replication_id(#{<<"source">> := Src, <<"target">> := Tgt} = Rep, 4) -> UUID = couch_server:get_uuid(), - SrcInfo = get_v4_endpoint(Rep#rep.source), - TgtInfo = get_v4_endpoint(Rep#rep.target), + SrcInfo = get_v4_endpoint(Src), + TgtInfo = get_v4_endpoint(Tgt), maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep); -replication_id(#rep{} = Rep, 3) -> +replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 3) -> UUID = couch_server:get_uuid(), - Src = get_rep_endpoint(Rep#rep.source), - Tgt = get_rep_endpoint(Rep#rep.target), + Src = get_rep_endpoint(Src0), + Tgt = get_rep_endpoint(Tgt0), maybe_append_filters([UUID, Src, Tgt], Rep); -replication_id(#rep{} = Rep, 2) -> +replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 2) -> {ok, HostName} = inet:gethostname(), Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of P when is_number(P) -> @@ -64,14 +65,14 @@ replication_id(#rep{} = Rep, 2) -> % ... mochiweb_socket_server:get(https, port) list_to_integer(config:get("httpd", "port", "5984")) end, - Src = get_rep_endpoint(Rep#rep.source), - Tgt = get_rep_endpoint(Rep#rep.target), + Src = get_rep_endpoint(Src0), + Tgt = get_rep_endpoint(Tgt0), maybe_append_filters([HostName, Port, Src, Tgt], Rep); -replication_id(#rep{} = Rep, 1) -> +replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 1) -> {ok, HostName} = inet:gethostname(), - Src = get_rep_endpoint(Rep#rep.source), - Tgt = get_rep_endpoint(Rep#rep.target), + Src = get_rep_endpoint(Src0), + Tgt = get_rep_endpoint(Tgt0), maybe_append_filters([HostName, Src, Tgt], Rep). @@ -83,15 +84,23 @@ convert(Id0) when is_binary(Id0) -> % the URL path. So undo the incorrect parsing here to avoid forcing % users to url encode + characters. Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]), - lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id)); -convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) -> + case binary:split(Id, <<"+">>) of + [BaseId, Ext] -> {BaseId, Ext}; + [BaseId] -> {BaseId, <<>>} + end +convert({BaseId, Ext}) when is_list(BaseId), is_list(Ext) -> + {list_to_binary(BaseId), list_to_binary(Ext)}; +convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) -> Id. % Private functions -maybe_append_filters(Base, - #rep{source = Source, options = Options}) -> +maybe_append_filters(Base, #{} = Rep) -> + #{ + <<"source">> := Source, + <<"options">> := Options + } = Rep, Base2 = Base ++ case couch_replicator_filters:parse(Options) of {ok, nil} -> @@ -112,7 +121,8 @@ maybe_append_filters(Base, {error, FilterParseError} -> throw({error, FilterParseError}) end, - couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))). + Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))), + list_to_binary(Res). maybe_append_options(Options, RepOptions) -> @@ -127,12 +137,19 @@ maybe_append_options(Options, RepOptions) -> end, [], Options). -get_rep_endpoint(#httpdb{url=Url, headers=Headers}) -> +get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) -> + Url = binary_to_list(Url0), + % We turn headers into a proplist of string() KVs to calculate + % the same replication ID as CouchDB 2.x + Headers1 = maps:fold(fun(K, V, Acc) -> + [{binary_to_list(K), binary_to_list(V)} | Acc] + end, [], Header0), + Headers2 = lists:keysort(1, Headers1), DefaultHeaders = (#httpdb{})#httpdb.headers, - {remote, Url, Headers -- DefaultHeaders}. + {remote, Url, Headers2 -- DefaultHeaders}. -get_v4_endpoint(#httpdb{} = HttpDb) -> +get_v4_endpoint(#{} = HttpDb) -> {remote, Url, Headers} = get_rep_endpoint(HttpDb), {{UserFromHeaders, _}, HeadersWithoutBasicAuth} = couch_replicator_utils:remove_basic_auth_from_headers(Headers), @@ -141,7 +158,6 @@ get_v4_endpoint(#httpdb{} = HttpDb) -> OAuth = undefined, % Keep this to ensure checkpoints don't change {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}. - pick_defined_value(Values) -> case [V || V <- Values, V /= undefined] of [] -> diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl deleted file mode 100644 index f7640a349..000000000 --- a/src/couch_replicator/src/couch_replicator_notifier.erl +++ /dev/null @@ -1,58 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_notifier). - --behaviour(gen_event). --vsn(1). - -% public API --export([start_link/1, stop/1, notify/1]). - -% gen_event callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_event/2, handle_call/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). - -start_link(FunAcc) -> - couch_event_sup:start_link(couch_replication, - {couch_replicator_notifier, make_ref()}, FunAcc). - -notify(Event) -> - gen_event:notify(couch_replication, Event). - -stop(Pid) -> - couch_event_sup:stop(Pid). - - -init(FunAcc) -> - {ok, FunAcc}. - -terminate(_Reason, _State) -> - ok. - -handle_event(Event, Fun) when is_function(Fun, 1) -> - Fun(Event), - {ok, Fun}; -handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) -> - Acc2 = Fun(Event, Acc), - {ok, {Fun, Acc2}}. - -handle_call(_Msg, State) -> - {ok, ok, State}. - -handle_info(_Msg, State) -> - {ok, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 565a2bd97..e8ddc8443 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -15,7 +15,7 @@ -behaviour(gen_server). -export([ - start_link/1 + start_link/3 ]). -export([ @@ -39,17 +39,16 @@ to_binary/1 ]). --import(couch_replicator_utils, [ - pp_rep_id/1 -]). - -define(LOWEST_SEQ, 0). -define(DEFAULT_CHECKPOINT_INTERVAL, 30000). -define(STARTUP_JITTER_DEFAULT, 5000). -record(rep_state, { - rep_details, + job, + job_data, + id, + base_id, source_name, target_name, source, @@ -73,37 +72,36 @@ workers, stats = couch_replicator_stats:new(), session_id, - source_monitor = nil, - target_monitor = nil, source_seq = nil, use_checkpoints = true, checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, type = db, - view = nil + view = nil, + user = null, + options = #{} }). -start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> - RepChildId = BaseId ++ Ext, - Source = couch_replicator_api_wrap:db_uri(Src), - Target = couch_replicator_api_wrap:db_uri(Tgt), - ServerName = {global, {?MODULE, Rep#rep.id}}, - - case gen_server:start_link(ServerName, ?MODULE, Rep, []) of +start_link(#{] = Job, #{} = JobData) -> + case gen_server:start_link(?MODULE, {Job, JobData}, []) of {ok, Pid} -> {ok, Pid}; {error, Reason} -> - couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)", - [RepChildId, Source, Target]), + #{<<"rep">> := Rep} = JobData, + {<<"id">> := Id, <<"source">> := Src, <<"target">> := Ttg} = Rep, + Source = couch_replicator_api_wrap:db_uri(Src), + Target = couch_replicator_api_wrap:db_uri(Tgt), + ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)", + couch_log:warning(ErrMsg, [RepId, Source, Target]), {error, Reason} end. -init(InitArgs) -> - {ok, InitArgs, 0}. +init({#{} = Job, #{} = JobData}) -> + {ok, {Job, JobData}, 0}. -do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> +do_init(#{} = Job, #{} = JobData) -> process_flag(trap_exit, true), timer:sleep(startup_jitter()), @@ -115,8 +113,12 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> target_name = TargetName, start_seq = {_Ts, StartSeq}, highest_seq_done = {_, HighestSeq}, - checkpoint_interval = CheckpointInterval - } = State = init_state(Rep), + checkpoint_interval = CheckpointInterval, + user = User, + options = Options, + doc_id = DocId, + db_name = DbName + } = State = init_state(Job, JobData), NumWorkers = get_value(worker_processes, Options), BatchSize = get_value(worker_batch_size, Options), @@ -147,10 +149,10 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> couch_task_status:add_task([ {type, replication}, - {user, UserCtx#user_ctx.name}, - {replication_id, ?l2b(BaseId ++ Ext)}, - {database, Rep#rep.db_name}, - {doc_id, Rep#rep.doc_id}, + {user, User}, + {replication_id, State#rep_state.id}, + {database, DbName}, + {doc_id, DocId}, {source, ?l2b(SourceName)}, {target, ?l2b(TargetName)}, {continuous, get_value(continuous, Options, false)}, @@ -159,16 +161,6 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> ] ++ rep_stats(State)), couch_task_status:set_update_frequency(1000), - % Until OTP R14B03: - % - % Restarting a temporary supervised child implies that the original arguments - % (#rep{} record) specified in the MFA component of the supervisor - % child spec will always be used whenever the child is restarted. - % This implies the same replication performance tunning parameters will - % always be used. The solution is to delete the child spec (see - % cancel_replication/1) and then start the replication again, but this is - % unfortunately not immune to race conditions. - log_replication_start(State), couch_log:debug("Worker pids are: ~p", [Workers]), @@ -222,7 +214,6 @@ handle_call({report_seq_done, Seq, StatsInc}, From, update_task(NewState), {noreply, NewState}. - handle_cast(checkpoint, State) -> case do_checkpoint(State) of {ok, NewState} -> @@ -242,14 +233,6 @@ handle_cast({report_seq, Seq}, handle_info(shutdown, St) -> {stop, shutdown, St}; -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) -> - couch_log:error("Source database is down. Reason: ~p", [Why]), - {stop, source_db_down, St}; - -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) -> - couch_log:error("Target database is down. Reason: ~p", [Why]), - {stop, target_db_down, St}; - handle_info({'EXIT', Pid, max_backoff}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; @@ -308,9 +291,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> {stop, {worker_died, Pid, Reason}, State2} end; -handle_info(timeout, InitArgs) -> - try do_init(InitArgs) of {ok, State} -> - {noreply, State} +handle_info(timeout, {#{} = Job, #{} = JobData} = InitArgs) -> + try do_init(Job, JobData) of + {ok, State} -> + {noreply, State} catch exit:{http_request_failed, _, _, max_backoff} -> {stop, {shutdown, max_backoff}, {error, InitArgs}}; @@ -325,13 +309,12 @@ handle_info(timeout, InitArgs) -> end. -terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep, - checkpoint_history = CheckpointHistory} = State) -> - terminate_cleanup(State), - couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}), - doc_update_completed(Rep, rep_stats(State)); +terminate(normal, #rep_state{} = State) -> + % Note: when terminating `normal`, the job was already marked as finished. + % if that fails then we'd end up in the error terminate clause + terminate_cleanup(State). -terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> +terminate(shutdown, #rep_state{id = RepId} = State) -> % Replication stopped via _scheduler_sup:terminate_child/1, which can be % occur during regular scheduler operation or when job is removed from % the scheduler. @@ -343,53 +326,57 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> couch_log:error(LogMsg, [?MODULE, RepId, Error]), State end, - couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), + finish_couch_job(State1, <<"stopped">>, null), terminate_cleanup(State1); -terminate({shutdown, max_backoff}, {error, InitArgs}) -> - #rep{id = {BaseId, Ext} = RepId} = InitArgs, +terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) -> + % Here we handle the case when replication fails during initialization. + % That is before the #rep_state{} is even built. + #{<<"rep">> := #{<<"id">> := RepId}} = JobData, couch_stats:increment_counter([couch_replicator, failed_starts]), - couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]), - couch_replicator_notifier:notify({error, RepId, max_backoff}); - -terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> - #rep{ - id = {BaseId, Ext} = RepId, - source = Source0, - target = Target0, - doc_id = DocId, - db_name = DbName - } = InitArgs, + couch_log:warning("Replication `~s` reached max backoff ", [RepId]), + finish_couch_job(Job, JobData, <<"error">>, max_backoff); + +terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) -> + % Here we handle the case when replication fails during initialization. + #{<<"rep">> := Rep} = JobData, + #{ + <<"id">> := Id, + <<"source">> := Source0, + <<"target">> := Target0, + <<"doc_id">> := DocId, + <<"db_name">> := DbName + } = Rep, Source = couch_replicator_api_wrap:db_uri(Source0), Target = couch_replicator_api_wrap:db_uri(Target0), - RepIdStr = BaseId ++ Ext, Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p", - couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName, + couch_log:error(Msg, [Class, Error, RepId, Source, Target, DbName, DocId, Stack]), couch_stats:increment_counter([couch_replicator, failed_starts]), - couch_replicator_notifier:notify({error, RepId, Error}); + finish_couch_job(Job, JobData, <<"error">>, Error); -terminate({shutdown, max_backoff}, State) -> +terminate({shutdown, max_backoff}, #rep_state{} = State) -> #rep_state{ + id = RepId, source_name = Source, target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} } = State, couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff", - [BaseId ++ Ext, Source, Target]), + [RepId, Source, Target]), terminate_cleanup(State), - couch_replicator_notifier:notify({error, RepId, max_backoff}); + finish_couch_job(State, <<"error">>, max_backoff); terminate(Reason, State) -> -#rep_state{ + #rep_state{ + id = RepId, source_name = Source, target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} } = State, couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s", - [BaseId ++ Ext, Source, Target, to_binary(Reason)]), + [RepId, Source, Target, to_binary(Reason)]), terminate_cleanup(State), - couch_replicator_notifier:notify({error, RepId, Reason}). + finish_couch_job(State, <<"error">>, Reason). + terminate_cleanup(State) -> update_task(State), @@ -403,22 +390,19 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) -> format_status(_Opt, [_PDict, State]) -> #rep_state{ + id = Id, source = Source, target = Target, - rep_details = RepDetails, start_seq = StartSeq, source_seq = SourceSeq, committed_seq = CommitedSeq, current_through_seq = ThroughSeq, highest_seq_done = HighestSeqDone, - session_id = SessionId - } = state_strip_creds(State), - #rep{ - id = RepId, - options = Options, + session_id = SessionId, doc_id = DocId, - db_name = DbName - } = RepDetails, + db_name = DbName, + options = Options + } = state_strip_creds(State), [ {rep_id, RepId}, {source, couch_replicator_api_wrap:db_uri(Source)}, @@ -462,73 +446,108 @@ httpdb_strip_creds(LocalDb) -> LocalDb. -rep_strip_creds(#rep{source = Source, target = Target} = Rep) -> - Rep#rep{ - source = httpdb_strip_creds(Source), - target = httpdb_strip_creds(Target) - }. - - -state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) -> - % #rep_state contains the source and target at the top level and also - % in the nested #rep_details record +state_strip_creds(#rep_state{source = Source, target = Target} = State) -> State#rep_state{ - rep_details = rep_strip_creds(Rep), source = httpdb_strip_creds(Source), target = httpdb_strip_creds(Target) }. -adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) -> +adjust_maxconn(Src = #{<<"http_connections">> : = 1}, RepId) -> Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p", couch_log:notice(Msg, [RepId]), - Src#httpdb{http_connections = 2}; + Src#{<<"http_connections">> := 2}; adjust_maxconn(Src, _RepId) -> Src. --spec doc_update_triggered(#rep{}) -> ok. -doc_update_triggered(#rep{db_name = null}) -> +-spec doc_update_triggered(#rep_state{}) -> ok. +doc_update_triggered(#rep_state{db_name = null}) -> ok; -doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) -> +doc_update_triggered(#rep_state{} = State) -> + #rep_state{id = Id, doc_id = DocId, db_name = DbName} = State, case couch_replicator_doc_processor:update_docs() of true -> - couch_replicator_docs:update_triggered(Rep, RepId); + couch_replicator_docs:update_triggered(Id, DocId, DbName); false -> ok end, - couch_log:notice("Document `~s` triggered replication `~s`", - [DocId, pp_rep_id(RepId)]), + couch_log:notice("Document `~s` triggered replication `~s`", [DocId, Id]), ok. --spec doc_update_completed(#rep{}, list()) -> ok. -doc_update_completed(#rep{db_name = null}, _Stats) -> +-spec doc_update_completed(#rep_state{}) -> ok. +doc_update_completed(#rep_state{db_name = null}) -> ok; -doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName, - start_time = StartTime}, Stats0) -> - Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}], +doc_update_completed(#rep_state{} = State) -> + #rep_state{ + id = Id, + doc_id = DocId, + db_name = DbName, + start_time = Start, + stats = Stats0 + } = State, + Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(Start)}], couch_replicator_docs:update_doc_completed(DbName, DocId, Stats), - couch_log:notice("Replication `~s` completed (triggered by `~s`)", - [pp_rep_id(RepId), DocId]), + couch_log:notice("Replication `~s` completed (triggered by `~s:~s`)", + [Id, DbName, DocId]), ok. do_last_checkpoint(#rep_state{seqs_in_progress = [], highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) -> - {stop, normal, cancel_timer(State)}; + History = State#rep_state.checkoint_history, + Result = case finish_couch_job(State, <<"completed">>, History) of + ok -> normal; + {error, _} = Error -> Error + end, + {stop, Result, cancel_timer(State)}; do_last_checkpoint(#rep_state{seqs_in_progress = [], highest_seq_done = Seq} = State) -> case do_checkpoint(State#rep_state{current_through_seq = Seq}) of {ok, NewState} -> couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {stop, normal, cancel_timer(NewState)}; + History = NewState#rep_state.checkpoint_history, + Result = case finish_couch_job(NewState, <<"completed">>, History) of + ok -> normal; + {error, _} = Error -> Error + end, + {stop, Result, cancel_timer(NewState)}; Error -> couch_stats:increment_counter([couch_replicator, checkpoints, failure]), {stop, Error, State} end. +finish_couch_job(#rep_state{} = State, FinishedState, Result) -> + #rep_state{job = Job, job_data = Jobdata} = State, + finish_couch_job(Job, JobData, FinishedState, Result). + + +finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) -> + #{<<"rep">> := #{<<"id">> := RepId}} = JobData, + case Result of + null -> null; + #{} -> Result0; + <<_/binary>> -> Result0; + Atom when is_atom(Atom) -> atom_to_binary(Atom, utf8) + Other -> couch_replicator_utils:rep_error_to_binary(Result0) + end, + JobData= JobData0#{ + <<"finished_state">> => FinishState, + <<"finished_result">> => Result + }, + case couch_jobs:finish(undefined, Job, JobData) of + ok -> + doc_update_completed(State), + ok; + {error, Error} -> + Msg = "Replication ~s job could not finish. Error:~p", + couch_log:error(Msg, [RepId, Error]), + {error, Error} + end. + + start_timer(State) -> After = State#rep_state.checkpoint_interval, case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of @@ -547,21 +566,36 @@ cancel_timer(#rep_state{timer = Timer} = State) -> State#rep_state{timer = nil}. -init_state(Rep) -> - #rep{ - id = {BaseId, _Ext}, - source = Src0, target = Tgt, - options = Options, - type = Type, view = View, - start_time = StartTime, - stats = Stats +init_state(#{} = Job, #{<<"rep">> =: Rep}} = JobData) -> + #{ + <<"id">> := Id, + <<"base_id">> := BaseId, + <<"source">> := Src0, + <<"target">> := Tgt, + <<"type">> := Type, + <<"view">> := View, + <<"start_time">> := StartTime, + <<"stats">> := Stats, + <<"options">> := OptionsMap, + <<"user_ctx">> := UserCtx, + <<"db_name">> := DbName, + <<"doc_id">> := DocId, } = Rep, + + Options = maps:fold(fun(K, V, Acc) -> + [{binary_to_atom(K, utf8), V} | Acc] + end, [], OptionsMap), + % Adjust minimum number of http source connections to 2 to avoid deadlock Src = adjust_maxconn(Src0, BaseId), {ok, Source} = couch_replicator_api_wrap:db_open(Src), {CreateTargetParams} = get_value(create_target_params, Options, {[]}), {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, - get_value(create_target, Options, false), CreateTargetParams), + + CreateTgt = get_value(create_target, Options, false), + CreateParams = maps:to_list(get_value(create_target_params, Options, #{}), + {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, UserCtx, CreateTgt, + CreateParams), {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source), {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target), @@ -576,7 +610,10 @@ init_state(Rep) -> #doc{body={CheckpointHistory}} = SourceLog, State = #rep_state{ - rep_details = Rep, + job = Job, + job_data = JobData, + id = Id, + base_id = BaseId, source_name = couch_replicator_api_wrap:db_uri(Source), target_name = couch_replicator_api_wrap:db_uri(Target), source = Source, @@ -592,28 +629,27 @@ init_state(Rep) -> src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), session_id = couch_uuids:random(), - source_monitor = db_monitor(Source), - target_monitor = db_monitor(Target), source_seq = SourceSeq, - use_checkpoints = get_value(use_checkpoints, Options, true), - checkpoint_interval = get_value(checkpoint_interval, Options, - ?DEFAULT_CHECKPOINT_INTERVAL), + use_checkpoints = get_value(use_checkpoints, Options), + checkpoint_interval = get_value(checkpoint_interval, Options), type = Type, view = View, stats = Stats + doc_id = DocId, + db_name = DbName }, State#rep_state{timer = start_timer(State)}. -find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) -> +find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) -> LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), - fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []). + fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []). fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) -> lists:reverse(Acc); -fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> +fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) -> case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of {error, <<"not_found">>} when Vsn > 1 -> OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1), @@ -633,8 +669,8 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> end. -maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) -> - case get_value(use_checkpoints, Rep#rep.options, true) of +maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) -> + case maps:get(<<"use_checkpoints">>, Options) of true -> update_checkpoint(Db, Doc), Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p", @@ -697,7 +733,7 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats, - rep_details = #rep{options = Options}, + options = Options, session_id = SessionId } = State, case commit_to_both(Source, Target) of @@ -906,14 +942,13 @@ has_session_id(SessionId, [{Props} | Rest]) -> db_monitor(#httpdb{}) -> - nil; + nil; db_monitor(Db) -> - couch_db:monitor(Db). + couch_db:monitor(Db). -get_pending_count(St) -> - Rep = St#rep_state.rep_details, - Timeout = get_value(connection_timeout, Rep#rep.options), +get_pending_count(#rep_state{options = Options} = St) -> + Timeout = get_value(connection_timeout, Options), TimeoutMicro = Timeout * 1000, case get(pending_count_state) of {LastUpdate, PendingCount} -> @@ -960,8 +995,7 @@ update_task(State) -> ]). -update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) -> - JobId = Rep#rep.id, +update_scheduler_job_stats(#rep_state{id = JobId, stats = Stats}) -> couch_replicator_scheduler:update_job_stats(JobId, Stats). @@ -998,24 +1032,21 @@ replication_start_error(Error) -> Error. -log_replication_start(#rep_state{rep_details = Rep} = RepState) -> - #rep{ - id = {BaseId, Ext}, - doc_id = DocId, - db_name = DbName, - options = Options - } = Rep, - Id = BaseId ++ Ext, - Workers = get_value(worker_processes, Options), - BatchSize = get_value(worker_batch_size, Options), +log_replication_start(#rep_state{} = RepState) -> #rep_state{ - source_name = Source, % credentials already stripped - target_name = Target, % credentials already stripped - session_id = Sid + id = Id, + doc_id = DocId, + db_name = DbName, + options = Options, + source_name = Source, + target_name = Target, + session_id = Sid, } = RepState, + Workers = get_value(worker_processes, Options), + BatchSize = get_value(worker_batch_size, Options), From = case DbName of - ShardName when is_binary(ShardName) -> - io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]); + Name when is_binary(Name) -> + io_lib:format("from doc ~s:~s", [Name, DocId]); _ -> "from _replicate endpoint" end, @@ -1048,14 +1079,13 @@ scheduler_job_format_status_test() -> Target = <<"http://u:p@h2/d2">>, Rep = #rep{ id = {"base", "+ext"}, - source = couch_replicator_docs:parse_rep_db(Source, [], []), - target = couch_replicator_docs:parse_rep_db(Target, [], []), + source = couch_replicator_docs:parse_rep_db(Source, #{}, #{}), + target = couch_replicator_docs:parse_rep_db(Target, #{}, #{}), options = [{create_target, true}], doc_id = <<"mydoc">>, db_name = <<"mydb">> }, State = #rep_state{ - rep_details = Rep, source = Rep#rep.source, target = Rep#rep.target, session_id = <<"a">>, diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl index 8ab55f838..3ea9dff4e 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl @@ -17,7 +17,7 @@ %% public api -export([ start_link/0, - start_child/1, + start_child/2, terminate_child/1 ]). @@ -37,8 +37,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -start_child(#rep{} = Rep) -> - supervisor:start_child(?MODULE, [Rep]). +start_child(#{} = Job, #{} = Rep) -> + supervisor:start_child(?MODULE, [Job, Rep]). terminate_child(Pid) -> diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl index 5475e8f37..b86529f26 100644 --- a/src/couch_replicator/src/couch_replicator_sup.erl +++ b/src/couch_replicator/src/couch_replicator_sup.erl @@ -20,18 +20,6 @@ start_link() -> init(_Args) -> Children = [ - {couch_replication_event, - {gen_event, start_link, [{local, couch_replication}]}, - permanent, - brutal_kill, - worker, - dynamic}, - {couch_replicator_clustering, - {couch_replicator_clustering, start_link, []}, - permanent, - brutal_kill, - worker, - [couch_replicator_clustering]}, {couch_replicator_connection, {couch_replicator_connection, start_link, []}, permanent, @@ -70,12 +58,6 @@ init(_Args) -> transient, brutal_kill, worker, - [couch_replicator]}, - {couch_replicator_db_changes, - {couch_replicator_db_changes, start_link, []}, - permanent, - brutal_kill, - worker, - [couch_multidb_changes]} + [couch_replicator]} ], {ok, {{rest_for_one,10,1}, Children}}. diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index ccf241324..b71ffeb46 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -20,11 +20,11 @@ rep_error_to_binary/1, get_json_value/2, get_json_value/3, - pp_rep_id/1, iso8601/1, filter_state/3, remove_basic_auth_from_headers/1, - normalize_rep/1 + normalize_rep/1, + default_headers_map/0 ]). @@ -74,14 +74,6 @@ get_json_value(Key, Props, Default) when is_binary(Key) -> end. -% pretty-print replication id --spec pp_rep_id(#rep{} | rep_id()) -> string(). -pp_rep_id(#rep{id = RepId}) -> - pp_rep_id(RepId); -pp_rep_id({Base, Extension}) -> - Base ++ Extension. - - % NV: TODO: this function is not used outside api wrap module % consider moving it there during final cleanup is_deleted(Change) -> @@ -102,8 +94,13 @@ parse_rep_doc(Props, UserCtx) -> couch_replicator_docs:parse_rep_doc(Props, UserCtx). --spec iso8601(erlang:timestamp()) -> binary(). -iso8601({_Mega, _Sec, _Micro} = Timestamp) -> +-spec iso8601(integer()) -> binary(). +iso8601(Native) when is_integer(Native) -> + ErlangSystemTime = erlang:convert_time_unit(Native, native, microsecond), + MegaSecs = ErlangSystemTime div 1000000000000, + Secs = ErlangSystemTime div 1000000 - MegaSecs * 1000000, + MicroSecs = ErlangSystemTime rem 1000000, + {MegaSecs, Secs, MicroSecs}. {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp), Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])). @@ -157,25 +154,39 @@ decode_basic_creds(Base64) -> end. -% Normalize a #rep{} record such that it doesn't contain time dependent fields +% Normalize a rep map such that it doesn't contain time dependent fields % pids (like httpc pools), and options / props are sorted. This function would % used during comparisons. --spec normalize_rep(#rep{} | nil) -> #rep{} | nil. -normalize_rep(nil) -> - nil; - -normalize_rep(#rep{} = Rep)-> - #rep{ - source = couch_replicator_api_wrap:normalize_db(Rep#rep.source), - target = couch_replicator_api_wrap:normalize_db(Rep#rep.target), - options = Rep#rep.options, % already sorted in make_options/1 - type = Rep#rep.type, - view = Rep#rep.view, - doc_id = Rep#rep.doc_id, - db_name = Rep#rep.db_name +-spec normalize_rep(#{} | null) -> #{} | null. +normalize_rep(null) -> + null; + +normalize_rep(#{} = Rep)-> + Ks = [<<"options">>, <<"type">>, <<"view">>, <<"doc_id">>, <<"db_name">>], + Rep1 = maps:with(Ks, Rep), + #{<<"source">> := Source, <<"target">> := Target} = Rep, + Rep1#{ + <<"source">> => normalize_endpoint(Source), + <<"target">> => normalize_endpoint(Target) }. +normalize_endpoint(<<DbName/binary>>) -> + DbName; + +normalize_endpoint(#{} = Endpoint) -> + Ks = [<<"url">>, <<"auth_props">>, <<"headers">>, <<"timeout">>, + <<"ibrowse_options">>, <<"retries">>, <<"http_connections">> + ], + maps:with(Ks, Endpoint). + + +get_default_headers() -> + lists:foldl(fun({K, V}, Acc) -> + Acc#{list_to_binary(K) => list_to_binary(V)} + end, #{}, (#httpdb{})#httpdb.headers). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -254,4 +265,23 @@ normalize_rep_test_() -> end) }. + +normalize_endpoint() -> + HttpDb = #httpdb{ + url = "http://host/db", + auth_props = [{"key", "val"}], + headers = [{"k2","v2"}, {"k1","v1"}], + timeout = 30000, + ibrowse_options = [{k2, v2}, {k1, v1}], + retries = 10, + http_connections = 20 + }, + Expected = HttpDb#httpdb{ + headers = [{"k1","v1"}, {"k2","v2"}], + ibrowse_options = [{k1, v1}, {k2, v2}] + }, + ?assertEqual(Expected, normalize_db(HttpDb)), + ?assertEqual(<<"local">>, normalize_db(<<"local">>)). + + -endif. diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl index 4f545bcb5..5fb922a3e 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl @@ -49,7 +49,7 @@ parse_rep_doc_without_proxy(_) -> {<<"source">>, <<"http://unproxied.com">>}, {<<"target">>, <<"http://otherunproxied.com">>} ]}, - Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc), + Rep = couch_replicator_docs:parse_rep_doc_without_id(NoProxyDoc), ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined), ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined) end). @@ -63,7 +63,7 @@ parse_rep_doc_with_proxy(_) -> {<<"target">>, <<"http://otherunproxied.com">>}, {<<"proxy">>, ProxyURL} ]}, - Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc), + Rep = couch_replicator_docs:parse_rep_doc_without_id(ProxyDoc), ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)), ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)) end). diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index c926da9e0..9ec9f2bcf 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -29,6 +29,9 @@ name/1, get_after_doc_read_fun/1, get_before_doc_update_fun/1, + get_during_doc_update_fun/1, + get_after_db_create_fun/1, + get_after_db_delete_fun/1, get_committed_update_seq/1, get_compacted_seq/1, get_compactor_pid/1, @@ -155,7 +158,9 @@ create(DbName, Options) -> #{} = Db0 -> Db1 = maybe_add_sys_db_callbacks(Db0), ok = fabric2_server:store(Db1), - {ok, Db1#{tx := undefined}}; + Db2 = Db1#{tx := undefined}, + ok = apply_after_db_create(Db2), + {ok, Db2}; Error -> Error end. @@ -188,6 +193,7 @@ delete(DbName, Options) -> fabric2_fdb:delete(TxDb) end), if Resp /= ok -> Resp; true -> + ok = apply_after_db_delete(Db#{tx := undefined}), fabric2_server:remove(DbName) end. @@ -264,6 +270,19 @@ get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) -> get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) -> BeforeDocUpdate. + +get_during_doc_update_fun(#{during_doc_update := DuringDocUpdate}) -> + DuringDocUpdate. + + +get_after_db_create_fun(#{after_db_create := AfterDbCreate}) -> + AfterDbCreate. + + +get_after_db_delete_fun(#{after_db_delete := AfterDbDelete}) -> + AfterDbDelete. + + get_committed_update_seq(#{} = Db) -> get_update_seq(Db). @@ -762,24 +781,33 @@ maybe_add_sys_db_callbacks(Db) -> IsReplicatorDb = is_replicator_db(Db), IsUsersDb = is_users_db(Db), - {BDU, ADR} = if + {BDU, DDU, ADR, ADC, ADD} = if IsReplicatorDb -> { fun couch_replicator_docs:before_doc_update/3, - fun couch_replicator_docs:after_doc_read/2 + fun couch_replicator_doc_processor:during_doc_update/3, + fun couch_replicator_docs:after_doc_read/2, + fun couch_replicator_doc_processor:after_db_create/1, + fun couch_replicator_doc_processor:after_db_delete/1 }; IsUsersDb -> { fun fabric2_users_db:before_doc_update/3, - fun fabric2_users_db:after_doc_read/2 + undefined, + fun fabric2_users_db:after_doc_read/2, + undefined, + undefined }; true -> - {undefined, undefined} + {undefined, undefined, undefined, undefined, undefined} end, Db#{ before_doc_update := BDU, - after_doc_read := ADR + during_doc_update := DDU, + after_doc_read := ADR, + after_db_create := ADC, + after_db_delete := ADD }. @@ -1042,6 +1070,33 @@ apply_before_doc_update(Db, Docs, Options) -> end. +apply_during_doc_update(#{during_doc_update := DDU} = Db, Doc, UpdateType) + when is_function(DDU, 3) -> + DDU(Doc, Db, UpdateType), + ok; + +apply_during_doc_update(#{during_doc_update := undefined}, _, _) -> + ok. + + +apply_after_db_create(#{after_db_create := ADC} = Db) + when is_function(ADC, 1) -> + ADC(Db), + ok; + +apply_after_db_create(#{after_db_create := undefined}) -> + ok. + + +apply_after_db_delete(#{after_db_delete := ADD} = Db) + when is_function(ADD, 1) -> + ADD(Db), + ok; + +apply_after_db_delete(#{after_db_delete := undefined}) -> + ok. + + update_doc_int(#{} = Db, #doc{} = Doc, Options) -> IsLocal = case Doc#doc.id of <<?LOCAL_DOC_PREFIX, _/binary>> -> true; @@ -1218,6 +1273,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> ToRemove ), + ok = apply_during_doc_update(Db, Doc3, interactive_edit), + {ok, {NewRevPos, NewRev}}. @@ -1301,6 +1358,8 @@ update_doc_replicated(Db, Doc0, _Options) -> ToRemove ), + ok = apply_during_doc_update(Db, Doc3, replicated_changes), + {ok, []}. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 71cb68f21..6387afd1e 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -162,7 +162,10 @@ create(#{} = Db0, Options) -> validate_doc_update_funs => [], before_doc_update => undefined, + during_doc_update => undefined, after_doc_read => undefined, + after_db_create => undefined, + after_db_delete => undefined, % All other db things as we add features, db_options => Options @@ -199,8 +202,10 @@ open(#{} = Db0, Options) -> % bits. validate_doc_update_funs => [], before_doc_update => undefined, + during_doc_update => undefined, after_doc_read => undefined, - + after_db_create => undefined, + after_db_delete => undefined, db_options => Options }, |