diff options
-rw-r--r-- | src/rabbit_channel.erl | 71 |
1 files changed, 37 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2b9cffd4..f3a2eb92 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -258,7 +258,7 @@ handle_cast({method, Method, Content, Flow}, end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + ok = send(Reply, NewState), noreply(NewState); {noreply, NewState} -> noreply(NewState); @@ -284,15 +284,16 @@ handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; -handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, - State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(consumer_monitor(ConsumerTag, State)); +handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) -> + ok = send(Msg, State), + noreply(consumer_monitor(CTag, State)); -handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, Msg), +handle_cast({command, Msg}, State) -> + ok = send(Msg, State), noreply(State); +handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> + noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -401,6 +402,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +send(_Command, #ch{state = closing}) -> + ok; +send(Command, #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Command). + handle_exception(Reason, State = #ch{protocol = Protocol, channel = Channel, writer_pid = WriterPid, @@ -545,12 +551,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case sets:is_element(QPid, Blocking) of false -> State; true -> Blocking1 = sets:del_element(QPid, Blocking), - ok = case sets:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, + case sets:size(Blocking1) of + 0 -> ok = send(#'channel.flow_ok'{active = false}, State); + _ -> ok + end, State#ch{blocking = Blocking1} end. @@ -833,12 +837,9 @@ handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> - {noreply, State2 = #ch{writer_pid = WriterPid}} = - handle_method(#'basic.recover_async'{requeue = Requeue}, - Content, - State), - ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), - {noreply, State2}; + {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, State), + {reply, #'basic.recover_ok'{}, State1}; handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, @@ -1145,17 +1146,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QCons, - writer_pid = WriterPid}) -> + queue_consumers = QCons}) -> ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> - Cancel = #'basic.cancel'{consumer_tag = CTag, - nowait = true}, - ok = rabbit_writer:send_command(WriterPid, Cancel), + ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, + State), dict:erase(CTag, CMap) end, ConsumerMapping, ConsumerTags), State#ch{consumer_mapping = ConsumerMapping1, @@ -1368,12 +1368,17 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> send_nacks([], State) -> State; +send_nacks(_MXs, State = #ch{state = closing, + tx_status = none}) -> %% optimisation + State; send_nacks(MXs, State = #ch{tx_status = none}) -> coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} end, State); +send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation + State#ch{tx_status = failed}; send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). @@ -1392,9 +1397,10 @@ send_confirms(State) -> send_confirms([], State) -> State; -send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = MsgSeqNo}), +send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation + State; +send_confirms([MsgSeqNo], State) -> + ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), State; send_confirms(Cs, State) -> coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> @@ -1402,8 +1408,7 @@ send_confirms(Cs, State) -> multiple = Multiple} end, State). -coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> +coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; @@ -1412,11 +1417,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; - _ -> ok = rabbit_writer:send_command( - WriterPid, MkMsgFun(lists:last(Ms), true)) + _ -> ok = send(MkMsgFun(lists:last(Ms), true), State) end, - [ok = rabbit_writer:send_command( - WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], + [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss], State. maybe_complete_tx(State = #ch{tx_status = in_progress}) -> @@ -1428,7 +1431,7 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) -> end. complete_tx(State = #ch{tx_status = committing}) -> - ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), + ok = send(#'tx.commit_ok'{}, State), State#ch{tx_status = in_progress}; complete_tx(State = #ch{tx_status = failed}) -> {noreply, State1} = handle_exception( |