diff options
author | Garren Smith <garren.smith@gmail.com> | 2018-08-08 09:56:10 +0200 |
---|---|---|
committer | garren smith <garren.smith@gmail.com> | 2018-08-08 19:24:38 +0200 |
commit | a6bc72e76c56a1befa8675b06016ecda46ef3a2d (patch) | |
tree | f8227e28677a5f9044bed4209f91728955a0174d /src/mango | |
parent | a7f2aa5175c8fad8f946c3d2ff79558b74b8ee18 (diff) | |
download | couchdb-a6bc72e76c56a1befa8675b06016ecda46ef3a2d.tar.gz |
Move mango selector matching to the shard level
This moves the Mango selector matching down to the shard level.
this would mean that the document is retrieved from the index and
matched against the selector before being sent to the coordinator node.
This reduces the network traffic for a mango query
Co-authored-by: Paul J. Davis <paul.joseph.davis@gmail.com>
Co-authored-by: Garren Smith <garren.smith@gmail.com>
Diffstat (limited to 'src/mango')
-rw-r--r-- | src/mango/src/mango_cursor_view.erl | 112 | ||||
-rw-r--r-- | src/mango/src/mango_execution_stats.erl | 7 | ||||
-rw-r--r-- | src/mango/test/20-no-timeout-test.py | 38 |
3 files changed, 141 insertions, 16 deletions
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index dbea36e77..51ec68c45 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -19,6 +19,7 @@ ]). -export([ + view_cb/2, handle_message/2, handle_all_docs_message/2, composite_indexes/2, @@ -28,9 +29,13 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). +-include_lib("fabric/include/fabric.hrl"). + -include("mango_cursor.hrl"). -include("mango_idx_view.hrl"). +-define(HEARTBEAT_INTERVAL_IN_USEC, 4000000). + create(Db, Indexes, Selector, Opts) -> FieldRanges = mango_idx_view:field_ranges(Selector), Composited = composite_indexes(Indexes, FieldRanges), @@ -93,13 +98,14 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) -> maybe_replace_max_json(EndKey) -> EndKey. -base_args(#cursor{index = Idx} = Cursor) -> +base_args(#cursor{index = Idx, selector = Selector} = Cursor) -> #mrargs{ view_type = map, reduce = false, start_key = mango_idx:start_key(Idx, Cursor#cursor.ranges), end_key = mango_idx:end_key(Idx, Cursor#cursor.ranges), - include_docs = true + include_docs = true, + extra = [{callback, {?MODULE, view_cb}}, {selector, Selector}] }. @@ -210,22 +216,84 @@ choose_best_index(_DbName, IndexRanges) -> {SelectedIndex, SelectedIndexRanges}. +view_cb({meta, Meta}, Acc) -> + % Map function starting + put(mango_docs_examined, 0), + set_mango_msg_timestamp(), + ok = rexi:stream2({meta, Meta}), + {ok, Acc}; +view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> + ViewRow = #view_row{ + id = couch_util:get_value(id, Row), + key = couch_util:get_value(key, Row), + doc = couch_util:get_value(doc, Row) + }, + case ViewRow#view_row.doc of + undefined -> + ViewRow2 = ViewRow#view_row{ + value = couch_util:get_value(value, Row) + }, + ok = rexi:stream2(ViewRow2), + put(mango_docs_examined, 0), + set_mango_msg_timestamp(); + Doc -> + Selector = couch_util:get_value(selector, Options), + case mango_selector:match(Selector, Doc) of + true -> + ViewRow2 = ViewRow#view_row{ + value = get(mango_docs_examined) + 1 + }, + ok = rexi:stream2(ViewRow2), + put(mango_docs_examined, 0), + set_mango_msg_timestamp(); + false -> + put(mango_docs_examined, get(mango_docs_examined) + 1), + maybe_send_mango_ping() + end + end, + {ok, Acc}; +view_cb(complete, Acc) -> + % Finish view output + ok = rexi:stream_last(complete), + {ok, Acc}; +view_cb(ok, ddoc_updated) -> + rexi:reply({ok, ddoc_updated}). + + +maybe_send_mango_ping() -> + Current = os:timestamp(), + LastPing = get(mango_last_msg_timestamp), + % Fabric will timeout if it has not heard a response from a worker node + % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen. + case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of + false -> + ok; + true -> + rexi:ping(), + set_mango_msg_timestamp() + end. + + +set_mango_msg_timestamp() -> + put(mango_last_msg_timestamp, os:timestamp()). + + handle_message({meta, _}, Cursor) -> {ok, Cursor}; handle_message({row, Props}, Cursor) -> - case doc_member(Cursor#cursor.db, Props, Cursor#cursor.opts, Cursor#cursor.execution_stats) of + case doc_member(Cursor, Props) of {ok, Doc, {execution_stats, ExecutionStats1}} -> Cursor1 = Cursor#cursor { execution_stats = ExecutionStats1 }, - case mango_selector:match(Cursor1#cursor.selector, Doc) of - true -> - Cursor2 = update_bookmark_keys(Cursor1, Props), - FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields), - handle_doc(Cursor2, FinalDoc); - false -> - {ok, Cursor1} - end; + Cursor2 = update_bookmark_keys(Cursor1, Props), + FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields), + handle_doc(Cursor2, FinalDoc); + {no_match, _, {execution_stats, ExecutionStats1}} -> + Cursor1 = Cursor#cursor { + execution_stats = ExecutionStats1 + }, + {ok, Cursor1}; Error -> couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]), {ok, Cursor} @@ -332,17 +400,31 @@ apply_opts([{_, _} | Rest], Args) -> apply_opts(Rest, Args). -doc_member(Db, RowProps, Opts, ExecutionStats) -> +doc_member(Cursor, RowProps) -> + Db = Cursor#cursor.db, + Opts = Cursor#cursor.opts, + ExecutionStats = Cursor#cursor.execution_stats, + Selector = Cursor#cursor.selector, + Incr = case couch_util:get_value(value, RowProps) of + N when is_integer(N) -> N; + _ -> 1 + end, case couch_util:get_value(doc, RowProps) of {DocProps} -> - ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats), + ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats, Incr), {ok, {DocProps}, {execution_stats, ExecutionStats1}}; undefined -> ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats), Id = couch_util:get_value(id, RowProps), case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of - {ok, #doc{}=Doc} -> - {ok, couch_doc:to_json_obj(Doc, []), {execution_stats, ExecutionStats1}}; + {ok, #doc{}=DocProps} -> + Doc = couch_doc:to_json_obj(DocProps, []), + case mango_selector:match(Selector, Doc) of + true -> + {ok, Doc, {execution_stats, ExecutionStats1}}; + false -> + {no_match, Doc, {execution_stats, ExecutionStats1}} + end; Else -> Else end diff --git a/src/mango/src/mango_execution_stats.erl b/src/mango/src/mango_execution_stats.erl index afdb417b7..7e8afd782 100644 --- a/src/mango/src/mango_execution_stats.erl +++ b/src/mango/src/mango_execution_stats.erl @@ -17,6 +17,7 @@ to_json/1, incr_keys_examined/1, incr_docs_examined/1, + incr_docs_examined/2, incr_quorum_docs_examined/1, incr_results_returned/1, log_start/1, @@ -45,8 +46,12 @@ incr_keys_examined(Stats) -> incr_docs_examined(Stats) -> + incr_docs_examined(Stats, 1). + + +incr_docs_examined(Stats, N) -> Stats#execution_stats { - totalDocsExamined = Stats#execution_stats.totalDocsExamined + 1 + totalDocsExamined = Stats#execution_stats.totalDocsExamined + N }. diff --git a/src/mango/test/20-no-timeout-test.py b/src/mango/test/20-no-timeout-test.py new file mode 100644 index 000000000..93dc146a3 --- /dev/null +++ b/src/mango/test/20-no-timeout-test.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import mango +import copy +import unittest + +class LongRunningMangoTest(mango.DbPerClass): + + def setUp(self): + self.db.recreate() + docs = [] + for i in range(100000): + docs.append({ + "_id": str(i), + "another": "field" + }) + if i % 20000 == 0: + self.db.save_docs(docs) + docs = [] + + # This test should run to completion and not timeout + def test_query_does_not_time_out(self): + selector = { + "_id": {"$gt": 0}, + "another": "wrong" + } + docs = self.db.find(selector) + self.assertEqual(len(docs), 0) |