diff options
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 315 |
1 files changed, 236 insertions, 79 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 800cc237..edafd52d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,10 +35,10 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -48,7 +48,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, + held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -70,6 +72,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(FLUSH_CONFIRMS_INTERVAL, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -86,12 +90,15 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). +-spec(flush_confirms/1 :: (pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -115,6 +122,9 @@ do(Pid, Method) -> do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). +flush(Pid) -> + gen_server2:call(Pid, flush). + shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -127,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). + +flush_confirms(Pid) -> + gen_server2:cast(Pid, flush_confirms). + list() -> pg_local:get_members(rabbit_channels). @@ -150,9 +166,6 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). -flush(Pid) -> - gen_server2:call(Pid, flush). - %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -177,7 +190,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer}, + stats_timer = StatsTimer, + confirm_enabled = false, + publish_seqno = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + unconfirmed = gb_sets:new(), + queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -197,6 +216,9 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -206,9 +228,6 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(flush, _From, State) -> - reply(ok, State); - handle_call(_Request, _From, State) -> noreply(State). @@ -242,12 +261,24 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), noreply(State); -handle_cast({deliver, ConsumerTag, AckRequired, Msg}, +handle_cast({deliver, ConsumerTag, AckRequired, + Msg = {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> - State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + State1 = lock_message(AckRequired, + ack_record(DeliveryTag, ConsumerTag, Msg), + State), + + M = #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), + maybe_incr_stats([{QPid, 1}], case AckRequired of true -> deliver; @@ -259,21 +290,38 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, - hibernate}. - -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + hibernate}; + +handle_cast(flush_confirms, State) -> + {noreply, internal_flush_confirms(State)}; + +handle_cast({confirm, MsgSeqNo, From}, State) -> + {noreply, confirm(MsgSeqNo, From, State)}. + +handle_info({'DOWN', _MRef, process, QPid, _Reason}, + State = #ch{queues_for_msg = QFM}) -> + State1 = dict:fold( + fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> + Qs = sets:del_element(QPid, QPids), + case sets:size(Qs) of + 0 -> confirm(Msg, QPid, State0); + _ -> State0#ch{queues_for_msg = + dict:store(Msg, Qs, QFM0)} + end + end, State, QFM), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State), hibernate}. + {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), + State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), - {hibernate, - State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. + StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), + {hibernate, State1#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -352,6 +400,13 @@ check_write_permitted(Resource, #ch{username = Username}) -> check_read_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, read). +check_internal_exchange(#exchange{name = Name, internal = true}) -> + rabbit_misc:protocol_error(access_refused, + "cannot publish to internal ~s", + [rabbit_misc:rs(Name)]); +check_internal_exchange(_) -> + ok. + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -418,6 +473,52 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +confirm(undefined, _QPid, State) -> + State; +confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> + State; +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> + do_if_unconfirmed(MsgSeqNo, QPid, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{ + delivery_tag = MSN}), + State1 + end, State); +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> + do_if_unconfirmed(MsgSeqNo, QPid, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_confirm_timer( + State1#ch{held_confirms = gb_sets:add(MSN, As)}) + end, State). + +do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, + State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + %% clears references to MsgSeqNo and does ConfirmFun + case gb_sets:is_element(MsgSeqNo, UC) of + true -> + Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), + case QPid of + undefined -> + ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); + _ -> + {ok, Qs} = dict:find(MsgSeqNo, QFM), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> ConfirmFun(MsgSeqNo, + State#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM), + unconfirmed = Unconfirmed1}); + _ -> State#ch{queues_for_msg = + dict:store(MsgSeqNo, Qs1, QFM)} + end + end; + false -> + State + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -440,16 +541,26 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + {MsgSeqNo, State1} + = case ConfirmEnabled of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, + State#ch{publish_seqno = SeqNo + 1, + unconfirmed = + gb_sets:add(SeqNo, State#ch.unconfirmed)}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -458,18 +569,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> ok; - unroutable -> ok = basic_return(Message, WriterPid, no_route); - not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) - end, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State2), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -506,7 +615,9 @@ handle_method(#'basic.get'{queue = QueueNameBin, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> - State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + State1 = lock_message(not(NoAck), + ack_record(DeliveryTag, none, Msg), + State), maybe_incr_stats([{QPid, 1}], case NoAck of true -> get_no_ack; @@ -642,30 +753,8 @@ handle_method(#'basic.recover_async'{requeue = true}, %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{writer_pid = WriterPid, - unacked_message_q = UAMQ}) -> - ok = rabbit_misc:queue_fold( - fun ({_DeliveryTag, none, _Msg}, ok) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> - %% Was sent as a proper consumer delivery. Resend - %% it as before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - internal_deliver( - WriterPid, false, ConsumerTag, DeliveryTag, - {QName, QPid, MsgId, true, Message}) - end, ok, UAMQ), - %% No answer required - basic.recover is the newer, synchronous - %% variant of this method - {noreply, State}; +handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> {noreply, State2 = #ch{writer_pid = WriterPid}} = @@ -691,7 +780,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = false, durable = Durable, auto_delete = AutoDelete, - internal = false, + internal = Internal, nowait = NoWait, arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> @@ -714,10 +803,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, CheckedType, Durable, AutoDelete, + Internal, Args) end, ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Args), + AutoDelete, Internal, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, @@ -878,6 +968,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; @@ -898,6 +993,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = false}) -> + return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, + NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + return_ok(State, NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot change confirm_multiple setting", []); + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -987,6 +1101,10 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +ack_record(DeliveryTag, ConsumerTag, + _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> + {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + collect_acks(Q, 0, true) -> {Q, queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> @@ -1059,8 +1177,7 @@ rollback_and_notify(State) -> fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, - {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> + fun ({_DTag, _CTag, {QPid, MsgId}}, D) -> %% dict:append would avoid the lists:reverse in %% handle_message({recover, true}, ...). However, it %% is significantly slower when going beyond a few @@ -1122,28 +1239,68 @@ is_message_persistent(Content) -> IsPersistent end. +process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_route), + confirm(MsgSeqNo, undefined, State); +process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_consumers), + confirm(MsgSeqNo, undefined, State); +process_routing_result(routed, [], MsgSeqNo, _, State) -> + confirm(MsgSeqNo, undefined, State); +process_routing_result(routed, _, undefined, _, State) -> + State; +process_routing_result(routed, QPids, MsgSeqNo, _, + State = #ch{queues_for_msg = QFM}) -> + QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + [maybe_monitor(QPid) || QPid <- QPids], + State#ch{queues_for_msg = QFM1}. + lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, - {_QName, QPid, _MsgId, Redelivered, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}}) -> - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - ok = case Notify of - true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, self(), M, Content); - false -> rabbit_writer:send_command(WriterPid, M, Content) - end. +start_confirm_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, + ?MODULE, flush_confirms, [self()]), + State#ch{confirm_tref = TRef}; +start_confirm_timer(State) -> + State. + +stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +internal_flush_confirms(State = #ch{writer_pid = WriterPid, + held_confirms = Cs}) -> + case gb_sets:is_empty(Cs) of + true -> State#ch{confirm_tref = undefined}; + false -> [First | Rest] = gb_sets:to_list(Cs), + {Mult, Inds} = find_consecutive_sequence(First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, multiple = true}), + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Inds), + State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined} + end. + +%% Find longest sequence of consecutive numbers at the beginning. +find_consecutive_sequence(Last, []) -> + {Last, []}; +find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> + find_consecutive_sequence(N, Ns); +find_consecutive_sequence(Last, Ns) -> + {Last, Ns}. -terminate(_State) -> +terminate(State) -> + stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). |