summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-02 15:13:35 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-02 15:13:35 +0100
commit19cb6e7f8c890fd3c9b1fa563a54a80129843bd4 (patch)
treef2239944bd09a4f921c2c2674d3bd3ab4b8b3e41
parent364b3898e14f9c801397639ce3df9d5f92588cb0 (diff)
downloadrabbitmq-server-19cb6e7f8c890fd3c9b1fa563a54a80129843bd4.tar.gz
Drop unneeded queue impls, tidy lqueue, add specs etc
-rw-r--r--src/bpqueue.erl273
-rw-r--r--src/finger_tree.erl264
-rw-r--r--src/fqueue.erl141
-rw-r--r--src/lqueue.erl24
-rw-r--r--src/rabbit_tests.erl138
-rw-r--r--src/rabbit_variable_queue.erl8
6 files changed, 28 insertions, 820 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
deleted file mode 100644
index c7d89998..00000000
--- a/src/bpqueue.erl
+++ /dev/null
@@ -1,273 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-%%
-
--module(bpqueue).
-
-%% Block-prefixed queue. From the perspective of the queue interface
-%% the datastructure acts like a regular queue where each value is
-%% paired with the prefix.
-%%
-%% This is implemented as a queue of queues, which is more space and
-%% time efficient, whilst supporting the normal queue interface. Each
-%% inner queue has a prefix, which does not need to be unique, and it
-%% is guaranteed that no two consecutive blocks have the same
-%% prefix. len/1 returns the flattened length of the queue and is
-%% O(1).
-
--export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1, map_fold_filter_l/4,
- map_fold_filter_r/4]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--export_type([bpqueue/0]).
-
--type(bpqueue() :: {non_neg_integer(), queue()}).
--type(prefix() :: any()).
--type(value() :: any()).
--type(result() :: ({'empty', bpqueue()} |
- {{'value', prefix(), value()}, bpqueue()})).
-
--spec(new/0 :: () -> bpqueue()).
--spec(is_empty/1 :: (bpqueue()) -> boolean()).
--spec(len/1 :: (bpqueue()) -> non_neg_integer()).
--spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
--spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
--spec(out/1 :: (bpqueue()) -> result()).
--spec(out_r/1 :: (bpqueue()) -> result()).
--spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()).
--spec(foldl/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
--spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
--spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
--spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
--spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())),
- (fun ((value(), B) ->
- ({prefix(), value(), B} | 'stop'))),
- B,
- bpqueue()) ->
- {bpqueue(), B}).
--spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())),
- (fun ((value(), B) ->
- ({prefix(), value(), B} | 'stop'))),
- B,
- bpqueue()) ->
- {bpqueue(), B}).
-
--endif.
-
--define(QUEUE, finger_tree).
-
-%%----------------------------------------------------------------------------
-
-new() -> {0, ?QUEUE:new()}.
-
-is_empty({0, _Q}) -> true;
-is_empty(_BPQ) -> false.
-
-len({N, _Q}) -> N.
-
-in(Prefix, Value, {0, Q}) ->
- {1, ?QUEUE:in({Prefix, ?QUEUE:from_list([Value])}, Q)};
-in(Prefix, Value, BPQ) ->
- in1({fun ?QUEUE:in/2, fun ?QUEUE:out_r/1}, Prefix, Value, BPQ).
-
-in_r(Prefix, Value, BPQ = {0, _Q}) ->
- in(Prefix, Value, BPQ);
-in_r(Prefix, Value, BPQ) ->
- in1({fun ?QUEUE:in_r/2, fun ?QUEUE:out/1}, Prefix, Value, BPQ).
-
-in1({In, Out}, Prefix, Value, {N, Q}) ->
- {N+1, case Out(Q) of
- {{value, {Prefix, InnerQ}}, Q1} ->
- In({Prefix, In(Value, InnerQ)}, Q1);
- {{value, {_Prefix, _InnerQ}}, _Q1} ->
- In({Prefix, ?QUEUE:in(Value, ?QUEUE:new())}, Q)
- end}.
-
-in_q(Prefix, Queue, BPQ = {0, Q}) ->
- case ?QUEUE:len(Queue) of
- 0 -> BPQ;
- N -> {N, ?QUEUE:in({Prefix, Queue}, Q)}
- end;
-in_q(Prefix, Queue, BPQ) ->
- in_q1({fun ?QUEUE:in/2, fun ?QUEUE:out_r/1,
- fun ?QUEUE:join/2},
- Prefix, Queue, BPQ).
-
-in_q_r(Prefix, Queue, BPQ = {0, _Q}) ->
- in_q(Prefix, Queue, BPQ);
-in_q_r(Prefix, Queue, BPQ) ->
- in_q1({fun ?QUEUE:in_r/2, fun ?QUEUE:out/1,
- fun (T, H) -> ?QUEUE:join(H, T) end},
- Prefix, Queue, BPQ).
-
-in_q1({In, Out, Join}, Prefix, Queue, BPQ = {N, Q}) ->
- case ?QUEUE:len(Queue) of
- 0 -> BPQ;
- M -> {N + M, case Out(Q) of
- {{value, {Prefix, InnerQ}}, Q1} ->
- In({Prefix, Join(InnerQ, Queue)}, Q1);
- {{value, {_Prefix, _InnerQ}}, _Q1} ->
- In({Prefix, Queue}, Q)
- end}
- end.
-
-out({0, _Q} = BPQ) -> {empty, BPQ};
-out(BPQ) -> out1({fun ?QUEUE:in_r/2, fun ?QUEUE:out/1}, BPQ).
-
-out_r({0, _Q} = BPQ) -> {empty, BPQ};
-out_r(BPQ) -> out1({fun ?QUEUE:in/2, fun ?QUEUE:out_r/1}, BPQ).
-
-out1({In, Out}, {N, Q}) ->
- {{value, {Prefix, InnerQ}}, Q1} = Out(Q),
- {{value, Value}, InnerQ1} = Out(InnerQ),
- Q2 = case ?QUEUE:is_empty(InnerQ1) of
- true -> Q1;
- false -> In({Prefix, InnerQ1}, Q1)
- end,
- {{value, Prefix, Value}, {N-1, Q2}}.
-
-join({0, _Q}, BPQ) ->
- BPQ;
-join(BPQ, {0, _Q}) ->
- BPQ;
-join({NHead, QHead}, {NTail, QTail}) ->
- {{value, {Prefix, InnerQHead}}, QHead1} = ?QUEUE:out_r(QHead),
- {NHead + NTail,
- case ?QUEUE:out(QTail) of
- {{value, {Prefix, InnerQTail}}, QTail1} ->
- ?QUEUE:join(
- ?QUEUE:in({Prefix, ?QUEUE:join(InnerQHead, InnerQTail)}, QHead1),
- QTail1);
- {{value, {_Prefix, _InnerQTail}}, _QTail1} ->
- ?QUEUE:join(QHead, QTail)
- end}.
-
-foldl(_Fun, Init, {0, _Q}) -> Init;
-foldl( Fun, Init, {_N, Q}) -> fold1(fun ?QUEUE:out/1, Fun, Init, Q).
-
-foldr(_Fun, Init, {0, _Q}) -> Init;
-foldr( Fun, Init, {_N, Q}) -> fold1(fun ?QUEUE:out_r/1, Fun, Init, Q).
-
-fold1(Out, Fun, Init, Q) ->
- case Out(Q) of
- {empty, _Q} ->
- Init;
- {{value, {Prefix, InnerQ}}, Q1} ->
- fold1(Out, Fun, fold1(Out, Fun, Prefix, Init, InnerQ), Q1)
- end.
-
-fold1(Out, Fun, Prefix, Init, InnerQ) ->
- case Out(InnerQ) of
- {empty, _Q} ->
- Init;
- {{value, Value}, InnerQ1} ->
- fold1(Out, Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1)
- end.
-
-from_list(List) ->
- {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} =
- lists:foldl(
- fun ({_Prefix, []}, Acc) ->
- Acc;
- ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
- {Prefix, ?QUEUE:join(InnerQ, ?QUEUE:from_list(InnerList)),
- ListOfPQs, LenAcc + length(InnerList)};
- ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
- {Prefix1, ?QUEUE:from_list(InnerList),
- [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)}
- end, {undefined, ?QUEUE:new(), [], 0}, List),
- ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1],
- [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2),
- {Len, ?QUEUE:from_list(case ?QUEUE:is_empty(InnerQ1) of
- true -> Rest;
- false -> All
- end)}.
-
-to_list({0, _Q}) -> [];
-to_list({_N, Q}) -> [{Prefix, ?QUEUE:to_list(InnerQ)} ||
- {Prefix, InnerQ} <- ?QUEUE:to_list(Q)].
-
-%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
-%% where FilterFun(Prefix) -> boolean()
-%% Fun(Value, Init) -> {Prefix, Value, Init} | stop
-%%
-%% The filter fun allows you to skip very quickly over blocks that
-%% you're not interested in. Such blocks appear in the resulting bpq
-%% without modification. The Fun is then used both to map the value,
-%% which also allows you to change the prefix (and thus block) of the
-%% value, and also to modify the Init/Acc (just like a fold). If the
-%% Fun returns 'stop' then it is not applied to any further items.
-map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
- {BPQ, Init};
-map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun ?QUEUE:out/1, fun ?QUEUE:in/2,
- fun in_q/3, fun join/2},
- N, PFilter, Fun, Init, Q, new()).
-
-map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
- {BPQ, Init};
-map_fold_filter_r(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun ?QUEUE:out_r/1, fun ?QUEUE:in_r/2,
- fun in_q_r/3, fun (T, H) -> join(H, T) end},
- N, PFilter, Fun, Init, Q, new()).
-
-map_fold_filter1(Funs = {Out, _In, InQ, Join}, Len, PFilter, Fun,
- Init, Q, QNew) ->
- case Out(Q) of
- {empty, _Q} ->
- {QNew, Init};
- {{value, {Prefix, InnerQ}}, Q1} ->
- case PFilter(Prefix) of
- true ->
- {Init1, QNew1, Cont} =
- map_fold_filter2(Funs, Fun, Prefix, Prefix,
- Init, InnerQ, QNew, ?QUEUE:new()),
- case Cont of
- false -> {Join(QNew1, {Len - len(QNew1), Q1}), Init1};
- true -> map_fold_filter1(Funs, Len, PFilter, Fun,
- Init1, Q1, QNew1)
- end;
- false ->
- map_fold_filter1(Funs, Len, PFilter, Fun,
- Init, Q1, InQ(Prefix, InnerQ, QNew))
- end
- end.
-
-map_fold_filter2(Funs = {Out, In, InQ, _Join}, Fun, OrigPrefix, Prefix,
- Init, InnerQ, QNew, InnerQNew) ->
- case Out(InnerQ) of
- {empty, _Q} ->
- {Init, InQ(OrigPrefix, InnerQ,
- InQ(Prefix, InnerQNew, QNew)), true};
- {{value, Value}, InnerQ1} ->
- case Fun(Value, Init) of
- stop ->
- {Init, InQ(OrigPrefix, InnerQ,
- InQ(Prefix, InnerQNew, QNew)), false};
- {Prefix1, Value1, Init1} ->
- {Prefix2, QNew1, InnerQNew1} =
- case Prefix1 =:= Prefix of
- true -> {Prefix, QNew, In(Value1, InnerQNew)};
- false -> {Prefix1, InQ(Prefix, InnerQNew, QNew),
- In(Value1, ?QUEUE:new())}
- end,
- map_fold_filter2(Funs, Fun, OrigPrefix, Prefix2,
- Init1, InnerQ1, QNew1, InnerQNew1)
- end
- end.
diff --git a/src/finger_tree.erl b/src/finger_tree.erl
deleted file mode 100644
index a48a5fc7..00000000
--- a/src/finger_tree.erl
+++ /dev/null
@@ -1,264 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
-%%
-
--module(finger_tree).
-
-%% This is an Erlang implementation of 2-3 finger trees, as defined by
-%% Ralf Hinze and Ross Paterson in their "Finger Trees: A Simple
-%% General-purpose Data Structure" paper[0].
-%%
-%% As usual with a queue-like thing, adding and removing from either
-%% end is O(1) amortized.
-%%
-%% On the whole, nearly everything else is O(log_2(N)), including
-%% join. Whilst there are instances of list append (++) in the code,
-%% it's only lists that are bounded in length to four items.
-%%
-%% Because Erlang lacks type classes, it's currently forced to cache
-%% sizes. This allows len to be O(1), and permits split_at based on
-%% index, which still remains O(log_2(N)) which demonstrates we're
-%% able to take advantage of the tree structure. However, it's not
-%% difficult to imagine a callback mechanism to allow the monoid
-%% implementation to vary. See the paper[0] for examples of other
-%% things that could be implemented.
-%%
-%% [0]: http://www.soi.city.ac.uk/~ross/papers/FingerTree.html
-
-
--compile([export_all]).
-
--record(finger_tree_single, {elem}).
--record(finger_tree_deep, {measured, prefix, middle, suffix}).
--record(node, {measured, tuple}).
--record(digit, {list}).
-
-%% queue interface
-new() -> finger_tree_empty.
-
-in(A, FT) -> cons_l(A, FT).
-in_r(A, FT) -> cons_r(A, FT).
-
-out(FT) -> uncons_r(FT).
-out_r(FT) -> uncons_l(FT).
-
-len(FT) -> measure(FT).
-
-%% To/From list
-cons(A, L) -> [A | L].
-to_list(FT) -> reduce_r(fun cons/2, FT, []).
-from_list(L) -> reduce_r_in(L, finger_tree_empty).
-
-is_empty(finger_tree_empty) -> true;
-is_empty(#finger_tree_single {}) -> false;
-is_empty(#finger_tree_deep {}) -> false.
-
-%% Smart constructors
-digit(A) -> #digit { list = A }.
-
-node2(A, B) ->
- #node { tuple = {A, B}, measured = add(measure(A), measure(B)) }.
-node3(A, B, C) ->
- #node { tuple = {A, B, C},
- measured = add(add(measure(A), measure(B)), measure(C)) }.
-
-deep(Prefix, Middle, Suffix) ->
- #finger_tree_deep { measured = add(add(measure(Prefix),
- measure(Middle)),
- measure(Suffix)),
- prefix = Prefix, middle = Middle, suffix = Suffix }.
-
-deep_r(#digit { list = [] }, M, S) ->
- case uncons_r(M) of
- {empty, _ } -> from_list(S);
- {{value, A}, M1} -> deep(digit(to_list(A)), M1, S)
- end;
-deep_r(P, M, S) ->
- deep(P, M, S).
-
-deep_l(P, M, #digit { list = [] }) ->
- case uncons_l(M) of
- {empty, _} -> from_list(P);
- {{value, A}, M1} -> deep(P, M1, digit(to_list(A)))
- end;
-deep_l(P, M, S) ->
- deep(P, M, S).
-
-%% Monoid
-measure(finger_tree_empty) -> null();
-measure(#finger_tree_single { elem = E }) -> measure(E);
-measure(#finger_tree_deep{ measured = V }) -> V;
-
-measure(#node { measured = V }) -> V;
-
-measure(#digit { list = Xs }) ->
- reduce_l(fun (A, I) -> add(I, measure(A)) end, Xs, null());
-
-%% Size implementation of monoid
-measure(_) -> 1.
-
-null() -> 0.
-add(A, B) -> A + B.
-
-
-%% Reduce primitives
-reduce_r(_Fun, finger_tree_empty, Z) -> Z;
-reduce_r(Fun, #finger_tree_single { elem = E }, Z) -> Fun(E, Z);
-reduce_r(Fun, #finger_tree_deep { prefix = P, middle = M, suffix = S }, Z) ->
- R = reduce_r(fun (A, B) -> reduce_r(Fun, A, B) end, M, reduce_r(Fun, S, Z)),
- reduce_r(Fun, P, R);
-
-reduce_r(Fun, #node { tuple = {A, B} }, Z) -> Fun(A, Fun(B, Z));
-reduce_r(Fun, #node { tuple = {A, B, C} }, Z) -> Fun(A, Fun(B, Fun(C, Z)));
-
-reduce_r(Fun, #digit { list = List }, Z) -> lists:foldr(Fun, Z, List);
-
-reduce_r(Fun, X, Z) when is_list(X) -> lists:foldr(Fun, Z, X).
-
-reduce_l(_Fun, finger_tree_empty, Z) -> Z;
-reduce_l(Fun, #finger_tree_single { elem = E }, Z) -> Fun(Z, E);
-reduce_l(Fun, #finger_tree_deep { prefix = P, middle = M, suffix = S }, Z) ->
- L = reduce_l(fun (A, B) -> reduce_l(Fun, A, B) end, reduce_l(Fun, Z, P), M),
- reduce_l(Fun, L, S);
-
-reduce_l(Fun, #node { tuple = {A, B} }, Z) -> Fun(Fun(Z, B), A);
-reduce_l(Fun, #node { tuple = {A, B, C} }, Z) -> Fun(Fun(Fun(Z, C), B), A);
-
-reduce_l(Fun, #digit { list = List }, Z) -> lists:foldl(Fun, Z, List);
-
-reduce_l(Fun, X, Z) when is_list(X) -> lists:foldl(Fun, Z, X).
-
-reduce_r_in(X, FT) -> reduce_r(fun cons_r/2, X, FT).
-reduce_l_in(X, FT) -> reduce_l(fun cons_l/2, X, FT).
-
-
-%% Consing
-cons_r(A, finger_tree_empty) ->
- #finger_tree_single { elem = A };
-cons_r(A, #finger_tree_single { elem = B }) ->
- deep(digit([A]), finger_tree_empty, digit([B]));
-cons_r(A, #finger_tree_deep { prefix = #digit { list = [B, C, D, E] }, middle = M, suffix = S }) ->
- deep(digit([A, B]), cons_r(node3(C, D, E), M), S);
-cons_r(A, #finger_tree_deep { prefix = #digit { list = P }, middle = M, suffix = S }) ->
- deep(digit([A | P]), M, S).
-
-cons_l(A, finger_tree_empty) ->
- #finger_tree_single { elem = A };
-cons_l(A, #finger_tree_single { elem = B }) ->
- deep(digit([B]), finger_tree_empty, digit([A]));
-cons_l(A, #finger_tree_deep { prefix = P, middle = M, suffix = #digit { list = [E, D, C, B] } }) ->
- deep(P, cons_l(node3(E, D, C), M), digit([B, A]));
-cons_l(A, #finger_tree_deep { prefix = P, middle = M, suffix = #digit { list = S } }) ->
- deep(P, M, digit(S ++ [A])).
-
-%% Unconsing
-uncons_r(finger_tree_empty = FT) ->
- {empty, FT};
-uncons_r(#finger_tree_single { elem = E }) ->
- {{value, E}, finger_tree_empty};
-uncons_r(#finger_tree_deep { prefix = #digit { list = [A | P] }, middle = M, suffix = S }) ->
- {{value, A}, deep_r(digit(P), M, S)}.
-
-uncons_l(finger_tree_empty = FT) ->
- {empty, FT};
-uncons_l(#finger_tree_single { elem = E }) ->
- {{value, E}, finger_tree_empty};
-uncons_l(#finger_tree_deep { prefix = P, middle = M, suffix = #digit { list = S } }) ->
- case S of
- [A] -> {{value, A}, deep_l(P, M, digit([]))};
- _ -> [A | S1] = lists:reverse(S),
- {{value, A}, deep_l(P, M, digit(lists:reverse(S1)))}
- end.
-
-%% Joining
-ft_nodes([A, B]) -> [node2(A, B)];
-ft_nodes([A, B, C]) -> [node3(A, B, C)];
-ft_nodes([A, B, C, D]) -> [node2(A, B), node2(C, D)];
-ft_nodes([A, B, C | Xs]) -> [node3(A, B, C) | ft_nodes(Xs)].
-
-app3(finger_tree_empty, Ts, Xs) ->
- reduce_r_in(Ts, Xs);
-app3(Xs, Ts, finger_tree_empty) ->
- reduce_l_in(Ts, Xs);
-app3(#finger_tree_single { elem = E }, Ts, Xs) ->
- cons_r(E, reduce_r_in(Ts, Xs));
-app3(Xs, Ts, #finger_tree_single { elem = E }) ->
- cons_l(E, reduce_l_in(Ts, Xs));
-app3(#finger_tree_deep { prefix = P1, middle = M1, suffix = #digit { list = S1 } }, #digit { list = Ts },
- #finger_tree_deep { prefix = #digit { list = P2 }, middle = M2, suffix = S2 }) ->
- deep(P1, app3(M1, digit(ft_nodes(S1 ++ (Ts ++ P2))), M2), S2).
-
-join(FT1, FT2) -> app3(FT1, digit([]), FT2).
-
-%% Splitting
-split_digit(_Pred, _Init, #digit { list = [A] }) ->
- {split, digit([]), A, digit([])};
-split_digit(Pred, Init, #digit { list = [A | List] }) ->
- Init1 = add(Init, measure(A)),
- case Pred(Init1) of
- true -> {split, digit([]), A, digit(List)};
- false -> {split, #digit { list = L }, X, R} = split_digit(Pred, Init1, digit(List)),
- {split, digit([A | L]), X, R}
- end.
-
-split_node(Pred, Init, #node { tuple = {A, B} }) ->
- Init1 = add(Init, measure(A)),
- case Pred(Init1) of
- true -> {split, digit([]), A, digit([B])};
- false -> {split, digit([A]), B, digit([])}
- end;
-split_node(Pred, Init, #node { tuple = {A, B, C} }) ->
- Init1 = add(Init, measure(A)),
- case Pred(Init1) of
- true -> {split, digit([]), A, digit([B, C])};
- false -> Init2 = add(Init1, measure(B)),
- case Pred(Init2) of
- true -> {split, digit([A]), B, digit([C])};
- false -> {split, digit([A, B]), C, digit([])}
- end
- end.
-
-split_tree(_Pred, _Init, #finger_tree_single { elem = E }) ->
- {split, finger_tree_empty, E, finger_tree_empty};
-split_tree(Pred, Init, #finger_tree_deep { prefix = P, middle = M, suffix = S }) ->
- VP = add(Init, measure(P)),
- case Pred(VP) of
- true ->
- {split, #digit { list = L }, X, R} = split_digit(Pred, Init, P),
- {split, from_list(L), X, deep_r(R, M, S)};
- false ->
- VM = add(VP, measure(M)),
- case VM /= VP andalso Pred(VM) of
- true ->
- {split, ML, Xs, MR} = split_tree(Pred, VP, M),
- Init1 = add(VP, measure(ML)),
- {split, L, X, R} = split_node(Pred, Init1, Xs),
- {split, deep_l(P, ML, L), X, deep_r(R, MR, S)};
- false ->
- {split, L, X, #digit { list = R }} = split_digit(Pred, VM, S),
- {split, deep_l(P, M, L), X, from_list(R)}
- end
- end.
-
-split(_Pred, finger_tree_empty) ->
- {finger_tree_empty, finger_tree_empty};
-split(Pred, FT) ->
- {split, L, X, R} = split_tree(Pred, null(), FT),
- case Pred(measure(FT)) of
- true -> {L, cons_r(X, R)};
- false -> {FT, finger_tree_empty}
- end.
-
-split_at(N, FT) -> split(fun (E) -> N < E end, FT).
diff --git a/src/fqueue.erl b/src/fqueue.erl
deleted file mode 100644
index 6337dea9..00000000
--- a/src/fqueue.erl
+++ /dev/null
@@ -1,141 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
-%%
-
--module(fqueue).
-
-%% This is a queue module, but optimised so that join is O(1). This
-%% forces the queue to go deep and then there are some neat pivotting
-%% tricks on out and out_r such that we ensure that we don't
-%% constantly have to dive deep into the structure. However, in a deep
-%% queue case, repeated out and out_r applications will be more
-%% expensive. This is the pathological case.
-
-%% Creation, inspection and conversion
--export([new/0,is_queue/1,is_empty/1,len/1,to_list/1,from_list/1,member/2]).
-%% Original style API
--export([in/2,in_r/2,out/1,out_r/1]).
-%% Less garbage style API
-%%-export([get/1,get_r/1,peek/1,peek_r/1,drop/1,drop_r/1]).
-
-%% Higher level API
-%%-export([reverse/1,join/2,split/2,filter/2]).
--export([join/2]).
-
--record(fq, {len, head, tail}).
-
-new() -> #fq { len = 0, head = [], tail = [] }.
-
-is_queue(#fq { len = L, head = H, tail = T })
- when is_integer(L) andalso is_list(H) andalso is_list(T) ->
- true;
-is_queue(_) ->
- false.
-
-is_empty(#fq { len = 0 }) ->
- true;
-is_empty(_) ->
- false.
-
-len(FQ) -> FQ#fq.len.
-
-to_list(FQ) ->
- to_list(false, FQ).
-
-to_list(true, List) when is_list(List) ->
- lists:foldl(fun (#fq{} = Q, Acc) -> to_list(false, Q) ++ Acc;
- (V, Acc) -> [unescape(V) | Acc]
- end, [], List);
-to_list(false, List) when is_list(List) ->
- lists:foldr(fun (#fq{} = Q, Acc) -> to_list(true, Q) ++ Acc;
- (V, Acc) -> [unescape(V) | Acc]
- end, [], List);
-to_list(Reverse, #fq { head = H, tail = T }) ->
- to_list(Reverse, H) ++ to_list(not Reverse, T).
-
-from_list(L) -> #fq { len = length(L), head = [escape(V) || V <- L], tail = [] }.
-
-member(X, #fq { head = H, tail = T }) ->
- member(X, H) orelse member(X, T);
-member(X, List) when is_list(List) ->
- lists:any(fun (E) -> member(X, unescape(E)) end, List);
-member(X, X) -> true.
-
-in(X, #fq { len = 0 } = Q) ->
- Q #fq { len = 1, head = [escape(X)] };
-in(X, #fq { len = L, tail = T } = Q) ->
- Q #fq { len = L+1, tail = [escape(X) | T] }.
-
-in_r(X, #fq { len = 0 } = Q) ->
- Q #fq { len = 1, tail = [escape(X)] };
-in_r(X, #fq { len = L, head = H } = Q) ->
- Q #fq { len = L+1, head = [escape(X) | H] }.
-
-out(#fq { len = 0 } = Q) ->
- {empty, Q};
-out(#fq { head = [#fq{} = IQ], tail = [] }) ->
- out(IQ);
-out(#fq { tail = [#fq{} = IQ], head = [] }) ->
- out(IQ);
-out(#fq { len = L, head = [#fq{ len = L1, tail = T1 } = IQ | H], tail = T }) ->
- %% Essentially we pivot so that the IQ becomes the outer, and we
- %% stuff ourselves at the end
- out(IQ #fq { len = L, tail = [#fq { len = L - L1, head = H, tail = T } | T1] });
-out(#fq { len = L, head = [V], tail = T }) ->
- {{value, unescape(V)}, #fq { len = L-1, head = lists:reverse(T), tail = [] }};
-out(#fq { len = L, head = [V | H] } = Q) ->
- {{value, unescape(V)}, Q #fq { len = L-1, head = H }};
-out(#fq { head = [], tail = T } = Q) ->
- out(Q #fq { head = lists:reverse(T), tail = [] }).
-
-out_r(#fq { len = 0 } = Q) ->
- {empty, Q};
-out_r(#fq { tail = [#fq{} = IQ], head = [] }) ->
- out_r(IQ);
-out_r(#fq { head = [#fq{} = IQ], tail = [] }) ->
- out_r(IQ);
-out_r(#fq { len = L, tail = [#fq{ len = L1, head = H1 } = IQ | T], head = H }) ->
- %% Essentially we pivot so that the IQ becomes the outer, and we
- %% stuff ourselves at the start
- out_r(IQ #fq { len = L, head = [#fq { len = L - L1, tail = T, head = H } | H1] });
-out_r(#fq { len = L, tail = [V], head = H }) ->
- {{value, unescape(V)}, #fq { len = L-1, tail = lists:reverse(H), head = [] }};
-out_r(#fq { len = L, tail = [V | T] } = Q) ->
- {{value, unescape(V)}, Q #fq { len = L-1, tail = T }};
-out_r(#fq { tail = [], head = H } = Q) ->
- out_r(Q #fq { tail = lists:reverse(H), head = [] }).
-
-join(Q, #fq { len = 0 }) ->
- Q;
-join(#fq { len = 0 }, Q) ->
- Q;
-join(Q, #fq { len = 1 } = Q1) ->
- {{value, V}, _} = out(Q1),
- in(V, Q);
-join(#fq { len = 1 } = Q, Q1) ->
- {{value, V}, _} = out(Q),
- in_r(V, Q1);
-join(#fq { len = L, tail = T } = Q, #fq { len = L1 } = Q1) ->
- Q #fq { len = L+L1, tail = [Q1 | T] }.
-
-
--compile({inline, [{escape,1},{unescape,1}]}).
-
-escape(#fq{} = V) -> {escaped, V};
-escape({escaped, _} = V) -> {escaped, V};
-escape(V) -> V.
-
-unescape({escaped, V}) -> V;
-unescape(V) -> V.
diff --git a/src/lqueue.erl b/src/lqueue.erl
index ea54ffd4..3ab8c9f0 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -21,6 +21,30 @@
-define(QUEUE, queue).
+-ifdef(use_specs).
+
+-export_type([?MODULE/0]).
+
+-type(?MODULE() :: {non_neg_integer(), ?MODULE()}).
+-type(value() :: any()).
+-type(result() :: ({'empty', ?MODULE()} |
+ {{'value', value()}, ?MODULE()})).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+-spec(len/1 :: (?MODULE()) -> non_neg_integer()).
+-spec(in/2 :: (value(), ?MODULE()) -> ?MODULE()).
+-spec(in_r/2 :: (value(), ?MODULE()) -> ?MODULE()).
+-spec(out/1 :: (?MODULE()) -> result()).
+-spec(out_r/1 :: (?MODULE()) -> result()).
+-spec(join/2 :: (?MODULE(), ?MODULE()) -> ?MODULE()).
+-spec(foldl/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
+-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B).
+-spec(from_list/1 :: ([value()]) -> ?MODULE()).
+-spec(to_list/1 :: (?MODULE()) -> [value()]).
+
+-endif.
+
new() -> {0, ?QUEUE:new()}.
is_empty({0, _Q}) -> true;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e5d6a737..a0d07493 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -44,7 +44,6 @@ all_tests() ->
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
- passed = test_bpqueue(),
passed = test_pg_local(),
passed = test_unfold(),
passed = test_supervisor_delayed_restart(),
@@ -262,143 +261,6 @@ test_priority_queue(Q) ->
priority_queue:to_list(Q),
priority_queue_out_all(Q)}.
-test_bpqueue() ->
- Q = bpqueue:new(),
- true = bpqueue:is_empty(Q),
- 0 = bpqueue:len(Q),
- [] = bpqueue:to_list(Q),
-
- Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1,
- fun bpqueue:to_list/1,
- fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4),
- Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1,
- fun (QR) -> lists:reverse(
- [{P, lists:reverse(L)} ||
- {P, L} <- bpqueue:to_list(QR)])
- end,
- fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4),
-
- [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)),
- [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)),
- [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] =
- bpqueue:to_list(bpqueue:join(Q1, Q2)),
-
- [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] =
- bpqueue:to_list(bpqueue:join(Q1, Q1)),
-
- [{foo, [1, 2]}, {bar, [3]}] =
- bpqueue:to_list(
- bpqueue:from_list(
- [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])),
-
- [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])),
-
- {4, [a,b,c,d]} =
- bpqueue:foldl(
- fun (Prefix, Value, {Prefix, Acc}) ->
- {Prefix + 1, [Value | Acc]}
- end,
- {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])),
-
- [{bar,3}, {foo,2}, {foo,1}] =
- bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2),
-
- BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}],
- BPQ = bpqueue:from_list(BPQL),
-
- %% no effect
- {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ),
- {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ),
- {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ),
- {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ),
-
- %% process 1 item
- {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffl([foo,bar], {foo, [2]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffl([bar], {bar, [4]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} =
- bpqueue_mffr([foo,bar], {foo, [6]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} =
- bpqueue_mffr([bar], {baz, [4]}, BPQ),
-
- %% change prefix
- {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} =
- bpqueue_mffl([foo,bar], {bar, []}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} =
- bpqueue_mffl([foo], {bar, [5]}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} =
- bpqueue_mffl([foo], {bar, [7]}, BPQ),
- {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} =
- bpqueue_mffl([bar], {foo, [5]}, BPQ),
- {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} =
- bpqueue_mffl([foo], {bar, []}, BPQ),
- {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} =
- bpqueue_mffl([bar], {foo, []}, BPQ),
-
- %% edge cases
- {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} =
- bpqueue_mffl([foo], {foo, [5]}, BPQ),
- {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} =
- bpqueue_mffr([foo], {foo, [2]}, BPQ),
-
- passed.
-
-bpqueue_test(In, Out, List, Fold, MapFoldFilter) ->
- Q = bpqueue:new(),
- {empty, _Q} = Out(Q),
-
- ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q),
- {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end,
- fun(_V, _N) -> throw(explosion) end, 0, Q),
- [] = bpqueue:to_list(Q1M),
-
- Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))),
- false = bpqueue:is_empty(Q1),
- 3 = bpqueue:len(Q1),
- [{foo, [1, 2]}, {bar, [3]}] = List(Q1),
-
- {{value, foo, 1}, Q3} = Out(Q1),
- {{value, foo, 2}, Q4} = Out(Q3),
- {{value, bar, 3}, _Q5} = Out(Q4),
-
- F = fun (QN) ->
- MapFoldFilter(fun (foo) -> true;
- (_) -> false
- end,
- fun (2, _Num) -> stop;
- (V, Num) -> {bar, -V, V - Num} end,
- 0, QN)
- end,
- {Q6, 0} = F(Q),
- [] = bpqueue:to_list(Q6),
- {Q7, 1} = F(Q1),
- [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7),
-
- Q1.
-
-bpqueue_mffl(FF1A, FF2A, BPQ) ->
- bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ).
-
-bpqueue_mffr(FF1A, FF2A, BPQ) ->
- bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ).
-
-bpqueue_mff(Fold, FF1A, FF2A, BPQ) ->
- FF1 = fun (Prefixes) ->
- fun (P) -> lists:member(P, Prefixes) end
- end,
- FF2 = fun ({Prefix, Stoppers}) ->
- fun (Val, Num) ->
- case lists:member(Val, Stoppers) of
- true -> stop;
- false -> {Prefix, -Val, 1 + Num}
- end
- end
- end,
- Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end,
-
- Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)).
-
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 310d9357..4712a620 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -316,11 +316,11 @@
end_seq_id :: non_neg_integer() }).
-type(state() :: #vqstate {
- q1 :: queue(),
- q2 :: queue(),
+ q1 :: ?QUEUE:?QUEUE(),
+ q2 :: ?QUEUE:?QUEUE(),
delta :: delta(),
- q3 :: queue(),
- q4 :: queue(),
+ q3 :: ?QUEUE:?QUEUE(),
+ q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),