diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-09 17:38:55 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-09 17:38:55 +0100 |
commit | 88e76eae4239f2eb76d2362645fa26f771a0c507 (patch) | |
tree | 91dbbe6eeac5221eb15d41019b6e4feabb595b00 | |
parent | 7f8bbe46b393a046bcb08c2751e99d1de0826967 (diff) | |
download | rabbitmq-server-88e76eae4239f2eb76d2362645fa26f771a0c507.tar.gz |
persistent_key => guid + is_persistent
-rw-r--r-- | include/rabbit.hrl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 7 |
4 files changed, 17 insertions, 19 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9bd1f6ef..f9ab250c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,7 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -144,7 +144,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a6e0f40d..ba41f550 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -374,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -382,29 +382,28 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). persist_delivery(_QName, _Message, true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, _IsDelivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = Guid}, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, Guid}}]). persist_acks(Txn, QName, Messages) -> persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, Guid}} || #basic_message{ + guid = Guid, is_persistent = true} <- Messages]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{guid = Guid}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, Guid}}]). persist_work(_Txn,_QName, []) -> ok; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72..7595d53b 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -96,7 +96,8 @@ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, content = build_content(Properties, BodyBin), - persistent_key = none}. + guid = rabbit_guid:guid(), + is_persistent = false}. properties(P = #'P_basic'{}) -> P; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index db5b9634..3fe86408 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -383,14 +383,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent)}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, |