summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-29 22:51:37 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-29 22:51:37 +0000
commitff172f664ccb1271fbd934bd1c7748ef007f0490 (patch)
tree8df50f40a9c6fa6395831ff3cef759f1fc637a73 /src/rabbit_mirror_queue_master.erl
parent15ff8a0d88feefae879faa2d392660152b34d5c6 (diff)
downloadrabbitmq-server-ff172f664ccb1271fbd934bd1c7748ef007f0490.tar.gz
return to a simpler, better BQ:dropwhile, and introduce 'fetchwhile'
...to cover the remaining required functionality, including the ability to process messages along the way, pass around an accumulator, and get hold of the IsDelivered flag (not needed in our use case but included for similarity with 'fetch').
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl46
1 files changed, 27 insertions, 19 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c8a361b1..0ae10d89 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -20,7 +20,7 @@
purge/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
@@ -216,19 +216,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -268,7 +266,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ,
empty ->
{Result, State1};
{#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
- {Result, drop(MsgId, AckTag, State1)}
+ {Result, drop_one(MsgId, AckTag, State1)}
end.
drop(AckRequired, State = #state { backing_queue = BQ,
@@ -277,7 +275,7 @@ drop(AckRequired, State = #state { backing_queue = BQ,
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
empty -> State1;
- {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ {MsgId, AckTag} -> drop_one(MsgId, AckTag, State1)
end}.
ack(AckTags, State = #state { gm = GM,
@@ -440,13 +438,23 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+drop_one(MsgId, AckTag, State = #state { ack_msg_id = AM,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
+
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).