diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-06-10 23:50:40 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-06-10 23:50:40 +0100 |
commit | 1c8662ab33fa801960f5729507cb91e63aedaed4 (patch) | |
tree | e41ff53d509c738dca24c55b7ad7c01c524c3e3d | |
parent | e4d97ce97e14dff60ea855aa725e2bc3399264a2 (diff) | |
download | rabbitmq-server-bug20943.tar.gz |
refactoring: rename "round robin" to "active consumers"bug20943
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 23df4f12..cf0ef44f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,7 +53,7 @@ has_had_consumers, next_msg_id, message_buffer, - round_robin, + active_consumers, blocked_consumers}). -record(consumer, {tag, ack_required}). @@ -100,7 +100,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new(), + active_consumers = queue:new(), blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> @@ -167,14 +167,14 @@ record_current_channel_tx(ChPid, Txn) -> deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, - round_robin = RoundRobin, + active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), - case queue:out(RoundRobin) of + case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, - RoundRobinTail} -> + ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), @@ -190,29 +190,31 @@ deliver_immediately(Message, Delivered, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), - {NewRoundRobin, NewBlockedConsumers} = + {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of - ok -> {queue:in(QEntry, RoundRobinTail), + ok -> {queue:in(QEntry, ActiveConsumersTail), BlockedConsumers}; block -> - {RoundRobin1, BlockedConsumers1} = + {ActiveConsumers1, BlockedConsumers1} = move_consumers(ChPid, - RoundRobinTail, + ActiveConsumersTail, BlockedConsumers), - {RoundRobin1, + {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, {offered, AckRequired, - State#q{round_robin = NewRoundRobin, + State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers, next_msg_id = NextId + 1}}; false -> store_ch_record(C#cr{is_limit_active = true}), - {NewRoundRobin, NewBlockedConsumers} = - move_consumers(ChPid, RoundRobin, BlockedConsumers), + {NewActiveConsumers, NewBlockedConsumers} = + move_consumers(ChPid, + ActiveConsumers, + BlockedConsumers), deliver_immediately( Message, Delivered, - State#q{round_robin = NewRoundRobin, + State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}) end; {empty, _} -> @@ -277,12 +279,12 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> {NewBlockedeConsumers, NewRoundRobin} = + unblock -> {NewBlockedeConsumers, NewActiveConsumers} = move_consumers(ChPid, State#q.blocked_consumers, - State#q.round_robin), + State#q.active_consumers), run_poke_burst( - State#q{round_robin = NewRoundRobin, + State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedeConsumers}) end end. @@ -312,7 +314,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> {ChPid, _} -> none; Other -> Other end, - round_robin = remove_consumers(ChPid, State#q.round_robin), + active_consumers = remove_consumers( + ChPid, State#q.active_consumers), blocked_consumers = remove_consumers( ChPid, State#q.blocked_consumers)}), case should_auto_delete(NewState) of @@ -360,7 +363,7 @@ run_poke_burst(MessageBuffer, State) -> State#q{message_buffer = MessageBuffer} end. -is_unused(State) -> queue:is_empty(State#q.round_robin) andalso +is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso queue:is_empty(State#q.blocked_consumers). maybe_send_reply(_ChPid, undefined) -> ok; @@ -521,7 +524,7 @@ i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); i(consumers, State) -> - queue:len(State#q.round_robin) + queue:len(State#q.blocked_consumers); + queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); i(transactions, _) -> length(all_tx_record()); i(memory, _) -> @@ -639,10 +642,10 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, State1#q.blocked_consumers)}; false -> run_poke_burst( State1#q{ - round_robin = + active_consumers = add_consumer( ChPid, Consumer, - State1#q.round_robin)}) + State1#q.active_consumers)}) end, reply(ok, State2) end @@ -666,9 +669,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State#q{exclusive_consumer = cancel_holder(ChPid, ConsumerTag, Holder), - round_robin = remove_consumer( - ChPid, ConsumerTag, - State#q.round_robin), + active_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.active_consumers), blocked_consumers = remove_consumer( ChPid, ConsumerTag, State#q.blocked_consumers)}, @@ -680,8 +683,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, - round_robin = RoundRobin}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); + active_consumers = ActiveConsumers}) -> + reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, + State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> |