summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-09 17:38:55 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-09 17:38:55 +0100
commit88e76eae4239f2eb76d2362645fa26f771a0c507 (patch)
tree91dbbe6eeac5221eb15d41019b6e4feabb595b00
parent7f8bbe46b393a046bcb08c2751e99d1de0826967 (diff)
downloadrabbitmq-server-88e76eae4239f2eb76d2362645fa26f771a0c507.tar.gz
persistent_key => guid + is_persistent
-rw-r--r--include/rabbit.hrl5
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_channel.erl7
4 files changed, 17 insertions, 19 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 9bd1f6ef..f9ab250c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,7 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
@@ -144,7 +144,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a6e0f40d..ba41f550 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -374,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
+persist_message(_Txn, _QName, #basic_message{is_persistent = false}) ->
ok;
persist_message(Txn, QName, Message) ->
M = Message#basic_message{
@@ -382,29 +382,28 @@ persist_message(Txn, QName, Message) ->
content = rabbit_binary_parser:clear_decoded_content(
Message#basic_message.content)},
persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
+ [{publish, M, {QName, M#basic_message.guid}}]).
persist_delivery(_QName, _Message,
true) ->
ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
+persist_delivery(_QName, #basic_message{is_persistent = false},
_IsDelivered) ->
ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
+persist_delivery(QName, #basic_message{guid = Guid},
_IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
+ persist_work(none, QName, [{deliver, {QName, Guid}}]).
persist_acks(Txn, QName, Messages) ->
persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
+ [{ack, {QName, Guid}} || #basic_message{
+ guid = Guid, is_persistent = true} <- Messages]).
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
+persist_auto_ack(_QName, #basic_message{is_persistent = false}) ->
ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
+persist_auto_ack(QName, #basic_message{guid = Guid}) ->
%% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
+ rabbit_persister:dirty_work([{ack, {QName, Guid}}]).
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9ebb6e72..7595d53b 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -96,7 +96,8 @@ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
content = build_content(Properties, BodyBin),
- persistent_key = none}.
+ guid = rabbit_guid:guid(),
+ is_persistent = false}.
properties(P = #'P_basic'{}) ->
P;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index db5b9634..3fe86408 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -383,14 +383,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent)},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,