summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 16:57:22 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 16:57:22 +0000
commitaf8b178595f89dcabae5b3074906848c3258c371 (patch)
tree307dd9404982ea758c2954e63c038fca4737e0d1
parent286d98d746ce7d7b39c2f612d71a4142df1b6277 (diff)
parent02a1457f8bef8adce0d16de0b0e976f065daceb2 (diff)
downloadrabbitmq-server-af8b178595f89dcabae5b3074906848c3258c371.tar.gz
Merge in default
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-service.bat12
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl562
-rw-r--r--src/rabbit_binary_generator.erl63
-rw-r--r--src/rabbit_binary_parser.erl28
-rw-r--r--src/rabbit_limiter.erl10
-rw-r--r--src/rabbit_node_monitor.erl6
-rw-r--r--src/rabbit_nodes.erl11
-rw-r--r--src/rabbit_queue_consumers.erl426
-rw-r--r--src/rabbit_reader.erl324
-rw-r--r--src/vm_memory_monitor.erl23
13 files changed, 840 insertions, 637 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 05140e3c..f53eea95 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Dec 10 2013 emile@rabbitmq.com 3.2.2-1
+- New Upstream Release
+
* Wed Oct 23 2013 emile@rabbitmq.com 3.2.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 6c63478f..f1e1c66b 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.2.2-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Emile Joubert <emile@rabbitmq.com> Tue, 10 Dec 2013 16:08:08 +0000
+
rabbitmq-server (3.2.0-1) unstable; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index d36b130c..8c350f7a 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -22,9 +22,17 @@ rem enabling delayed expansion
set TN0=%~n0
set TDP0=%~dp0
set P1=%1
-set STAR=%*
setlocal enabledelayedexpansion
+set STARVAR=
+shift
+:loop1
+if "%1"=="" goto after_loop
+ set STARVAR=%STARVAR% %1
+ shift
+goto loop1
+:after_loop
+
if "!RABBITMQ_SERVICENAME!"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -200,7 +208,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
-!STAR!
+!STARVAR!
set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:\=\\!
set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8306f134..6b1e00b7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -115,7 +115,8 @@
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
- -> [{pid(), rabbit_types:ctag(), boolean()}]).
+ -> [{pid(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 39863807..40389e8a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,7 +20,6 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -38,8 +37,7 @@
has_had_consumers,
backing_queue,
backing_queue_state,
- active_consumers,
- consumer_use,
+ consumers,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -57,21 +55,6 @@
status
}).
--record(consumer, {tag, ack_required, args}).
-
-%% These are held in our process dictionary
--record(cr, {ch_pid,
- monitor_ref,
- acktags,
- consumer_count,
- %% Queue of {ChPid, #consumer{}} for consumers which have
- %% been blocked for any reason
- blocked_consumers,
- %% The limiter itself
- limiter,
- %% Internal flow control for queue -> writer
- unsent_message_count}).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -151,8 +134,7 @@ init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- active_consumers = priority_queue:new(),
- consumer_use = {inactive, now_micros(), 0, 0.0},
+ consumers = rabbit_queue_consumers:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -236,13 +218,14 @@ notify_decorators(Event, Props, State) when Event =:= startup;
Event =:= shutdown ->
decorator_callback(qname(State), Event, Props);
-notify_decorators(Event, Props, State = #q{active_consumers = ACs,
+notify_decorators(Event, Props, State = #q{consumers = Consumers,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- decorator_callback(
- qname(State), notify,
- [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)},
- {is_empty, BQ:is_empty(BQS)} | Props]]).
+ P = rabbit_queue_consumers:max_active_priority(Consumers),
+ decorator_callback(qname(State), notify,
+ [Event, [{max_active_consumer_priority, P},
+ {is_empty, BQ:is_empty(BQS)} |
+ Props]]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -316,7 +299,7 @@ init_max_length(MaxLen, State) ->
State1.
terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue_state = BQS} =
+ State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
fun stop_rate_timer/1,
@@ -327,8 +310,9 @@ terminate_shutdown(Fun, State) ->
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
notify_decorators(shutdown, [], State),
- [emit_consumer_deleted(Ch, CTag, QName)
- || {Ch, CTag, _} <- consumers(State1)],
+ [emit_consumer_deleted(Ch, CTag, QName) ||
+ {Ch, CTag, _, _} <-
+ rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -411,156 +395,27 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(State = #q{active_consumers = AC}) ->
- true = (priority_queue:is_empty(AC) orelse is_empty(State)).
+assert_invariant(State = #q{consumers = Consumers}) ->
+ true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
-lookup_ch(ChPid) ->
- case get({ch, ChPid}) of
- undefined -> not_found;
- C -> C
- end.
-
-ch_record(ChPid, LimiterPid) ->
- Key = {ch, ChPid},
- case get(Key) of
- undefined -> MonitorRef = erlang:monitor(process, ChPid),
- Limiter = rabbit_limiter:client(LimiterPid),
- C = #cr{ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = queue:new(),
- consumer_count = 0,
- blocked_consumers = priority_queue:new(),
- limiter = Limiter,
- unsent_message_count = 0},
- put(Key, C),
- C;
- C = #cr{} -> C
- end.
-
-update_ch_record(C = #cr{consumer_count = ConsumerCount,
- acktags = ChAckTags,
- unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {true, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
- end,
- C.
-
-store_ch_record(C = #cr{ch_pid = ChPid}) ->
- put({ch, ChPid}, C),
- ok.
-
-erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
- erlang:demonitor(MonitorRef),
- erase({ch, ChPid}),
- ok.
-
-all_ch_record() -> [C || {{ch, _}, C} <- get()].
-
-block_consumer(C = #cr{blocked_consumers = Blocked},
- {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
- update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}),
- notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
-
-is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
- Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
-
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
true -> notify_decorators(queue_empty, [], State),
- [send_drained(C) || C <- all_ch_record()];
+ rabbit_queue_consumers:send_drained();
false -> ok
end,
State.
-send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
- case rabbit_limiter:drained(Limiter) of
- {[], Limiter} -> ok;
- {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
- ChPid, CTagCredit),
- update_ch_record(C#cr{limiter = Limiter2})
- end.
-
-deliver_msgs_to_consumers(_DeliverFun, true, State) ->
- {true, State};
-deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers,
- consumer_use = CUInfo}) ->
- case priority_queue:out_p(ActiveConsumers) of
- {empty, _} ->
- {false,
- State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
- {{value, QEntry, Priority}, Tail} ->
- {Stop, State1} = deliver_msg_to_consumer(
- DeliverFun, QEntry, Priority,
- State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(DeliverFun, Stop, State1)
- end.
-
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) ->
- C = lookup_ch(ChPid),
- case is_ch_blocked(C) of
- true -> block_consumer(C, E, State),
- {false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter,
- Consumer#consumer.ack_required,
- Consumer#consumer.tag) of
- {suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E, State),
- {false, State};
- {continue, Limiter} ->
- AC1 = priority_queue:in(E, Priority,
- State#q.active_consumers),
- deliver_msg_to_consumer0(
- DeliverFun, Consumer, C#cr{limiter = Limiter},
- State#q{active_consumers = AC1})
- end
- end.
-
-deliver_msg_to_consumer0(DeliverFun,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, Stop, State1} =
- DeliverFun(AckRequired, State),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> queue:in(AckTag, ChAckTags);
- false -> ChAckTags
- end,
- update_ch_record(C#cr{acktags = ChAckTags1,
- unsent_message_count = Count + 1}),
- {Stop, State1}.
-
-deliver_from_queue_deliver(AckRequired, State) ->
- {Result, State1} = fetch(AckRequired, State),
- {Result, is_empty(State1), State1}.
-
-update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
- CUInfo;
-update_consumer_use({active, _, _} = CUInfo, active) ->
- CUInfo;
-update_consumer_use({active, Since, Avg}, inactive) ->
- Now = now_micros(),
- {inactive, Now, Now - Since, Avg};
-update_consumer_use({inactive, Since, Active, Avg}, active) ->
- Now = now_micros(),
- {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
-
-consumer_use_avg(Active, Inactive, Avg) ->
- Time = Inactive + Active,
- Ratio = Active / Time,
- Weight = erlang:min(1, Time / 1000000),
- case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end.
+deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
+ {Active, Blocked, State1, Consumers1} =
+ rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
+ Consumers),
+ State2 = State1#q{consumers = Consumers1},
+ [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
+ {_ChPid, CTag} <- Blocked],
+ {Active, State2}.
confirm_messages([], State) ->
State;
@@ -608,36 +463,31 @@ discard(#delivery{sender = SenderPid,
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- {_IsEmpty1, State1} = deliver_msgs_to_consumers(
- fun deliver_from_queue_deliver/2,
- is_empty(State), State),
- State1.
-
-add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
- Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
- {_, P} -> P;
- _ -> 0
- end,
- priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers).
+ {_Active, State3} = deliver_msgs_to_consumers(
+ fun(AckRequired, State1) ->
+ {Result, State2} = fetch(AckRequired, State1),
+ {Result, is_empty(State2), State2}
+ end, is_empty(State), State),
+ State3.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- case BQ:is_duplicate(Message, BQS) of
- {false, BQS1} ->
- deliver_msgs_to_consumers(
- fun (true, State1 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}};
- (false, State1) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State1)}
- end, false, State#q{backing_queue_state = BQS1});
- {true, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}}
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ false -> deliver_msgs_to_consumers(
+ fun (true, State2 = #q{backing_queue_state = BQS2}) ->
+ true = BQ:is_empty(BQS2),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State2#q{backing_queue_state = BQS3}};
+ (false, State2) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State2)}
+ end, false, State1);
+ true -> {true, State1}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
@@ -653,7 +503,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
- maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
@@ -715,85 +565,47 @@ requeue(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
- end, Queue).
-
-remove_consumers(ChPid, Queue, QName) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag, QName),
- false;
- (_) ->
- true
- end, Queue).
-
-possibly_unblock(State, ChPid, Update) ->
- case lookup_ch(ChPid) of
- not_found -> State;
- C -> C1 = Update(C),
- case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
- false -> update_ch_record(C1),
- State;
- true -> unblock(State, C1)
- end
- end.
-
-unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
- case lists:partition(
- fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
- rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, priority_queue:to_list(C#cr.blocked_consumers)) of
- {_, []} ->
- update_ch_record(C),
+possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
+ case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
+ unchanged ->
State;
- {Blocked, Unblocked} ->
- BlockedQ = priority_queue:from_list(Blocked),
- UnblockedQ = priority_queue:from_list(Unblocked),
- update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- State1 = State#q{consumer_use =
- update_consumer_use(CUInfo, active)},
- AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
- State2 = State1#q{active_consumers = AC1},
- [notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State2) ||
- {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State2)
+ {unblocked, UnblockedCTags, Consumers1} ->
+ State1 = State#q{consumers = Consumers1},
+ [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
+ State1) || CTag <- UnblockedCTags],
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+handle_ch_down(DownPid, State = #q{consumers = Consumers,
+ exclusive_consumer = Holder,
senders = Senders}) ->
- Senders1 = case pmon:is_monitored(DownPid, Senders) of
- false -> Senders;
- true -> credit_flow:peer_down(DownPid),
- pmon:demonitor(DownPid, Senders)
- end,
- case lookup_ch(DownPid) of
+ State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end},
+ case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
not_found ->
- {ok, State#q{senders = Senders1}};
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- blocked_consumers = Blocked} ->
- QName = qname(State),
- _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
- ok = erase_ch_record(C),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State#q.active_consumers,
- QName),
- senders = Senders1},
- case should_auto_delete(State1) of
- true -> {stop, State1};
- false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
- ensure_expiry_timer(State1))}
+ {ok, State1};
+ {ChAckTags, ChCTags, Consumers1} ->
+ QName = qname(State1),
+ [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags],
+ Holder1 = case Holder of
+ {DownPid, _} -> none;
+ Other -> Other
+ end,
+ State2 = State1#q{consumers = Consumers1,
+ exclusive_consumer = Holder1},
+ [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
+ CTag <- ChCTags],
+ case should_auto_delete(State2) of
+ true -> {stop, State2};
+ false -> {ok, requeue_and_run(ChAckTags,
+ ensure_expiry_timer(State2))}
end
end.
@@ -807,10 +619,7 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() ->
- lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
-
-is_unused(_State) -> consumer_count() == 0.
+is_unused(_State) -> rabbit_queue_consumers:count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -822,23 +631,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
- case lookup_ch(ChPid) of
- not_found ->
- State;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- Fun(State)
- end.
-
-subtract_acks([], [], AckQ) ->
- AckQ;
-subtract_acks([], Prefix, AckQ) ->
- queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
- case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
+ not_found -> State;
+ ok -> Fun(State)
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
@@ -1059,21 +854,16 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
+ rabbit_queue_consumers:unacknowledged_message_count();
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
i(consumers, _) ->
- consumer_count();
-i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
- case consumer_count() of
+ rabbit_queue_consumers:count();
+i(consumer_utilisation, #q{consumers = Consumers}) ->
+ case rabbit_queue_consumers:count() of
0 -> '';
- _ -> case ConsumerUse of
- {active, Since, Avg} ->
- consumer_use_avg(now_micros() - Since, 0, Avg);
- {inactive, Since, Active, Avg} ->
- consumer_use_avg(Active, now_micros() - Since, Avg)
- end
+ _ -> rabbit_queue_consumers:utilisation(Consumers)
end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
@@ -1099,17 +889,6 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers}) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
- consumers(ActiveConsumers, []), all_ch_record()).
-
-consumers(Consumers, Acc) ->
- priority_queue:fold(
- fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
- end, Acc, Consumers).
-
emit_stats(State) ->
emit_stats(State, []).
@@ -1198,8 +977,8 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From, State) ->
- reply(consumers(State), State);
+handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
+ reply(rabbit_queue_consumers:all(Consumers), State);
handle_call({deliver, Delivery, Delivered}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
@@ -1224,89 +1003,58 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag}, State2} ->
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} =
- ch_record(ChPid, LimiterPid),
- ChAckTags1 = queue:in(AckTag, ChAckTags),
- update_ch_record(C#cr{acktags = ChAckTags1}),
- State2;
- false -> State2
- end,
+ {{Message, IsDelivered, AckTag},
+ #q{backing_queue = BQ, backing_queue_state = BQS} = State2} ->
+ case AckRequired of
+ true -> ok = rabbit_queue_consumers:record_ack(
+ ChPid, LimiterPid, AckTag);
+ false -> ok
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, BQ:len(BQS), Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
- _From, State = #q{exclusive_consumer = Holder}) ->
+ _From, State = #q{consumers = Consumers,
+ exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = Count,
- limiter = Limiter} = ch_record(ChPid, LimiterPid),
- Limiter1 = case LimiterActive of
- true -> rabbit_limiter:activate(Limiter);
- false -> Limiter
- end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, Drain)
- end,
- C1 = update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter2}),
- case is_empty(State) of
- true -> send_drained(C1);
- false -> ok
- end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck,
- args = OtherArgs},
- ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
- AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
- State2 = State1#q{active_consumers = AC1},
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State2),
- reply(ok, run_message_queue(State2))
+ in_use -> reply({error, exclusive_consume_unavailable}, State);
+ ok -> Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ CreditArgs, OtherArgs,
+ is_empty(State), Consumers),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ State1 = State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, qname(State1), OtherArgs),
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder}) ->
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
- case lookup_ch(ChPid) of
+ case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
- C = #cr{consumer_count = Count,
- limiter = Limiter,
- blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- Limiter1 = case Count of
- 1 -> rabbit_limiter:deactivate(Limiter);
- _ -> Limiter
- end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
- update_ch_record(C#cr{consumer_count = Count - 1,
- limiter = Limiter2,
- blocked_consumers = Blocked1}),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers)},
+ Consumers1 ->
+ Holder1 = case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end,
+ State1 = State#q{consumers = Consumers1,
+ exclusive_consumer = Holder1},
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
notify_decorators(
basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
@@ -1318,7 +1066,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_expiry_timer(State),
- reply({ok, BQ:len(BQS), consumer_count()}, State1);
+ reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1370,14 +1118,16 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State);
handle_call(force_event_refresh, _From,
- State = #q{exclusive_consumer = Exclusive}) ->
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
QName = qname(State),
+ AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
Ch, CTag, false, AckRequired, QName, Args) ||
- {Ch, CTag, AckRequired, Args} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State),
+ {Ch, CTag, AckRequired, Args} <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Args)
end,
@@ -1418,25 +1168,16 @@ handle_cast(delete_immediately, State) ->
stop(State);
handle_cast({resume, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = Limiter}) ->
- C#cr{limiter = rabbit_limiter:resume(Limiter)}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:resume_fun(),
+ ChPid, State));
handle_cast({notify_sent, ChPid, Credit}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - Credit}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:notify_sent_fun(Credit),
+ ChPid, State));
handle_cast({activate_limit, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = Limiter}) ->
- C#cr{limiter = rabbit_limiter:activate(Limiter)}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
+ ChPid, State));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
@@ -1452,39 +1193,30 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State);
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
true = BQ =/= rabbit_mirror_queue_master, %% assertion
BQ1 = rabbit_mirror_queue_master,
BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+ backing_queue_state = BQS1});
handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
BQ = rabbit_mirror_queue_master, %% assertion
{BQ1, BQS1} = BQ:stop_mirroring(BQS),
noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+ backing_queue_state = BQS1});
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
Len = BQ:len(BQS),
rabbit_channel:send_credit_reply(ChPid, Len),
- C = #cr{limiter = Limiter} = lookup_ch(ChPid),
- C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
- noreply(case Drain andalso Len == 0 of
- true -> update_ch_record(C1),
- send_drained(C1),
- State;
- false -> case is_ch_blocked(C1) of
- true -> update_ch_record(C1),
- State;
- false -> unblock(State, C1)
- end
- end);
+ noreply(possibly_unblock(rabbit_queue_consumers:credit_fun(
+ Len == 0, Credit, Drain, CTag),
+ ChPid, State));
handle_cast(notify_decorators, State) ->
notify_decorators(refresh, [], State),
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index ae5bbf51..8eaac10d 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -119,52 +119,51 @@ create_frame(TypeInt, ChannelInt, Payload) ->
table_field_to_binary({FName, T, V}) ->
[short_string_to_binary(FName) | field_value_to_binary(T, V)].
-field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)];
-field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>];
+field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)];
+field_value_to_binary(signedint, V) -> [$I | <<V:32/signed>>];
field_value_to_binary(decimal, V) -> {Before, After} = V,
- ["D", Before, <<After:32>>];
-field_value_to_binary(timestamp, V) -> ["T", <<V:64>>];
-field_value_to_binary(table, V) -> ["F", table_to_binary(V)];
-field_value_to_binary(array, V) -> ["A", array_to_binary(V)];
-field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>];
-field_value_to_binary(double, V) -> ["d", <<V:64/float>>];
-field_value_to_binary(float, V) -> ["f", <<V:32/float>>];
-field_value_to_binary(long, V) -> ["l", <<V:64/signed>>];
-field_value_to_binary(short, V) -> ["s", <<V:16/signed>>];
-field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end];
-field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)];
-field_value_to_binary(void, _V) -> ["V"].
+ [$D | [Before, <<After:32>>]];
+field_value_to_binary(timestamp, V) -> [$T | <<V:64>>];
+field_value_to_binary(table, V) -> [$F | table_to_binary(V)];
+field_value_to_binary(array, V) -> [$A | array_to_binary(V)];
+field_value_to_binary(byte, V) -> [$b | <<V:8/unsigned>>];
+field_value_to_binary(double, V) -> [$d | <<V:64/float>>];
+field_value_to_binary(float, V) -> [$f | <<V:32/float>>];
+field_value_to_binary(long, V) -> [$l | <<V:64/signed>>];
+field_value_to_binary(short, V) -> [$s | <<V:16/signed>>];
+field_value_to_binary(bool, V) -> [$t | [if V -> 1; true -> 0 end]];
+field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)];
+field_value_to_binary(void, _V) -> [$V].
table_to_binary(Table) when is_list(Table) ->
- BinTable = generate_table(Table),
- [<<(size(BinTable)):32>>, BinTable].
+ BinTable = generate_table_iolist(Table),
+ [<<(iolist_size(BinTable)):32>> | BinTable].
array_to_binary(Array) when is_list(Array) ->
- BinArray = generate_array(Array),
- [<<(size(BinArray)):32>>, BinArray].
+ BinArray = generate_array_iolist(Array),
+ [<<(iolist_size(BinArray)):32>> | BinArray].
generate_table(Table) when is_list(Table) ->
- list_to_binary(lists:map(fun table_field_to_binary/1, Table)).
+ list_to_binary(generate_table_iolist(Table)).
-generate_array(Array) when is_list(Array) ->
- list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end,
- Array)).
+generate_table_iolist(Table) ->
+ lists:map(fun table_field_to_binary/1, Table).
+
+generate_array_iolist(Array) ->
+ lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, Array).
-short_string_to_binary(String) when is_binary(String) ->
- Len = size(String),
- if Len < 256 -> [<<Len:8>>, String];
- true -> exit(content_properties_shortstr_overflow)
- end;
short_string_to_binary(String) ->
- Len = length(String),
- if Len < 256 -> [<<Len:8>>, String];
+ Len = string_length(String),
+ if Len < 256 -> [<<Len:8>> | String];
true -> exit(content_properties_shortstr_overflow)
end.
-long_string_to_binary(String) when is_binary(String) ->
- [<<(size(String)):32>>, String];
long_string_to_binary(String) ->
- [<<(length(String)):32>>, String].
+ Len = string_length(String),
+ [<<Len:32>> | String].
+
+string_length(String) when is_binary(String) -> size(String);
+string_length(String) -> length(String).
check_empty_frame_size() ->
%% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 088ad0e5..f65d8ea7 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -53,35 +53,35 @@ parse_array(<<ValueAndRest/binary>>) ->
{Type, Value, Rest} = parse_field_value(ValueAndRest),
[{Type, Value} | parse_array(Rest)].
-parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
{longstr, V, R};
-parse_field_value(<<"I", V:32/signed, R/binary>>) ->
+parse_field_value(<<$I, V:32/signed, R/binary>>) ->
{signedint, V, R};
-parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) ->
+parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) ->
{decimal, {Before, After}, R};
-parse_field_value(<<"T", V:64/unsigned, R/binary>>) ->
+parse_field_value(<<$T, V:64/unsigned, R/binary>>) ->
{timestamp, V, R};
-parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
+parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
{table, parse_table(Table), R};
-parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
+parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
{array, parse_array(Array), R};
-parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R};
-parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R};
-parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R};
-parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R};
-parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R};
-parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
+parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R};
+parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
+parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
+parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
+parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R};
+parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
-parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
{binary, V, R};
-parse_field_value(<<"V", R/binary>>) ->
+parse_field_value(<<$V, R/binary>>) ->
{void, undefined, R}.
ensure_content_decoded(Content = #content{properties = Props})
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6aca34ae..635bf75a 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -126,7 +126,7 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -169,8 +169,8 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
- -> qstate()).
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
+ boolean()) -> qstate()).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -278,7 +278,9 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
none -> false
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
+credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
+ Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)};
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
drained(Limiter = #qstate{credits = Credits}) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 10e68198..488f1df5 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -201,7 +201,7 @@ init([]) ->
%% writing out the cluster status files - bad things can then
%% happen.
process_flag(trap_exit, true),
- net_kernel:monitor_nodes(true),
+ net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
@@ -267,7 +267,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
-handle_info({nodedown, Node}, State) ->
+handle_info({nodedown, Node, Info}, State) ->
+ rabbit_log:info("node ~p down: ~p~n",
+ [Node, proplists:get_value(nodedown_reason, Info)]),
ok = handle_dead_node(Node),
{noreply, State};
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index b54fdd2e..5a1613a7 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,9 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2]).
+ is_running/2, is_process_running/2, fqdn_nodename/0]).
+
+-include_lib("kernel/include/inet.hrl").
-define(EPMD_TIMEOUT, 30000).
@@ -35,6 +37,7 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
+-spec(fqdn_nodename/0 :: () -> binary()).
-endif.
@@ -107,3 +110,9 @@ is_process_running(Node, Process) ->
undefined -> false;
P when is_pid(P) -> true
end.
+
+fqdn_nodename() ->
+ {ID, _} = rabbit_nodes:parts(node()),
+ {ok, Host} = inet:gethostname(),
+ {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
+ list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
new file mode 100644
index 00000000..702091dc
--- /dev/null
+++ b/src/rabbit_queue_consumers.erl
@@ -0,0 +1,426 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_consumers).
+
+-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
+ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
+ possibly_unblock/3,
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
+ utilisation/1]).
+
+%%----------------------------------------------------------------------------
+
+-define(UNSENT_MESSAGE_LIMIT, 200).
+
+-record(state, {consumers, use}).
+
+-record(consumer, {tag, ack_required, args}).
+
+%% These are held in our process dictionary
+-record(cr, {ch_pid,
+ monitor_ref,
+ acktags,
+ consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
+ blocked_consumers,
+ %% The limiter itself
+ limiter,
+ %% Internal flow control for queue -> writer
+ unsent_message_count}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type time_micros() :: non_neg_integer().
+-type ratio() :: float().
+-type state() :: #state{consumers ::priority_queue:q(),
+ use :: {'inactive',
+ time_micros(), time_micros(), ratio()} |
+ {'active', time_micros(), ratio()}}.
+-type ch() :: pid().
+-type ack() :: non_neg_integer().
+-type cr_fun() :: fun ((#cr{}) -> #cr{}).
+-type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
+-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
+
+-spec new() -> state().
+-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
+-spec inactive(state()) -> boolean().
+-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}].
+-spec count() -> non_neg_integer().
+-spec unacknowledged_message_count() -> non_neg_integer().
+-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
+ credit_args(), rabbit_framing:amqp_table(), boolean(),
+ state()) -> state().
+-spec remove(ch(), rabbit_types:ctag(), state()) ->
+ 'not_found' | state().
+-spec erase_ch(ch(), state()) ->
+ 'not_found' | {[ack()], [rabbit_types:ctag()],
+ state()}.
+-spec send_drained() -> 'ok'.
+-spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
+ boolean(), rabbit_amqqueue:name(), T, state()) ->
+ {boolean(), [{ch(), rabbit_types:ctag()}], T, state()}.
+-spec record_ack(ch(), pid(), ack()) -> 'ok'.
+-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec possibly_unblock(cr_fun(), ch(), state()) ->
+ 'unchanged' |
+ {'unblocked', [rabbit_types:ctag()], state()}.
+-spec resume_fun() -> cr_fun().
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+-spec activate_limit_fun() -> cr_fun().
+-spec credit_fun(boolean(), non_neg_integer(), boolean(),
+ rabbit_types:ctag()) -> cr_fun().
+-spec utilisation(state()) -> ratio().
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() -> #state{consumers = priority_queue:new(),
+ use = {inactive, now_micros(), 0, 0.0}}.
+
+max_active_priority(#state{consumers = Consumers}) ->
+ priority_queue:highest(Consumers).
+
+inactive(#state{consumers = Consumers}) ->
+ priority_queue:is_empty(Consumers).
+
+all(#state{consumers = Consumers}) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(Consumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
+ priority_queue:fold(
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Args} | Acc1]
+ end, Acc, Consumers).
+
+count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
+unacknowledged_message_count() ->
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+
+add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
+ IsEmpty, State = #state{consumers = Consumers}) ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, Crd, IsEmpty, Drain)
+ end,
+ C1 = C#cr{consumer_count = Count + 1,
+ limiter = Limiter2},
+ update_ch_record(case IsEmpty of
+ true -> send_drained(C1);
+ false -> C1
+ end),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck,
+ args = OtherArgs},
+ State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
+
+remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
+ Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
+ State#state{consumers =
+ remove_consumer(ChPid, ConsumerTag, Consumers)}
+ end.
+
+erase_ch(ChPid, State = #state{consumers = Consumers}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = BlockedQ} ->
+ AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ ok = erase_ch_record(C),
+ {queue:to_list(ChAckTags),
+ tags(priority_queue:to_list(AllConsumers)),
+ State#state{consumers = remove_consumers(ChPid, Consumers)}}
+ end.
+
+send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
+ ok.
+
+deliver(FetchFun, Stop, QName, S, State) ->
+ deliver(FetchFun, Stop, QName, [], S, State).
+
+deliver(_FetchFun, true, _QName, Blocked, S, State) ->
+ {true, Blocked, S, State};
+deliver( FetchFun, false, QName, Blocked, S,
+ State = #state{consumers = Consumers, use = Use}) ->
+ case priority_queue:out_p(Consumers) of
+ {empty, _} ->
+ {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
+ {{value, QEntry, Priority}, Tail} ->
+ {Stop, Blocked1, S1, Consumers1} =
+ deliver_to_consumer(FetchFun, QEntry, Priority, QName,
+ Blocked, S, Tail),
+ deliver(FetchFun, Stop, QName, Blocked1, S1,
+ State#state{consumers = Consumers1})
+ end.
+
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
+ Blocked, S, Consumers) ->
+ C = lookup_ch(ChPid),
+ case is_ch_blocked(C) of
+ true -> block_consumer(C, E),
+ Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
+ {false, Blocked1, S, Consumers};
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
+ {false, Blocked1, S, Consumers};
+ {continue, Limiter} ->
+ {Stop, S1} = deliver_to_consumer(
+ FetchFun, Consumer,
+ C#cr{limiter = Limiter}, QName, S),
+ {Stop, Blocked, S1,
+ priority_queue:in(E, Priority, Consumers)}
+ end
+ end.
+
+deliver_to_consumer(FetchFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ QName, S) ->
+ {{Message, IsDelivered, AckTag}, Stop, S1} = FetchFun(AckRequired, S),
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> queue:in(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ {Stop, S1}.
+
+record_ack(ChPid, LimiterPid, AckTag) ->
+ C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
+ update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
+ ok.
+
+subtract_acks(ChPid, AckTags) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
+ ok
+ end.
+
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
+possibly_unblock(Update, ChPid, State) ->
+ case lookup_ch(ChPid) of
+ not_found -> unchanged;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ unchanged;
+ true -> unblock(C1, State)
+ end
+ end.
+
+unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
+ State = #state{consumers = Consumers, use = Use}) ->
+ case lists:partition(
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, priority_queue:to_list(BlockedQ)) of
+ {_, []} ->
+ update_ch_record(C),
+ unchanged;
+ {Blocked, Unblocked} ->
+ BlockedQ1 = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
+ {unblocked,
+ tags(Unblocked),
+ State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
+ use = update_use(Use, active)}}
+ end.
+
+resume_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end.
+
+notify_sent_fun(Credit) ->
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - Credit}
+ end.
+
+activate_limit_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end.
+
+credit_fun(IsEmpty, Credit, Drain, CTag) ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C1 = C#cr{limiter = rabbit_limiter:credit(
+ Limiter, CTag, Credit, IsEmpty, Drain)},
+ case Drain andalso IsEmpty of
+ true -> send_drained(C1);
+ false -> C1
+ end
+ end.
+
+utilisation(#state{use = {active, Since, Avg}}) ->
+ use_avg(now_micros() - Since, 0, Avg);
+utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
+ use_avg(Active, now_micros() - Since, Avg).
+
+%%----------------------------------------------------------------------------
+
+lookup_ch(ChPid) ->
+ case get({ch, ChPid}) of
+ undefined -> not_found;
+ C -> C
+ end.
+
+ch_record(ChPid, LimiterPid) ->
+ Key = {ch, ChPid},
+ case get(Key) of
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
+ C = #cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = queue:new(),
+ consumer_count = 0,
+ blocked_consumers = priority_queue:new(),
+ limiter = Limiter,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
+ C = #cr{} -> C
+ end.
+
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
+ end,
+ C.
+
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
+
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}),
+ ok.
+
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
+
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
+
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> C;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ C#cr{limiter = Limiter2}
+ end.
+
+tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
+
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, Queue).
+
+remove_consumer(ChPid, ConsumerTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
+ end, Queue).
+
+remove_consumers(ChPid, Queue) ->
+ priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
+ (_) -> true
+ end, Queue).
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
+now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b3b341c5..fce50049 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/2, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/4, recvloop/4]).
-export([conserve_resources/3, server_properties/1]).
@@ -38,7 +38,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
+ stats_timer, channel_sup_sup_pid, channel_count, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
@@ -91,9 +91,10 @@
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
--spec(mainloop/2 :: (_,#v1{}) -> any()).
+-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
--spec(system_continue/3 :: (_,_,#v1{}) -> any()).
+-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) ->
+ any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
-endif.
@@ -113,8 +114,8 @@ init(Parent, HelperSup) ->
start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
-system_continue(Parent, Deb, State) ->
- mainloop(Deb, State#v1{parent = Parent}).
+system_continue(Parent, Deb, {Buf, BufLen, State}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -239,8 +240,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
- buf = [],
- buf_len = 0,
+ channel_count = 0,
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
@@ -248,9 +248,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
blocked_sent = false}},
try
run({?MODULE, recvloop,
- [Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)]}),
+ [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -277,29 +277,38 @@ run({M, F, A}) ->
catch {become, MFA} -> run(MFA)
end.
-recvloop(Deb, State = #v1{pending_recv = true}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{connection_state = blocked}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
+recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, Buf, BufLen, State)});
+recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
- mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
- {Data, Rest} = split_binary(case Buf of
- [B] -> B;
- _ -> list_to_binary(lists:reverse(Buf))
- end, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen})).
-
-mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true});
+recvloop(Deb, [B], _BufLen, State) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, [Rest], size(Rest), State1);
+recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
+ {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
+
+binlist_split(0, L, Acc) ->
+ {L, Acc};
+binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
+ {H, T} = split_binary(Acc0, -Len),
+ {[H|L], [T|Acc]};
+binlist_split(Len, [H|T], Acc) ->
+ binlist_split(Len - size(H), T, [H|Acc]).
+
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
case rabbit_net:recv(Sock) of
{data, Data} ->
- recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
+ recvloop(Deb, [Data | Buf], BufLen + size(Data),
+ State#v1{pending_recv = false});
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
@@ -310,11 +319,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
throw({inet_error, Reason});
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
- ?MODULE, Deb, State);
+ ?MODULE, Deb, {Buf, BufLen, State});
{other, Other} ->
case handle_other(Other, State) of
stop -> ok;
- NewState -> recvloop(Deb, NewState)
+ NewState -> recvloop(Deb, Buf, BufLen, NewState)
end
end.
@@ -329,8 +338,8 @@ handle_other({conserve_resources, Source, Conserve},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- maybe_close(control_throttle(State));
+ {_, State1} = channel_cleanup(ChPid, State),
+ maybe_close(control_throttle(State1));
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -487,63 +496,59 @@ close_connection(State = #v1{queue_collector = Collector,
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, controlled} -> State;
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, controlled} -> State1;
{undefined, uncontrolled} -> exit({abnormal_dependent_exit,
ChPid, Reason});
- {_Channel, controlled} -> maybe_close(control_throttle(State));
- {Channel, uncontrolled} -> State1 = handle_exception(
- State, Channel, Reason),
- maybe_close(control_throttle(State1))
+ {_, controlled} -> maybe_close(control_throttle(State1));
+ {_, uncontrolled} -> State2 = handle_exception(
+ State1, Channel, Reason),
+ maybe_close(control_throttle(State2))
end.
-terminate_channels() ->
- NChannels =
- length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
- if NChannels > 0 ->
- Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
- TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(NChannels, TimerRef);
- true -> ok
- end.
+terminate_channels(#v1{channel_count = 0} = State) ->
+ State;
+terminate_channels(#v1{channel_count = ChannelCount} = State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
+ TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
+ wait_for_channel_termination(ChannelCount, TimerRef, State).
-wait_for_channel_termination(0, TimerRef) ->
+wait_for_channel_termination(0, TimerRef, State) ->
case erlang:cancel_timer(TimerRef) of
false -> receive
- cancel_wait -> ok
+ cancel_wait -> State
end;
- _ -> ok
+ _ -> State
end;
-
-wait_for_channel_termination(N, TimerRef) ->
+wait_for_channel_termination(N, TimerRef, State) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, _} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- wait_for_channel_termination(N-1, TimerRef);
- {Channel, uncontrolled} ->
- log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(N-1, TimerRef)
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, _} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_, controlled} -> wait_for_channel_termination(
+ N-1, TimerRef, State1);
+ {_, uncontrolled} -> log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
+ wait_for_channel_termination(
+ N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
end.
maybe_close(State = #v1{connection_state = closing,
- connection = #connection{protocol = Protocol},
- sock = Sock}) ->
- case all_channels() of
- [] ->
- NewState = close_connection(State),
- ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- NewState;
- _ -> State
- end;
+ channel_count = 0,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
maybe_close(State) ->
State.
@@ -562,8 +567,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
[self(), CS, Channel, Reason]),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
+ State1 = close_connection(terminate_channels(State)),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
State1;
handle_exception(State, Channel, Reason) ->
@@ -601,41 +605,43 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{name = Name,
- protocol = Protocol,
- frame_max = FrameMax,
- channel_max = ChannelMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- N = length(all_channels()),
- case ChannelMax == 0 orelse N < ChannelMax of
- true -> {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities,
- Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ok, {ChPid, AState}};
- false -> {error, rabbit_misc:amqp_error(
- not_allowed, "number of channels opened (~w) has "
- "reached the negotiated channel_max (~w)",
- [N, ChannelMax], 'none')}
- end.
-
-channel_cleanup(ChPid) ->
+create_channel(_Channel,
+ #v1{channel_count = ChannelCount,
+ connection = #connection{channel_max = ChannelMax}})
+ when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
+ {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has reached the "
+ "negotiated channel_max (~w)",
+ [ChannelCount, ChannelMax], 'none')};
+create_channel(Channel,
+ #v1{sock = Sock,
+ queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ channel_count = ChannelCount,
+ connection =
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State) ->
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}.
+
+channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
case get({ch_pid, ChPid}) of
- undefined -> undefined;
+ undefined -> {undefined, State};
{Channel, MRef} -> credit_flow:peer_down(ChPid),
erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
- Channel
+ {Channel, State#v1{channel_count = ChannelCount - 1}}
end.
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
@@ -676,34 +682,34 @@ process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
case (case get(ChKey) of
undefined -> create_channel(Channel, State);
- Other -> {ok, Other}
+ Other -> {ok, Other, State}
end) of
{error, Error} ->
handle_exception(State, Channel, Error);
- {ok, {ChPid, AState}} ->
+ {ok, {ChPid, AState}, State1} ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} ->
put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
+ post_process_frame(Frame, ChPid, State1);
{ok, Method, NewAState} ->
rabbit_channel:do(ChPid, Method),
put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
+ post_process_frame(Frame, ChPid, State1);
{ok, Method, Content, NewAState} ->
rabbit_channel:do_flow(ChPid, Method, Content),
put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
+ post_process_frame(Frame, ChPid, control_throttle(State1));
{error, Reason} ->
- handle_exception(State, Channel, Reason)
+ handle_exception(State1, Channel, Reason)
end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
- channel_cleanup(ChPid),
+ {_, State1} = channel_cleanup(ChPid, State),
%% This is not strictly necessary, but more obviously
%% correct. Also note that we do not need to call maybe_close/1
%% since we cannot possibly be in the 'closing' state.
- control_throttle(State);
+ control_throttle(State1);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
@@ -717,26 +723,38 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_header, Other});
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
%% The two rules pertaining to version negotiation:
%%
@@ -746,37 +764,31 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
%%
%% * The server MUST provide a protocol version that is lower than or
%% equal to that requested by the client in the protocol header.
-handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+handshake({0, 0, 9, 1}, State) ->
start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
%% This is the protocol header for 0-9, which we can safely treat as
%% though it were 0-9-1.
-handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+handshake({1, 1, 0, 9}, State) ->
start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
%% defines the version as 8-0.
-handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+handshake({1, 1, 8, 0}, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
%% The 0-8 spec as on the AMQP web site actually has this as the
%% protocol header; some libraries e.g., py-amqplib, send it when they
%% want 0-8.
-handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
+handshake({1, 1, 9, 1}, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
-%% ... and finally, the 1.0 spec is crystal clear! Note that the
-handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+%% ... and finally, the 1.0 spec is crystal clear!
+handshake({Id, 1, 0, 0}, State) ->
become_1_0(Id, State);
-handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, {A, B, C, D}});
-
-handle_input(handshake, Other, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_header, Other});
-
-handle_input(Callback, Data, _State) ->
- throw({bad_input, Callback, Data}).
+handshake(Vsn, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, Vsn}).
%% Offer a protocol version to the client. Connection.start only
%% includes a major and minor version number, Luckily 0-9 and 0-9-1
@@ -820,7 +832,10 @@ handle_method0(MethodName, FieldsBin,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
- catch exit:#amqp_error{method = none} = Reason ->
+ catch throw:{inet_error, closed} ->
+ maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+ exit:#amqp_error{method = none} = Reason ->
handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
Stack = erlang:get_stacktrace(),
@@ -1044,16 +1059,14 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(channels, #v1{}) -> length(all_channels());
+i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
i(state, #v1{connection_state = ConnectionState,
throttle = #throttle{last_blocked_by = BlockedBy,
last_blocked_at = T}}) ->
Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000,
- case {BlockedBy, ConnectionState, Recent} of
- {resourse, blocked, _} -> blocked;
- {_, blocking, _} -> blocking;
- {flow, _, true} -> flow;
- {_, _, _} -> ConnectionState
+ case {BlockedBy, Recent} of
+ {flow, true} -> flow;
+ {_, _} -> ConnectionState
end;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
@@ -1110,10 +1123,8 @@ emit_stats(State) ->
%% If we emit an event which looks like we are in flow control, it's not a
%% good idea for it to be our last even if we go idle. Keep emitting
%% events, either we stay busy or we drop out of flow control.
- %% The 5 is to match the test in formatters.js:fmt_connection_state().
- %% This magic number will go away when bug 24829 is merged.
- case proplists:get_value(last_blocked_age, Infos) < 5 of
- true -> ensure_stats_timer(State1);
+ case proplists:get_value(state, Infos) of
+ flow -> ensure_stats_timer(State1);
_ -> State1
end.
@@ -1131,15 +1142,16 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
Sock, {unsupported_amqp1_0_protocol_id, Id},
{3, 1, 0, 0})
end,
- throw({become, {rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(State)]}})
+ F = fun (_Deb, Buf, BufLen, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(Buf, BufLen, S)]}
+ end,
+ State = #v1{connection_state = {become, F}}
end.
-pack_for_1_0(#v1{parent = Parent,
- sock = Sock,
- recv_len = RecvLen,
- pending_recv = PendingRecv,
- helper_sup = SupPid,
- buf = Buf,
- buf_len = BufLen}) ->
+pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ helper_sup = SupPid}) ->
{Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 369ec655..fc4353dc 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -173,16 +173,19 @@ set_mem_limits(State, MemFraction) ->
?MEMORY_SIZE_FOR_UNKNOWN_OS;
M -> M
end,
- UsableMemory = case get_vm_limit() of
- Limit when Limit < TotalMemory ->
- error_logger:warning_msg(
- "Only ~pMB of ~pMB memory usable due to "
- "limited address space.~n",
- [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]),
- Limit;
- _ ->
- TotalMemory
- end,
+ UsableMemory =
+ case get_vm_limit() of
+ Limit when Limit < TotalMemory ->
+ error_logger:warning_msg(
+ "Only ~pMB of ~pMB memory usable due to "
+ "limited address space.~n"
+ "Crashes due to memory exhaustion are possible - see~n"
+ "http://www.rabbitmq.com/memory.html#address-space~n",
+ [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]),
+ Limit;
+ _ ->
+ TotalMemory
+ end,
MemLim = trunc(MemFraction * UsableMemory),
error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n",
[trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]),