summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-24 17:53:15 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-24 17:53:15 +0100
commitea7ca9c0d32ed23df7296db66a1f8e6da7f99c3d (patch)
treec6cc478ebe728fd0e0164cc9f851a8179ed3b3a0
parent6865582704b6dd15d95184e2d552a6848d1fc714 (diff)
downloadrabbitmq-server-ea7ca9c0d32ed23df7296db66a1f8e6da7f99c3d.tar.gz
Glad to see I'm not the only one who's made this mistake: release and remove are two different things
-rw-r--r--src/rabbit_variable_queue.erl12
1 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 958a2903..e4a81311 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -509,7 +509,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end.
ack(AckTags, State) ->
- a(ack(fun (_AckEntry, State1) -> State1 end, AckTags, State)).
+ a(ack(remove, fun (_AckEntry, State1) -> State1 end, AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { durable = IsDurable,
@@ -561,7 +561,8 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
requeue(AckTags, State) ->
a(reduce_memory_use(
- ack(fun (#msg_status { msg = Msg }, State1) ->
+ ack(release,
+ fun (#msg_status { msg = Msg }, State1) ->
{_SeqId, State2} = publish(Msg, true, false, State1),
State2;
({IsPersistent, Guid}, State1) ->
@@ -847,9 +848,10 @@ beta_fold(Fun, Init, Q) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-ack(_Fun, [], State) ->
+ack(_ReleaseOrRemove, _Fun, [], State) ->
State;
-ack(Fun, AckTags, State) ->
+ack(ReleaseOrRemove, Fun, AckTags, State) when ReleaseOrRemove =:= remove orelse
+ ReleaseOrRemove =:= release ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
@@ -867,7 +869,7 @@ ack(Fun, AckTags, State) ->
end, {{[], dict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:release(MsgStore, Guids)
+ rabbit_msg_store:ReleaseOrRemove(MsgStore, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;