summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-26 09:09:25 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-26 09:09:25 +0000
commit6d7792714815e494cb63aec8ee86894e12f5e4d7 (patch)
treeef180be664ddbb62661584d8e0c35d636294a1ec
parent0d6e4923d6d5f88ba6742983065b39ef296b6c8b (diff)
downloadrabbitmq-server-6d7792714815e494cb63aec8ee86894e12f5e4d7.tar.gz
tidying up, refactoring and some cosmetic changes
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl73
-rw-r--r--src/rabbit_channel.erl15
-rw-r--r--src/rabbit_limiter.erl6
4 files changed, 38 insertions, 61 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a345f5ab..3c8bd99e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -38,8 +38,7 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2]).
--export([unblock/2]).
+-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -97,7 +96,7 @@
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c49b06e5..5199fb87 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -162,59 +162,42 @@ deliver_immediately(Message, Delivered,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
- {{value, QEntry = {ChPid,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired = true}}},
+ {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}},
RoundRobinTail} ->
- % Use Qos Limits if an ack is required
- % Query the limiter to find out if a limit has been breached
- C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self()) of
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
+ unacked_messages = UAM} = ch_record(ChPid),
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
true ->
- really_deliver(AckRequired, ChPid, ConsumerTag,
- Delivered, Message, NextId, QName,
- QEntry, RoundRobinTail, State);
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
false ->
- % Have another go by cycling through the consumer
- % queue
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
deliver_immediately(Message, Delivered,
State#q{round_robin = NewConsumers})
end;
- {{value, QEntry = {ChPid,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired = false}}},
- RoundRobinTail} ->
- really_deliver(AckRequired, ChPid, ConsumerTag,
- Delivered, Message, NextId, QName,
- QEntry, RoundRobinTail, State);
{empty, _} ->
{not_offered, State}
end.
-% TODO The arity of this function seems a bit large :-(
-really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
- QName, QEntry, RoundRobinTail, State) ->
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewC = C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM},
- store_ch_record(NewC),
- NewConsumers =
- case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}}.
-
attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@@ -631,9 +614,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers],
- limiter_pid = LimiterPid},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
if Consumers == [] ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
@@ -662,8 +644,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
if NewConsumers == [] ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4bfacbb..304275c4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -427,15 +427,12 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{global = Flag = true}, _, _State) ->
- rabbit_misc:protocol_error(not_implemented,
- "Global flag (~s) for basic.qos not implementented", [Flag]);
+handle_method(#'basic.qos'{global = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "global=true", []);
-handle_method(#'basic.qos'{prefetch_size = Size},
- _, _State) when Size /= 0 ->
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
- "Pre-fetch size (~s) for basic.qos not implementented",
- [Size]);
+ "prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
_, State = #ch{ limiter_pid = LimiterPid,
@@ -859,8 +856,8 @@ consumer_queues(Consumers) ->
end].
%% tell the limiter about the number of acks that have been received
-%% for messages delivered to subscribed consumers, rather than those
-%% for messages sent in a response to a basic.get
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get.
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 62c6c73c..3776edd0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -179,9 +179,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
case length(QList) of
0 -> ok;
L ->
- %% We randomly vary the position in which each queue
- %% appears in the list, thus ensuring that each queue has
- %% an equal chance of being notified first.
+ %% We randomly vary the position of queues in the list,
+ %% thus ensuring that each queue has an equal chance of
+ %% being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
ok