From 1accdc881477f68566fead5b239f7644b7109a45 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 25 Nov 2013 15:19:50 +0400 Subject: Make connection.tune.channel_max configurable --- src/rabbit_reader.erl | 61 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e00732fd..220db174 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -32,6 +32,7 @@ -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(CHANNEL_MIN, 4). %%-------------------------------------------------------------------------- @@ -838,8 +839,9 @@ handle_method0(#'connection.secure_ok'{response = Response}, State = #v1{connection_state = securing}) -> auth_phase(Response, State); -handle_method0(#'connection.tune_ok'{frame_max = FrameMax, - heartbeat = ClientHeartbeat}, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, + heartbeat = ClientHeartbeat, + channel_max = ChannelMax}, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, @@ -854,21 +856,40 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ServerFrameMax]); true -> - {ok, Collector} = + ServerChannelMax = server_channel_max(), + Protocol = Connection#connection.protocol, + if ChannelMax /= 0 andalso ChannelMax < ?CHANNEL_MIN -> + AmqpError = rabbit_misc:amqp_error( + not_allowed, "negotiated channel_max=~w < ~w min size", + [ChannelMax, ServerChannelMax], none), + {0, CloseMethod} = rabbit_binary_generator:map_exception( + 0, AmqpError, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + ChannelMax /= 0 andalso ChannelMax > ServerChannelMax -> + AmqpError = rabbit_misc:amqp_error( + not_allowed, "negotiated channel_max=~w > ~w max size", + [ChannelMax, ServerChannelMax], none), + {0, CloseMethod} = rabbit_binary_generator:map_exception( + 0, AmqpError, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + true -> + {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(SupPid), - Frame = rabbit_binary_generator:build_heartbeat_frame(), - SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, - queue_collector = Collector, - heartbeater = Heartbeater} + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, + Parent = self(), + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, + ReceiveFun), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}, + queue_collector = Collector, + heartbeater = Heartbeater} + end end; handle_method0(#'connection.open'{virtual_host = VHostPath}, @@ -921,6 +942,12 @@ server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. +server_channel_max() -> + case application:get_env(rabbit, channel_max) of + {ok, ChannelMax} -> ChannelMax; + undefined -> 0 + end. + server_heartbeat() -> {ok, Heartbeat} = application:get_env(rabbit, heartbeat), Heartbeat. @@ -989,7 +1016,7 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = 0, + Tune = #'connection.tune'{channel_max = server_channel_max(), frame_max = server_frame_max(), heartbeat = server_heartbeat()}, ok = send_on_channel0(Sock, Tune, Protocol), -- cgit v1.2.1 From 47a5118aa394e1d6f4736ae93e950549247b4e05 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 25 Nov 2013 15:20:11 +0400 Subject: Add channel_max default --- ebin/rabbit_app.in | 1 + 1 file changed, 1 insertion(+) diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index a10bad45..0254bb21 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -25,6 +25,7 @@ %% 0 ("no limit") would make a better default, but that %% breaks the QPid Java client {frame_max, 131072}, + {channel_max, 0}, {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, -- cgit v1.2.1 From f33267221b56fd78e7459945cbd70f4f432e297d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 13:25:02 +0400 Subject: No need to handle undefined case here --- src/rabbit_reader.erl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 220db174..188368f7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -943,10 +943,8 @@ server_frame_max() -> FrameMax. server_channel_max() -> - case application:get_env(rabbit, channel_max) of - {ok, ChannelMax} -> ChannelMax; - undefined -> 0 - end. + {ok, ChannelMax} = application:get_env(rabbit, channel_max), + ChannelMax. server_heartbeat() -> {ok, Heartbeat} = application:get_env(rabbit, heartbeat), -- cgit v1.2.1 From de8a27f1a463ba020b1742c4816cb334617171b6 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 14:30:37 +0400 Subject: Extract function, improve error messages --- src/rabbit_reader.erl | 95 +++++++++++++++++++++++++++------------------------ 1 file changed, 50 insertions(+), 45 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 188368f7..945e38a4 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -846,51 +846,35 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, connection = Connection, helper_sup = SupPid, sock = Sock}) -> - ServerFrameMax = server_frame_max(), - if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w < ~w min size", - [FrameMax, ?FRAME_MIN_SIZE]); - ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w > ~w max size", - [FrameMax, ServerFrameMax]); - true -> - ServerChannelMax = server_channel_max(), - Protocol = Connection#connection.protocol, - if ChannelMax /= 0 andalso ChannelMax < ?CHANNEL_MIN -> - AmqpError = rabbit_misc:amqp_error( - not_allowed, "negotiated channel_max=~w < ~w min size", - [ChannelMax, ServerChannelMax], none), - {0, CloseMethod} = rabbit_binary_generator:map_exception( - 0, AmqpError, Protocol), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); - ChannelMax /= 0 andalso ChannelMax > ServerChannelMax -> - AmqpError = rabbit_misc:amqp_error( - not_allowed, "negotiated channel_max=~w > ~w max size", - [ChannelMax, ServerChannelMax], none), - {0, CloseMethod} = rabbit_binary_generator:map_exception( - 0, AmqpError, Protocol), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); - true -> - {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(SupPid), - Frame = rabbit_binary_generator:build_heartbeat_frame(), - SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, - ReceiveFun), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, - queue_collector = Collector, - heartbeater = Heartbeater} - end - end; + Protocol = Connection#connection.protocol, + ok = validate_negotiated_integer_value(State, + frame_max, + FrameMax, + server_frame_max(), + ?FRAME_MIN_SIZE, + Protocol), + ok = validate_negotiated_integer_value(State, + channel_max, + ChannelMax, + server_channel_max(), + ?CHANNEL_MIN, + Protocol), + {ok, Collector} = + rabbit_connection_helper_sup:start_queue_collector(SupPid), + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, + Parent = self(), + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, + ReceiveFun), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}, + queue_collector = Collector, + heartbeater = Heartbeater}; handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, @@ -938,6 +922,27 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). +validate_negotiated_integer_value(State, Field, ClientValue, ServerValue, Min, Protocol) -> + if ClientValue /= 0 andalso ClientValue < Min -> + AmqpError = rabbit_misc:amqp_error( + not_allowed, "negotiated ~p = ~w is lower than the minimum allowedvalue (~w)", + [Field, ClientValue, ServerValue], none), + {0, CloseMethod} = + rabbit_binary_generator:map_exception(0, AmqpError, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol), + rabbit_misc:protocol_error(AmqpError); + ServerValue /= 0 andalso ClientValue > ServerValue -> + AmqpError = rabbit_misc:amqp_error( + not_allowed, "negotiated ~p = ~w is greater than the maximum allowed value (~w)", + [Field, ClientValue, ServerValue], none), + {0, CloseMethod} = + rabbit_binary_generator:map_exception(0, AmqpError, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol), + rabbit_misc:protocol_error(AmqpError); + true -> + ok + end. + server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. -- cgit v1.2.1 From 58cdc04c08ca968716194bbd14234776e738a5f4 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 14:58:21 +0400 Subject: Pass socket around instead of the entire state --- src/rabbit_reader.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 945e38a4..b1daeb91 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -847,13 +847,13 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, helper_sup = SupPid, sock = Sock}) -> Protocol = Connection#connection.protocol, - ok = validate_negotiated_integer_value(State, + ok = validate_negotiated_integer_value(State#v1.sock, frame_max, FrameMax, server_frame_max(), ?FRAME_MIN_SIZE, Protocol), - ok = validate_negotiated_integer_value(State, + ok = validate_negotiated_integer_value(State#v1.sock, channel_max, ChannelMax, server_channel_max(), @@ -922,14 +922,14 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -validate_negotiated_integer_value(State, Field, ClientValue, ServerValue, Min, Protocol) -> +validate_negotiated_integer_value(Socket, Field, ClientValue, ServerValue, Min, Protocol) -> if ClientValue /= 0 andalso ClientValue < Min -> AmqpError = rabbit_misc:amqp_error( not_allowed, "negotiated ~p = ~w is lower than the minimum allowedvalue (~w)", [Field, ClientValue, ServerValue], none), {0, CloseMethod} = rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol), + ok = send_on_channel0(Socket, CloseMethod, Protocol), rabbit_misc:protocol_error(AmqpError); ServerValue /= 0 andalso ClientValue > ServerValue -> AmqpError = rabbit_misc:amqp_error( @@ -937,7 +937,7 @@ validate_negotiated_integer_value(State, Field, ClientValue, ServerValue, Min, P [Field, ClientValue, ServerValue], none), {0, CloseMethod} = rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol), + ok = send_on_channel0(Socket, CloseMethod, Protocol), rabbit_misc:protocol_error(AmqpError); true -> ok -- cgit v1.2.1 From c55fde8f805f56ef61be7501d711c31ed7803dbd Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 15:17:53 +0400 Subject: Refactor * Passing State allows the function take one less argument --- src/rabbit_reader.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b1daeb91..d720ef0e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -846,19 +846,16 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, connection = Connection, helper_sup = SupPid, sock = Sock}) -> - Protocol = Connection#connection.protocol, - ok = validate_negotiated_integer_value(State#v1.sock, - frame_max, + ok = validate_negotiated_integer_value(frame_max, FrameMax, server_frame_max(), ?FRAME_MIN_SIZE, - Protocol), - ok = validate_negotiated_integer_value(State#v1.sock, - channel_max, + State), + ok = validate_negotiated_integer_value(channel_max, ChannelMax, server_channel_max(), ?CHANNEL_MIN, - Protocol), + State), {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), @@ -922,14 +919,16 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -validate_negotiated_integer_value(Socket, Field, ClientValue, ServerValue, Min, Protocol) -> +validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min, + State = #v1{sock = Sock, connection = Connection}) -> + Protocol = Connection#connection.protocol, if ClientValue /= 0 andalso ClientValue < Min -> AmqpError = rabbit_misc:amqp_error( not_allowed, "negotiated ~p = ~w is lower than the minimum allowedvalue (~w)", [Field, ClientValue, ServerValue], none), {0, CloseMethod} = rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(Socket, CloseMethod, Protocol), + ok = send_on_channel0(Sock, CloseMethod, Protocol), rabbit_misc:protocol_error(AmqpError); ServerValue /= 0 andalso ClientValue > ServerValue -> AmqpError = rabbit_misc:amqp_error( @@ -937,7 +936,7 @@ validate_negotiated_integer_value(Socket, Field, ClientValue, ServerValue, Min, [Field, ClientValue, ServerValue], none), {0, CloseMethod} = rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(Socket, CloseMethod, Protocol), + ok = send_on_channel0(Sock, CloseMethod, Protocol), rabbit_misc:protocol_error(AmqpError); true -> ok -- cgit v1.2.1 From 58285a490f9ab2c9f3e5e6bf8907607a5296c6d5 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 15:29:02 +0400 Subject: Extract function --- src/rabbit_reader.erl | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d720ef0e..59b4fec1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -923,25 +923,34 @@ validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min, State = #v1{sock = Sock, connection = Connection}) -> Protocol = Connection#connection.protocol, if ClientValue /= 0 andalso ClientValue < Min -> - AmqpError = rabbit_misc:amqp_error( - not_allowed, "negotiated ~p = ~w is lower than the minimum allowedvalue (~w)", - [Field, ClientValue, ServerValue], none), - {0, CloseMethod} = - rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(Sock, CloseMethod, Protocol), - rabbit_misc:protocol_error(AmqpError); + fail_negotiation(Field, ClientValue, ServerValue, min, State); ServerValue /= 0 andalso ClientValue > ServerValue -> - AmqpError = rabbit_misc:amqp_error( - not_allowed, "negotiated ~p = ~w is greater than the maximum allowed value (~w)", - [Field, ClientValue, ServerValue], none), - {0, CloseMethod} = - rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(Sock, CloseMethod, Protocol), - rabbit_misc:protocol_error(AmqpError); + fail_negotiation(Field, ClientValue, ServerValue, max, State); true -> ok end. +fail_negotiation(Field, ClientValue, + ServerValue, MinOrMax, + State = #v1{sock = Sock, connection = Connection}) -> + Protocol = Connection#connection.protocol, + S1 = case MinOrMax of + min -> lower; + max -> greater + end, + S2 = case MinOrMax of + min -> minimum; + max -> maximum + end, + AmqpError = rabbit_misc:amqp_error( + not_allowed, + "negotiated ~p = ~w is ~p than the ~p allowed value (~w)", + [Field, ClientValue, S1, S2, ServerValue], none), + {0, CloseMethod} = + rabbit_binary_generator:map_exception(0, AmqpError, Protocol), + ok = send_on_channel0(Sock, CloseMethod, Protocol), + rabbit_misc:protocol_error(AmqpError). + server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. -- cgit v1.2.1 From 449165cc5dde258fb313901fdbdf0a9087cb6a2c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 16:02:44 +0400 Subject: Don't send connection.close when connection.tune negotiation fails This keeps us spec-compliant. It only happens with bogus clients and there will be a sensible error message in the log. Per discussion with Simon. --- src/rabbit_reader.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 59b4fec1..1600b680 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -933,7 +933,6 @@ validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min, fail_negotiation(Field, ClientValue, ServerValue, MinOrMax, State = #v1{sock = Sock, connection = Connection}) -> - Protocol = Connection#connection.protocol, S1 = case MinOrMax of min -> lower; max -> greater @@ -946,9 +945,6 @@ fail_negotiation(Field, ClientValue, not_allowed, "negotiated ~p = ~w is ~p than the ~p allowed value (~w)", [Field, ClientValue, S1, S2, ServerValue], none), - {0, CloseMethod} = - rabbit_binary_generator:map_exception(0, AmqpError, Protocol), - ok = send_on_channel0(Sock, CloseMethod, Protocol), rabbit_misc:protocol_error(AmqpError). server_frame_max() -> -- cgit v1.2.1 From a7c27a13f8f004c961a131d8ca7985b016126853 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Nov 2013 22:09:19 +0400 Subject: Eliminate warnings --- src/rabbit_reader.erl | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1600b680..04c2a159 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -849,13 +849,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, ok = validate_negotiated_integer_value(frame_max, FrameMax, server_frame_max(), - ?FRAME_MIN_SIZE, - State), + ?FRAME_MIN_SIZE), ok = validate_negotiated_integer_value(channel_max, ChannelMax, server_channel_max(), - ?CHANNEL_MIN, - State), + ?CHANNEL_MIN), {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), @@ -919,20 +917,17 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min, - State = #v1{sock = Sock, connection = Connection}) -> - Protocol = Connection#connection.protocol, +validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min) -> if ClientValue /= 0 andalso ClientValue < Min -> - fail_negotiation(Field, ClientValue, ServerValue, min, State); + fail_negotiation(Field, ClientValue, ServerValue, min); ServerValue /= 0 andalso ClientValue > ServerValue -> - fail_negotiation(Field, ClientValue, ServerValue, max, State); + fail_negotiation(Field, ClientValue, ServerValue, max); true -> ok end. fail_negotiation(Field, ClientValue, - ServerValue, MinOrMax, - State = #v1{sock = Sock, connection = Connection}) -> + ServerValue, MinOrMax) -> S1 = case MinOrMax of min -> lower; max -> greater -- cgit v1.2.1 From 4d28d1b6dd4f9bf138ff825598ebddd8a03ffeb0 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 26 Nov 2013 18:31:32 +0000 Subject: various fixes and refactors - replace two 'case's with one - the opposite of 'lower' is not 'greater' but 'higher' - eliminate unnecessary intermediate var - use rabbit_misc:protocol_error/4 instead of rabbit_misc:protocol_error/1 + rabbit_misc:amqp_error/4 - don't use ~p unnecessarily --- src/rabbit_reader.erl | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 04c2a159..d0ab1df3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -926,21 +926,14 @@ validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min) -> ok end. -fail_negotiation(Field, ClientValue, - ServerValue, MinOrMax) -> - S1 = case MinOrMax of - min -> lower; - max -> greater - end, - S2 = case MinOrMax of - min -> minimum; - max -> maximum - end, - AmqpError = rabbit_misc:amqp_error( - not_allowed, - "negotiated ~p = ~w is ~p than the ~p allowed value (~w)", - [Field, ClientValue, S1, S2, ServerValue], none), - rabbit_misc:protocol_error(AmqpError). +fail_negotiation(Field, ClientValue, ServerValue, MinOrMax) -> + {S1,S2} = case MinOrMax of + min -> {lower, minimum}; + max -> {higher, maximum} + end, + rabbit_misc:protocol_error( + not_allowed, "negotiated ~p = ~w is ~p than the ~p allowed value (~w)", + [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), -- cgit v1.2.1 From dbfedd11c54682e4b502d9399d924216bdd64071 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 26 Nov 2013 18:37:05 +0000 Subject: refactor - more sensible order of arguments - less vertical space --- src/rabbit_reader.erl | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d0ab1df3..984bd8d0 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -846,14 +846,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, connection = Connection, helper_sup = SupPid, sock = Sock}) -> - ok = validate_negotiated_integer_value(frame_max, - FrameMax, - server_frame_max(), - ?FRAME_MIN_SIZE), - ok = validate_negotiated_integer_value(channel_max, - ChannelMax, - server_channel_max(), - ?CHANNEL_MIN), + ok = validate_negotiated_integer_value( + frame_max, ?FRAME_MIN_SIZE, server_frame_max(), FrameMax), + ok = validate_negotiated_integer_value( + channel_max, ?CHANNEL_MIN, server_channel_max(), ChannelMax), {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), @@ -917,16 +913,16 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -validate_negotiated_integer_value(Field, ClientValue, ServerValue, Min) -> +validate_negotiated_integer_value(Field, Min, ServerValue, ClientValue) -> if ClientValue /= 0 andalso ClientValue < Min -> - fail_negotiation(Field, ClientValue, ServerValue, min); + fail_negotiation(Field, min, ServerValue, ClientValue); ServerValue /= 0 andalso ClientValue > ServerValue -> - fail_negotiation(Field, ClientValue, ServerValue, max); + fail_negotiation(Field, max, ServerValue, ClientValue); true -> ok end. -fail_negotiation(Field, ClientValue, ServerValue, MinOrMax) -> +fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> {S1,S2} = case MinOrMax of min -> {lower, minimum}; max -> {higher, maximum} -- cgit v1.2.1 From b88e8e23d3158341534c494209d5e09ec2103a8d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 26 Nov 2013 18:37:36 +0000 Subject: (really) don't use ~p unnecessarily --- src/rabbit_reader.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 984bd8d0..5ec2b23d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -928,7 +928,7 @@ fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> max -> {higher, maximum} end, rabbit_misc:protocol_error( - not_allowed, "negotiated ~p = ~w is ~p than the ~p allowed value (~w)", + not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). server_frame_max() -> -- cgit v1.2.1 From d9af8a9f11897dd67014a7bf15013e3d9b4421c7 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 26 Nov 2013 18:45:24 +0000 Subject: refactor: make handling of server config values more generic and shrink the code as a result --- src/rabbit_reader.erl | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5ec2b23d..589584bd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -847,9 +847,9 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, helper_sup = SupPid, sock = Sock}) -> ok = validate_negotiated_integer_value( - frame_max, ?FRAME_MIN_SIZE, server_frame_max(), FrameMax), + frame_max, ?FRAME_MIN_SIZE, FrameMax), ok = validate_negotiated_integer_value( - channel_max, ?CHANNEL_MIN, server_channel_max(), ChannelMax), + channel_max, ?CHANNEL_MIN, ChannelMax), {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), @@ -913,7 +913,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -validate_negotiated_integer_value(Field, Min, ServerValue, ClientValue) -> +validate_negotiated_integer_value(Field, Min, ClientValue) -> + ServerValue = get_env(Field), if ClientValue /= 0 andalso ClientValue < Min -> fail_negotiation(Field, min, ServerValue, ClientValue); ServerValue /= 0 andalso ClientValue > ServerValue -> @@ -931,17 +932,9 @@ fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). -server_frame_max() -> - {ok, FrameMax} = application:get_env(rabbit, frame_max), - FrameMax. - -server_channel_max() -> - {ok, ChannelMax} = application:get_env(rabbit, channel_max), - ChannelMax. - -server_heartbeat() -> - {ok, Heartbeat} = application:get_env(rabbit, heartbeat), - Heartbeat. +get_env(Key) -> + {ok, Value} = application:get_env(rabbit, Key), + Value. send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -1007,9 +1000,9 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = server_channel_max(), - frame_max = server_frame_max(), - heartbeat = server_heartbeat()}, + Tune = #'connection.tune'{channel_max = get_env(channel_max), + frame_max = get_env(frame_max), + heartbeat = get_env(heartbeat)}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User, -- cgit v1.2.1 From 4d79dfc227eb1e01ed2ec684f7c4a684b851d67b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 26 Nov 2013 19:16:26 +0000 Subject: refactor: consistent order of handling configurable params --- src/rabbit_reader.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 589584bd..c3b52307 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -840,8 +840,8 @@ handle_method0(#'connection.secure_ok'{response = Response}, auth_phase(Response, State); handle_method0(#'connection.tune_ok'{frame_max = FrameMax, - heartbeat = ClientHeartbeat, - channel_max = ChannelMax}, + channel_max = ChannelMax, + heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, @@ -1000,8 +1000,8 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = get_env(channel_max), - frame_max = get_env(frame_max), + Tune = #'connection.tune'{frame_max = get_env(frame_max), + channel_max = get_env(channel_max), heartbeat = get_env(heartbeat)}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, -- cgit v1.2.1 From 2bdabb22e265903c73d1209641698ca9b5e1b182 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 26 Nov 2013 19:16:56 +0000 Subject: cosmetic: reduce vertical space and distance to default --- src/rabbit_reader.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c3b52307..b9751b31 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -858,8 +858,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, - ReceiveFun), + SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, -- cgit v1.2.1 From c319f8942aa00cce9367f9cb0d216ac736837593 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 27 Nov 2013 14:01:05 +0400 Subject: Set CHANNEL_MIN to 1 --- src/rabbit_reader.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b9751b31..3676d9bc 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -32,7 +32,7 @@ -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). --define(CHANNEL_MIN, 4). +-define(CHANNEL_MIN, 1). %%-------------------------------------------------------------------------- -- cgit v1.2.1 From 5bd86f63364722fedeffddf68d14b376554898ec Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 27 Nov 2013 18:00:13 +0400 Subject: Reject channel.open method with channel number > negotiated channel_max --- src/rabbit_reader.erl | 72 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3676d9bc..968fb157 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, - protocol, user, timeout_sec, frame_max, vhost, + protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, auth_mechanism, auth_state}). @@ -607,17 +607,28 @@ create_channel(Channel, State) -> connection = #connection{name = Name, protocol = Protocol, frame_max = FrameMax, + channel_max = ChannelMax, user = User, vhost = VHost, capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ChPid, AState}. + case ChannelMax /= 0 andalso Channel > ChannelMax of + true -> + %% we cannot use rabbit_misc:protocol_error here because amqp_error is caught + %% only for the methods on channel 0. + AmqpError = rabbit_misc:amqp_error( + not_allowed, "channel ~w is greater than negotiated channel_max (~w)", + [Channel, ChannelMax], 'channel.open'), + throw({error, AmqpError}); + false -> + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, + Protocol, User, VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + {ChPid, AState} + end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of @@ -665,24 +676,28 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - {ChPid, AState} = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) + try + {ChPid, AState} = case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> Other + end, + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end + catch {error, Error} -> + handle_exception(State, Channel, Error) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> @@ -862,7 +877,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, + frame_max = FrameMax, + channel_max = ChannelMax}, queue_collector = Collector, heartbeater = Heartbeater}; -- cgit v1.2.1 From 81d1b61309ae604acf8876ebf213c08890f92c09 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 28 Nov 2013 01:29:27 +0400 Subject: Update config example --- docs/rabbitmq.config.example | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 91402649..c0d6cc70 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -138,6 +138,11 @@ %% %% {frame_max, 131072}, + %% Set the max permissible number of channels per connection. + %% 0 means "no limit". + %% + %% {channel_max, 128}, + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for -- cgit v1.2.1 From 8c09ac3b742fefa6a5e17d5887e19c220429e46d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 28 Nov 2013 14:55:17 +0400 Subject: Reject channel.open when the total number of open channel goes above channel_max --- src/rabbit_reader.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 968fb157..b55fde5a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -611,13 +611,14 @@ create_channel(Channel, State) -> user = User, vhost = VHost, capabilities = Capabilities}} = State, - case ChannelMax /= 0 andalso Channel > ChannelMax of + N = length(all_channels()), + case ChannelMax /= 0 andalso N > ChannelMax of true -> %% we cannot use rabbit_misc:protocol_error here because amqp_error is caught %% only for the methods on channel 0. AmqpError = rabbit_misc:amqp_error( - not_allowed, "channel ~w is greater than negotiated channel_max (~w)", - [Channel, ChannelMax], 'channel.open'), + not_allowed, "number of channels opened (~w) is greater than the negotiated channel_max (~w)", + [N, ChannelMax], 'channel.open'), throw({error, AmqpError}); false -> {ok, _ChSupPid, {ChPid, AState}} = -- cgit v1.2.1 From 2f507c70d7d5c45690b9209864e36b750f7bd569 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 28 Nov 2013 16:51:15 +0000 Subject: Introduce halt_on_upgrade_failure app env param and set it to true. Only throw an error instead of halting if it has been set to false. --- ebin/rabbit_app.in | 1 + src/rabbit_upgrade.erl | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index a10bad45..f0fee96a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -52,6 +52,7 @@ {nodelay, true}, {linger, {true, 0}}, {exit_on_close, false}]}, + {halt_on_upgrade_failure, true}, {hipe_compile, false}, %% see bug 24513 for how this list was created {hipe_modules, diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 1047b823..44a1fe0d 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -191,9 +191,14 @@ die(Msg, Args) -> %% straight out into do_boot, generating an erl_crash.dump %% and displaying any error message in a confusing way. error_logger:error_msg(Msg, Args), - io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + Str = rabbit_misc:format( + "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + io:format(Str), error_logger:logfile(close), - halt(1). + case application:get_env(rabbit, halt_on_upgrade_failure) of + false -> throw({upgrade_error, Str}); + _ -> halt(1) %% i.e. true or undefined + end. primary_upgrade(Upgrades, Nodes) -> Others = Nodes -- [node()], -- cgit v1.2.1 From 4883b6e6a4fa9fb757d0f8ed8b0220bae1e34bed Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 28 Nov 2013 17:20:56 +0000 Subject: Ooops. --- src/rabbit_upgrade.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 44a1fe0d..c1f142d7 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -196,8 +196,8 @@ die(Msg, Args) -> io:format(Str), error_logger:logfile(close), case application:get_env(rabbit, halt_on_upgrade_failure) of - false -> throw({upgrade_error, Str}); - _ -> halt(1) %% i.e. true or undefined + {ok, false} -> throw({upgrade_error, Str}); + _ -> halt(1) %% i.e. true or undefined end. primary_upgrade(Upgrades, Nodes) -> -- cgit v1.2.1 From 94bbcb166303ae93b31ed223be72a1531661030a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 12:44:54 +0400 Subject: Fix off-by-1 issue --- src/rabbit_reader.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b55fde5a..00ff4b19 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -612,12 +612,12 @@ create_channel(Channel, State) -> vhost = VHost, capabilities = Capabilities}} = State, N = length(all_channels()), - case ChannelMax /= 0 andalso N > ChannelMax of + case ChannelMax /= 0 andalso N + 1 > ChannelMax of true -> %% we cannot use rabbit_misc:protocol_error here because amqp_error is caught %% only for the methods on channel 0. AmqpError = rabbit_misc:amqp_error( - not_allowed, "number of channels opened (~w) is greater than the negotiated channel_max (~w)", + not_allowed, "number of channels opened (~w) has reached the negotiated channel_max (~w)", [N, ChannelMax], 'channel.open'), throw({error, AmqpError}); false -> -- cgit v1.2.1 From b162ff71c79b62cae58297933f1f6667f22931e6 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 12:45:18 +0400 Subject: We don't know what method caused the problem, so specify it as 'none' --- src/rabbit_reader.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 00ff4b19..4e268f45 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -618,7 +618,7 @@ create_channel(Channel, State) -> %% only for the methods on channel 0. AmqpError = rabbit_misc:amqp_error( not_allowed, "number of channels opened (~w) has reached the negotiated channel_max (~w)", - [N, ChannelMax], 'channel.open'), + [N, ChannelMax], 'none'), throw({error, AmqpError}); false -> {ok, _ChSupPid, {ChPid, AState}} = -- cgit v1.2.1 From ee5f7e72885aa87501191c2d41d9cc7a241f7893 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 12:49:34 +0400 Subject: Use rabbit_misc:protocol_error and catch exits --- src/rabbit_reader.erl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4e268f45..023d0da9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -614,12 +614,9 @@ create_channel(Channel, State) -> N = length(all_channels()), case ChannelMax /= 0 andalso N + 1 > ChannelMax of true -> - %% we cannot use rabbit_misc:protocol_error here because amqp_error is caught - %% only for the methods on channel 0. - AmqpError = rabbit_misc:amqp_error( - not_allowed, "number of channels opened (~w) has reached the negotiated channel_max (~w)", - [N, ChannelMax], 'none'), - throw({error, AmqpError}); + rabbit_misc:protocol_error( + not_allowed, "number of channels opened (~w) has reached the negotiated channel_max (~w)", + [N, ChannelMax], 'none'); false -> {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( @@ -697,7 +694,7 @@ process_frame(Frame, Channel, State) -> {error, Reason} -> handle_exception(State, Channel, Reason) end - catch {error, Error} -> + catch exit:Error -> handle_exception(State, Channel, Error) end. -- cgit v1.2.1 From 82d648055b672bfd70cbba106b46477be8495b7d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 12:51:46 +0400 Subject: Narrow down try .. catch here --- src/rabbit_reader.erl | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 023d0da9..8caae3b8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -674,28 +674,28 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - try - {ChPid, AState} = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) - end - catch exit:Error -> - handle_exception(State, Channel, Error) + {ChPid, AState} = case get(ChKey) of + undefined -> try + create_channel(Channel, State) + catch exit:Error -> + handle_exception(State, Channel, Error) + end; + Other -> Other + end, + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> -- cgit v1.2.1 From cef495ab29605d3e19db8cc20d80cdda44ba9cf5 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 14:27:00 +0400 Subject: Add channel_max to info keys --- src/rabbit_reader.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8caae3b8..713d6f36 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,7 +57,7 @@ peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, client_properties]). + timeout, frame_max, channel_max, client_properties]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). -- cgit v1.2.1 From f53dbd93602c280b4a93f5eeb8fa1094c3ab16b2 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 15:49:48 +0400 Subject: Missing fn head for channel_max --- src/rabbit_reader.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 713d6f36..957233f6 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1063,6 +1063,7 @@ ic(user, #connection{user = U}) -> U#user.username; ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; +ic(channel_max, #connection{channel_max = ChannelMax}) -> ChannelMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; -- cgit v1.2.1 From 9ad8491c3e1163da3f8e4fc8776c3ff68e5d2674 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 15:51:58 +0400 Subject: Cosmetics --- src/rabbit_reader.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 957233f6..cfe80d12 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1064,8 +1064,8 @@ ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; ic(channel_max, #connection{channel_max = ChannelMax}) -> ChannelMax; -ic(client_properties, #connection{client_properties = CP}) -> CP; -ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; +ic(client_properties, #connection{client_properties = CP}) -> CP; +ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; ic(Item, #connection{}) -> throw({bad_argument, Item}). -- cgit v1.2.1 From d86b1e26d7731fc8e1d6f570ccca5c58bdfbfd82 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 29 Nov 2013 16:14:40 +0400 Subject: Formatting --- src/rabbit_reader.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index cfe80d12..407ae67f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -678,7 +678,9 @@ process_frame(Frame, Channel, State) -> undefined -> try create_channel(Channel, State) catch exit:Error -> - handle_exception(State, Channel, Error) + handle_exception(State, + Channel, + Error) end; Other -> Other end, -- cgit v1.2.1 From 5e8be4f6086ab292cc49e3cddfe22ea4b98acf8f Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 2 Dec 2013 11:38:26 +0400 Subject: Make this message fit in 80 chars --- src/rabbit_reader.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 407ae67f..cd920d50 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -615,7 +615,9 @@ create_channel(Channel, State) -> case ChannelMax /= 0 andalso N + 1 > ChannelMax of true -> rabbit_misc:protocol_error( - not_allowed, "number of channels opened (~w) has reached the negotiated channel_max (~w)", + not_allowed, + "number of channels opened (~w) has reached " + ++ "the negotiated channel_max (~w)", [N, ChannelMax], 'none'); false -> {ok, _ChSupPid, {ChPid, AState}} = -- cgit v1.2.1 From fdfe9457f14e9afb9fe01710ed8f2ac7a16ae078 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 2 Dec 2013 12:46:15 +0400 Subject: Indicate channel creation errors via returned value --- src/rabbit_reader.erl | 50 ++++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index cd920d50..d4bc1fa0 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -614,11 +614,12 @@ create_channel(Channel, State) -> N = length(all_channels()), case ChannelMax /= 0 andalso N + 1 > ChannelMax of true -> - rabbit_misc:protocol_error( + Err = rabbit_misc:amqp_error( not_allowed, "number of channels opened (~w) has reached " ++ "the negotiated channel_max (~w)", - [N, ChannelMax], 'none'); + [N, ChannelMax], 'none'), + {error, Err}; false -> {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( @@ -676,30 +677,31 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - {ChPid, AState} = case get(ChKey) of - undefined -> try - create_channel(Channel, State) - catch exit:Error -> - handle_exception(State, - Channel, - Error) - end; + Ch = case get(ChKey) of + undefined -> create_channel(Channel, State); Other -> Other end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) + case Ch of + {error, Error} -> + handle_exception(State, + Channel, + Error); + {ChPid, AState} -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> -- cgit v1.2.1 From 1fed52383f8a7a6fcec748af24f153ceb09ca2f5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 4 Dec 2013 14:58:48 +0000 Subject: Cosmetic --- src/rabbit_reader.erl | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d4bc1fa0..4cba5dc8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -615,10 +615,8 @@ create_channel(Channel, State) -> case ChannelMax /= 0 andalso N + 1 > ChannelMax of true -> Err = rabbit_misc:amqp_error( - not_allowed, - "number of channels opened (~w) has reached " - ++ "the negotiated channel_max (~w)", - [N, ChannelMax], 'none'), + not_allowed, "number of channels opened (~w) has reached " + "the negotiated channel_max (~w)", [N, ChannelMax], 'none'), {error, Err}; false -> {ok, _ChSupPid, {ChPid, AState}} = @@ -678,15 +676,13 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, Ch = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, + undefined -> create_channel(Channel, State); + Other -> Other + end, case Ch of {error, Error} -> - handle_exception(State, - Channel, - Error); - {ChPid, AState} -> + handle_exception(State, Channel, Error); + {ChPid, AState} -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> put(ChKey, {ChPid, NewAState}), @@ -943,10 +939,10 @@ validate_negotiated_integer_value(Field, Min, ClientValue) -> end. fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> - {S1,S2} = case MinOrMax of - min -> {lower, minimum}; - max -> {higher, maximum} - end, + {S1, S2} = case MinOrMax of + min -> {lower, minimum}; + max -> {higher, maximum} + end, rabbit_misc:protocol_error( not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). -- cgit v1.2.1 From e49c748ce7abc252fb12e7211572d21780de104e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 4 Dec 2013 17:22:22 +0000 Subject: Rather than just querying a single remote node, query them all, and accomodate remote nodes that might be running older versions than ourselves, provided we can find someone who is running the same as us. I did try just limiting Nodes to Nodes with the rabbit process running on them. That seemed to break the tests, and trying to figure out why by looking at rabbit_mnesia made me want to slit my wrists and other body parts. Thus I wrote more code instead. --- src/rabbit_mnesia.erl | 34 ++++++++++++++++++++++++---------- src/rabbit_version.erl | 19 ++++++++++++++++++- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 3a8fae7f..8147a91a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -434,10 +434,9 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> %% First disc node up maybe_force_load(), ok; - {[AnotherNode | _], _, _} -> + {[_ | _], _, _} -> %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), + ok = ensure_version_ok(Nodes), maybe_force_load(), ok = rabbit_table:wait_for_replicated(), ok = rabbit_table:create_local_copy(NodeType) @@ -639,14 +638,29 @@ schema_ok_or_move() -> ok = create_schema() end. -ensure_version_ok({ok, DiscVersion}) -> - DesiredVersion = rabbit_version:desired(), - case rabbit_version:matches(DesiredVersion, DiscVersion) of +ensure_version_ok(OtherNodes) -> + Desired = rabbit_version:desired(), + Fun = fun (Node, FoundMatch) -> + case rpc:call(Node, rabbit_version, recorded, []) of + {error, _} -> + FoundMatch; %% Node probably isn't fully up. + {ok, NodeVersion} -> + case rabbit_version:compare(Desired, NodeVersion) of + eq -> + true; + gt -> + FoundMatch; %% Remote is just older than us. + _ -> + throw({error, {version_mismatch, + Desired, NodeVersion}}) + end + end + end, + FoundMatch = lists:foldl(Fun, false, OtherNodes), + case FoundMatch of true -> ok; - false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) - end; -ensure_version_ok({error, _}) -> - ok = rabbit_version:record_desired(). + false -> throw({error, {version_mismatch, Desired}}) + end. %% We only care about disc nodes since ram nodes are supposed to catch %% up only diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index c629180e..047737a4 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -16,7 +16,7 @@ -module(rabbit_version). --export([recorded/0, matches/2, desired/0, desired_for_scope/1, +-export([recorded/0, matches/2, compare/2, desired/0, desired_for_scope/1, record_desired/0, record_desired_for_scope/1, upgrades_required/1]). @@ -33,6 +33,7 @@ -spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(matches/2 :: ([A], [A]) -> boolean()). +-spec(compare/2 :: ([A], [A]) -> ('eq' | 'lt' | 'gt' | 'incomparable')). -spec(desired/0 :: () -> version()). -spec(desired_for_scope/1 :: (scope()) -> scope_version()). -spec(record_desired/0 :: () -> 'ok'). @@ -82,6 +83,22 @@ record_for_scope(Scope, ScopeVersion) -> matches(VerA, VerB) -> lists:usort(VerA) =:= lists:usort(VerB). +compare(VerA, VerB) -> + VerAS = lists:usort(VerA), + VerBS = lists:usort(VerB), + VerASPrefixVerBS = lists:prefix(VerAS, VerBS), + VerBSPrefixVerAS = lists:prefix(VerBS, VerAS), + if + VerAS =:= VerBS -> + eq; + VerASPrefixVerBS -> + lt; + VerBSPrefixVerAS -> + gt; + true -> + incomparable + end. + %% ------------------------------------------------------------------- desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. -- cgit v1.2.1 From fee0d2612f16d67a1a00dd4a8d74c03904e6a5bd Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 5 Dec 2013 16:58:05 +0400 Subject: Disambiguate return values --- src/rabbit_reader.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4cba5dc8..ddacbf26 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -625,8 +625,8 @@ create_channel(Channel, State) -> Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ChPid, AState} + put({channel, Channel}, {ok, ChPid, AState}), + {ok, ChPid, AState} end. channel_cleanup(ChPid) -> @@ -682,18 +682,18 @@ process_frame(Frame, Channel, State) -> case Ch of {error, Error} -> handle_exception(State, Channel, Error); - {ChPid, AState} -> + {ok, ChPid, AState} -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), + put(ChKey, {ok, ChPid, NewAState}), post_process_frame(Frame, ChPid, State); {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), + put(ChKey, {ok, ChPid, NewAState}), post_process_frame(Frame, ChPid, State); {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), + put(ChKey, {ok, ChPid, NewAState}), post_process_frame(Frame, ChPid, control_throttle(State)); {error, Reason} -> handle_exception(State, Channel, Reason) -- cgit v1.2.1 From 652be3a76a239ee3f82991452d660743310460f2 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 5 Dec 2013 16:59:09 +0400 Subject: Add channel_max to rabbitmqctl(1) --- docs/rabbitmqctl.1.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 19d29577..77e4ed5d 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1444,6 +1444,10 @@ frame_max Maximum frame size (bytes). + + channel_max + Maximum number of channels per connection. + client_properties Informational properties transmitted by the client -- cgit v1.2.1 From 2b67b14ddb0e4c2e3f5a4e1b28c5e44d7d54cc84 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 5 Dec 2013 14:27:08 +0000 Subject: better --- src/rabbit_reader.erl | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ddacbf26..41640f97 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -625,8 +625,8 @@ create_channel(Channel, State) -> Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ok, ChPid, AState}), - {ok, ChPid, AState} + put({channel, Channel}, {ChPid, AState}), + {ok, {ChPid, AState}} end. channel_cleanup(ChPid) -> @@ -675,25 +675,24 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - Ch = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case Ch of + case (case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> {ok, Other} + end) of {error, Error} -> handle_exception(State, Channel, Error); - {ok, ChPid, AState} -> + {ok, {ChPid, AState}} -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> - put(ChKey, {ok, ChPid, NewAState}), + put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, State); {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - put(ChKey, {ok, ChPid, NewAState}), + put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, State); {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ok, ChPid, NewAState}), + put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, control_throttle(State)); {error, Reason} -> handle_exception(State, Channel, Reason) -- cgit v1.2.1 From 12431f57fb847fb9f16c8a6ff477d18b4e5f7213 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 5 Dec 2013 14:28:00 +0000 Subject: cosmetic --- src/rabbit_reader.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 41640f97..84d98825 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1064,9 +1064,9 @@ ic(user, #connection{user = U}) -> U#user.username; ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; -ic(channel_max, #connection{channel_max = ChannelMax}) -> ChannelMax; -ic(client_properties, #connection{client_properties = CP}) -> CP; -ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; +ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; +ic(client_properties, #connection{client_properties = CP}) -> CP; +ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; ic(Item, #connection{}) -> throw({bad_argument, Item}). -- cgit v1.2.1 From 267acee1d2444e32ab0fd0159974aeec89bfd2c0 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 5 Dec 2013 14:32:57 +0000 Subject: cosmetic --- src/rabbit_reader.erl | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 84d98825..f6693a46 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -613,20 +613,19 @@ create_channel(Channel, State) -> capabilities = Capabilities}} = State, N = length(all_channels()), case ChannelMax /= 0 andalso N + 1 > ChannelMax of - true -> - Err = rabbit_misc:amqp_error( - not_allowed, "number of channels opened (~w) has reached " - "the negotiated channel_max (~w)", [N, ChannelMax], 'none'), - {error, Err}; - false -> - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ok, {ChPid, AState}} + true -> {error, rabbit_misc:amqp_error( + not_allowed, "number of channels opened (~w) has " + "reached the negotiated channel_max (~w)", + [N, ChannelMax], 'none')}; + false -> {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, + Protocol, User, VHost, Capabilities, + Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + {ok, {ChPid, AState}} end. channel_cleanup(ChPid) -> @@ -875,9 +874,9 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax, - channel_max = ChannelMax}, + frame_max = FrameMax, + channel_max = ChannelMax, + timeout_sec = ClientHeartbeat}, queue_collector = Collector, heartbeater = Heartbeater}; -- cgit v1.2.1 From e0515da4e165e8534469258696646ccfa677493d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 5 Dec 2013 14:39:16 +0000 Subject: refactor for clarity --- src/rabbit_reader.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f6693a46..2b33293d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -612,12 +612,8 @@ create_channel(Channel, State) -> vhost = VHost, capabilities = Capabilities}} = State, N = length(all_channels()), - case ChannelMax /= 0 andalso N + 1 > ChannelMax of - true -> {error, rabbit_misc:amqp_error( - not_allowed, "number of channels opened (~w) has " - "reached the negotiated channel_max (~w)", - [N, ChannelMax], 'none')}; - false -> {ok, _ChSupPid, {ChPid, AState}} = + case ChannelMax == 0 orelse N < ChannelMax of + true -> {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, @@ -625,8 +621,12 @@ create_channel(Channel, State) -> MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), - {ok, {ChPid, AState}} - end. + {ok, {ChPid, AState}}; + false -> {error, rabbit_misc:amqp_error( + not_allowed, "number of channels opened (~w) has " + "reached the negotiated channel_max (~w)", + [N, ChannelMax], 'none')} + end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of -- cgit v1.2.1 From 9d7ca484130d69d6755fda31f83b061974055e33 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 5 Dec 2013 15:50:33 +0000 Subject: Clarify --- docs/rabbitmqctl.1.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 77e4ed5d..8e99dba9 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1446,7 +1446,7 @@ channel_max - Maximum number of channels per connection. + Maximum number of channels on this connection. client_properties -- cgit v1.2.1 From 549a9ca6b3da26d68e13bcfdb628039b5e0b2117 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 5 Dec 2013 18:23:11 +0000 Subject: Make sure that our last event before going sorta-idle is not one in which we claim to be in flow control. --- src/rabbit_reader.erl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e00732fd..2c19b2bf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1073,8 +1073,16 @@ maybe_emit_stats(State) -> fun() -> emit_stats(State) end). emit_stats(State) -> - rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), - rabbit_event:reset_stats_timer(State, #v1.stats_timer). + Infos = infos(?STATISTICS_KEYS, State), + rabbit_event:notify(connection_stats, Infos), + State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer), + %% If we emit an event which looks like we are in flow control, it's not a + %% good idea for it to be our last even if we go idle. Keep emitting + %% events, either we stay busy or we drop out of flow control. + case proplists:get_value(last_blocked_age, Infos) < 5 of + true -> ensure_stats_timer(State1); + _ -> State1 + end. %% 1.0 stub -ifdef(use_specs). -- cgit v1.2.1 From 3817f05efa7c6199bd5f6ea350280786f8564c80 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 6 Dec 2013 11:31:32 +0000 Subject: Explain --- src/rabbit_reader.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2c19b2bf..46c5c42a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1079,6 +1079,8 @@ emit_stats(State) -> %% If we emit an event which looks like we are in flow control, it's not a %% good idea for it to be our last even if we go idle. Keep emitting %% events, either we stay busy or we drop out of flow control. + %% The 5 is to match the test in formatters.js:fmt_connection_state(). + %% This magic number will go away when bug 24829 is merged. case proplists:get_value(last_blocked_age, Infos) < 5 of true -> ensure_stats_timer(State1); _ -> State1 -- cgit v1.2.1 From 08c6e4053b517a54f34ccad447e7076cfcb2b16c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 6 Dec 2013 12:31:22 +0000 Subject: Undo some changes. --- src/rabbit_mnesia.erl | 34 ++++++++++------------------------ src/rabbit_version.erl | 19 +------------------ 2 files changed, 11 insertions(+), 42 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8147a91a..3a8fae7f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -434,9 +434,10 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> %% First disc node up maybe_force_load(), ok; - {[_ | _], _, _} -> + {[AnotherNode | _], _, _} -> %% Subsequent node in cluster, catch up - ok = ensure_version_ok(Nodes), + ensure_version_ok( + rpc:call(AnotherNode, rabbit_version, recorded, [])), maybe_force_load(), ok = rabbit_table:wait_for_replicated(), ok = rabbit_table:create_local_copy(NodeType) @@ -638,29 +639,14 @@ schema_ok_or_move() -> ok = create_schema() end. -ensure_version_ok(OtherNodes) -> - Desired = rabbit_version:desired(), - Fun = fun (Node, FoundMatch) -> - case rpc:call(Node, rabbit_version, recorded, []) of - {error, _} -> - FoundMatch; %% Node probably isn't fully up. - {ok, NodeVersion} -> - case rabbit_version:compare(Desired, NodeVersion) of - eq -> - true; - gt -> - FoundMatch; %% Remote is just older than us. - _ -> - throw({error, {version_mismatch, - Desired, NodeVersion}}) - end - end - end, - FoundMatch = lists:foldl(Fun, false, OtherNodes), - case FoundMatch of +ensure_version_ok({ok, DiscVersion}) -> + DesiredVersion = rabbit_version:desired(), + case rabbit_version:matches(DesiredVersion, DiscVersion) of true -> ok; - false -> throw({error, {version_mismatch, Desired}}) - end. + false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) + end; +ensure_version_ok({error, _}) -> + ok = rabbit_version:record_desired(). %% We only care about disc nodes since ram nodes are supposed to catch %% up only diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 047737a4..c629180e 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -16,7 +16,7 @@ -module(rabbit_version). --export([recorded/0, matches/2, compare/2, desired/0, desired_for_scope/1, +-export([recorded/0, matches/2, desired/0, desired_for_scope/1, record_desired/0, record_desired_for_scope/1, upgrades_required/1]). @@ -33,7 +33,6 @@ -spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(matches/2 :: ([A], [A]) -> boolean()). --spec(compare/2 :: ([A], [A]) -> ('eq' | 'lt' | 'gt' | 'incomparable')). -spec(desired/0 :: () -> version()). -spec(desired_for_scope/1 :: (scope()) -> scope_version()). -spec(record_desired/0 :: () -> 'ok'). @@ -83,22 +82,6 @@ record_for_scope(Scope, ScopeVersion) -> matches(VerA, VerB) -> lists:usort(VerA) =:= lists:usort(VerB). -compare(VerA, VerB) -> - VerAS = lists:usort(VerA), - VerBS = lists:usort(VerB), - VerASPrefixVerBS = lists:prefix(VerAS, VerBS), - VerBSPrefixVerAS = lists:prefix(VerBS, VerAS), - if - VerAS =:= VerBS -> - eq; - VerASPrefixVerBS -> - lt; - VerBSPrefixVerAS -> - gt; - true -> - incomparable - end. - %% ------------------------------------------------------------------- desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. -- cgit v1.2.1 From 29a80ca7ed7d46ddd39d6e381df7e212c07b45e2 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 6 Dec 2013 12:50:28 +0000 Subject: Remove unnecessary test. --- src/rabbit_mnesia.erl | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 3a8fae7f..f27f77c6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -434,10 +434,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> %% First disc node up maybe_force_load(), ok; - {[AnotherNode | _], _, _} -> + {[_ | _], _, _} -> %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), maybe_force_load(), ok = rabbit_table:wait_for_replicated(), ok = rabbit_table:create_local_copy(NodeType) @@ -639,15 +637,6 @@ schema_ok_or_move() -> ok = create_schema() end. -ensure_version_ok({ok, DiscVersion}) -> - DesiredVersion = rabbit_version:desired(), - case rabbit_version:matches(DesiredVersion, DiscVersion) of - true -> ok; - false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) - end; -ensure_version_ok({error, _}) -> - ok = rabbit_version:record_desired(). - %% We only care about disc nodes since ram nodes are supposed to catch %% up only create_schema() -> -- cgit v1.2.1