summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-21 14:22:51 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-21 14:22:51 +0100
commit1d2e94174aaf0339315b2a1fe8d701d877776a95 (patch)
tree4a790343a8f9335d7030e8b3cfff5b8f32cba884
parent42c8d70bf7ae3a1e348b160238da9970b57215de (diff)
downloadrabbitmq-server-1d2e94174aaf0339315b2a1fe8d701d877776a95.tar.gz
Stop persistent msgs in non-durable queues from reaching disk
-rw-r--r--src/rabbit_invariable_queue.erl73
1 files changed, 44 insertions, 29 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b4fd9156..6e1d33f9 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,7 +43,7 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack }).
+-record(iv_state, { queue, qname, len, pending_ack, durable }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
@@ -67,17 +67,19 @@ init(QName, IsDurable, Recover) ->
false -> []
end),
#iv_state { queue = Q, qname = QName, len = queue:len(Q),
- pending_ack = dict:new() }.
+ pending_ack = dict:new(), durable = IsDurable }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA,
+ durable = IsDurable }) ->
+ ok = persist_acks(IsDurable, none, QName, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+purge(State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -85,57 +87,62 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(IsDurable, QName, Msg, IsDelivered),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(none, QName, AckTags, PA),
+ ok = persist_acks(IsDurable, none, QName, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
- ok = persist_message(none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len,
+ durable = IsDurable }) ->
+ ok = persist_message(IsDurable, none, QName, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, len = 0,
- pending_ack = PA }) ->
- ok = persist_message(none, QName, Msg),
- ok = persist_delivery(QName, Msg, false),
+ State = #iv_state { qname = QName, pending_ack = PA,
+ len = 0, durable = IsDurable }) ->
+ ok = persist_message(IsDurable, none, QName, Msg),
+ ok = persist_delivery(IsDurable, QName, Msg, false),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
- pending_ack = PA }) ->
+fetch(AckRequired, State = #iv_state { queue = Q, len = Len, pending_ack = PA,
+ qname = QName, durable = IsDurable }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(IsDurable, QName, Msg, IsDelivered),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(none, QName, [Guid], PA1),
+ false -> ok = persist_acks(IsDurable, none, QName,
+ [Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA,
+ durable = IsDurable }) ->
+ ok = persist_acks(IsDurable, none, QName, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(Txn, QName, Msg),
+ ok = persist_message(IsDurable, Txn, QName, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA,
+ durable = IsDurable }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(Txn, QName, AckTags, PA),
+ ok = persist_acks(IsDurable, Txn, QName, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -228,9 +235,12 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
+persist_message(false, _Txn, _QName, _Msg) ->
ok;
-persist_message(Txn, QName, Msg) ->
+persist_message(_IsDurable, _Txn, _QName,
+ #basic_message { is_persistent = false }) ->
+ ok;
+persist_message(_IsDurable, Txn, QName, Msg) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties,
%% rebuild from properties_bin on restore
@@ -239,15 +249,20 @@ persist_message(Txn, QName, Msg) ->
persist_work(Txn, QName,
[{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
-persist_delivery(_QName, #basic_message { is_persistent = false },
+persist_delivery(false, _QName, _Msg, _IsDelivered) ->
+ ok;
+persist_delivery(_IsDurable, _QName, #basic_message { is_persistent = false },
_IsDelivered) ->
ok;
-persist_delivery(_QName, _Message, true) ->
+persist_delivery(_IsDurable, _QName, _Message, true) ->
ok;
-persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
+persist_delivery(_IsDurable, QName, #basic_message { guid = Guid },
+ _IsDelivered) ->
persist_work(none, QName, [{deliver, {QName, Guid}}]).
-persist_acks(Txn, QName, AckTags, PA) ->
+persist_acks(false, _Txn, _QName, _AckTags, _PA) ->
+ ok;
+persist_acks(_IsDurable, Txn, QName, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin