diff options
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/rabbit_log.erl | 80 | ||||
-rw-r--r-- | src/rabbit_net.erl | 21 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 12 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 42 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 34 |
6 files changed, 103 insertions, 87 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9301af6b..2fee1114 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -37,6 +37,7 @@ {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, {trace_vhosts, []}, + {log_levels, [{connection, info}]}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 8f58f848..418b53c7 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -23,15 +23,25 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). -define(SERVER, ?MODULE). +-define(LEVELS, [info, warning, error, none]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([level/0]). + +-type(category() :: atom()). +-type(level() :: 'info' | 'warning' | 'error'). + -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). + +-spec(log/3 :: (category(), level(), string()) -> 'ok'). +-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -43,51 +53,48 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -info(Fmt) -> - gen_server:cast(?SERVER, {info, Fmt}). - -info(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {info, Fmt, Args}). - -warning(Fmt) -> - gen_server:cast(?SERVER, {warning, Fmt}). +-record(state, {levels, config}). -warning(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {warning, Fmt, Args}). +%%---------------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [?LEVELS], []). +log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). -error(Fmt) -> - gen_server:cast(?SERVER, {error, Fmt}). +log(Category, Level, Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}). -error(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {error, Fmt, Args}). +info(Fmt) -> log(default, info, Fmt). +info(Fmt, Args) -> log(default, info, Fmt, Args). +warning(Fmt) -> log(default, warning, Fmt). +warning(Fmt, Args) -> log(default, warning, Fmt, Args). +error(Fmt) -> log(default, error, Fmt). +error(Fmt, Args) -> log(default, error, Fmt, Args). %%-------------------------------------------------------------------- -init([]) -> {ok, none}. +init([Levels]) -> + {ok, LevelConfig} = application:get_env(log_levels), + {ok, #state{levels = orddict:from_list( + lists:zip(Levels, lists:seq(1, length(Levels)))), + config = orddict:from_list(LevelConfig)}}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({info, Fmt}, State) -> - error_logger:info_msg(Fmt), - {noreply, State}; -handle_cast({info, Fmt, Args}, State) -> - error_logger:info_msg(Fmt, Args), - {noreply, State}; -handle_cast({warning, Fmt}, State) -> - error_logger:warning_msg(Fmt), - {noreply, State}; -handle_cast({warning, Fmt, Args}, State) -> - error_logger:warning_msg(Fmt, Args), - {noreply, State}; -handle_cast({error, Fmt}, State) -> - error_logger:error_msg(Fmt), - {noreply, State}; -handle_cast({error, Fmt, Args}, State) -> - error_logger:error_msg(Fmt, Args), +handle_cast({log, Category, Level, Fmt, Args}, + State = #state{levels = Levels, config = Config}) -> + CatLevel = case orddict:find(Category, Config) of + {ok, L} -> L; + error -> info + end, + case orddict:fetch(Level, Levels) >= orddict:fetch(CatLevel, Levels) of + false -> ok; + true -> (case Level of + info -> fun error_logger:info_msg/2; + warning -> fun error_logger:warning_msg/2; + error -> fun error_logger:error_msg/2 + end)(Fmt, Args) + end, {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -100,4 +107,3 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index b944ec81..fef8ae88 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -19,7 +19,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - sockname/1, peername/1, peercert/1]). + sockname/1, peername/1, peercert/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -62,6 +62,8 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). +-spec(connection_string/2 :: + (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). -endif. @@ -141,3 +143,20 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. + +connection_string(Sock, Direction) -> + {From, To} = case Direction of + inbound -> {fun peername/1, fun sockname/1}; + outbound -> {fun sockname/1, fun peername/1} + end, + case {From(Sock), To(Sock)} of + {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> + {ok, lists:flatten( + io_lib:format("~s:~p -> ~s:~p", + [rabbit_misc:ntoab(FromAddress), FromPort, + rabbit_misc:ntoab(ToAddress), ToPort]))}; + {{error, _Reason} = Error, _} -> + Error; + {_, {error, _Reason} = Error} -> + Error + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index e81f8134..99bfb765 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) -> fun (Sock) -> case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection ~p to SSL~n", - [self()]), {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, Reason} -> {error, {ssl_upgrade_error, Reason}}; @@ -266,6 +264,16 @@ start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Reader), Reader ! {go, Sock, SockTransform}, + + %% In the event that somebody floods us with connections, the + %% reader processes can spew log events at error_logger faster + %% than it can keep up, causing its mailbox to grow unbounded + %% until we eat all the memory available and crash. So here is a + %% meaningless synchronous call to the underlying gen_event + %% mechanism. When it returns the mailbox is drained, and we + %% return to our caller to accept more connetions. + gen_event:which_handlers(error_logger), + Reader. start_client(Sock) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fce61129..31be244b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -177,25 +177,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Reason]), - rabbit_log:info("closing TCP connection ~p~n", - [self()]), + {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", + [self(), Reason]), exit(normal) end. start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), - PeerAddressS = rabbit_misc:ntoab(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + ConnStr = socket_op(Sock, fun (Sock0) -> + rabbit_net:connection_string( + Sock0, inbound) + end), + log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), @@ -226,15 +227,13 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, State, #v1.stats_timer), handshake, 8)) catch - Ex -> (if Ex == connection_closed_abruptly -> - fun rabbit_log:warning/2; - true -> - fun rabbit_log:error/2 - end)("exception on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> log(case Ex of + connection_closed_abruptly -> warning; + _ -> error + end, "exception on AMQP connection ~p (~s)~n~p~n", + [self(), ConnStr, Ex]) after - rabbit_log:info("closing TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]), %% We don't close the socket explicitly. The reader is the %% controlling process and hence its termination will close %% the socket. Furthermore, gen_tcp:close/1 waits for pending @@ -389,8 +388,8 @@ handle_dependent_exit(ChPid, Reason, State) -> {_Channel, controlled} -> maybe_close(State); {Channel, uncontrolled} -> - rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), + log(error, "connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), maybe_close(handle_exception(State, Channel, Reason)) end. @@ -432,9 +431,10 @@ wait_for_channel_termination(N, TimerRef) -> {_Channel, controlled} -> wait_for_channel_termination(N-1, TimerRef); {Channel, uncontrolled} -> - rabbit_log:error("connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), + log(error, + "connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 8678c2c9..88da74c5 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - try - %% report - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), - {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [rabbit_misc:ntoab(Address), Port, - rabbit_misc:ntoab(PeerAddress), PeerPort]), - %% In the event that somebody floods us with connections we can spew - %% the above message at error_logger faster than it can keep up. - %% So error_logger's mailbox grows unbounded until we eat all the - %% memory available and crash. So here's a meaningless synchronous call - %% to the underlying gen_event mechanism - when it returns the mailbox - %% is drained. - gen_event:which_handlers(error_logger), - %% handle - file_handle_cache:transfer(apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain() - catch {inet_error, Reason} -> - gen_tcp:close(Sock), - error_logger:error_msg("unable to accept TCP connection: ~p~n", - [Reason]) - end, + %% handle + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain(), %% accept more accept(State); @@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {AddressS, Port} = case inet:sockname(LSock) of + {ok, {A, P}} -> {rabbit_misc:ntoab(A), P}; + {error, _} -> {"unknown", unknown} + end, error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", - [rabbit_misc:ntoab(Address), Port, Reason]), + [AddressS, Port, Reason]), accept(State); handle_info(_Info, State) -> @@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). - accept(State = #state{sock=LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; |