summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-09-17 14:15:11 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-09-17 14:15:11 +0100
commitddf7763729950ad71d782818b3bb105f95e458e9 (patch)
tree120e2ceac0bd8d1912ca12aa06e5dec03d93ac83
parent27ef3f0d08dd6d1246b16a1886b1218879917de0 (diff)
downloadrabbitmq-server-bug23896.tar.gz
nuke 'immediate'bug23896
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl42
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_basic.erl32
-rw-r--r--src/rabbit_channel.erl21
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl51
-rw-r--r--src/rabbit_tests.erl5
-rw-r--r--src/rabbit_types.erl1
9 files changed, 66 insertions, 118 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d6fac46d..fff92205 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -73,7 +73,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d566ac87..f80559ba 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -60,7 +60,7 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(routing_result() :: 'routed' | 'unroutable').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -645,18 +645,17 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
-deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+deliver([], #delivery{mandatory = false}, _Flow) ->
%% /dev/null optimisation
{routed, []};
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
+deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
+ %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
+ %% will deliver the message to the queue process asynchronously,
+ %% and return true, which means all the QPids will always be
+ %% returned. It is therefore safe to use a fire-and-forget cast
+ %% here and return the QPids - the semantics is preserved. This
+ %% scales much better than the case below.
QPids = qpids(Qs),
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
@@ -668,21 +667,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
end),
{routed, QPids};
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
- _Flow) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
+deliver(Qs, Delivery, _Flow) ->
+ case delegate:invoke(
+ qpids(Qs), fun (QPid) ->
+ ok = gen_server2:call(QPid, {deliver, Delivery},
+ infinity)
+ end) of
+ {[], _} -> {unroutable, []};
+ {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
end.
qpids(Qs) -> lists:append([[QPid | SPids] ||
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 20ba4574..d6a5523a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -770,7 +770,7 @@ dead_letter_fun(Reason, _State) ->
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
DLMsg = make_dead_letter_msg(Reason, Msg, State),
- Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
@@ -1032,27 +1032,9 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
- %% FIXME: Is this correct semantics?
- %%
- %% I'm worried in particular about the case where an exchange has
- %% two queues against a particular routing key, and a message is
- %% sent in immediate mode through the binding. In non-immediate
- %% mode, both queues get the message, saving it for later if
- %% there's noone ready to receive it just now. In immediate mode,
- %% should both queues still get the message, somehow, or should
- %% just all ready-to-consume queues get the message, with unready
- %% queues discarding the message?
- %%
- Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- reply(Delivered, case Delivered of
- true -> maybe_record_confirm_message(Confirm, State1);
- false -> discard_delivery(Delivery, State1)
- end);
-
-handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
- gen_server2:reply(From, true),
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, State));
handle_call({notify_down, ChPid}, From, State) ->
@@ -1198,7 +1180,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State = #q{senders = Senders}) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
pmon:monitor(Sender, Senders);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 734456d3..db2b7e95 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/4, publish/6, publish/1,
+-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -40,13 +40,13 @@
-spec(publish/4 ::
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+-spec(publish/5 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(),
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/4 ::
- (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/3 ::
+ (boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -80,18 +80,16 @@
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+ publish(Exchange, RoutingKeyBin, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- publish(delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined)).
+publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(X, delivery(Mandatory, Message, undefined));
+publish(XName, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(delivery(Mandatory, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -105,8 +103,8 @@ publish(X, Delivery) ->
{RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
-delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
+delivery(Mandatory, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e50e823c..e8f3aab3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -598,10 +598,12 @@ handle_method(_Method, _, #ch{tx_status = TxStatus})
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
+handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "immediate=true", []);
+
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
+ mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx_status = TxStatus,
confirm_enabled = ConfirmEnabled,
@@ -623,8 +625,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
- MsgSeqNo),
+ Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
{noreply,
case TxStatus of
@@ -1342,20 +1343,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QPid <- DeliveredQPids]], publish, State2),
State2.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
-process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
State#ch.unconfirmed)}.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index f1672f4e..a9af2d8a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -81,7 +81,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
{ok, _RoutingRes, _DeliveredQPids} =
- rabbit_basic:publish(LogExch, RoutingKey, false, false,
+ rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
list_to_binary(io_lib:format(Format, Data))),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3e45f026..1f6567e0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -70,7 +70,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
ack_num,
@@ -167,27 +167,10 @@ init_it(Self, Node, QueueName) ->
end
end.
-handle_call({deliver, Delivery = #delivery { immediate = true }},
- From, State) ->
- %% It is safe to reply 'false' here even if a) we've not seen the
- %% msg via gm, or b) the master dies before we receive the msg via
- %% gm. In the case of (a), we will eventually receive the msg via
- %% gm, and it's only the master's result to the channel that is
- %% important. In the case of (b), if the master does die and we do
- %% get promoted then at that point we have no consumers, thus
- %% 'false' is precisely the correct answer. However, we must be
- %% careful to _not_ enqueue the message in this case.
-
- %% Note this is distinct from the case where we receive the msg
- %% via gm first, then we're promoted to master, and only then do
- %% we receive the msg from the channel.
- gen_server2:reply(From, false), %% master may deliver it, not us
- noreply(maybe_enqueue_message(Delivery, false, State));
-
-handle_call({deliver, Delivery = #delivery { mandatory = true }},
- From, State) ->
- gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- noreply(maybe_enqueue_message(Delivery, true, State));
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -232,12 +215,12 @@ handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
noflow -> ok
end,
- noreply(maybe_enqueue_message(Delivery, true, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -554,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- {Delivery, true} <- queue:to_list(PubQ)],
+ Delivery <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -655,14 +638,13 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
sender = ChPid },
- EnqueueOnPromotion,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
@@ -732,10 +714,9 @@ process_instruction(
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, {published, ChPid}, MS)};
- {{value, {Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
@@ -747,7 +728,7 @@ process_instruction(
ChPid, [MsgSeqNo]),
MS
end};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -784,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{empty, _MQ} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, discarded, MS)};
- {{value, {#delivery { message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, #delivery { message = #basic_message { id = MsgId } }},
+ MQ2} ->
%% We've already seen it from the channel, we're not
%% going to see this again, so don't add it to MS
{MQ2, PendingCh, MS};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3cc0e5db..08535e7d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -656,7 +656,6 @@ test_topic_expect_match(X, List) ->
#'P_basic'{}, <<>>),
Res = rabbit_exchange_type_topic:route(
X, #delivery{mandatory = false,
- immediate = false,
sender = self(),
message = Message}),
ExpectedRes = lists:map(
@@ -2194,8 +2193,8 @@ publish_and_confirm(Q, Payload, Count) ->
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
- Delivery = #delivery{mandatory = false, immediate = false,
- sender = self(), message = Msg, msg_seq_no = Seq},
+ Delivery = #delivery{mandatory = false, sender = self(),
+ message = Msg, msg_seq_no = Seq},
{routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 8966bcab..f488afb4 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -69,7 +69,6 @@
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
- immediate :: boolean(),
sender :: pid(),
message :: message()}).
-type(message_properties() ::