summaryrefslogtreecommitdiff
path: root/src/priority_queue.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-21 15:50:27 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-21 15:50:27 +0100
commitc153e81488594754505bfab34d1409781cb972e9 (patch)
tree0e2440f385c3b8977bc7109280b5b97cdf7a8416 /src/priority_queue.erl
parentb78f55bda18f00f5c19eb4c7eea0b247e9767c18 (diff)
downloadrabbitmq-server-c153e81488594754505bfab34d1409781cb972e9.tar.gz
Transplant consumer priorities from bug 25553.
Diffstat (limited to 'src/priority_queue.erl')
-rw-r--r--src/priority_queue.erl37
1 files changed, 35 insertions, 2 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 6995c3be..c76c0d33 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -40,8 +40,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
- out/1, join/2]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
+ in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]).
%%----------------------------------------------------------------------------
@@ -59,10 +59,16 @@
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
+-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
+-spec(fold/3 ::
+ (fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
+-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
@@ -96,6 +102,9 @@ to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
+from_list(L) ->
+ lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
+
in(Item, Q) ->
in(Item, 0, Q).
@@ -147,6 +156,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
+out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
+
+add_p(R, P) -> case R of
+ {empty, Q} -> {empty, Q};
+ {{value, V}, Q} -> {{value, V, P}, Q}
+ end.
+
join(A, {queue, [], [], 0}) ->
A;
join({queue, [], [], 0}, B) ->
@@ -185,6 +202,22 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
+filter(Pred, Q) -> fold(fun(V, P, Acc) ->
+ case Pred(V) of
+ true -> in(V, P, Acc);
+ false -> Acc
+ end
+ end, new(), Q).
+
+fold(Fun, Init, Q) -> case out_p(Q) of
+ {empty, _Q} -> Init;
+ {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
+ end.
+
+highest({queue, [], [], 0}) -> empty;
+highest({queue, _, _, _}) -> 0;
+highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
+
r2f([], 0) -> {queue, [], [], 0};
r2f([_] = R, 1) -> {queue, [], R, 1};
r2f([X,Y], 2) -> {queue, [X], [Y], 2};