diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-05-26 20:10:30 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-05-26 20:10:30 +0100 |
commit | c9c48bf908368cbdc31531341d1e5099aae9c0d1 (patch) | |
tree | 721411395d3a1cefc635b0dcc3d5c9f4a2d4d993 | |
parent | 5c9938c68d93cb29577cf5e3f245cbfb5d6291a7 (diff) | |
download | rabbitmq-server-c9c48bf908368cbdc31531341d1e5099aae9c0d1.tar.gz |
improve error handling code
-rw-r--r-- | src/rabbit_channel.erl | 72 |
1 files changed, 32 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0cdf7f2d..79cae8e0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -183,9 +183,24 @@ handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content}, State) -> - handle_exiting_function( - fun (State1) -> handle_method(Method, Content, State1) end, Method, - State); + try handle_method(Method, Content, State) of + {reply, Reply, NewState} -> + ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + noreply(NewState); + {noreply, NewState} -> + noreply(NewState); + stop -> + {stop, normal, State#ch{state = terminating}} + catch + exit:Reason = #amqp_error{} -> + MethodName = rabbit_misc:method_record_type(Method), + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + State)}; + exit:normal -> + {stop, normal, State}; + _:Reason -> + {stop, {Reason, erlang:get_stacktrace()}, State} + end; handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; @@ -207,18 +222,15 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast({conserve_memory, Conserve}, State) -> flow_control(not Conserve, State); -handle_cast({flow_timeout, Ref}, State) -> - handle_exiting_function( - fun (#ch{flow = #flow{client = ClientFlow, - pending = {Ref1, _TRef}}}) - when Ref =:= Ref1 -> - rabbit_misc:protocol_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not ClientFlow]); - (State1) -> - {noreply, State1} - end, none, State). +handle_cast({flow_timeout, Ref}, + State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; +handle_cast({flow_timeout, _Ref}, State) -> + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -259,6 +271,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> + ok = rollback_and_notify(State), + Reader ! {channel_exit, Channel, Reason}, + State#ch{state = terminating}. + return_queue_declare_ok(State, NoWait, Q) -> NewState = State#ch{most_recently_declared_queue = (Q#amqqueue.name)#resource.name}, @@ -357,31 +374,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -handle_exiting_function(Fun, Method, State) -> - try Fun(State) of - {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - noreply(NewState); - {noreply, NewState} -> - noreply(NewState); - stop -> - {stop, normal, State#ch{state = terminating}} - catch - exit:Reason = #amqp_error{} -> - ok = rollback_and_notify(State), - MethodName = case Method of - none -> none; - _ -> rabbit_misc:method_record_type(Method) - end, - State#ch.reader_pid ! {channel_exit, State#ch.channel, - Reason#amqp_error{method = MethodName}}, - {stop, normal, State#ch{state = terminating}}; - exit:normal -> - {stop, normal, State}; - _:Reason -> - {stop, {Reason, erlang:get_stacktrace()}, State} - end. - handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {reply, #'channel.open_ok'{}, State#ch{state = running}}; |