diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-29 22:51:37 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-29 22:51:37 +0000 |
commit | ff172f664ccb1271fbd934bd1c7748ef007f0490 (patch) | |
tree | 8df50f40a9c6fa6395831ff3cef759f1fc637a73 /src/rabbit_mirror_queue_master.erl | |
parent | 15ff8a0d88feefae879faa2d392660152b34d5c6 (diff) | |
download | rabbitmq-server-ff172f664ccb1271fbd934bd1c7748ef007f0490.tar.gz |
return to a simpler, better BQ:dropwhile, and introduce 'fetchwhile'
...to cover the remaining required functionality, including the
ability to process messages along the way, pass around an accumulator,
and get hold of the IsDelivered flag (not needed in our use case but
included for similarity with 'fetch').
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c8a361b1..0ae10d89 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -20,7 +20,7 @@ purge/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, - dropwhile/3, set_ram_duration_target/2, ram_duration/1, + dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, foreach_ack/3]). @@ -216,19 +216,17 @@ discard(MsgId, ChPid, State = #state { gm = GM, State end. -dropwhile(Pred, AckRequired, - State = #state{gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> +dropwhile(Pred, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), - Len1 = BQ:len(BQS1), - Dropped = Len - Len1, - case Dropped of - 0 -> ok; - _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}) - end, - {Next, Msgs, State #state { backing_queue_state = BQS1 } }. + {Next, BQS1} = BQ:dropwhile(Pred, BQS), + {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}. + +fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS), + {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -268,7 +266,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ, empty -> {Result, State1}; {#basic_message{id = MsgId}, _IsDelivered, AckTag} -> - {Result, drop(MsgId, AckTag, State1)} + {Result, drop_one(MsgId, AckTag, State1)} end. drop(AckRequired, State = #state { backing_queue = BQ, @@ -277,7 +275,7 @@ drop(AckRequired, State = #state { backing_queue = BQ, State1 = State #state { backing_queue_state = BQS1 }, {Result, case Result of empty -> State1; - {MsgId, AckTag} -> drop(MsgId, AckTag, State1) + {MsgId, AckTag} -> drop_one(MsgId, AckTag, State1) end}. ack(AckTags, State = #state { gm = GM, @@ -440,13 +438,23 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop(MsgId, AckTag, State = #state { ack_msg_id = AM, - gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> +drop_one(MsgId, AckTag, State = #state { ack_msg_id = AM, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. +drop(PrevLen, AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + case PrevLen - Len of + 0 -> State; + Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}), + State + end. + maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). |