summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-06-10 23:50:40 +0100
committerMatthias Radestock <matthias@lshift.net>2009-06-10 23:50:40 +0100
commit1c8662ab33fa801960f5729507cb91e63aedaed4 (patch)
treee41ff53d509c738dca24c55b7ad7c01c524c3e3d
parente4d97ce97e14dff60ea855aa725e2bc3399264a2 (diff)
downloadrabbitmq-server-bug20943.tar.gz
refactoring: rename "round robin" to "active consumers"bug20943
-rw-r--r--src/rabbit_amqqueue_process.erl58
1 files changed, 31 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 23df4f12..cf0ef44f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,7 +53,7 @@
has_had_consumers,
next_msg_id,
message_buffer,
- round_robin,
+ active_consumers,
blocked_consumers}).
-record(consumer, {tag, ack_required}).
@@ -100,7 +100,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new(),
+ active_consumers = queue:new(),
blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
@@ -167,14 +167,14 @@ record_current_channel_tx(ChPid, Txn) ->
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
+ active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- case queue:out(RoundRobin) of
+ case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
- RoundRobinTail} ->
+ ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
@@ -190,29 +190,31 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
- {NewRoundRobin, NewBlockedConsumers} =
+ {NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
- ok -> {queue:in(QEntry, RoundRobinTail),
+ ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
block ->
- {RoundRobin1, BlockedConsumers1} =
+ {ActiveConsumers1, BlockedConsumers1} =
move_consumers(ChPid,
- RoundRobinTail,
+ ActiveConsumersTail,
BlockedConsumers),
- {RoundRobin1,
+ {ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
{offered, AckRequired,
- State#q{round_robin = NewRoundRobin,
+ State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers,
next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
- {NewRoundRobin, NewBlockedConsumers} =
- move_consumers(ChPid, RoundRobin, BlockedConsumers),
+ {NewActiveConsumers, NewBlockedConsumers} =
+ move_consumers(ChPid,
+ ActiveConsumers,
+ BlockedConsumers),
deliver_immediately(
Message, Delivered,
- State#q{round_robin = NewRoundRobin,
+ State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers})
end;
{empty, _} ->
@@ -277,12 +279,12 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> {NewBlockedeConsumers, NewRoundRobin} =
+ unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
move_consumers(ChPid,
State#q.blocked_consumers,
- State#q.round_robin),
+ State#q.active_consumers),
run_poke_burst(
- State#q{round_robin = NewRoundRobin,
+ State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedeConsumers})
end
end.
@@ -312,7 +314,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
{ChPid, _} -> none;
Other -> Other
end,
- round_robin = remove_consumers(ChPid, State#q.round_robin),
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
blocked_consumers = remove_consumers(
ChPid, State#q.blocked_consumers)}),
case should_auto_delete(NewState) of
@@ -360,7 +363,7 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
-is_unused(State) -> queue:is_empty(State#q.round_robin) andalso
+is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
queue:is_empty(State#q.blocked_consumers).
maybe_send_reply(_ChPid, undefined) -> ok;
@@ -521,7 +524,7 @@ i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
i(consumers, State) ->
- queue:len(State#q.round_robin) + queue:len(State#q.blocked_consumers);
+ queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@@ -639,10 +642,10 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
State1#q.blocked_consumers)};
false -> run_poke_burst(
State1#q{
- round_robin =
+ active_consumers =
add_consumer(
ChPid, Consumer,
- State1#q.round_robin)})
+ State1#q.active_consumers)})
end,
reply(ok, State2)
end
@@ -666,9 +669,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State#q{exclusive_consumer = cancel_holder(ChPid,
ConsumerTag,
Holder),
- round_robin = remove_consumer(
- ChPid, ConsumerTag,
- State#q.round_robin),
+ active_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.active_consumers),
blocked_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.blocked_consumers)},
@@ -680,8 +683,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
- round_robin = RoundRobin}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
+ active_consumers = ActiveConsumers}) ->
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
+ State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->