summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-10 12:03:22 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-10 12:03:22 +0000
commita3a42f14505cb1e366a931a74ae21a29a6aee444 (patch)
treea8e9e45fd4d563b6ce8914f5377d1013c1455ac4
parent8909dce2a277b5723eef6595c00c13f9c356184c (diff)
parent79c905c57216c2128a74f9f976a35686a586d7c2 (diff)
downloadrabbitmq-server-a3a42f14505cb1e366a931a74ae21a29a6aee444.tar.gz
Merge in default
-rw-r--r--docs/rabbitmq.config.example5
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_reader.erl163
-rw-r--r--src/rabbit_upgrade.erl9
6 files changed, 118 insertions, 78 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 91402649..c0d6cc70 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -138,6 +138,11 @@
%%
%% {frame_max, 131072},
+ %% Set the max permissible number of channels per connection.
+ %% 0 means "no limit".
+ %%
+ %% {channel_max, 128},
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6410235d..d2a3f7c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1430,6 +1430,10 @@
<listitem><para>Maximum frame size (bytes).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>channel_max</term>
+ <listitem><para>Maximum number of channels on this connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>client_properties</term>
<listitem><para>Informational properties transmitted by the client
during connection establishment.</para></listitem>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index a10bad45..29f06e79 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -25,6 +25,7 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
+ {channel_max, 0},
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
@@ -52,6 +53,7 @@
{nodelay, true},
{linger, {true, 0}},
{exit_on_close, false}]},
+ {halt_on_upgrade_failure, true},
{hipe_compile, false},
%% see bug 24513 for how this list was created
{hipe_modules,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 3a8fae7f..f27f77c6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -434,10 +434,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
%% First disc node up
maybe_force_load(),
ok;
- {[AnotherNode | _], _, _} ->
+ {[_ | _], _, _} ->
%% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
maybe_force_load(),
ok = rabbit_table:wait_for_replicated(),
ok = rabbit_table:create_local_copy(NodeType)
@@ -639,15 +637,6 @@ schema_ok_or_move() ->
ok = create_schema()
end.
-ensure_version_ok({ok, DiscVersion}) ->
- DesiredVersion = rabbit_version:desired(),
- case rabbit_version:matches(DesiredVersion, DiscVersion) of
- true -> ok;
- false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}})
- end;
-ensure_version_ok({error, _}) ->
- ok = rabbit_version:record_desired().
-
%% We only care about disc nodes since ram nodes are supposed to catch
%% up only
create_schema() ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e5c8964c..e510f3f1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -32,6 +32,7 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
%%--------------------------------------------------------------------------
@@ -40,7 +41,7 @@
stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -55,7 +56,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -606,17 +607,26 @@ create_channel(Channel, State) ->
connection = #connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
+ channel_max = ChannelMax,
user = User,
vhost = VHost,
capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ N = length(all_channels()),
+ case ChannelMax == 0 orelse N < ChannelMax of
+ true -> {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities,
+ Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}};
+ false -> {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has "
+ "reached the negotiated channel_max (~w)",
+ [N, ChannelMax], 'none')}
+ end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -664,24 +674,28 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -838,40 +852,35 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
sock = Sock}) ->
- ServerFrameMax = server_frame_max(),
- if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w < ~w min size",
- [FrameMax, ?FRAME_MIN_SIZE]);
- ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ServerFrameMax]);
- true ->
- {ok, Collector} =
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} =
rabbit_connection_helper_sup:start_queue_collector(
SupPid, Connection#connection.name),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
rabbit_heartbeat:start(
SupPid, Sock, Connection#connection.name,
ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- queue_collector = Collector,
- heartbeater = Heartbeater}
- end;
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
@@ -919,13 +928,28 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-server_frame_max() ->
- {ok, FrameMax} = application:get_env(rabbit, frame_max),
- FrameMax.
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, ServerValue, ClientValue);
+ ServerValue /= 0 andalso ClientValue > ServerValue ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
-server_heartbeat() ->
- {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
- Heartbeat.
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
+
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -991,9 +1015,9 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = server_heartbeat()},
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User,
@@ -1045,6 +1069,7 @@ ic(user, #connection{user = U}) -> U#user.username;
ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
@@ -1079,8 +1104,18 @@ maybe_emit_stats(State) ->
fun() -> emit_stats(State) end).
emit_stats(State) ->
- rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
- rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+ Infos = infos(?STATISTICS_KEYS, State),
+ rabbit_event:notify(connection_stats, Infos),
+ State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
+ %% If we emit an event which looks like we are in flow control, it's not a
+ %% good idea for it to be our last even if we go idle. Keep emitting
+ %% events, either we stay busy or we drop out of flow control.
+ %% The 5 is to match the test in formatters.js:fmt_connection_state().
+ %% This magic number will go away when bug 24829 is merged.
+ case proplists:get_value(last_blocked_age, Infos) < 5 of
+ true -> ensure_stats_timer(State1);
+ _ -> State1
+ end.
%% 1.0 stub
-ifdef(use_specs).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 1047b823..c1f142d7 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -191,9 +191,14 @@ die(Msg, Args) ->
%% straight out into do_boot, generating an erl_crash.dump
%% and displaying any error message in a confusing way.
error_logger:error_msg(Msg, Args),
- io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ Str = rabbit_misc:format(
+ "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ io:format(Str),
error_logger:logfile(close),
- halt(1).
+ case application:get_env(rabbit, halt_on_upgrade_failure) of
+ {ok, false} -> throw({upgrade_error, Str});
+ _ -> halt(1) %% i.e. true or undefined
+ end.
primary_upgrade(Upgrades, Nodes) ->
Others = Nodes -- [node()],