%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the %% License for the specific language governing rights and limitations %% under the License. %% %% The Original Code is RabbitMQ. %% %% The Initial Developers of the Original Code are LShift Ltd, %% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. %% %% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, %% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% %% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are %% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright %% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% %% Contributor(s): ______________________________________. %% -module(rabbit_channel). -include("rabbit_framing.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([flow_timeout/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow}). -record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, connection, number, user, vhost, transactional, consumer_count, messages_unacknowledged, acks_uncommitted, prefetch_count]). %%---------------------------------------------------------------------------- -ifdef(use_specs). -type(ref() :: any()). -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). -spec(info_all/0 :: () -> [[info()]]). -spec(info_all/1 :: ([info_key()]) -> [[info()]]). -endif. %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> {ok, Pid} = gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid], []), Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). flow_timeout(Pid, Ref) -> gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). list() -> pg_local:get_members(rabbit_channels). info_keys() -> ?INFO_KEYS. info(Pid) -> gen_server2:pcall(Pid, 9, info, infinity). info(Pid, Items) -> case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. info_all() -> rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, transaction_id = none, tx_participants = sets:new(), next_tag = 1, uncommitted_ack_q = queue:new(), unacked_message_q = queue:new(), username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, pending = none}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); handle_call({info, Items}, _From, State) -> try reply({ok, infos(Items, State)}, State) catch Error -> reply({error, Error}, State) end; handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), noreply(NewState); {noreply, NewState} -> noreply(NewState); stop -> {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), {stop, normal, terminating(Reason#amqp_error{method = MethodName}, State)}; exit:normal -> {stop, normal, State}; _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> {stop, normal, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> noreply(State); handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), noreply(State#ch{state = running}); handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> flow_control(not Conserve, State); handle_cast({conserve_memory, _Conserve}, State) -> noreply(State); handle_cast({flow_timeout, Ref}, State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> {stop, normal, terminating( rabbit_misc:amqp_error( precondition_failed, "timeout waiting for channel.flow_ok{active=~w}", [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), {hibernate, State}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok end, terminate(State). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. noreply(NewState) -> {noreply, NewState, hibernate}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> ok = rollback_and_notify(State), Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. return_queue_declare_ok(State, NoWait, Q) -> NewState = State#ch{most_recently_declared_queue = (Q#amqqueue.name)#resource.name}, case NoWait of true -> {noreply, NewState}; false -> {ok, ActualName, MessageCount, ConsumerCount} = rabbit_misc:with_exit_handler( fun () -> {ok, Q#amqqueue.name, 0, 0} end, fun () -> rabbit_amqqueue:stat(Q) end), Reply = #'queue.declare_ok'{queue = ActualName#resource.name, message_count = MessageCount, consumer_count = ConsumerCount}, {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> V = {Resource, Perm}, Cache = case get(permission_cache) of undefined -> []; Other -> Other end, CacheTail = case lists:member(V, Cache) of true -> lists:delete(V, Cache); false -> ok = rabbit_access_control:check_resource_access( Username, Resource, Perm), lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) end, put(permission_cache, [V | CacheTail]), ok. clear_permission_cache() -> erase(permission_cache), ok. check_configure_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, configure). check_write_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, write). check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). check_exclusive_access(#amqqueue{exclusive_owner = Owner}, ReaderPid) when Owner =:= none orelse Owner =:= ReaderPid -> ok; check_exclusive_access(#amqqueue{name = QName}, _ReaderPid) -> rabbit_misc:protocol_error( resource_locked, "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). with_exclusive_access_or_die(QName, ReaderPid, F) -> rabbit_amqqueue:with_or_die( QName, fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, most_recently_declared_queue = MRDQ }) -> rabbit_misc:r(VHostPath, queue, MRDQ); expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> rabbit_misc:r(VHostPath, queue, QueueNameBin). expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% %% One, quite reasonable, interpretation of the spec, taken by the %% QPid M1 Java client, is that the exclusion of "amq." prefixed names %% only applies on actual creation, and not in the cases where the %% entity already exists. This is how we use this function in the code %% below. However, AMQP JIRA 123 changes that in 0-10, and possibly %% 0-9SP1, making it illegal to attempt to declare an exchange/queue %% with an amq.* name when passive=false. So this will need %% revisiting. %% %% TODO: enforce other constraints on name. See AMQP JIRA 69. check_name(Kind, NameBin = <<"amq.", _/binary>>) -> rabbit_misc:protocol_error( access_refused, "~s name '~s' contains reserved prefix 'amq.*'",[Kind, NameBin]); check_name(_Kind, NameBin) -> NameBin. queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case dict:find(QPid, Blocking) of error -> State; {ok, MRef} -> true = erlang:demonitor(MRef), Blocking1 = dict:erase(QPid, Blocking), ok = case dict:size(Blocking1) of 0 -> rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow_ok'{active = false}); _ -> ok end, State#ch{blocking = Blocking1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of true -> {noreply, State}; false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} end; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( command_invalid, "second 'channel.open' seen", []); handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> rabbit_misc:protocol_error( command_invalid, "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, guid = rabbit_guid:guid(), is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of routed -> ok; unroutable -> %% FIXME: 312 should be replaced by the ?NO_ROUTE %% definition, when we move to >=0-9 ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); not_delivered -> %% FIXME: 313 should be replaced by the ?NO_CONSUMERS %% definition, when we move to >=0-9 ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) end, {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> if DeliveryTag >= NextDeliveryTag -> rabbit_misc:protocol_error( command_invalid, "unknown delivery tag ~w", [DeliveryTag]); true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(TxnKey, Acked), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( Participants, State#ch{unacked_message_q = Remaining, uncommitted_ack_q = NewUAQ}) end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ writer_pid = WriterPid, reader_pid = ReaderPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, redelivered = Redelivered, exchange = ExchangeName#resource.name, routing_key = RoutingKey, message_count = MessageCount}, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{cluster_id = <<>>}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, _, State = #ch{ reader_pid = ReaderPid, limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binstring_guid("amq.ctag"); Other -> Other end, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) end) of ok -> {noreply, State#ch{consumer_mapping = dict:store(ActualConsumerTag, QueueName, ConsumerMapping)}}; {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", [rabbit_misc:rs(QueueName)]) end; {ok, _} -> %% Attempted reuse of consumer tag. rabbit_misc:protocol_error( not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag]) end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); {ok, QueueName} -> NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, ConsumerMapping)}, case rabbit_amqqueue:with( QueueName, fun (Q) -> %% In order to ensure that no more messages %% are sent to the consumer after the %% cancel_ok has been sent, we get the %% queue process to send the cancel_ok on %% our behalf. If we were sending the %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of ok -> {noreply, NewState}; {error, not_found} -> %% Spec requires we ignore this situation. return_ok(NewState, NoWait, OkMsg) end end; handle_method(#'basic.qos'{global = true}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "global=true", []); handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case {LimiterPid, PrefetchCount} of {undefined, 0} -> undefined; {undefined, _} -> start_limiter(State); {_, _} -> LimiterPid end, LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of ok -> LimiterPid1; stopped -> unlimit_queues(State) end, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes %% that messages will be requeued in their original %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> %% Was sent as a basic.get_ok. Don't redeliver %% it. FIXME: appropriate? ok; ({DeliveryTag, ConsumerTag, {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> %% Was sent as a proper consumer delivery. Resend %% it as before. %% %% FIXME: What should happen if the consumer's been %% cancelled since? %% %% FIXME: should we allocate a fresh DeliveryTag? internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, ok, UAMQ), %% No answer required, apparently! {noreply, State}; handle_method(#'basic.recover'{}, _, _State) -> rabbit_misc:protocol_error( not_allowed, "attempt to recover a transactional channel",[]); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, durable = Durable, auto_delete = AutoDelete, internal = false, nowait = NoWait, arguments = Args}, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> check_name('exchange', ExchangeNameBin), case rabbit_misc:r_arg(VHostPath, exchange, Args, <<"alternate-exchange">>) of undefined -> ok; AName -> check_read_permitted(ExchangeName, State), check_write_permitted(AName, State), ok end, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, AutoDelete, Args) end, ok = rabbit_exchange:assert_type(X, CheckedType), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = true, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = Durable, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, arguments = Args}, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none end, %% We use this in both branches, because queue_declare may yet return an %% existing queue. Finish = fun (#amqqueue{name = QueueName} = Q) -> check_exclusive_access(Q, Owner), check_configure_permitted(QueueName, State), case Owner of none -> ok; _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) end, Q end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), Finish) of {error, not_found} -> ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); #amqqueue{} = Other -> Other end, return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, nowait = NoWait}, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> rabbit_misc:protocol_error( precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}) end; handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; handle_method(#'tx.select'{}, _, State) -> {reply, #'tx.select_ok'{}, State}; handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( not_allowed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State) -> {reply, #'tx.commit_ok'{}, internal_commit(State)}; handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( not_allowed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of ok -> LimiterPid; stopped -> unlimit_queues(State) end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> LimiterPid1 = case LimiterPid of undefined -> start_limiter(State); Other -> Other end, ok = rabbit_limiter:block(LimiterPid1), QPids = consumer_queues(Consumers), Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], ok = rabbit_amqqueue:flush_all(QPids, self()), case Queues of [] -> {reply, #'channel.flow_ok'{active = false}, State}; _ -> {noreply, State#ch{limiter_pid = LimiterPid1, blocking = dict:from_list(Queues)}} end; handle_method(#'channel.flow_ok'{active = Active}, _, State = #ch{flow = #flow{server = Active, client = Flow, pending = {_Ref, TRef}} = F}) when Flow =:= not Active -> {ok, cancel} = timer:cancel(TRef), {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; handle_method(#'channel.flow_ok'{active = Active}, _, State = #ch{flow = #flow{server = Flow, client = Flow, pending = {_Ref, TRef}}}) when Flow =:= not Active -> {ok, cancel} = timer:cancel(TRef), {noreply, issue_flow(Flow, State)}; handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> rabbit_misc:protocol_error( command_invalid, "unsolicited channel.flow_ok", []); handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> rabbit_misc:protocol_error( command_invalid, "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). %%---------------------------------------------------------------------------- flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) when Flow =:= not Active -> ok = clear_permission_cache(), noreply(issue_flow(Active, State)); flow_control(Active, State = #ch{flow = F}) -> noreply(State#ch{flow = F#flow{server = Active}}). issue_flow(Active, State) -> ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = Active}), Ref = make_ref(), {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, [self(), Ref]), State#ch{flow = #flow{server = Active, client = not Active, pending = {Ref, TRef}}}. binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_write_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, fun (_X, Q) -> check_exclusive_access(Q, ReaderPid) end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> rabbit_misc:not_found(QueueName); {error, exchange_and_queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); {error, durability_settings_incompatible} -> rabbit_misc:protocol_error( not_allowed, "durability settings of ~s incompatible with ~s", [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, ReturnMethod) end. basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, WriterPid, ReplyCode, ReplyText) -> ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, reply_text = ReplyText, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, Content). collect_acks(Q, 0, true) -> {Q, queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case queue:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)}; Multiple -> collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc, QTail, DeliveryTag, Multiple); true -> collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), QTail, DeliveryTag, Multiple) end; {empty, _} -> {ToAcc, PrefixAcc} end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). make_tx_id() -> rabbit_guid:guid(). new_tx(State) -> State#ch{transaction_id = make_tx_id(), tx_participants = sets:new(), uncommitted_ack_q = queue:new()}. internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. internal_rollback(State = #ch{transaction_id = TxnKey, tx_participants = Participants, uncommitted_ack_q = UAQ, unacked_message_q = UAMQ}) -> ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n", [self(), queue:len(UAQ), queue:len(UAMQ)]), ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), TxnKey, self()), NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}). rollback_and_notify(State = #ch{transaction_id = none}) -> notify_queues(State); rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would avoid the lists:reverse in %% handle_message({recover, true}, ...). However, it %% is significantly slower when going beyond a few %% thousand elements. rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). start_limiter(State = #ch{unacked_message_q = UAMQ}) -> LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). unlimit_queues(State) -> ok = limit_queues(undefined, State), undefined. limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> [QPid || QueueName <- sets:to_list( dict:fold(fun (_ConsumerTag, QueueName, S) -> sets:add_element(QueueName, S) end, sets:new(), Consumers)), case rabbit_amqqueue:lookup(QueueName) of {ok, Q} -> QPid = Q#amqqueue.pid, true; %% queue has been deleted in the meantime {error, not_found} -> QPid = none, false end]. %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. is_message_persistent(Content) -> case rabbit_basic:is_message_persistent(Content) of {invalid, Other} -> rabbit_log:warning("Unknown delivery mode ~p - " "treating as 1, non-persistent~n", [Other]), false; IsPersistent when is_boolean(IsPersistent) -> IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> State. internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}) -> M = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, redelivered = Redelivered, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), rabbit_writer:shutdown(WriterPid), rabbit_limiter:shutdown(LimiterPid). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{username = Username}) -> Username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> queue:len(UAQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(Item, _) -> throw({bad_argument, Item}).