%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License %% at http://www.mozilla.org/MPL/ %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and %% limitations under the License. %% %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% -module(rabbit_reader). -include("rabbit_framing.hrl"). -include("rabbit.hrl"). -export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). -export([init/2, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). -define(CHANNEL_MIN, 1). %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, channel_count, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, auth_mechanism, auth_state, connected_at}). -record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, timeout, frame_max, channel_max, client_properties, connected_at]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). -define(AUTH_NOTIFICATION_INFO_KEYS, [host, vhost, name, peer_host, peer_port, protocol, auth_mechanism, ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject, peer_cert_validity]). -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse State#v1.connection_state =:= blocked)). -define(IS_STOPPING(State), (State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed)). %%-------------------------------------------------------------------------- -ifdef(use_specs). -spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy -spec(init/2 :: (pid(), pid()) -> no_return()). -spec(start_connection/5 :: (pid(), pid(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). -spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). -spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) -> any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). -endif. %%-------------------------------------------------------------------------- start_link(HelperSup) -> {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). init(Parent, HelperSup) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, {Buf, BufLen, State}) -> mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info_keys() -> ?INFO_KEYS. info(Pid) -> gen_server:call(Pid, info, infinity). info(Pid, Items) -> case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. force_event_refresh(Pid, Ref) -> gen_server:cast(Pid, {force_event_refresh, Ref}). conserve_resources(Pid, Source, Conserve) -> Pid ! {conserve_resources, Source, Conserve}, ok. server_properties(Protocol) -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), %% Get any configuration-specified server properties {ok, RawConfigServerProps} = application:get_env(rabbit, server_properties), %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms %% from the config and merge them with the generated built-in properties NormalizedConfigServerProps = [{<<"capabilities">>, table, server_capabilities(Protocol)} | [case X of {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), longstr, maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} end || X <- RawConfigServerProps ++ [{product, Product}, {version, Version}, {cluster_name, rabbit_nodes:cluster_name()}, {platform, "Erlang/OTP"}, {copyright, ?COPYRIGHT_MESSAGE}, {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). maybe_list_to_binary(V) when is_binary(V) -> V; maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, {<<"basic.nack">>, bool, true}, {<<"consumer_cancel_notify">>, bool, true}, {<<"connection.blocked">>, bool, true}, {<<"consumer_priorities">>, bool, true}, {<<"authentication_failure_close">>, bool, true}, {<<"per_consumer_qos">>, bool, true}]; server_capabilities(_) -> []. %%-------------------------------------------------------------------------- log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). socket_error(Reason) when is_atom(Reason) -> log(error, "Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; {error, Reason} -> socket_error(Reason), %% NB: this is tcp socket, even in case of ssl rabbit_net:fast_close(Sock), exit(normal) end. start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of {ok, Str} -> Str; {error, enotconn} -> rabbit_net:fast_close(Sock), exit(normal); {error, Reason} -> socket_error(Reason), rabbit_net:fast_close(Sock), exit(normal) end, {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(list_to_binary(Name)), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ name = list_to_binary(Name), host = Host, peer_host = PeerHost, port = Port, peer_port = PeerPort, protocol = none, user = none, timeout_sec = (HandshakeTimeout / 1000), frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, capabilities = [], auth_mechanism = none, auth_state = none, connected_at = rabbit_misc:now_to_ms(os:timestamp())}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, connection_state = pre_init, queue_collector = undefined, %% started on tune-ok helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, channel_count = 0, throttle = #throttle{ alarmed_by = [], last_blocked_by = none, last_blocked_at = never}}, try run({?MODULE, recvloop, [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of connection_closed_with_no_data_received -> debug; connection_closed_abruptly -> warning; _ -> error end, "closing AMQP connection ~p (~s):~n~p~n", [self(), Name, Ex]) after %% We don't call gen_tcp:close/1 here since it waits for %% pending output to be sent, which results in unnecessary %% delays. We could just terminate - the reader is the %% controlling process and hence its termination will close %% the socket. However, to keep the file_handle_cache %% accounting as accurate as possible we ought to close the %% socket w/o delay before termination. rabbit_net:fast_close(ClientSock), rabbit_networking:unregister_connection(self()), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. run({M, F, A}) -> try apply(M, F, A) catch {become, MFA} -> run(MFA) end. recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> mainloop(Deb, Buf, BufLen, State); recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> mainloop(Deb, Buf, BufLen, State); recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> throw({become, F(Deb, Buf, BufLen, State)}); recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> case rabbit_net:setopts(Sock, [{active, once}]) of ok -> mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); {error, Reason} -> stop(Reason, State) end; recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), recvloop(Deb, [Rest], size(Rest), State1); recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []), Data = list_to_binary(lists:reverse(DataLRev)), {<<>>, State1} = handle_input(State#v1.callback, Data, State), recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). binlist_split(0, L, Acc) -> {L, Acc}; binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> {H, T} = split_binary(Acc0, -Len), {[H|L], [T|Acc]}; binlist_split(Len, [H|T], Acc) -> binlist_split(Len - size(H), T, [H|Acc]). mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, connection_state = CS, connection = #connection{ name = ConnName}}) -> Recv = rabbit_net:recv(Sock), case CS of pre_init when Buf =:= [] -> %% We only new incoming connections only when either the %% first byte was received or there was an error (eg. a %% timeout). %% %% The goal is to not log TCP healthchecks (a connection %% with no data received) unless specified otherwise. log(case Recv of closed -> debug; _ -> info end, "accepting AMQP connection ~p (~s)~n", [self(), ConnName]); _ -> ok end, case Recv of {data, Data} -> recvloop(Deb, [Data | Buf], BufLen + size(Data), State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> stop(closed, State); {error, Reason} -> stop(Reason, State); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, ?MODULE, Deb, {Buf, BufLen, State}); {other, Other} -> case handle_other(Other, State) of stop -> ok; NewState -> recvloop(Deb, Buf, BufLen, NewState) end end. stop(closed, #v1{connection_state = pre_init} = State) -> %% The connection was closed before any packet was received. It's %% probably a load-balancer healthcheck: don't consider this a %% failure. maybe_emit_stats(State), throw(connection_closed_with_no_data_received); stop(closed, State) -> maybe_emit_stats(State), throw(connection_closed_abruptly); stop(Reason, State) -> maybe_emit_stats(State), throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> CR1 = case Conserve of true -> lists:usort([Source | CR]); false -> CR -- [Source] end, State1 = control_throttle( State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}), case {blocked_by_alarm(State), blocked_by_alarm(State1)} of {false, true} -> ok = send_blocked(State1); {true, false} -> ok = send_unblocked(State1); {_, _} -> ok end, State1; handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), {_, State1} = channel_cleanup(ChPid, State), maybe_close(control_throttle(State1)); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% %% If we wanted to be *really* nice we should wait for a while for %% clients to close the socket at their end, just as we do in the %% ordinary error case. However, since this termination is %% initiated by our parent it is probably more important to exit %% quickly. maybe_emit_stats(State), exit(Reason); handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) -> maybe_emit_stats(State), throw(E); handle_other({channel_exit, Channel, Reason}, State) -> handle_exception(State, Channel, Reason); handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> handle_dependent_exit(ChPid, Reason, State); handle_other(terminate_connection, State) -> maybe_emit_stats(State), stop; handle_other(handshake_timeout, State) when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> State; handle_other(handshake_timeout, State) -> maybe_emit_stats(State), throw({handshake_timeout, State#v1.callback}); handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> State; handle_other(heartbeat_timeout, State = #v1{connection_state = S}) -> maybe_emit_stats(State), throw({heartbeat_timeout, S}); handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of force -> stop; normal -> NewState end; handle_other({'$gen_call', From, info}, State) -> gen_server:reply(From, infos(?INFO_KEYS, State)), State; handle_other({'$gen_call', From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), State; handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) when ?IS_RUNNING(State) -> rabbit_event:notify( connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), rabbit_event:init_stats_timer(State, #v1.stats_timer); handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; handle_other(ensure_stats, State) -> ensure_stats_timer(State); handle_other(emit_stats, State) -> emit_stats(State); handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), control_throttle(State); handle_other(Other, State) -> %% internal error -> something worth dying for maybe_emit_stats(State), exit({unexpected_message, Other}). switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, handle_exception(State, 0, rabbit_misc:amqp_error( connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse credit_flow:blocked()), case {CS, IsThrottled} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), State#v1{connection_state = running}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State end. maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), State1 = State#v1{connection_state = blocked, throttle = update_last_blocked_by( Throttle#throttle{ last_blocked_at = erlang:now()})}, case {blocked_by_alarm(State), blocked_by_alarm(State1)} of {false, true} -> ok = send_blocked(State1); {_, _} -> ok end, State1; maybe_block(State) -> State. blocked_by_alarm(#v1{connection_state = blocked, throttle = #throttle{alarmed_by = CR}}) when CR =/= [] -> true; blocked_by_alarm(#v1{}) -> false. send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, connection = #connection{protocol = Protocol, capabilities = Capabilities}, sock = Sock}) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> RStr = string:join([atom_to_list(A) || A <- CR], " & "), Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, Protocol); _ -> ok end. send_unblocked(#v1{connection = #connection{protocol = Protocol, capabilities = Capabilities}, sock = Sock}) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol); _ -> ok end. update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> Throttle#throttle{last_blocked_by = flow}; update_last_blocked_by(Throttle) -> Throttle#throttle{last_blocked_by = resource}. %%-------------------------------------------------------------------------- %% error handling / termination close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> %% The spec says "Exclusive queues may only be accessed by the %% current connection, and are deleted when that connection %% closes." This does not strictly imply synchrony, but in %% practice it seems to be what people assume. rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. erlang:send_after((if TimeoutSec > 0 andalso TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; true -> ?CLOSING_TIMEOUT end) * 1000, self(), terminate_connection), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> {Channel, State1} = channel_cleanup(ChPid, State), case {Channel, termination_kind(Reason)} of {undefined, controlled} -> State1; {undefined, uncontrolled} -> exit({abnormal_dependent_exit, ChPid, Reason}); {_, controlled} -> maybe_close(control_throttle(State1)); {_, uncontrolled} -> State2 = handle_exception( State1, Channel, Reason), maybe_close(control_throttle(State2)) end. terminate_channels(#v1{channel_count = 0} = State) -> State; terminate_channels(#v1{channel_count = ChannelCount} = State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), wait_for_channel_termination(ChannelCount, TimerRef, State). wait_for_channel_termination(0, TimerRef, State) -> case erlang:cancel_timer(TimerRef) of false -> receive cancel_wait -> State end; _ -> State end; wait_for_channel_termination(N, TimerRef, State = #v1{connection_state = CS, connection = #connection{ name = ConnName, user = User, vhost = VHost}}) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> {Channel, State1} = channel_cleanup(ChPid, State), case {Channel, termination_kind(Reason)} of {undefined, _} -> exit({abnormal_dependent_exit, ChPid, Reason}); {_, controlled} -> wait_for_channel_termination(N-1, TimerRef, State1); {_, uncontrolled} -> log(error, "Error on AMQP connection ~p (~s, vhost: '~s'," " user: '~s', state: ~p), channel ~p:" "error while terminating:~n~p~n", [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]), wait_for_channel_termination(N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) end. maybe_close(State = #v1{connection_state = closing, channel_count = 0, connection = #connection{protocol = Protocol}, sock = Sock}) -> NewState = close_connection(State), ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), NewState; maybe_close(State) -> State. termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. log_hard_error(#v1{connection_state = CS, connection = #connection{ name = ConnName, user = User, vhost = VHost}}, Channel, Reason) -> log(error, "Error on AMQP connection ~p (~s, vhost: '~s'," " user: '~s', state: ~p), channel ~p:~n~p~n", [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]). handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> log_hard_error(State, Channel, Reason), State; handle_exception(State = #v1{connection = #connection{protocol = Protocol}, connection_state = CS}, Channel, Reason) when ?IS_RUNNING(State) orelse CS =:= closing -> log_hard_error(State, Channel, Reason), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), State1 = close_connection(terminate_channels(State)), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), State1; handle_exception(State, Channel, Reason) -> %% We don't trust the client at this point - force them to wait %% for a bit so they can't DOS us with repeated failed logins etc. timer:sleep(?SILENT_CLOSE_DELAY * 1000), throw({handshake_error, State#v1.connection_state, Channel, Reason}). %% we've "lost sync" with the client and hence must not accept any %% more input fatal_frame_error(Error, Type, Channel, Payload, State) -> frame_error(Error, Type, Channel, Payload, State), %% grace period to allow transmission of error timer:sleep(?SILENT_CLOSE_DELAY * 1000), throw(fatal_frame_error). frame_error(Error, Type, Channel, Payload, State) -> {Str, Bin} = payload_snippet(Payload), handle_exception(State, Channel, rabbit_misc:amqp_error(frame_error, "type ~p, ~s octets = ~p: ~p", [Type, Str, Bin, Error], none)). unexpected_frame(Type, Channel, Payload, State) -> {Str, Bin} = payload_snippet(Payload), handle_exception(State, Channel, rabbit_misc:amqp_error(unexpected_frame, "type ~p, ~s octets = ~p", [Type, Str, Bin], none)). payload_snippet(Payload) when size(Payload) =< 16 -> {"all", Payload}; payload_snippet(<>) -> {"first 16", Snippet}. %%-------------------------------------------------------------------------- create_channel(_Channel, #v1{channel_count = ChannelCount, connection = #connection{channel_max = ChannelMax}}) when ChannelMax /= 0 andalso ChannelCount >= ChannelMax -> {error, rabbit_misc:amqp_error( not_allowed, "number of channels opened (~w) has reached the " "negotiated channel_max (~w)", [ChannelCount, ChannelMax], 'none')}; create_channel(Channel, #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, channel_count = ChannelCount, connection = #connection{name = Name, protocol = Protocol, frame_max = FrameMax, user = User, vhost = VHost, capabilities = Capabilities}} = State) -> {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}. channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> case get({ch_pid, ChPid}) of undefined -> {undefined, State}; {Channel, MRef} -> credit_flow:peer_down(ChPid), erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), {Channel, State#v1{channel_count = ChannelCount - 1}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. %%-------------------------------------------------------------------------- handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) when ?IS_STOPPING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State end; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> frame_error(unknown_frame, Type, 0, Payload, State); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> unexpected_frame(Type, 0, Payload, State) end; handle_frame(Type, Channel, Payload, State = #v1{connection = #connection{protocol = Protocol}}) when ?IS_RUNNING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> frame_error(unknown_frame, Type, Channel, Payload, State); heartbeat -> unexpected_frame(Type, Channel, Payload, State); Frame -> process_frame(Frame, Channel, State) end; handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) -> State; handle_frame(Type, Channel, Payload, State) -> unexpected_frame(Type, Channel, Payload, State). process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, case (case get(ChKey) of undefined -> create_channel(Channel, State); Other -> {ok, Other, State} end) of {error, Error} -> handle_exception(State, Channel, Error); {ok, {ChPid, AState}, State1} -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, State1); {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, State1); {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(ChPid, Method, Content), put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, control_throttle(State1)); {error, Reason} -> handle_exception(State1, Channel, Reason) end end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> {_, State1} = channel_cleanup(ChPid, State), %% This is not strictly necessary, but more obviously %% correct. Also note that we do not need to call maybe_close/1 %% since we cannot possibly be in the 'closing' state. control_throttle(State1); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> maybe_block(State); post_process_frame(_Frame, _ChPid, State) -> State. %%-------------------------------------------------------------------------- %% We allow clients to exceed the frame size a little bit since quite %% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. -define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). handle_input(frame_header, <>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> fatal_frame_error( {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, Type, Channel, <<>>, State); handle_input(frame_header, <>, State) -> {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))}; handle_input(frame_header, <>, State) -> {Rest, ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1))}; handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> <> = Data, case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), {Rest, switch_callback(State1, frame_header, 7)}; _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, Type, Channel, Payload, State) end; handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> {Rest, handshake({A, B, C, D}, State)}; handle_input(handshake, <>, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). %% The two rules pertaining to version negotiation: %% %% * If the server cannot support the protocol specified in the %% protocol header, it MUST respond with a valid protocol header and %% then close the socket connection. %% %% * The server MUST provide a protocol version that is lower than or %% equal to that requested by the client in the protocol header. handshake({0, 0, 9, 1}, State) -> start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); %% This is the protocol header for 0-9, which we can safely treat as %% though it were 0-9-1. handshake({1, 1, 0, 9}, State) -> start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); %% This is what most clients send for 0-8. The 0-8 spec, confusingly, %% defines the version as 8-0. handshake({1, 1, 8, 0}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); %% The 0-8 spec as on the AMQP web site actually has this as the %% protocol header; some libraries e.g., py-amqplib, send it when they %% want 0-8. handshake({1, 1, 9, 1}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); %% ... and finally, the 1.0 spec is crystal clear! handshake({Id, 1, 0, 0}, State) -> become_1_0(Id, State); handshake(Vsn, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_version, Vsn}). %% Offer a protocol version to the client. Connection.start only %% includes a major and minor version number, Luckily 0-9 and 0-9-1 %% are similar enough that clients will be happy with either. start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> rabbit_networking:register_connection(self()), Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(Protocol), mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ timeout_sec = ?NORMAL_TIMEOUT, protocol = Protocol}, connection_state = starting}, frame_header, 7). refuse_connection(Sock, Exception, {A, B, C, D}) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end), throw(Exception). -ifdef(use_specs). -spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()). -endif. refuse_connection(Sock, Exception) -> refuse_connection(Sock, Exception, {0, 0, 9, 1}). ensure_stats_timer(State = #v1{connection_state = running}) -> rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); ensure_stats_timer(State) -> State. %%-------------------------------------------------------------------------- handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch throw:{inet_error, E} when E =:= closed; E =:= enotconn -> maybe_emit_stats(State), throw(connection_closed_abruptly); exit:#amqp_error{method = none} = Reason -> handle_exception(State, 0, Reason#amqp_error{method = MethodName}); Type:Reason -> Stack = erlang:get_stacktrace(), handle_exception(State, 0, {Type, Reason, MethodName, Stack}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State0 = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> AuthMechanism = auth_mechanism_to_module(Mechanism, Sock), Capabilities = case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of {table, Capabilities1} -> Capabilities1; _ -> [] end, State = State0#v1{connection_state = securing, connection = Connection#connection{ client_properties = ClientProperties, capabilities = Capabilities, auth_mechanism = {Mechanism, AuthMechanism}, auth_state = AuthMechanism:init(Sock)}}, auth_phase(Response, State); handle_method0(#'connection.secure_ok'{response = Response}, State = #v1{connection_state = securing}) -> auth_phase(Response, State); handle_method0(#'connection.tune_ok'{frame_max = FrameMax, channel_max = ChannelMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, sock = Sock}) -> ok = validate_negotiated_integer_value( frame_max, ?FRAME_MIN_SIZE, FrameMax), ok = validate_negotiated_integer_value( channel_max, ?CHANNEL_MIN, ChannelMax), {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector( SupPid, Connection#connection.name), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = rabbit_heartbeat:start( SupPid, Sock, Connection#connection.name, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ frame_max = FrameMax, channel_max = ChannelMax, timeout_sec = ClientHeartbeat}, queue_collector = Collector, heartbeater = Heartbeater}; handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, connection = Connection = #connection{ user = User, protocol = Protocol}, helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = rabbit_connection_helper_sup:start_channel_sup_sup(SupPid), State1 = control_throttle( State#v1{connection_state = running, connection = NewConnection, channel_sup_sup_pid = ChannelSupSupPid, throttle = Throttle1}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), maybe_emit_stats(State1), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection = #connection{protocol = Protocol}, sock = Sock}) when ?IS_STOPPING(State) -> %% We're already closed or closing, so we don't need to cleanup %% anything. ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, State; handle_method0(_Method, State) when ?IS_STOPPING(State) -> State; handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). validate_negotiated_integer_value(Field, Min, ClientValue) -> ServerValue = get_env(Field), if ClientValue /= 0 andalso ClientValue < Min -> fail_negotiation(Field, min, ServerValue, ClientValue); ServerValue /= 0 andalso (ClientValue =:= 0 orelse ClientValue > ServerValue) -> fail_negotiation(Field, max, ServerValue, ClientValue); true -> ok end. %% keep dialyzer happy -spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) -> no_return(). fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> {S1, S2} = case MinOrMax of min -> {lower, minimum}; max -> {higher, maximum} end, rabbit_misc:protocol_error( not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). get_env(Key) -> {ok, Value} = application:get_env(rabbit, Key), Value. send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown authentication mechanism '~s'", [TypeBin]); T -> case {lists:member(T, auth_mechanisms(Sock)), rabbit_registry:lookup_module(auth_mechanism, T)} of {true, {ok, Module}} -> Module; _ -> rabbit_misc:protocol_error( command_invalid, "invalid authentication mechanism '~s'", [T]) end end. auth_mechanisms(Sock) -> {ok, Configured} = application:get_env(auth_mechanisms), [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), Module:should_offer(Sock), lists:member(Name, Configured)]. auth_mechanisms_binary(Sock) -> list_to_binary( string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, State = #v1{connection = Connection = #connection{protocol = Protocol, auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Username, Msg, Args} -> auth_fail(Username, Msg, Args, Name, State); {protocol_error, Msg, Args} -> notify_auth_result(none, user_authentication_failure, [{error, rabbit_misc:format(Msg, Args)}], State), rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> Secure = #'connection.secure'{challenge = Challenge}, ok = send_on_channel0(Sock, Secure, Protocol), State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User = #user{username = Username}} -> case rabbit_access_control:check_user_loopback(Username, Sock) of ok -> notify_auth_result(Username, user_authentication_success, [], State); not_allowed -> auth_fail(Username, "user '~s' can only connect via " "localhost", [Username], Name, State) end, Tune = #'connection.tune'{frame_max = get_env(frame_max), channel_max = get_env(channel_max), heartbeat = get_env(heartbeat)}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User, auth_state = none}} end. -ifdef(use_specs). -spec(auth_fail/5 :: (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) -> no_return()). -endif. auth_fail(Username, Msg, Args, AuthName, State = #v1{connection = #connection{protocol = Protocol, capabilities = Capabilities}}) -> notify_auth_result(Username, user_authentication_failure, [{error, rabbit_misc:format(Msg, Args)}], State), AmqpError = rabbit_misc:amqp_error( access_refused, "~s login refused: ~s", [AuthName, io_lib:format(Msg, Args)], none), case rabbit_misc:table_lookup(Capabilities, <<"authentication_failure_close">>) of {bool, true} -> SafeMsg = io_lib:format( "Login was refused using authentication " "mechanism ~s. For details see the broker " "logfile.", [AuthName]), AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg}, {0, CloseMethod} = rabbit_binary_generator:map_exception( 0, AmqpError1, Protocol), ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); _ -> ok end, rabbit_misc:protocol_error(AmqpError). notify_auth_result(Username, AuthResult, ExtraProps, State) -> EventProps = [{connection_type, network}, {name, case Username of none -> ''; _ -> Username end}] ++ [case Item of name -> {connection_name, i(name, State)}; _ -> {Item, i(Item, State)} end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++ ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(SockStat, S) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end, fun ([{_, I}]) -> I end, S); i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S); i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S); i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S); i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount; i(state, #v1{connection_state = ConnectionState, throttle = #throttle{alarmed_by = Alarms, last_blocked_by = WasBlockedBy, last_blocked_at = T}}) -> case Alarms =:= [] andalso %% not throttled by resource alarms (credit_flow:blocked() %% throttled by flow now orelse %% throttled by flow recently (WasBlockedBy =:= flow andalso T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000)) of true -> flow; false -> ConnectionState end; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; ic(host, #connection{host = Host}) -> Host; ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost; ic(port, #connection{port = Port}) -> Port; ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort; ic(protocol, #connection{protocol = none}) -> none; ic(protocol, #connection{protocol = P}) -> P:version(); ic(user, #connection{user = none}) -> ''; ic(user, #connection{user = U}) -> U#user.username; ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; ic(connected_at, #connection{connected_at = T}) -> T; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> case Get(Sock) of {ok, T} -> Select(T); {error, _} -> '' end. ssl_info(F, #v1{sock = Sock}) -> %% The first ok form is R14 %% The second is R13 - the extra term is exportability (by inspection, %% the docs are wrong) case rabbit_net:ssl_info(Sock) of nossl -> ''; {error, _} -> ''; {ok, {P, {K, C, H}}} -> F({P, {K, C, H}}); {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. cert_info(F, #v1{sock = Sock}) -> case rabbit_net:peercert(Sock) of nossl -> ''; {error, no_peercert} -> ''; {ok, Cert} -> list_to_binary(F(Cert)) end. maybe_emit_stats(State) -> rabbit_event:if_enabled(State, #v1.stats_timer, fun() -> emit_stats(State) end). emit_stats(State) -> Infos = infos(?STATISTICS_KEYS, State), rabbit_event:notify(connection_stats, Infos), State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer), %% If we emit an event which looks like we are in flow control, it's not a %% good idea for it to be our last even if we go idle. Keep emitting %% events, either we stay busy or we drop out of flow control. case proplists:get_value(state, Infos) of flow -> ensure_stats_timer(State1); _ -> State1 end. %% 1.0 stub -ifdef(use_specs). -spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()). -endif. become_1_0(Id, State = #v1{sock = Sock}) -> case code:is_loaded(rabbit_amqp1_0_reader) of false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled); _ -> Mode = case Id of 0 -> amqp; 3 -> sasl; _ -> refuse_connection( Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, F = fun (_Deb, Buf, BufLen, S) -> {rabbit_amqp1_0_reader, init, [Mode, pack_for_1_0(Buf, BufLen, S)]} end, State#v1{connection_state = {become, F}} end. pack_for_1_0(Buf, BufLen, #v1{parent = Parent, sock = Sock, recv_len = RecvLen, pending_recv = PendingRecv, helper_sup = SupPid}) -> {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.