From 5082c0bd74e981dddfdeef165574213485e4c728 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Thu, 15 Sep 2011 13:54:18 +0100 Subject: less arbitrary division of labour --- src/rabbit_channel.erl | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3f639d14..c73d85ac 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -328,11 +328,14 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{consumer_monitors = ConsumerMonitors}) -> State1 = handle_publishing_queue_down(QPid, Reason, State), - noreply( - case gb_sets:is_member(QPid, ConsumerMonitors) of - false -> State1; - true -> handle_consuming_queue_down(QPid, State1) - end); + erase_queue_stats(QPid), + State2 = queue_blocked(QPid, State1), + State3 = case gb_sets:is_member(QPid, ConsumerMonitors) of + false -> State2; + true -> handle_consuming_queue_down(QPid, State1) + end, + noreply(State3#ch{queue_monitors = + dict:erase(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -1200,9 +1203,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> {true, fun send_nacks/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - erase_queue_stats(QPid), - State3 = SendFun(MXs, State2), - queue_blocked(QPid, State3). + SendFun(MXs, State2). handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, -- cgit v1.2.1