diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-21 14:22:51 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-21 14:22:51 +0100 |
commit | 1d2e94174aaf0339315b2a1fe8d701d877776a95 (patch) | |
tree | 4a790343a8f9335d7030e8b3cfff5b8f32cba884 | |
parent | 42c8d70bf7ae3a1e348b160238da9970b57215de (diff) | |
download | rabbitmq-server-1d2e94174aaf0339315b2a1fe8d701d877776a95.tar.gz |
Stop persistent msgs in non-durable queues from reaching disk
-rw-r--r-- | src/rabbit_invariable_queue.erl | 73 |
1 files changed, 44 insertions, 29 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b4fd9156..6e1d33f9 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -43,7 +43,7 @@ -include("rabbit.hrl"). --record(iv_state, { queue, qname, len, pending_ack }). +-record(iv_state, { queue, qname, len, pending_ack, durable }). -record(tx, { pending_messages, pending_acks, is_persistent }). -ifdef(use_specs). @@ -67,17 +67,19 @@ init(QName, IsDurable, Recover) -> false -> [] end), #iv_state { queue = Q, qname = QName, len = queue:len(Q), - pending_ack = dict:new() }. + pending_ack = dict:new(), durable = IsDurable }. terminate(State) -> State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. -delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), +delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA, + durable = IsDurable }) -> + ok = persist_acks(IsDurable, none, QName, dict:fetch_keys(PA), PA), {_PLen, State1} = purge(State), terminate(State1). -purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> +purge(State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable }) -> %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( @@ -85,57 +87,62 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> Acc; ({Msg = #basic_message { guid = Guid }, IsDelivered}, {AckTagsN, PAN}) -> - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(IsDurable, QName, Msg, IsDelivered), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} end, {[], dict:new()}, Q), - ok = persist_acks(none, QName, AckTags, PA), + ok = persist_acks(IsDurable, none, QName, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> - ok = persist_message(none, QName, Msg), +publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len, + durable = IsDurable }) -> + ok = persist_message(IsDurable, none, QName, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - State = #iv_state { qname = QName, len = 0, - pending_ack = PA }) -> - ok = persist_message(none, QName, Msg), - ok = persist_delivery(QName, Msg, false), + State = #iv_state { qname = QName, pending_ack = PA, + len = 0, durable = IsDurable }) -> + ok = persist_message(IsDurable, none, QName, Msg), + ok = persist_delivery(IsDurable, QName, Msg, false), {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, - pending_ack = PA }) -> +fetch(AckRequired, State = #iv_state { queue = Q, len = Len, pending_ack = PA, + qname = QName, durable = IsDurable }) -> {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = queue:out(Q), Len1 = Len - 1, - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(IsDurable, QName, Msg, IsDelivered), PA1 = dict:store(Guid, Msg, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; - false -> ok = persist_acks(none, QName, [Guid], PA1), + false -> ok = persist_acks(IsDurable, none, QName, + [Guid], PA1), {blank_ack, PA} end, {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. -ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, AckTags, PA), +ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA, + durable = IsDurable }) -> + ok = persist_acks(IsDurable, none, QName, AckTags, PA), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> +tx_publish(Txn, Msg, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(Txn, QName, Msg), + ok = persist_message(IsDurable, Txn, QName, Msg), State. -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA, + durable = IsDurable }) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(Txn, QName, AckTags, PA), + ok = persist_acks(IsDurable, Txn, QName, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> @@ -228,9 +235,12 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- -persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> +persist_message(false, _Txn, _QName, _Msg) -> ok; -persist_message(Txn, QName, Msg) -> +persist_message(_IsDurable, _Txn, _QName, + #basic_message { is_persistent = false }) -> + ok; +persist_message(_IsDurable, Txn, QName, Msg) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties, %% rebuild from properties_bin on restore @@ -239,15 +249,20 @@ persist_message(Txn, QName, Msg) -> persist_work(Txn, QName, [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). -persist_delivery(_QName, #basic_message { is_persistent = false }, +persist_delivery(false, _QName, _Msg, _IsDelivered) -> + ok; +persist_delivery(_IsDurable, _QName, #basic_message { is_persistent = false }, _IsDelivered) -> ok; -persist_delivery(_QName, _Message, true) -> +persist_delivery(_IsDurable, _QName, _Message, true) -> ok; -persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> +persist_delivery(_IsDurable, QName, #basic_message { guid = Guid }, + _IsDelivered) -> persist_work(none, QName, [{deliver, {QName, Guid}}]). -persist_acks(Txn, QName, AckTags, PA) -> +persist_acks(false, _Txn, _QName, _AckTags, _PA) -> + ok; +persist_acks(_IsDurable, Txn, QName, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin |