summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-11 12:15:58 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-11 12:15:58 +0100
commit64298e9d2b3879be54b4297376c2a29d9d65175b (patch)
treebe3fb40a0543294de72f07ca7601f90aadd6268e /src/rabbit_reader.erl
parentfabe3df8c2eba2b852e34ca0b4a163550e7d6c9b (diff)
parenta1884877808f6c34c349e132b31db9e45de347d0 (diff)
downloadrabbitmq-server-64298e9d2b3879be54b4297376c2a29d9d65175b.tar.gz
stable to default
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl64
1 files changed, 40 insertions, 24 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 68cef56a..2cdd54a7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,6 @@
-export([conserve_resources/3, server_properties/1]).
--define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -43,7 +42,7 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, connected_at}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
@@ -55,7 +54,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, channel_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties, connected_at]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -189,10 +188,10 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
socket_error(Reason) when is_atom(Reason) ->
- log(error, "error on AMQP connection ~p: ~s~n",
+ log(error, "Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
@@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
peer_port = PeerPort,
protocol = none,
user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
+ timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
capabilities = [],
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
rabbit_event:notify(
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
- State;
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
@@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) ->
end;
_ -> State
end;
-wait_for_channel_termination(N, TimerRef, State) ->
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
{Channel, State1} = channel_cleanup(ChPid, State),
case {Channel, termination_kind(Reason)} of
- {undefined, _} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_, controlled} -> wait_for_channel_termination(
- N-1, TimerRef, State1);
- {_, uncontrolled} -> log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(
- N-1, TimerRef, State1)
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ wait_for_channel_termination(N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
@@ -581,16 +588,24 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n~p~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]).
+
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), closed, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
State;
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
connection_state = CS},
Channel, Reason)
when ?IS_RUNNING(State) orelse CS =:= closing ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), CS, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
State1 = close_connection(terminate_channels(State)),
@@ -1130,6 +1145,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->