diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2016-05-13 18:12:52 -0400 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2017-04-28 17:35:50 -0400 |
commit | d89f21bff34a21d7ba296d43b3b0c12021416424 (patch) | |
tree | 16be92a662667416bb6884030c6e83449ea7d8fa | |
parent | d3d90976c77998b46622e403e0c05d82990cd59f (diff) | |
download | couchdb-d89f21bff34a21d7ba296d43b3b0c12021416424.tar.gz |
Refactor utils into 3 modules
Over the years utils accumulated a lot of functionality. Clean up a bit by
separating it into specific modules according to semantics:
- couch_replicator_docs : Handle read and writing to replicator dbs.
It includes updating state fields, parsing options from documents, and
making sure replication VDU design document is in sync.
- couch_replicator_filters : Fetch and manipulate replication filters.
- couch_replicator_ids : Calculate replication IDs. Handles versioning and
Pretty formatting of IDs. Filtered replications using user filter functions
incorporate a filter code hash into the calculation, in that case call
couch_replicator_filters module to fetch the filter from the source.
Jira: COUCHDB-3324
-rw-r--r-- | src/couch_replicator/src/couch_replicator_docs.erl | 756 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_filters.erl | 214 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_ids.erl | 127 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_utils.erl | 583 |
4 files changed, 1198 insertions, 482 deletions
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl new file mode 100644 index 000000000..cce4ce23c --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -0,0 +1,756 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_docs). + +-export([ + parse_rep_doc/1, + parse_rep_doc/2, + parse_rep_db/3, + parse_rep_doc_without_id/1, + parse_rep_doc_without_id/2, + before_doc_update/2, + after_doc_read/2, + ensure_rep_db_exists/0, + ensure_rep_ddoc_exists/1, + ensure_cluster_rep_ddoc_exists/1, + remove_state_fields/2, + update_doc_completed/3, + update_failed/3, + update_rep_id/1, + update_triggered/2, + update_error/2 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include("couch_replicator.hrl"). +-include("couch_replicator_js_functions.hrl"). + +-import(couch_util, [ + get_value/2, + get_value/3, + to_binary/1 +]). + +-import(couch_replicator_utils, [ + get_json_value/2, + get_json_value/3 +]). + + +-define(REP_DB_NAME, <<"_replicator">>). +-define(REP_DESIGN_DOC, <<"_design/_replicator">>). +-define(OWNER, <<"owner">>). +-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). +-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). + + +remove_state_fields(DbName, DocId) -> + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, undefined}, + {<<"_replication_state_time">>, undefined}, + {<<"_replication_state_reason">>, undefined}, + {<<"_replication_id">>, undefined}, + {<<"_replication_stats">>, undefined}]). + + +-spec update_doc_completed(binary(), binary(), [_]) -> any(). +update_doc_completed(DbName, DocId, Stats) -> + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"completed">>}, + {<<"_replication_state_reason">>, undefined}, + {<<"_replication_stats">>, {Stats}}]), + couch_stats:increment_counter([couch_replicator, docs, + completed_state_updates]). + + +-spec update_failed(binary(), binary(), any()) -> any(). +update_failed(DbName, DocId, Error) -> + Reason = error_reason(Error), + couch_log:error("Error processing replication doc `~s` from `~s`: ~s", + [DocId, DbName, Reason]), + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"failed">>}, + {<<"_replication_stats">>, undefined}, + {<<"_replication_state_reason">>, Reason}]), + couch_stats:increment_counter([couch_replicator, docs, + failed_state_updates]). + + +-spec update_triggered(#rep{}, rep_id()) -> ok. +update_triggered(Rep, {Base, Ext}) -> + #rep{ + db_name = DbName, + doc_id = DocId + } = Rep, + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_state_reason">>, undefined}, + {<<"_replication_id">>, iolist_to_binary([Base, Ext])}, + {<<"_replication_stats">>, undefined}]), + ok. + + +-spec update_error(#rep{}, any()) -> ok. +update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> + Reason = error_reason(Error), + BinRepId = case RepId of + {Base, Ext} -> + iolist_to_binary([Base, Ext]); + _Other -> + null + end, + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_state_reason">>, Reason}, + {<<"_replication_stats">>, undefined}, + {<<"_replication_id">>, BinRepId}]), + ok. + + +-spec ensure_rep_db_exists() -> {ok, #db{}}. +ensure_rep_db_exists() -> + Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, + nologifmissing]) of + {ok, Db0} -> + Db0; + _Error -> + {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]), + Db0 + end, + ok = ensure_rep_ddoc_exists(?REP_DB_NAME), + {ok, Db}. + + +-spec ensure_rep_ddoc_exists(binary()) -> ok. +ensure_rep_ddoc_exists(RepDb) -> + case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of + true -> + ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); + false -> + ok + end. + + +-spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. +ensure_rep_ddoc_exists(RepDb, DDocId) -> + case open_rep_doc(RepDb, DDocId) of + {not_found, no_db_file} -> + %% database was deleted. + ok; + {not_found, _Reason} -> + DocProps = replication_design_doc_props(DDocId), + DDoc = couch_doc:from_json_obj({DocProps}), + couch_log:notice("creating replicator ddoc", []), + {ok, _Rev} = save_rep_doc(RepDb, DDoc); + {ok, Doc} -> + Latest = replication_design_doc_props(DDocId), + {Props0} = couch_doc:to_json_obj(Doc, []), + {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0), + case compare_ejson({Props}, {Latest}) of + true -> + ok; + false -> + LatestWithRev = [{<<"_rev">>, Rev} | Latest], + DDoc = couch_doc:from_json_obj({LatestWithRev}), + couch_log:notice("updating replicator ddoc", []), + try + {ok, _} = save_rep_doc(RepDb, DDoc) + catch + throw:conflict -> + %% ignore, we'll retry next time + ok + end + end + end, + ok. + + +-spec ensure_cluster_rep_ddoc_exists(binary()) -> ok. +ensure_cluster_rep_ddoc_exists(RepDb) -> + DDocId = ?REP_DESIGN_DOC, + [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId), + ensure_rep_ddoc_exists(DbShard, DDocId). + + +-spec compare_ejson({[_]}, {[_]}) -> boolean(). +compare_ejson(EJson1, EJson2) -> + EjsonSorted1 = couch_replicator_filters:ejsort(EJson1), + EjsonSorted2 = couch_replicator_filters:ejsort(EJson2), + EjsonSorted1 == EjsonSorted2. + + +-spec replication_design_doc_props(binary()) -> [_]. +replication_design_doc_props(DDocId) -> + [ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]. + + +% Note: parse_rep_doc can handle filtered replications. During parsing of the +% replication doc it will make possibly remote http requests to the source +% database. If failure or parsing of filter docs fails, parse_doc throws a +% {filter_fetch_error, Error} excation. This exception should be considered +% transient in respect to the contents of the document itself, since it depends +% on netowrk availability of the source db and other factors. +-spec parse_rep_doc({[_]}) -> #rep{}. +parse_rep_doc(RepDoc) -> + {ok, Rep} = try + parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + throw:{filter_fetch_error, Reason} -> + throw({filter_fetch_error, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end, + Rep. + + +-spec parse_rep_doc_without_id({[_]}) -> #rep{}. +parse_rep_doc_without_id(RepDoc) -> + {ok, Rep} = try + parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc)) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end, + Rep. + + +-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}. +parse_rep_doc(Doc, UserCtx) -> + {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx), + Cancel = get_value(cancel, Rep#rep.options, false), + Id = get_value(id, Rep#rep.options, nil), + case {Cancel, Id} of + {true, nil} -> + % Cancel request with no id, must parse id out of body contents + {ok, update_rep_id(Rep)}; + {true, Id} -> + % Cancel request with an id specified, so do not parse id from body + {ok, Rep}; + {false, _Id} -> + % Not a cancel request, regular replication doc + {ok, update_rep_id(Rep)} + end. + + +-spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}. +parse_rep_doc_without_id({Props}, UserCtx) -> + Proxy = get_value(<<"proxy">>, Props, <<>>), + Opts = make_options(Props), + case get_value(cancel, Opts, false) andalso + (get_value(id, Opts, nil) =/= nil) of + true -> + {ok, #rep{options = Opts, user_ctx = UserCtx}}; + false -> + Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts), + Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts), + {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of + {error, Error} -> + throw({bad_request, Error}); + Result -> + Result + end, + Rep = #rep{ + source = Source, + target = Target, + options = Opts, + user_ctx = UserCtx, + type = Type, + view = View, + doc_id = get_value(<<"_id">>, Props, null) + }, + % Check if can parse filter code, if not throw exception + case couch_replicator_filters:parse(Opts) of + {error, FilterError} -> + throw({error, FilterError}); + {ok, _Filter} -> + ok + end, + {ok, Rep} + end. + + +% Update a #rep{} record with a replication_id. Calculating the id might involve +% fetching a filter from the source db, and so it could fail intermetently. +% In case of a failure to fetch the filter this function will throw a +% `{filter_fetch_error, Reason} exception. +update_rep_id(Rep) -> + RepId = couch_replicator_ids:replication_id(Rep), + Rep#rep{id = RepId}. + + +update_rep_doc(RepDbName, RepDocId, KVs) -> + update_rep_doc(RepDbName, RepDocId, KVs, 1). + + +update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) -> + try + case open_rep_doc(RepDbName, RepDocId) of + {ok, LastRepDoc} -> + update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2); + _ -> + ok + end + catch + throw:conflict -> + Msg = "Conflict when updating replication doc `~s`. Retrying.", + couch_log:error(Msg, [RepDocId]), + ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100), + update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2) + end; + +update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> + NewRepDocBody = lists:foldl( + fun({K, undefined}, Body) -> + lists:keydelete(K, 1, Body); + ({<<"_replication_state">> = K, State} = KV, Body) -> + case get_json_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + Timestamp = couch_replicator_utils:iso8601(os:timestamp()), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, Timestamp}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}) + end. + + +open_rep_doc(DbName, DocId) -> + case couch_db:open_int(DbName, [?CTX, sys_db]) of + {ok, Db} -> + try + couch_db:open_doc(Db, DocId, [ejson_body]) + after + couch_db:close(Db) + end; + Else -> + Else + end. + + +save_rep_doc(DbName, Doc) -> + {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), + try + couch_db:update_doc(Db, Doc, []) + after + couch_db:close(Db) + end. + + +-spec rep_user_ctx({[_]}) -> #user_ctx{}. +rep_user_ctx({RepDoc}) -> + case get_json_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{}; + {UserCtx} -> + #user_ctx{ + name = get_json_value(<<"name">>, UserCtx, null), + roles = get_json_value(<<"roles">>, UserCtx, []) + } + end. + + +-spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary(). +parse_rep_db({Props}, Proxy, Options) -> + ProxyParams = parse_proxy_params(Proxy), + ProxyURL = case ProxyParams of + [] -> undefined; + _ -> binary_to_list(Proxy) + end, + Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), + {AuthProps} = get_value(<<"auth">>, Props, {[]}), + {BinHeaders} = get_value(<<"headers">>, Props, {[]}), + Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), + DefaultHeaders = (#httpdb{})#httpdb.headers, + OAuth = case get_value(<<"oauth">>, AuthProps) of + undefined -> + nil; + {OauthProps} -> + #oauth{ + consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)), + token = ?b2l(get_value(<<"token">>, OauthProps)), + token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)), + consumer_secret = ?b2l(get_value(<<"consumer_secret">>, + OauthProps)), + signature_method = + case get_value(<<"signature_method">>, OauthProps) of + undefined -> hmac_sha1; + <<"PLAINTEXT">> -> plaintext; + <<"HMAC-SHA1">> -> hmac_sha1; + <<"RSA-SHA1">> -> rsa_sha1 + end + } + end, + #httpdb{ + url = Url, + oauth = OAuth, + headers = lists:ukeymerge(1, Headers, DefaultHeaders), + ibrowse_options = lists:keysort(1, + [{socket_options, get_value(socket_options, Options)} | + ProxyParams ++ ssl_params(Url)]), + timeout = get_value(connection_timeout, Options), + http_connections = get_value(http_connections, Options), + retries = get_value(retries, Options), + proxy_url = ProxyURL + }; + +parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> + parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); + +parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> + parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); + +parse_rep_db(<<DbName/binary>>, _Proxy, _Options) -> + DbName; + +parse_rep_db(undefined, _Proxy, _Options) -> + throw({error, <<"Missing replicator database">>}). + + +-spec maybe_add_trailing_slash(binary() | list()) -> list(). +maybe_add_trailing_slash(Url) when is_binary(Url) -> + maybe_add_trailing_slash(?b2l(Url)); +maybe_add_trailing_slash(Url) -> + case lists:member($?, Url) of + true -> + Url; % skip if there are query params + false -> + case lists:last(Url) of + $/ -> + Url; + _ -> + Url ++ "/" + end + end. + + +-spec make_options([_]) -> [_]. +make_options(Props) -> + Options0 = lists:ukeysort(1, convert_options(Props)), + Options = check_options(Options0), + DefWorkers = config:get("replicator", "worker_processes", "4"), + DefBatchSize = config:get("replicator", "worker_batch_size", "500"), + DefConns = config:get("replicator", "http_connections", "20"), + DefTimeout = config:get("replicator", "connection_timeout", "30000"), + DefRetries = config:get("replicator", "retries_per_request", "10"), + UseCheckpoints = config:get("replicator", "use_checkpoints", "true"), + DefCheckpointInterval = config:get("replicator", "checkpoint_interval", + "30000"), + {ok, DefSocketOptions} = couch_util:parse_term( + config:get("replicator", "socket_options", + "[{keepalive, true}, {nodelay, false}]")), + lists:ukeymerge(1, Options, lists:keysort(1, [ + {connection_timeout, list_to_integer(DefTimeout)}, + {retries, list_to_integer(DefRetries)}, + {http_connections, list_to_integer(DefConns)}, + {socket_options, DefSocketOptions}, + {worker_batch_size, list_to_integer(DefBatchSize)}, + {worker_processes, list_to_integer(DefWorkers)}, + {use_checkpoints, list_to_existing_atom(UseCheckpoints)}, + {checkpoint_interval, list_to_integer(DefCheckpointInterval)} + ])). + + +-spec convert_options([_]) -> [_]. +convert_options([])-> + []; +convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)-> + throw({bad_request, <<"parameter `cancel` must be a boolean">>}); +convert_options([{<<"cancel">>, V} | R]) -> + [{cancel, V} | convert_options(R)]; +convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>; + IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> + [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; +convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)-> + throw({bad_request, <<"parameter `create_target` must be a boolean">>}); +convert_options([{<<"create_target">>, V} | R]) -> + [{create_target, V} | convert_options(R)]; +convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)-> + throw({bad_request, <<"parameter `continuous` must be a boolean">>}); +convert_options([{<<"continuous">>, V} | R]) -> + [{continuous, V} | convert_options(R)]; +convert_options([{<<"filter">>, V} | R]) -> + [{filter, V} | convert_options(R)]; +convert_options([{<<"query_params">>, V} | R]) -> + [{query_params, V} | convert_options(R)]; +convert_options([{<<"doc_ids">>, null} | R]) -> + convert_options(R); +convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> + throw({bad_request, <<"parameter `doc_ids` must be an array">>}); +convert_options([{<<"doc_ids">>, V} | R]) -> + % Ensure same behaviour as old replicator: accept a list of percent + % encoded doc IDs. + DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]), + [{doc_ids, DocIds} | convert_options(R)]; +convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> + throw({bad_request, <<"parameter `selector` must be a JSON object">>}); +convert_options([{<<"selector">>, V} | R]) -> + [{selector, V} | convert_options(R)]; +convert_options([{<<"worker_processes">>, V} | R]) -> + [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"worker_batch_size">>, V} | R]) -> + [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"http_connections">>, V} | R]) -> + [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"connection_timeout">>, V} | R]) -> + [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"retries_per_request">>, V} | R]) -> + [{retries, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"socket_options">>, V} | R]) -> + {ok, SocketOptions} = couch_util:parse_term(V), + [{socket_options, SocketOptions} | convert_options(R)]; +convert_options([{<<"since_seq">>, V} | R]) -> + [{since_seq, V} | convert_options(R)]; +convert_options([{<<"use_checkpoints">>, V} | R]) -> + [{use_checkpoints, V} | convert_options(R)]; +convert_options([{<<"checkpoint_interval">>, V} | R]) -> + [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([_ | R]) -> % skip unknown option + convert_options(R). + + +-spec check_options([_]) -> [_]. +check_options(Options) -> + DocIds = lists:keyfind(doc_ids, 1, Options), + Filter = lists:keyfind(filter, 1, Options), + Selector = lists:keyfind(selector, 1, Options), + case {DocIds, Filter, Selector} of + {false, false, false} -> Options; + {false, false, _} -> Options; + {false, _, false} -> Options; + {_, false, false} -> Options; + _ -> + throw({bad_request, + "`doc_ids`,`filter`,`selector` are mutually exclusive"}) + end. + + +-spec parse_proxy_params(binary() | [_]) -> [_]. +parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> + parse_proxy_params(?b2l(ProxyUrl)); +parse_proxy_params([]) -> + []; +parse_proxy_params(ProxyUrl) -> + #url{ + host = Host, + port = Port, + username = User, + password = Passwd, + protocol = Protocol + } = ibrowse_lib:parse_url(ProxyUrl), + [ + {proxy_protocol, Protocol}, + {proxy_host, Host}, + {proxy_port, Port} + ] ++ case is_list(User) andalso is_list(Passwd) of + false -> + []; + true -> + [{proxy_user, User}, {proxy_password, Passwd}] + end. + + +-spec ssl_params([_]) -> [_]. +ssl_params(Url) -> + case ibrowse_lib:parse_url(Url) of + #url{protocol = https} -> + Depth = list_to_integer( + config:get("replicator", "ssl_certificate_max_depth", "3") + ), + VerifyCerts = config:get("replicator", "verify_ssl_certificates"), + CertFile = config:get("replicator", "cert_file", undefined), + KeyFile = config:get("replicator", "key_file", undefined), + Password = config:get("replicator", "password", undefined), + SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")], + SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of + true -> + case Password of + undefined -> + [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts; + _ -> + [{certfile, CertFile}, {keyfile, KeyFile}, + {password, Password}] ++ SslOpts + end; + false -> SslOpts + end, + [{is_ssl, true}, {ssl_options, SslOpts1}]; + #url{protocol = http} -> + [] + end. + + +-spec ssl_verify_options(true | false) -> [_]. +ssl_verify_options(true) -> + CAFile = config:get("replicator", "ssl_trusted_certificates_file"), + [{verify, verify_peer}, {cacertfile, CAFile}]; +ssl_verify_options(false) -> + [{verify, verify_none}]. + + +-spec before_doc_update(#doc{}, #db{}) -> #doc{}. +before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> + Doc; +before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> + #user_ctx{roles = Roles, name = Name} = UserCtx, + case lists:member(<<"_replicator">>, Roles) of + true -> + Doc; + false -> + case couch_util:get_value(?OWNER, Body) of + undefined -> + Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; + Name -> + Doc; + Other -> + case (catch couch_db:check_is_admin(Db)) of + ok when Other =:= null -> + Doc#doc{body = {?replace(Body, ?OWNER, Name)}}; + ok -> + Doc; + _ -> + throw({forbidden, <<"Can't update replication documents", + " from other users.">>}) + end + end + end. + + +-spec after_doc_read(#doc{}, #db{}) -> #doc{}. +after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> + Doc; +after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> + #user_ctx{name = Name} = UserCtx, + case (catch couch_db:check_is_admin(Db)) of + ok -> + Doc; + _ -> + case couch_util:get_value(?OWNER, Body) of + Name -> + Doc; + _Other -> + Source = strip_credentials(couch_util:get_value(<<"source">>, +Body)), + Target = strip_credentials(couch_util:get_value(<<"target">>, +Body)), + NewBody0 = ?replace(Body, <<"source">>, Source), + NewBody = ?replace(NewBody0, <<"target">>, Target), + #doc{revs = {Pos, [_ | Revs]}} = Doc, + NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}}, + NewRevId = couch_db:new_revid(NewDoc), + NewDoc#doc{revs = {Pos, [NewRevId | Revs]}} + end + end. + + +-spec strip_credentials(undefined) -> undefined; + (binary()) -> binary(); + ({[_]}) -> {[_]}. +strip_credentials(undefined) -> + undefined; +strip_credentials(Url) when is_binary(Url) -> + re:replace(Url, + "http(s)?://(?:[^:]+):[^@]+@(.*)$", + "http\\1://\\2", + [{return, binary}]); +strip_credentials({Props}) -> + {lists:keydelete(<<"oauth">>, 1, Props)}. + + +error_reason({shutdown, Error}) -> + error_reason(Error); +error_reason({bad_rep_doc, Reason}) -> + to_binary(Reason); +error_reason({error, {Error, Reason}}) + when is_atom(Error), is_binary(Reason) -> + to_binary(io_lib:format("~s: ~s", [Error, Reason])); +error_reason({error, Reason}) -> + to_binary(Reason); +error_reason(Reason) -> + to_binary(Reason). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +check_options_pass_values_test() -> + ?assertEqual(check_options([]), []), + ?assertEqual(check_options([baz, {other, fiz}]), [baz, {other, fiz}]), + ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]), + ?assertEqual(check_options([{filter, x}]), [{filter, x}]), + ?assertEqual(check_options([{selector, x}]), [{selector, x}]). + + +check_options_fail_values_test() -> + ?assertThrow({bad_request, _}, + check_options([{doc_ids, x}, {filter, y}])), + ?assertThrow({bad_request, _}, + check_options([{doc_ids, x}, {selector, y}])), + ?assertThrow({bad_request, _}, + check_options([{filter, x}, {selector, y}])), + ?assertThrow({bad_request, _}, + check_options([{doc_ids, x}, {selector, y}, {filter, z}])). + + +check_convert_options_pass_test() -> + ?assertEqual([], convert_options([])), + ?assertEqual([], convert_options([{<<"random">>, 42}])), + ?assertEqual([{cancel, true}], + convert_options([{<<"cancel">>, true}])), + ?assertEqual([{create_target, true}], + convert_options([{<<"create_target">>, true}])), + ?assertEqual([{continuous, true}], + convert_options([{<<"continuous">>, true}])), + ?assertEqual([{doc_ids, [<<"id">>]}], + convert_options([{<<"doc_ids">>, [<<"id">>]}])), + ?assertEqual([{selector, {key, value}}], + convert_options([{<<"selector">>, {key, value}}])). + + +check_convert_options_fail_test() -> + ?assertThrow({bad_request, _}, + convert_options([{<<"cancel">>, <<"true">>}])), + ?assertThrow({bad_request, _}, + convert_options([{<<"create_target">>, <<"true">>}])), + ?assertThrow({bad_request, _}, + convert_options([{<<"continuous">>, <<"true">>}])), + ?assertThrow({bad_request, _}, + convert_options([{<<"doc_ids">>, not_a_list}])), + ?assertThrow({bad_request, _}, + convert_options([{<<"selector">>, [{key, value}]}])). + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl new file mode 100644 index 000000000..5668820d1 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_filters.erl @@ -0,0 +1,214 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_filters). + +-export([ + parse/1, + fetch/4, + view_type/2, + ejsort/1 +]). + +-include_lib("couch/include/couch_db.hrl"). + + +% Parse the filter from replication options proplist. +% Return {ok, {FilterType,...}} | {error, ParseError}. +% For `user` filter, i.e. filters specified as user code +% in source database, this code doesn't fetch the filter +% code, but only returns the name of the filter. +-spec parse([_]) -> + {ok, nil} | + {ok, {view, binary(), {[_]}}} | + {ok, {user, {binary(), binary()}, {[_]}}} | + {ok, {docids, [_]}} | + {ok, {mango, {[_]}}} | + {error, binary()}. +parse(Options) -> + Filter = couch_util:get_value(filter, Options), + DocIds = couch_util:get_value(doc_ids, Options), + Selector = couch_util:get_value(selector, Options), + case {Filter, DocIds, Selector} of + {undefined, undefined, undefined} -> + {ok, nil}; + {<<"_", _/binary>>, undefined, undefined} -> + {ok, {view, Filter, query_params(Options)}}; + {_, undefined, undefined} -> + case parse_user_filter(Filter) of + {ok, {Doc, FilterName}} -> + {ok, {user, {Doc, FilterName}, query_params(Options)}}; + {error, Error} -> + {error, Error} + end; + {undefined, _, undefined} -> + {ok, {docids, DocIds}}; + {undefined, undefined, _} -> + {ok, {mango, ejsort(mango_selector:normalize(Selector))}}; + _ -> + Err = "`selector`, `filter` and `doc_ids` are mutually exclusive", + {error, list_to_binary(Err)} + end. + + +% Fetches body of filter function from source database. Guaranteed to either +% return {ok, Body} or an {error, Reason}. Also assume this function might +% block due to network / socket issues for an undeterminted amount of time. +-spec fetch(binary(), binary(), binary(), #user_ctx{}) -> + {ok, {[_]}} | {error, binary()}. +fetch(DDocName, FilterName, Source, UserCtx) -> + {Pid, Ref} = spawn_monitor(fun() -> + try fetch_internal(DDocName, FilterName, Source, UserCtx) of + Resp -> + exit({exit_ok, Resp}) + catch + throw:{fetch_error, Reason} -> + exit({exit_fetch_error, Reason}); + _OtherTag:Reason -> + exit({exit_other_error, Reason}) + end + end), + receive + {'DOWN', Ref, process, Pid, {exit_ok, Resp}} -> + {ok, Resp}; + {'DOWN', Ref, process, Pid, {exit_fetch_error, Reason}} -> + {error, Reason}; + {'DOWN', Ref, process, Pid, {exit_other_error, Reason}} -> + {error, couch_util:to_binary(Reason)} + end. + + +% Get replication type and view (if any) from replication document props +-spec view_type([_], [_]) -> + {view, {binary(), binary()}} | {db, nil} | {error, binary()}. +view_type(Props, Options) -> + case couch_util:get_value(<<"filter">>, Props) of + <<"_view">> -> + {QP} = couch_util:get_value(query_params, Options, {[]}), + ViewParam = couch_util:get_value(<<"view">>, QP), + case re:split(ViewParam, <<"/">>) of + [DName, ViewName] -> + {view, {<< "_design/", DName/binary >>, ViewName}}; + _ -> + {error, <<"Invalid `view` parameter.">>} + end; + _ -> + {db, nil} + end. + + +% Private functions + +fetch_internal(DDocName, FilterName, Source, UserCtx) -> + Db = case (catch couch_replicator_api_wrap:db_open(Source, + [{user_ctx, UserCtx}])) of + {ok, Db0} -> + Db0; + DbError -> + DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s", + [couch_replicator_api_wrap:db_uri(Source), + couch_util:to_binary(DbError)]), + throw({fetch_error, iolist_to_binary(DbErrorMsg)}) + end, + try + Body = case (catch couch_replicator_api_wrap:open_doc( + Db, <<"_design/", DDocName/binary>>, [ejson_body])) of + {ok, #doc{body = Body0}} -> + Body0; + DocError -> + DocErrorMsg = io_lib:format( + "Couldn't open document `_design/~s` from source " + "database `~s`: ~s", [DDocName, + couch_replicator_api_wrap:db_uri(Source), + couch_util:to_binary(DocError)] + ), + throw({fetch_error, iolist_to_binary(DocErrorMsg)}) + end, + try + Code = couch_util:get_nested_json_value( + Body, [<<"filters">>, FilterName]), + re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}]) + catch + _Tag:CodeError -> + CodeErrorMsg = io_lib:format( + "Couldn't parse filter code from document ~s on `~s` " + " Error: ~s", [DDocName, + couch_replicator_api_wrap:db_uri(Source), + couch_util:to_binary(CodeError)] + ), + throw({fetch_error, CodeErrorMsg}) + end + after + couch_replicator_api_wrap:db_close(Db) + end. + + +-spec query_params([_]) -> {[_]}. +query_params(Options)-> + couch_util:get_value(query_params, Options, {[]}). + + +parse_user_filter(Filter) -> + case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of + {match, [DDocName0, FilterName0]} -> + {ok, {DDocName0, FilterName0}}; + _ -> + {error, <<"Invalid filter. Must match `ddocname/filtername`.">>} + end. + + +% Sort an EJSON object's properties to attempt +% to generate a unique representation. This is used +% to reduce the chance of getting different +% replication checkpoints for the same Mango selector +ejsort({V})-> + ejsort_props(V, []); +ejsort(V) when is_list(V) -> + ejsort_array(V, []); +ejsort(V) -> + V. + + +ejsort_props([], Acc)-> + {lists:keysort(1, Acc)}; +ejsort_props([{K, V}| R], Acc) -> + ejsort_props(R, [{K, ejsort(V)} | Acc]). + + +ejsort_array([], Acc)-> + lists:reverse(Acc); +ejsort_array([V | R], Acc) -> + ejsort_array(R, [ejsort(V) | Acc]). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +ejsort_basic_values_test() -> + ?assertEqual(ejsort(0), 0), + ?assertEqual(ejsort(<<"a">>), <<"a">>), + ?assertEqual(ejsort(true), true), + ?assertEqual(ejsort([]), []), + ?assertEqual(ejsort({[]}), {[]}). + + +ejsort_compound_values_test() -> + ?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]), + Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]}, + Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]}, + ?assertEqual(ejsort(Ej1), Ej1s), + Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]}, + ?assertEqual(ejsort(Ej2), + {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}). + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl new file mode 100644 index 000000000..7f26db757 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_ids.erl @@ -0,0 +1,127 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_ids). + +-export([ + replication_id/1, + replication_id/2, + convert/1 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include("couch_replicator.hrl"). + +% replication_id/1 and replication_id/2 will attempt to fetch +% filter code for filtered replications. If fetching or parsing +% of the remotely fetched filter code fails they throw: +% {filter_fetch_error, Error} exception. +% + +replication_id(#rep{options = Options} = Rep) -> + BaseId = replication_id(Rep, ?REP_ID_VERSION), + {BaseId, maybe_append_options([continuous, create_target], Options)}. + + +% Versioned clauses for generating replication IDs. +% If a change is made to how replications are identified, +% please add a new clause and increase ?REP_ID_VERSION. + +replication_id(#rep{user_ctx = UserCtx} = Rep, 3) -> + UUID = couch_server:get_uuid(), + Src = get_rep_endpoint(UserCtx, Rep#rep.source), + Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), + maybe_append_filters([UUID, Src, Tgt], Rep); + +replication_id(#rep{user_ctx = UserCtx} = Rep, 2) -> + {ok, HostName} = inet:gethostname(), + Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of + P when is_number(P) -> + P; + _ -> + % On restart we might be called before the couch_httpd process is + % started. + % TODO: we might be under an SSL socket server only, or both under + % SSL and a non-SSL socket. + % ... mochiweb_socket_server:get(https, port) + list_to_integer(config:get("httpd", "port", "5984")) + end, + Src = get_rep_endpoint(UserCtx, Rep#rep.source), + Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), + maybe_append_filters([HostName, Port, Src, Tgt], Rep); + +replication_id(#rep{user_ctx = UserCtx} = Rep, 1) -> + {ok, HostName} = inet:gethostname(), + Src = get_rep_endpoint(UserCtx, Rep#rep.source), + Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), + maybe_append_filters([HostName, Src, Tgt], Rep). + + +-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}. +convert(Id) when is_list(Id) -> + convert(?l2b(Id)); +convert(Id) when is_binary(Id) -> + lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id)); +convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) -> + Id. + + +% Private functions + +maybe_append_filters(Base, + #rep{source = Source, user_ctx = UserCtx, options = Options}) -> + Base2 = Base ++ + case couch_replicator_filters:parse(Options) of + {ok, nil} -> + []; + {ok, {view, Filter, QueryParams}} -> + [Filter, QueryParams]; + {ok, {user, {Doc, Filter}, QueryParams}} -> + case couch_replicator_filters:fetch(Doc, Filter, Source, UserCtx) of + {ok, Code} -> + [Code, QueryParams]; + {error, Error} -> + throw({filter_fetch_error, Error}) + end; + {ok, {docids, DocIds}} -> + [DocIds]; + {ok, {mango, Selector}} -> + [Selector]; + {error, FilterParseError} -> + throw({error, FilterParseError}) + end, + couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))). + + +maybe_append_options(Options, RepOptions) -> + lists:foldl(fun(Option, Acc) -> + Acc ++ + case couch_util:get_value(Option, RepOptions, false) of + true -> + "+" ++ atom_to_list(Option); + false -> + "" + end + end, [], Options). + + +get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) -> + DefaultHeaders = (#httpdb{})#httpdb.headers, + case OAuth of + nil -> + {remote, Url, Headers -- DefaultHeaders}; + #oauth{} -> + {remote, Url, Headers -- DefaultHeaders, OAuth} + end; +get_rep_endpoint(UserCtx, <<DbName/binary>>) -> + {local, DbName, UserCtx}. diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index e96d52a41..05836d483 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -12,17 +12,28 @@ -module(couch_replicator_utils). --export([parse_rep_doc/2]). --export([open_db/1, close_db/1]). --export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]). --export([replication_id/2]). --export([sum_stats/2, is_deleted/1]). +-export([ + parse_rep_doc/2, + open_db/1, + close_db/1, + start_db_compaction_notifier/2, + stop_db_compaction_notifier/1, + replication_id/2, + sum_stats/2, + is_deleted/1, + rep_error_to_binary/1, + get_json_value/2, + get_json_value/3, + pp_rep_id/1, + iso8601/1, + filter_state/3 +]). --export([handle_db_event/3]). +-export([ + handle_db_event/3 +]). -include_lib("couch/include/couch_db.hrl"). --include_lib("ibrowse/include/ibrowse.hrl"). --include("couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -import(couch_util, [ @@ -31,385 +42,6 @@ ]). -parse_rep_doc({Props}, UserCtx) -> - ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)), - Options = make_options(Props), - case get_value(cancel, Options, false) andalso - (get_value(id, Options, nil) =/= nil) of - true -> - {ok, #rep{options = Options, user_ctx = UserCtx}}; - false -> - Source = parse_rep_db(get_value(<<"source">>, Props), - ProxyParams, Options), - Target = parse_rep_db(get_value(<<"target">>, Props), - ProxyParams, Options), - - - {RepType, View} = case get_value(<<"filter">>, Props) of - <<"_view">> -> - {QP} = get_value(query_params, Options, {[]}), - ViewParam = get_value(<<"view">>, QP), - View1 = case re:split(ViewParam, <<"/">>) of - [DName, ViewName] -> - {<< "_design/", DName/binary >>, ViewName}; - _ -> - throw({bad_request, "Invalid `view` parameter."}) - end, - {view, View1}; - _ -> - {db, nil} - end, - - Rep = #rep{ - source = Source, - target = Target, - options = Options, - user_ctx = UserCtx, - type = RepType, - view = View, - doc_id = get_value(<<"_id">>, Props, null) - }, - {ok, Rep#rep{id = replication_id(Rep)}} - end. - - -replication_id(#rep{options = Options} = Rep) -> - BaseId = replication_id(Rep, ?REP_ID_VERSION), - {BaseId, maybe_append_options([continuous, create_target], Options)}. - - -% Versioned clauses for generating replication IDs. -% If a change is made to how replications are identified, -% please add a new clause and increase ?REP_ID_VERSION. - -replication_id(#rep{user_ctx = UserCtx} = Rep, 3) -> - UUID = couch_server:get_uuid(), - Src = get_rep_endpoint(UserCtx, Rep#rep.source), - Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), - maybe_append_filters([UUID, Src, Tgt], Rep); - -replication_id(#rep{user_ctx = UserCtx} = Rep, 2) -> - {ok, HostName} = inet:gethostname(), - Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of - P when is_number(P) -> - P; - _ -> - % On restart we might be called before the couch_httpd process is - % started. - % TODO: we might be under an SSL socket server only, or both under - % SSL and a non-SSL socket. - % ... mochiweb_socket_server:get(https, port) - list_to_integer(config:get("httpd", "port", "5984")) - end, - Src = get_rep_endpoint(UserCtx, Rep#rep.source), - Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), - maybe_append_filters([HostName, Port, Src, Tgt], Rep); - -replication_id(#rep{user_ctx = UserCtx} = Rep, 1) -> - {ok, HostName} = inet:gethostname(), - Src = get_rep_endpoint(UserCtx, Rep#rep.source), - Tgt = get_rep_endpoint(UserCtx, Rep#rep.target), - maybe_append_filters([HostName, Src, Tgt], Rep). - - -maybe_append_filters(Base, - #rep{source = Source, user_ctx = UserCtx, options = Options}) -> - Filter = get_value(filter, Options), - DocIds = get_value(doc_ids, Options), - Selector = get_value(selector, Options), - Base2 = Base ++ - case {Filter, DocIds, Selector} of - {undefined, undefined, undefined} -> - []; - {<<"_", _/binary>>, undefined, undefined} -> - [Filter, get_value(query_params, Options, {[]})]; - {_, undefined, undefined} -> - [filter_code(Filter, Source, UserCtx), - get_value(query_params, Options, {[]})]; - {undefined, _, undefined} -> - [DocIds]; - {undefined, undefined, _} -> - [ejsort(mango_selector:normalize(Selector))]; - _ -> - throw({error, <<"`selector`, `filter` and `doc_ids` fields are mutually exclusive">>}) - end, - couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))). - - -filter_code(Filter, Source, UserCtx) -> - {DDocName, FilterName} = - case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of - {match, [DDocName0, FilterName0]} -> - {DDocName0, FilterName0}; - _ -> - throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>}) - end, - Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of - {ok, Db0} -> - Db0; - DbError -> - DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s", - [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]), - throw({error, iolist_to_binary(DbErrorMsg)}) - end, - try - Body = case (catch couch_replicator_api_wrap:open_doc( - Db, <<"_design/", DDocName/binary>>, [ejson_body])) of - {ok, #doc{body = Body0}} -> - Body0; - DocError -> - DocErrorMsg = io_lib:format( - "Couldn't open document `_design/~s` from source " - "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source), - couch_util:to_binary(DocError)]), - throw({error, iolist_to_binary(DocErrorMsg)}) - end, - Code = couch_util:get_nested_json_value( - Body, [<<"filters">>, FilterName]), - re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}]) - after - couch_replicator_api_wrap:db_close(Db) - end. - - -maybe_append_options(Options, RepOptions) -> - lists:foldl(fun(Option, Acc) -> - Acc ++ - case get_value(Option, RepOptions, false) of - true -> - "+" ++ atom_to_list(Option); - false -> - "" - end - end, [], Options). - - -get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) -> - DefaultHeaders = (#httpdb{})#httpdb.headers, - case OAuth of - nil -> - {remote, Url, Headers -- DefaultHeaders}; - #oauth{} -> - {remote, Url, Headers -- DefaultHeaders, OAuth} - end; -get_rep_endpoint(UserCtx, <<DbName/binary>>) -> - {local, DbName, UserCtx}. - - -parse_rep_db({Props}, ProxyParams, Options) -> - Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), - {AuthProps} = get_value(<<"auth">>, Props, {[]}), - {BinHeaders} = get_value(<<"headers">>, Props, {[]}), - Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), - DefaultHeaders = (#httpdb{})#httpdb.headers, - OAuth = case get_value(<<"oauth">>, AuthProps) of - undefined -> - nil; - {OauthProps} -> - #oauth{ - consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)), - token = ?b2l(get_value(<<"token">>, OauthProps)), - token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)), - consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)), - signature_method = - case get_value(<<"signature_method">>, OauthProps) of - undefined -> hmac_sha1; - <<"PLAINTEXT">> -> plaintext; - <<"HMAC-SHA1">> -> hmac_sha1; - <<"RSA-SHA1">> -> rsa_sha1 - end - } - end, - #httpdb{ - url = Url, - oauth = OAuth, - headers = lists:ukeymerge(1, Headers, DefaultHeaders), - ibrowse_options = lists:keysort(1, - [{socket_options, get_value(socket_options, Options)} | - ProxyParams ++ ssl_params(Url)]), - timeout = get_value(connection_timeout, Options), - http_connections = get_value(http_connections, Options), - retries = get_value(retries, Options) - }; -parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options); -parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) -> - parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options); -parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) -> - DbName. - - -maybe_add_trailing_slash(Url) when is_binary(Url) -> - maybe_add_trailing_slash(?b2l(Url)); -maybe_add_trailing_slash(Url) -> - case lists:last(Url) of - $/ -> - Url; - _ -> - Url ++ "/" - end. - - -make_options(Props) -> - Options0 = lists:ukeysort(1, convert_options(Props)), - Options = check_options(Options0), - DefWorkers = config:get("replicator", "worker_processes", "4"), - DefBatchSize = config:get("replicator", "worker_batch_size", "500"), - DefConns = config:get("replicator", "http_connections", "20"), - DefTimeout = config:get("replicator", "connection_timeout", "30000"), - DefRetries = config:get("replicator", "retries_per_request", "10"), - UseCheckpoints = config:get("replicator", "use_checkpoints", "true"), - DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"), - {ok, DefSocketOptions} = couch_util:parse_term( - config:get("replicator", "socket_options", - "[{keepalive, true}, {nodelay, false}]")), - lists:ukeymerge(1, Options, lists:keysort(1, [ - {connection_timeout, list_to_integer(DefTimeout)}, - {retries, list_to_integer(DefRetries)}, - {http_connections, list_to_integer(DefConns)}, - {socket_options, DefSocketOptions}, - {worker_batch_size, list_to_integer(DefBatchSize)}, - {worker_processes, list_to_integer(DefWorkers)}, - {use_checkpoints, list_to_existing_atom(UseCheckpoints)}, - {checkpoint_interval, list_to_integer(DefCheckpointInterval)} - ])). - - -convert_options([])-> - []; -convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)-> - throw({bad_request, <<"parameter `cancel` must be a boolean">>}); -convert_options([{<<"cancel">>, V} | R]) -> - [{cancel, V} | convert_options(R)]; -convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>; - IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> - Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)), - [{id, Id} | convert_options(R)]; -convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)-> - throw({bad_request, <<"parameter `create_target` must be a boolean">>}); -convert_options([{<<"create_target">>, V} | R]) -> - [{create_target, V} | convert_options(R)]; -convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)-> - throw({bad_request, <<"parameter `continuous` must be a boolean">>}); -convert_options([{<<"continuous">>, V} | R]) -> - [{continuous, V} | convert_options(R)]; -convert_options([{<<"filter">>, V} | R]) -> - [{filter, V} | convert_options(R)]; -convert_options([{<<"query_params">>, V} | R]) -> - [{query_params, V} | convert_options(R)]; -convert_options([{<<"doc_ids">>, null} | R]) -> - convert_options(R); -convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> - throw({bad_request, <<"parameter `doc_ids` must be an array">>}); -convert_options([{<<"doc_ids">>, V} | R]) -> - % Ensure same behaviour as old replicator: accept a list of percent - % encoded doc IDs. - DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V], - [{doc_ids, DocIds} | convert_options(R)]; -convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> - throw({bad_request, <<"parameter `selector` must be a JSON object">>}); -convert_options([{<<"selector">>, V} | R]) -> - [{selector, V} | convert_options(R)]; -convert_options([{<<"worker_processes">>, V} | R]) -> - [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"worker_batch_size">>, V} | R]) -> - [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"http_connections">>, V} | R]) -> - [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"connection_timeout">>, V} | R]) -> - [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"retries_per_request">>, V} | R]) -> - [{retries, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([{<<"socket_options">>, V} | R]) -> - {ok, SocketOptions} = couch_util:parse_term(V), - [{socket_options, SocketOptions} | convert_options(R)]; -convert_options([{<<"since_seq">>, V} | R]) -> - [{since_seq, V} | convert_options(R)]; -convert_options([{<<"use_checkpoints">>, V} | R]) -> - [{use_checkpoints, V} | convert_options(R)]; -convert_options([{<<"checkpoint_interval">>, V} | R]) -> - [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; -convert_options([_ | R]) -> % skip unknown option - convert_options(R). - -check_options(Options) -> - DocIds = lists:keyfind(doc_ids, 1, Options), - Filter = lists:keyfind(filter, 1, Options), - Selector = lists:keyfind(selector, 1, Options), - case {DocIds, Filter, Selector} of - {false, false, false} -> Options; - {false, false, _} -> Options; - {false, _, false} -> Options; - {_, false, false} -> Options; - _ -> - throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"}) - end. - - -parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> - parse_proxy_params(?b2l(ProxyUrl)); -parse_proxy_params([]) -> - []; -parse_proxy_params(ProxyUrl) -> - #url{ - host = Host, - port = Port, - username = User, - password = Passwd, - protocol = Protocol - } = ibrowse_lib:parse_url(ProxyUrl), - [{proxy_protocol, Protocol}, {proxy_host, Host}, {proxy_port, Port}] ++ - case is_list(User) andalso is_list(Passwd) of - false -> - []; - true -> - [{proxy_user, User}, {proxy_password, Passwd}] - end. - - -ssl_params(Url) -> - case ibrowse_lib:parse_url(Url) of - #url{protocol = https} -> - Depth = list_to_integer( - config:get("replicator", "ssl_certificate_max_depth", "3") - ), - VerifyCerts = config:get("replicator", "verify_ssl_certificates"), - CertFile = config:get("replicator", "cert_file", undefined), - KeyFile = config:get("replicator", "key_file", undefined), - Password = config:get("replicator", "password", undefined), - SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")], - SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of - true -> - case Password of - undefined -> - [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts; - _ -> - [{certfile, CertFile}, {keyfile, KeyFile}, - {password, Password}] ++ SslOpts - end; - false -> SslOpts - end, - [{is_ssl, true}, {ssl_options, SslOpts1}]; - #url{protocol = http} -> - [] - end. - -ssl_verify_options(Value) -> - ssl_verify_options(Value, erlang:system_info(otp_release)). - -ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" -> - CAFile = config:get("replicator", "ssl_trusted_certificates_file"), - [{verify, verify_peer}, {cacertfile, CAFile}]; -ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" -> - [{verify, verify_none}]; -ssl_verify_options(true, _OTPVersion) -> - CAFile = config:get("replicator", "ssl_trusted_certificates_file"), - [{verify, 2}, {cacertfile, CAFile}]; -ssl_verify_options(false, _OTPVersion) -> - [{verify, 0}]. - - -%% New db record has Options field removed here to enable smoother dbcore migration open_db(#db{name = Name, user_ctx = UserCtx}) -> {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]), Db; @@ -444,103 +76,90 @@ handle_db_event(DbName, compacted, Server) -> handle_db_event(_DbName, _Event, Server) -> {ok, Server}. -% Obsolete - remove in next release + +rep_error_to_binary(Error) -> + couch_util:to_binary(error_reason(Error)). + + +error_reason({shutdown, Error}) -> + error_reason(Error); +error_reason({error, {Error, Reason}}) + when is_atom(Error), is_binary(Reason) -> + io_lib:format("~s: ~s", [Error, Reason]); +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. + + +get_json_value(Key, Props) -> + get_json_value(Key, Props, undefined). + +get_json_value(Key, Props, Default) when is_atom(Key) -> + Ref = make_ref(), + case get_value(Key, Props, Ref) of + Ref -> + get_value(?l2b(atom_to_list(Key)), Props, Default); + Else -> + Else + end; +get_json_value(Key, Props, Default) when is_binary(Key) -> + Ref = make_ref(), + case get_value(Key, Props, Ref) of + Ref -> + get_value(list_to_atom(?b2l(Key)), Props, Default); + Else -> + Else + end. + + +% pretty-print replication id +-spec pp_rep_id(#rep{} | rep_id()) -> string(). +pp_rep_id(#rep{id = RepId}) -> + pp_rep_id(RepId); +pp_rep_id({Base, Extension}) -> + Base ++ Extension. + + +% NV: TODO: this function is not used outside api wrap module +% consider moving it there during final cleanup +is_deleted(Change) -> + get_json_value(<<"deleted">>, Change, false). + + +% NV: TODO: proxy some functions which used to be here, later remove +% these and replace calls to their respective modules +replication_id(Rep, Version) -> + couch_replicator_ids:replication_id(Rep, Version). + + sum_stats(S1, S2) -> couch_replicator_stats:sum_stats(S1, S2). -is_deleted(Change) -> - case couch_util:get_value(<<"deleted">>, Change) of - undefined -> - % keep backwards compatibility for a while - couch_util:get_value(deleted, Change, false); - Else -> - Else - end. +parse_rep_doc(Props, UserCtx) -> + couch_replicator_docs:parse_rep_doc(Props, UserCtx). -% Sort an EJSON object's properties to attempt -% to generate a unique representation. This is used -% to reduce the chance of getting different -% replication checkpoints for the same Mango selector -ejsort({V})-> - ejsort_props(V, []); -ejsort(V) when is_list(V) -> - ejsort_array(V, []); -ejsort(V) -> - V. - -ejsort_props([], Acc)-> - {lists:keysort(1, Acc)}; -ejsort_props([{K, V}| R], Acc) -> - ejsort_props(R, [{K, ejsort(V)} | Acc]). - -ejsort_array([], Acc)-> - lists:reverse(Acc); -ejsort_array([V | R], Acc) -> - ejsort_array(R, [ejsort(V) | Acc]). - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - -ejsort_basic_values_test() -> - ?assertEqual(ejsort(0), 0), - ?assertEqual(ejsort(<<"a">>), <<"a">>), - ?assertEqual(ejsort(true), true), - ?assertEqual(ejsort([]), []), - ?assertEqual(ejsort({[]}), {[]}). - -ejsort_compound_values_test() -> - ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]), - Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]}, - Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]}, - ?assertEqual(ejsort(Ej1), Ej1s), - Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]}, - ?assertEqual(ejsort(Ej2), - {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}). - -check_options_pass_values_test() -> - ?assertEqual(check_options([]), []), - ?assertEqual(check_options([baz, {other,fiz}]), [baz, {other, fiz}]), - ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]), - ?assertEqual(check_options([{filter, x}]), [{filter, x}]), - ?assertEqual(check_options([{selector, x}]), [{selector, x}]). - -check_options_fail_values_test() -> - ?assertThrow({bad_request, _}, - check_options([{doc_ids, x}, {filter, y}])), - ?assertThrow({bad_request, _}, - check_options([{doc_ids, x}, {selector, y}])), - ?assertThrow({bad_request, _}, - check_options([{filter, x}, {selector, y}])), - ?assertThrow({bad_request, _}, - check_options([{doc_ids, x}, {selector, y}, {filter, z}])). - -check_convert_options_pass_test() -> - ?assertEqual([], convert_options([])), - ?assertEqual([], convert_options([{<<"random">>, 42}])), - ?assertEqual([{cancel, true}], - convert_options([{<<"cancel">>, true}])), - ?assertEqual([{create_target, true}], - convert_options([{<<"create_target">>, true}])), - ?assertEqual([{continuous, true}], - convert_options([{<<"continuous">>, true}])), - ?assertEqual([{doc_ids, [<<"id">>]}], - convert_options([{<<"doc_ids">>, [<<"id">>]}])), - ?assertEqual([{selector, {key, value}}], - convert_options([{<<"selector">>, {key, value}}])). - -check_convert_options_fail_test() -> - ?assertThrow({bad_request, _}, - convert_options([{<<"cancel">>, <<"true">>}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"create_target">>, <<"true">>}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"continuous">>, <<"true">>}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"doc_ids">>, not_a_list}])), - ?assertThrow({bad_request, _}, - convert_options([{<<"selector">>, [{key, value}]}])). - --endif. + +-spec iso8601(erlang:timestamp()) -> binary(). +iso8601({_Mega, _Sec, _Micro} = Timestamp) -> + {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp), + Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", + iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])). + + +%% Filter replication info ejson by state provided. If it matches return +%% the input value, if it doesn't return 'skip'. This is used from replicator +%% fabric coordinator and worker. +-spec filter_state(atom(), [atom()], {[_ | _]}) -> {[_ | _]} | skip. +filter_state(null = _State, _States, _Info) -> + skip; +filter_state(_ = _State, [] = _States, Info) -> + Info; +filter_state(State, States, Info) -> + case lists:member(State, States) of + true -> + Info; + false -> + skip + end. |