From 6d7792714815e494cb63aec8ee86894e12f5e4d7 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 26 Dec 2008 09:09:25 +0000 Subject: tidying up, refactoring and some cosmetic changes --- src/rabbit_amqqueue.erl | 5 ++- src/rabbit_amqqueue_process.erl | 73 +++++++++++++++-------------------------- src/rabbit_channel.erl | 15 ++++----- src/rabbit_limiter.erl | 6 ++-- 4 files changed, 38 insertions(+), 61 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a345f5ab..3c8bd99e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -38,8 +38,7 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2]). --export([unblock/2]). +-export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -97,7 +96,7 @@ -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c49b06e5..5199fb87 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -162,59 +162,42 @@ deliver_immediately(Message, Delivered, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, - #consumer{tag = ConsumerTag, - ack_required = AckRequired = true}}}, + {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}}, RoundRobinTail} -> - % Use Qos Limits if an ack is required - % Query the limiter to find out if a limit has been breached - C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self()) of + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of true -> - really_deliver(AckRequired, ChPid, ConsumerTag, - Delivered, Message, NextId, QName, - QEntry, RoundRobinTail, State); + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; false -> - % Have another go by cycling through the consumer - % queue store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, State#q{round_robin = NewConsumers}) end; - {{value, QEntry = {ChPid, - #consumer{tag = ConsumerTag, - ack_required = AckRequired = false}}}, - RoundRobinTail} -> - really_deliver(AckRequired, ChPid, ConsumerTag, - Delivered, Message, NextId, QName, - QEntry, RoundRobinTail, State); {empty, _} -> {not_offered, State} end. -% TODO The arity of this function seems a bit large :-( -really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, - QName, QEntry, RoundRobinTail, State) -> - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, - store_ch_record(NewC), - NewConsumers = - case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}. - attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -631,9 +614,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers], - limiter_pid = LimiterPid}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), if Consumers == [] -> ok = rabbit_limiter:register(LimiterPid, self()); true -> @@ -662,8 +644,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), if NewConsumers == [] -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4bfacbb..304275c4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -427,15 +427,12 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{global = Flag = true}, _, _State) -> - rabbit_misc:protocol_error(not_implemented, - "Global flag (~s) for basic.qos not implementented", [Flag]); +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "global=true", []); -handle_method(#'basic.qos'{prefetch_size = Size}, - _, _State) when Size /= 0 -> +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, - "Pre-fetch size (~s) for basic.qos not implementented", - [Size]); + "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{ limiter_pid = LimiterPid, @@ -859,8 +856,8 @@ consumer_queues(Consumers) -> end]. %% tell the limiter about the number of acks that have been received -%% for messages delivered to subscribed consumers, rather than those -%% for messages sent in a response to a basic.get +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get. notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 62c6c73c..3776edd0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -179,9 +179,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> case length(QList) of 0 -> ok; L -> - %% We randomly vary the position in which each queue - %% appears in the list, thus ensuring that each queue has - %% an equal chance of being notified first. + %% We randomly vary the position of queues in the list, + %% thus ensuring that each queue has an equal chance of + %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], ok -- cgit v1.2.1