summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-26 20:17:04 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-26 20:17:04 +0000
commitcd58656595005d7fd7266c72ab30763f0b676328 (patch)
treec0ce4c7ccc520c46de893041bd1352cec0404309
parent4b80eef39e8101d1bf2ad4e485ce21007ceed652 (diff)
parent3dd75c74e124cf0fd29f7f6cc6da016520a69e54 (diff)
downloadrabbitmq-server-cd58656595005d7fd7266c72ab30763f0b676328.tar.gz
merge default into bug22470
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit_log.erl80
-rw-r--r--src/rabbit_net.erl21
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_reader.erl42
-rw-r--r--src/tcp_acceptor.erl34
6 files changed, 103 insertions, 87 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9301af6b..2fee1114 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -37,6 +37,7 @@
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
+ {log_levels, [{connection, info}]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8f58f848..418b53c7 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -23,15 +23,25 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([info/1, info/2, warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
+-define(LEVELS, [info, warning, error, none]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([level/0]).
+
+-type(category() :: atom()).
+-type(level() :: 'info' | 'warning' | 'error').
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
+-spec(log/3 :: (category(), level(), string()) -> 'ok').
+-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -43,51 +53,48 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-info(Fmt) ->
- gen_server:cast(?SERVER, {info, Fmt}).
-
-info(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {info, Fmt, Args}).
-
-warning(Fmt) ->
- gen_server:cast(?SERVER, {warning, Fmt}).
+-record(state, {levels, config}).
-warning(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {warning, Fmt, Args}).
+%%----------------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [?LEVELS], []).
+log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
-error(Fmt) ->
- gen_server:cast(?SERVER, {error, Fmt}).
+log(Category, Level, Fmt, Args) when is_list(Args) ->
+ gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
-error(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {error, Fmt, Args}).
+info(Fmt) -> log(default, info, Fmt).
+info(Fmt, Args) -> log(default, info, Fmt, Args).
+warning(Fmt) -> log(default, warning, Fmt).
+warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+error(Fmt) -> log(default, error, Fmt).
+error(Fmt, Args) -> log(default, error, Fmt, Args).
%%--------------------------------------------------------------------
-init([]) -> {ok, none}.
+init([Levels]) ->
+ {ok, LevelConfig} = application:get_env(log_levels),
+ {ok, #state{levels = orddict:from_list(
+ lists:zip(Levels, lists:seq(1, length(Levels)))),
+ config = orddict:from_list(LevelConfig)}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({info, Fmt}, State) ->
- error_logger:info_msg(Fmt),
- {noreply, State};
-handle_cast({info, Fmt, Args}, State) ->
- error_logger:info_msg(Fmt, Args),
- {noreply, State};
-handle_cast({warning, Fmt}, State) ->
- error_logger:warning_msg(Fmt),
- {noreply, State};
-handle_cast({warning, Fmt, Args}, State) ->
- error_logger:warning_msg(Fmt, Args),
- {noreply, State};
-handle_cast({error, Fmt}, State) ->
- error_logger:error_msg(Fmt),
- {noreply, State};
-handle_cast({error, Fmt, Args}, State) ->
- error_logger:error_msg(Fmt, Args),
+handle_cast({log, Category, Level, Fmt, Args},
+ State = #state{levels = Levels, config = Config}) ->
+ CatLevel = case orddict:find(Category, Config) of
+ {ok, L} -> L;
+ error -> info
+ end,
+ case orddict:fetch(Level, Levels) >= orddict:fetch(CatLevel, Levels) of
+ false -> ok;
+ true -> (case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end)(Fmt, Args)
+ end,
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -100,4 +107,3 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b944ec81..fef8ae88 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1]).
+ sockname/1, peername/1, peercert/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -62,6 +62,8 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(connection_string/2 ::
+ (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-endif.
@@ -141,3 +143,20 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+
+connection_string(Sock, Direction) ->
+ {From, To} = case Direction of
+ inbound -> {fun peername/1, fun sockname/1};
+ outbound -> {fun sockname/1, fun peername/1}
+ end,
+ case {From(Sock), To(Sock)} of
+ {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+ {ok, lists:flatten(
+ io_lib:format("~s:~p -> ~s:~p",
+ [rabbit_misc:ntoab(FromAddress), FromPort,
+ rabbit_misc:ntoab(ToAddress), ToPort]))};
+ {{error, _Reason} = Error, _} ->
+ Error;
+ {_, {error, _Reason} = Error} ->
+ Error
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index e81f8134..99bfb765 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) ->
fun (Sock) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection ~p to SSL~n",
- [self()]),
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, Reason} ->
{error, {ssl_upgrade_error, Reason}};
@@ -266,6 +264,16 @@ start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
+
+ %% In the event that somebody floods us with connections, the
+ %% reader processes can spew log events at error_logger faster
+ %% than it can keep up, causing its mailbox to grow unbounded
+ %% until we eat all the memory available and crash. So here is a
+ %% meaningless synchronous call to the underlying gen_event
+ %% mechanism. When it returns the mailbox is drained, and we
+ %% return to our caller to accept more connetions.
+ gen_event:which_handlers(error_logger),
+
Reader.
start_client(Sock) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fce61129..31be244b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -177,25 +177,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Reason]),
- rabbit_log:info("closing TCP connection ~p~n",
- [self()]),
+ {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
+ [self(), Reason]),
exit(normal)
end.
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = rabbit_misc:ntoab(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ ConnStr = socket_op(Sock, fun (Sock0) ->
+ rabbit_net:connection_string(
+ Sock0, inbound)
+ end),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
@@ -226,15 +227,13 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
State, #v1.stats_timer),
handshake, 8))
catch
- Ex -> (if Ex == connection_closed_abruptly ->
- fun rabbit_log:warning/2;
- true ->
- fun rabbit_log:error/2
- end)("exception on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> log(case Ex of
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end, "exception on AMQP connection ~p (~s)~n~p~n",
+ [self(), ConnStr, Ex])
after
- rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]),
%% We don't close the socket explicitly. The reader is the
%% controlling process and hence its termination will close
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
@@ -389,8 +388,8 @@ handle_dependent_exit(ChPid, Reason, State) ->
{_Channel, controlled} ->
maybe_close(State);
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
+ log(error, "connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
maybe_close(handle_exception(State, Channel, Reason))
end.
@@ -432,9 +431,10 @@ wait_for_channel_termination(N, TimerRef) ->
{_Channel, controlled} ->
wait_for_channel_termination(N-1, TimerRef);
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
+ log(error,
+ "connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 8678c2c9..88da74c5 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- try
- %% report
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
- {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [rabbit_misc:ntoab(Address), Port,
- rabbit_misc:ntoab(PeerAddress), PeerPort]),
- %% In the event that somebody floods us with connections we can spew
- %% the above message at error_logger faster than it can keep up.
- %% So error_logger's mailbox grows unbounded until we eat all the
- %% memory available and crash. So here's a meaningless synchronous call
- %% to the underlying gen_event mechanism - when it returns the mailbox
- %% is drained.
- gen_event:which_handlers(error_logger),
- %% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain()
- catch {inet_error, Reason} ->
- gen_tcp:close(Sock),
- error_logger:error_msg("unable to accept TCP connection: ~p~n",
- [Reason])
- end,
+ %% handle
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain(),
%% accept more
accept(State);
@@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {AddressS, Port} = case inet:sockname(LSock) of
+ {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
+ {error, _} -> {"unknown", unknown}
+ end,
error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
- [rabbit_misc:ntoab(Address), Port, Reason]),
+ [AddressS, Port, Reason]),
accept(State);
handle_info(_Info, State) ->
@@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};