diff options
Diffstat (limited to 'src/fabric/src/fabric_ring.erl')
-rw-r--r-- | src/fabric/src/fabric_ring.erl | 153 |
1 files changed, 73 insertions, 80 deletions
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl index bad0f42d1..2bb7d717f 100644 --- a/src/fabric/src/fabric_ring.erl +++ b/src/fabric/src/fabric_ring.erl @@ -12,7 +12,6 @@ -module(fabric_ring). - -export([ is_progress_possible/1, is_progress_possible/2, @@ -25,21 +24,17 @@ handle_response/5 ]). - -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). - -type fabric_dict() :: [{#shard{}, any()}]. -type ring_opts() :: [atom() | tuple()]. - %% @doc looks for a fully covered keyrange in the list of counters -spec is_progress_possible(fabric_dict()) -> boolean(). is_progress_possible(Counters) -> is_progress_possible(Counters, []). - %% @doc looks for a fully covered keyrange in the list of counters %% This version take ring option to configure how progress will %% be checked. By default, [], checks that the full ring is covered. @@ -47,41 +42,39 @@ is_progress_possible(Counters) -> is_progress_possible(Counters, RingOpts) -> is_progress_possible(Counters, [], 0, ?RING_END, RingOpts). - -spec get_shard_replacements(binary(), [#shard{}]) -> [#shard{}]. get_shard_replacements(DbName, UsedShards0) -> % We only want to generate a replacements list from shards % that aren't already used. AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]), - UsedShards = [S#shard{ref=undefined} || S <- UsedShards0], + UsedShards = [S#shard{ref = undefined} || S <- UsedShards0], get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards). - -spec node_down(node(), fabric_dict(), fabric_dict()) -> {ok, fabric_dict()} | error. node_down(Node, Workers, Responses) -> node_down(Node, Workers, Responses, []). - -spec node_down(node(), fabric_dict(), fabric_dict(), ring_opts()) -> {ok, fabric_dict()} | error. node_down(Node, Workers, Responses, RingOpts) -> {B, E} = range_bounds(Workers, Responses), - Workers1 = fabric_dict:filter(fun(#shard{node = N}, _) -> - N =/= Node - end, Workers), + Workers1 = fabric_dict:filter( + fun(#shard{node = N}, _) -> + N =/= Node + end, + Workers + ), case is_progress_possible(Workers1, Responses, B, E, RingOpts) of true -> {ok, Workers1}; false -> error end. - -spec handle_error(#shard{}, fabric_dict(), fabric_dict()) -> {ok, fabric_dict()} | error. handle_error(Shard, Workers, Responses) -> handle_error(Shard, Workers, Responses, []). - -spec handle_error(#shard{}, fabric_dict(), fabric_dict(), ring_opts()) -> {ok, fabric_dict()} | error. handle_error(Shard, Workers, Responses, RingOpts) -> @@ -92,20 +85,28 @@ handle_error(Shard, Workers, Responses, RingOpts) -> false -> error end. - -spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict()) -> {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}. handle_response(Shard, Response, Workers, Responses) -> handle_response(Shard, Response, Workers, Responses, []). - --spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(), - ring_opts()) -> +-spec handle_response( + #shard{}, + any(), + fabric_dict(), + fabric_dict(), + ring_opts() +) -> {ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}. handle_response(Shard, Response, Workers, Responses, RingOpts) -> - handle_response(Shard, Response, Workers, Responses, RingOpts, - fun stop_workers/1). - + handle_response( + Shard, + Response, + Workers, + Responses, + RingOpts, + fun stop_workers/1 + ). % Worker response handler. Gets reponses from shard and puts them in the list % until they complete a full ring. Then kill unused responses and remaining @@ -139,7 +140,6 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) -> handle_response_all(Workers1, Responses1) end. - handle_response_ring(Workers, Responses, CleanupCb) -> {MinB, MaxE} = range_bounds(Workers, Responses), Ranges = lists:map(fun({R, _, _}) -> R end, Responses), @@ -159,7 +159,6 @@ handle_response_ring(Workers, Responses, CleanupCb) -> {stop, fabric_dict:from_list(UsedResponses)} end. - handle_response_any(Shard, Response, Workers, Any, CleanupCb) -> case lists:member(Shard#shard{ref = undefined}, Any) of true -> @@ -169,7 +168,6 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) -> {ok, {Workers, []}} end. - handle_response_all(Workers, Responses) -> case fabric_dict:size(Workers) =:= 0 of true -> @@ -178,7 +176,6 @@ handle_response_all(Workers, Responses) -> {ok, {Workers, Responses}} end. - % Check if workers still waiting and the already received responses could % still form a continous range. The range won't always be the full ring, and % the bounds are computed based on the minimum and maximum interval beginning @@ -191,19 +188,21 @@ handle_response_all(Workers, Responses) -> % might look like: 00-ff, 00-ff, 07-ff. Even if both 00-ff workers exit, % progress can still be made with the remaining 07-ff copy. % --spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}], - non_neg_integer(), non_neg_integer(), ring_opts()) -> boolean(). +-spec is_progress_possible( + fabric_dict(), + [{any(), #shard{}, any()}], + non_neg_integer(), + non_neg_integer(), + ring_opts() +) -> boolean(). is_progress_possible([], [], _, _, _) -> false; - is_progress_possible(Counters, Responses, MinB, MaxE, []) -> ResponseRanges = lists:map(fun({{B, E}, _, _}) -> {B, E} end, Responses), Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges, mem3_util:get_ring(Ranges, MinB, MaxE) =/= []; - is_progress_possible(Counters, _Responses, _, _, [all]) -> fabric_dict:size(Counters) > 0; - is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) -> InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end, case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of @@ -216,30 +215,37 @@ is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) -> true end. - get_shard_replacements_int(UnusedShards, UsedShards) -> % If we have more than one copy of a range then we don't % want to try and add a replacement to any copy. - RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) -> - dict:update_counter(R, 1, Acc) - end, dict:new(), UsedShards), + RangeCounts = lists:foldl( + fun(#shard{range = R}, Acc) -> + dict:update_counter(R, 1, Acc) + end, + dict:new(), + UsedShards + ), % For each seq shard range with a count of 1, find any % possible replacements from the unused shards. The % replacement list is keyed by range. - lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) -> - case dict:find(Range, RangeCounts) of - {ok, 1} -> - Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E), - % Only keep non-empty lists of replacements - if Repls == [] -> Acc; true -> - [{Range, Repls} | Acc] - end; - _ -> - Acc - end - end, [], UsedShards). - + lists:foldl( + fun(#shard{range = [B, E] = Range}, Acc) -> + case dict:find(Range, RangeCounts) of + {ok, 1} -> + Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E), + % Only keep non-empty lists of replacements + if + Repls == [] -> Acc; + true -> [{Range, Repls} | Acc] + end; + _ -> + Acc + end + end, + [], + UsedShards + ). range_bounds(Workers, Responses) -> RespRanges = lists:map(fun({R, _, _}) -> R end, Responses), @@ -247,31 +253,24 @@ range_bounds(Workers, Responses) -> {Bs, Es} = lists:unzip(Ranges), {lists:min(Bs), lists:max(Es)}. - get_responses([], _) -> []; - get_responses([Range | Ranges], [{Range, Shard, Value} | Resps]) -> [{Shard, Value} | get_responses(Ranges, Resps)]; - get_responses(Ranges, [_DupeRangeResp | Resps]) -> get_responses(Ranges, Resps). - stop_unused_workers(_, _, _, undefined) -> ok; - stop_unused_workers(Workers, AllResponses, UsedResponses, CleanupCb) -> WorkerShards = [S || {S, _} <- Workers], - Used = [S || {S, _} <- UsedResponses], + Used = [S || {S, _} <- UsedResponses], Unused = [S || {_, S, _} <- AllResponses, not lists:member(S, Used)], CleanupCb(WorkerShards ++ Unused). - stop_workers(Shards) when is_list(Shards) -> rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Shards]). - % Unit tests is_progress_possible_full_range_test() -> @@ -297,7 +296,6 @@ is_progress_possible_full_range_test() -> T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]], ?assertEqual(false, is_progress_possible(mk_cnts(T7))). - is_progress_possible_with_responses_test() -> C1 = mk_cnts([[0, ?RING_END]]), ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [])), @@ -322,7 +320,6 @@ is_progress_possible_with_responses_test() -> ?assertEqual(false, is_progress_possible([], RS1, 5, 8, [])), ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])). - is_progress_possible_with_ring_opts_any_test() -> Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}], C1 = [{mk_shard("n1", [0, ?RING_END]), nil}], @@ -340,23 +337,31 @@ is_progress_possible_with_ring_opts_any_test() -> C2 = [{mk_shard("n1", [0, 5]), nil}], ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)). - is_progress_possible_with_ring_opts_all_test() -> C1 = [{mk_shard("n1", [0, ?RING_END]), nil}], ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [all])), ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, [all])). - get_shard_replacements_test() -> - Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [ - {"n1", 11, 20}, {"n1", 21, ?RING_END}, - {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20}, - {"n3", 0, 21, ?RING_END} - ]], - Used = [mk_shard(N, [B, E]) || {N, B, E} <- [ - {"n2", 21, ?RING_END}, - {"n3", 0, 10}, {"n3", 11, 20} - ]], + Unused = [ + mk_shard(N, [B, E]) + || {N, B, E} <- [ + {"n1", 11, 20}, + {"n1", 21, ?RING_END}, + {"n2", 0, 4}, + {"n2", 5, 10}, + {"n2", 11, 20}, + {"n3", 0, 21, ?RING_END} + ] + ], + Used = [ + mk_shard(N, [B, E]) + || {N, B, E} <- [ + {"n2", 21, ?RING_END}, + {"n3", 0, 10}, + {"n3", 11, 20} + ] + ], Res = lists:sort(get_shard_replacements_int(Unused, Used)), % Notice that [0, 10] range can be replaces by spawning the % [0, 4] and [5, 10] workers on n1 @@ -367,7 +372,6 @@ get_shard_replacements_test() -> ], ?assertEqual(Expect, Res). - handle_response_basic_test() -> Shard1 = mk_shard("n1", [0, 1]), Shard2 = mk_shard("n1", [2, ?RING_END]), @@ -383,7 +387,6 @@ handle_response_basic_test() -> Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2). - handle_response_incomplete_ring_test() -> Shard1 = mk_shard("n1", [0, 1]), Shard2 = mk_shard("n1", [2, 10]), @@ -399,7 +402,6 @@ handle_response_incomplete_ring_test() -> Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined), ?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2). - handle_response_multiple_copies_test() -> Shard1 = mk_shard("n1", [0, 1]), Shard2 = mk_shard("n2", [0, 1]), @@ -421,7 +423,6 @@ handle_response_multiple_copies_test() -> % that responded first is included in the ring. ?assertEqual({stop, [{Shard1, 42}, {Shard3, 44}]}, Result3). - handle_response_backtracking_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n1", [10, ?RING_END]), @@ -445,7 +446,6 @@ handle_response_backtracking_test() -> Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined), ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4). - handle_response_ring_opts_any_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n2", [0, 1]), @@ -469,7 +469,6 @@ handle_response_ring_opts_any_test() -> Result3 = handle_response(Shard3, 44, Workers3, [], Opts, undefined), ?assertEqual({stop, [{Shard3, 44}]}, Result3). - handle_response_ring_opts_all_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n2", [0, 1]), @@ -493,7 +492,6 @@ handle_response_ring_opts_all_test() -> Result3 = handle_response(W3, 44, Workers3, [], [all], undefined), ?assertMatch({stop, [_ | _]}, Result3). - handle_error_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n1", [10, ?RING_END]), @@ -516,7 +514,6 @@ handle_error_test() -> {ok, {Workers4, Responses3}} = Result3, ?assertEqual(error, handle_error(Shard4, Workers4, Responses3)). - node_down_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n1", [10, ?RING_END]), @@ -547,20 +544,16 @@ node_down_test() -> ?assertEqual(error, node_down(n3, Workers5, Responses3)). - mk_cnts(Ranges) -> Shards = lists:map(fun mk_shard/1, Ranges), fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil). - mk_resps(RangeNameVals) -> [{{B, E}, mk_shard(Name, [B, E]), V} || {Name, B, E, V} <- RangeNameVals]. - mk_shard([B, E]) when is_integer(B), is_integer(E) -> #shard{range = [B, E]}. - mk_shard(Name, Range) -> Node = list_to_atom(Name), BName = list_to_binary(Name), |