summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_ring.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_ring.erl')
-rw-r--r--src/fabric/src/fabric_ring.erl153
1 files changed, 73 insertions, 80 deletions
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index bad0f42d1..2bb7d717f 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -12,7 +12,6 @@
-module(fabric_ring).
-
-export([
is_progress_possible/1,
is_progress_possible/2,
@@ -25,21 +24,17 @@
handle_response/5
]).
-
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-
-type fabric_dict() :: [{#shard{}, any()}].
-type ring_opts() :: [atom() | tuple()].
-
%% @doc looks for a fully covered keyrange in the list of counters
-spec is_progress_possible(fabric_dict()) -> boolean().
is_progress_possible(Counters) ->
is_progress_possible(Counters, []).
-
%% @doc looks for a fully covered keyrange in the list of counters
%% This version take ring option to configure how progress will
%% be checked. By default, [], checks that the full ring is covered.
@@ -47,41 +42,39 @@ is_progress_possible(Counters) ->
is_progress_possible(Counters, RingOpts) ->
is_progress_possible(Counters, [], 0, ?RING_END, RingOpts).
-
-spec get_shard_replacements(binary(), [#shard{}]) -> [#shard{}].
get_shard_replacements(DbName, UsedShards0) ->
% We only want to generate a replacements list from shards
% that aren't already used.
AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
- UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
+ UsedShards = [S#shard{ref = undefined} || S <- UsedShards0],
get_shard_replacements_int(AllLiveShards -- UsedShards, UsedShards).
-
-spec node_down(node(), fabric_dict(), fabric_dict()) ->
{ok, fabric_dict()} | error.
node_down(Node, Workers, Responses) ->
node_down(Node, Workers, Responses, []).
-
-spec node_down(node(), fabric_dict(), fabric_dict(), ring_opts()) ->
{ok, fabric_dict()} | error.
node_down(Node, Workers, Responses, RingOpts) ->
{B, E} = range_bounds(Workers, Responses),
- Workers1 = fabric_dict:filter(fun(#shard{node = N}, _) ->
- N =/= Node
- end, Workers),
+ Workers1 = fabric_dict:filter(
+ fun(#shard{node = N}, _) ->
+ N =/= Node
+ end,
+ Workers
+ ),
case is_progress_possible(Workers1, Responses, B, E, RingOpts) of
true -> {ok, Workers1};
false -> error
end.
-
-spec handle_error(#shard{}, fabric_dict(), fabric_dict()) ->
{ok, fabric_dict()} | error.
handle_error(Shard, Workers, Responses) ->
handle_error(Shard, Workers, Responses, []).
-
-spec handle_error(#shard{}, fabric_dict(), fabric_dict(), ring_opts()) ->
{ok, fabric_dict()} | error.
handle_error(Shard, Workers, Responses, RingOpts) ->
@@ -92,20 +85,28 @@ handle_error(Shard, Workers, Responses, RingOpts) ->
false -> error
end.
-
-spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict()) ->
{ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
handle_response(Shard, Response, Workers, Responses) ->
handle_response(Shard, Response, Workers, Responses, []).
-
--spec handle_response(#shard{}, any(), fabric_dict(), fabric_dict(),
- ring_opts()) ->
+-spec handle_response(
+ #shard{},
+ any(),
+ fabric_dict(),
+ fabric_dict(),
+ ring_opts()
+) ->
{ok, {fabric_dict(), fabric_dict()}} | {stop, fabric_dict()}.
handle_response(Shard, Response, Workers, Responses, RingOpts) ->
- handle_response(Shard, Response, Workers, Responses, RingOpts,
- fun stop_workers/1).
-
+ handle_response(
+ Shard,
+ Response,
+ Workers,
+ Responses,
+ RingOpts,
+ fun stop_workers/1
+ ).
% Worker response handler. Gets reponses from shard and puts them in the list
% until they complete a full ring. Then kill unused responses and remaining
@@ -139,7 +140,6 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
handle_response_all(Workers1, Responses1)
end.
-
handle_response_ring(Workers, Responses, CleanupCb) ->
{MinB, MaxE} = range_bounds(Workers, Responses),
Ranges = lists:map(fun({R, _, _}) -> R end, Responses),
@@ -159,7 +159,6 @@ handle_response_ring(Workers, Responses, CleanupCb) ->
{stop, fabric_dict:from_list(UsedResponses)}
end.
-
handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
case lists:member(Shard#shard{ref = undefined}, Any) of
true ->
@@ -169,7 +168,6 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
{ok, {Workers, []}}
end.
-
handle_response_all(Workers, Responses) ->
case fabric_dict:size(Workers) =:= 0 of
true ->
@@ -178,7 +176,6 @@ handle_response_all(Workers, Responses) ->
{ok, {Workers, Responses}}
end.
-
% Check if workers still waiting and the already received responses could
% still form a continous range. The range won't always be the full ring, and
% the bounds are computed based on the minimum and maximum interval beginning
@@ -191,19 +188,21 @@ handle_response_all(Workers, Responses) ->
% might look like: 00-ff, 00-ff, 07-ff. Even if both 00-ff workers exit,
% progress can still be made with the remaining 07-ff copy.
%
--spec is_progress_possible(fabric_dict(), [{any(), #shard{}, any()}],
- non_neg_integer(), non_neg_integer(), ring_opts()) -> boolean().
+-spec is_progress_possible(
+ fabric_dict(),
+ [{any(), #shard{}, any()}],
+ non_neg_integer(),
+ non_neg_integer(),
+ ring_opts()
+) -> boolean().
is_progress_possible([], [], _, _, _) ->
false;
-
is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
ResponseRanges = lists:map(fun({{B, E}, _, _}) -> {B, E} end, Responses),
Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges,
mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
-
is_progress_possible(Counters, _Responses, _, _, [all]) ->
fabric_dict:size(Counters) > 0;
-
is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
@@ -216,30 +215,37 @@ is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
true
end.
-
get_shard_replacements_int(UnusedShards, UsedShards) ->
% If we have more than one copy of a range then we don't
% want to try and add a replacement to any copy.
- RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
- dict:update_counter(R, 1, Acc)
- end, dict:new(), UsedShards),
+ RangeCounts = lists:foldl(
+ fun(#shard{range = R}, Acc) ->
+ dict:update_counter(R, 1, Acc)
+ end,
+ dict:new(),
+ UsedShards
+ ),
% For each seq shard range with a count of 1, find any
% possible replacements from the unused shards. The
% replacement list is keyed by range.
- lists:foldl(fun(#shard{range = [B, E] = Range}, Acc) ->
- case dict:find(Range, RangeCounts) of
- {ok, 1} ->
- Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
- % Only keep non-empty lists of replacements
- if Repls == [] -> Acc; true ->
- [{Range, Repls} | Acc]
- end;
- _ ->
- Acc
- end
- end, [], UsedShards).
-
+ lists:foldl(
+ fun(#shard{range = [B, E] = Range}, Acc) ->
+ case dict:find(Range, RangeCounts) of
+ {ok, 1} ->
+ Repls = mem3_util:non_overlapping_shards(UnusedShards, B, E),
+ % Only keep non-empty lists of replacements
+ if
+ Repls == [] -> Acc;
+ true -> [{Range, Repls} | Acc]
+ end;
+ _ ->
+ Acc
+ end
+ end,
+ [],
+ UsedShards
+ ).
range_bounds(Workers, Responses) ->
RespRanges = lists:map(fun({R, _, _}) -> R end, Responses),
@@ -247,31 +253,24 @@ range_bounds(Workers, Responses) ->
{Bs, Es} = lists:unzip(Ranges),
{lists:min(Bs), lists:max(Es)}.
-
get_responses([], _) ->
[];
-
get_responses([Range | Ranges], [{Range, Shard, Value} | Resps]) ->
[{Shard, Value} | get_responses(Ranges, Resps)];
-
get_responses(Ranges, [_DupeRangeResp | Resps]) ->
get_responses(Ranges, Resps).
-
stop_unused_workers(_, _, _, undefined) ->
ok;
-
stop_unused_workers(Workers, AllResponses, UsedResponses, CleanupCb) ->
WorkerShards = [S || {S, _} <- Workers],
- Used = [S || {S, _} <- UsedResponses],
+ Used = [S || {S, _} <- UsedResponses],
Unused = [S || {_, S, _} <- AllResponses, not lists:member(S, Used)],
CleanupCb(WorkerShards ++ Unused).
-
stop_workers(Shards) when is_list(Shards) ->
rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Shards]).
-
% Unit tests
is_progress_possible_full_range_test() ->
@@ -297,7 +296,6 @@ is_progress_possible_full_range_test() ->
T7 = [[0, 10], [13, 20], [21, ?RING_END], [9, 12]],
?assertEqual(false, is_progress_possible(mk_cnts(T7))).
-
is_progress_possible_with_responses_test() ->
C1 = mk_cnts([[0, ?RING_END]]),
?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [])),
@@ -322,7 +320,6 @@ is_progress_possible_with_responses_test() ->
?assertEqual(false, is_progress_possible([], RS1, 5, 8, [])),
?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).
-
is_progress_possible_with_ring_opts_any_test() ->
Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
@@ -340,23 +337,31 @@ is_progress_possible_with_ring_opts_any_test() ->
C2 = [{mk_shard("n1", [0, 5]), nil}],
?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).
-
is_progress_possible_with_ring_opts_all_test() ->
C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [all])),
?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, [all])).
-
get_shard_replacements_test() ->
- Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
- {"n1", 11, 20}, {"n1", 21, ?RING_END},
- {"n2", 0, 4}, {"n2", 5, 10}, {"n2", 11, 20},
- {"n3", 0, 21, ?RING_END}
- ]],
- Used = [mk_shard(N, [B, E]) || {N, B, E} <- [
- {"n2", 21, ?RING_END},
- {"n3", 0, 10}, {"n3", 11, 20}
- ]],
+ Unused = [
+ mk_shard(N, [B, E])
+ || {N, B, E} <- [
+ {"n1", 11, 20},
+ {"n1", 21, ?RING_END},
+ {"n2", 0, 4},
+ {"n2", 5, 10},
+ {"n2", 11, 20},
+ {"n3", 0, 21, ?RING_END}
+ ]
+ ],
+ Used = [
+ mk_shard(N, [B, E])
+ || {N, B, E} <- [
+ {"n2", 21, ?RING_END},
+ {"n3", 0, 10},
+ {"n3", 11, 20}
+ ]
+ ],
Res = lists:sort(get_shard_replacements_int(Unused, Used)),
% Notice that [0, 10] range can be replaces by spawning the
% [0, 4] and [5, 10] workers on n1
@@ -367,7 +372,6 @@ get_shard_replacements_test() ->
],
?assertEqual(Expect, Res).
-
handle_response_basic_test() ->
Shard1 = mk_shard("n1", [0, 1]),
Shard2 = mk_shard("n1", [2, ?RING_END]),
@@ -383,7 +387,6 @@ handle_response_basic_test() ->
Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
-
handle_response_incomplete_ring_test() ->
Shard1 = mk_shard("n1", [0, 1]),
Shard2 = mk_shard("n1", [2, 10]),
@@ -399,7 +402,6 @@ handle_response_incomplete_ring_test() ->
Result2 = handle_response(Shard2, 43, Workers2, Responses1, [], undefined),
?assertEqual({stop, [{Shard1, 42}, {Shard2, 43}]}, Result2).
-
handle_response_multiple_copies_test() ->
Shard1 = mk_shard("n1", [0, 1]),
Shard2 = mk_shard("n2", [0, 1]),
@@ -421,7 +423,6 @@ handle_response_multiple_copies_test() ->
% that responded first is included in the ring.
?assertEqual({stop, [{Shard1, 42}, {Shard3, 44}]}, Result3).
-
handle_response_backtracking_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n1", [10, ?RING_END]),
@@ -445,7 +446,6 @@ handle_response_backtracking_test() ->
Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined),
?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
-
handle_response_ring_opts_any_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n2", [0, 1]),
@@ -469,7 +469,6 @@ handle_response_ring_opts_any_test() ->
Result3 = handle_response(Shard3, 44, Workers3, [], Opts, undefined),
?assertEqual({stop, [{Shard3, 44}]}, Result3).
-
handle_response_ring_opts_all_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n2", [0, 1]),
@@ -493,7 +492,6 @@ handle_response_ring_opts_all_test() ->
Result3 = handle_response(W3, 44, Workers3, [], [all], undefined),
?assertMatch({stop, [_ | _]}, Result3).
-
handle_error_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n1", [10, ?RING_END]),
@@ -516,7 +514,6 @@ handle_error_test() ->
{ok, {Workers4, Responses3}} = Result3,
?assertEqual(error, handle_error(Shard4, Workers4, Responses3)).
-
node_down_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n1", [10, ?RING_END]),
@@ -547,20 +544,16 @@ node_down_test() ->
?assertEqual(error, node_down(n3, Workers5, Responses3)).
-
mk_cnts(Ranges) ->
Shards = lists:map(fun mk_shard/1, Ranges),
fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil).
-
mk_resps(RangeNameVals) ->
[{{B, E}, mk_shard(Name, [B, E]), V} || {Name, B, E, V} <- RangeNameVals].
-
mk_shard([B, E]) when is_integer(B), is_integer(E) ->
#shard{range = [B, E]}.
-
mk_shard(Name, Range) ->
Node = list_to_atom(Name),
BName = list_to_binary(Name),