summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-26 20:10:30 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-26 20:10:30 +0100
commitc9c48bf908368cbdc31531341d1e5099aae9c0d1 (patch)
tree721411395d3a1cefc635b0dcc3d5c9f4a2d4d993
parent5c9938c68d93cb29577cf5e3f245cbfb5d6291a7 (diff)
downloadrabbitmq-server-c9c48bf908368cbdc31531341d1e5099aae9c0d1.tar.gz
improve error handling code
-rw-r--r--src/rabbit_channel.erl72
1 files changed, 32 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0cdf7f2d..79cae8e0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -183,9 +183,24 @@ handle_call(_Request, _From, State) ->
noreply(State).
handle_cast({method, Method, Content}, State) ->
- handle_exiting_function(
- fun (State1) -> handle_method(Method, Content, State1) end, Method,
- State);
+ try handle_method(Method, Content, State) of
+ {reply, Reply, NewState} ->
+ ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ noreply(NewState);
+ {noreply, NewState} ->
+ noreply(NewState);
+ stop ->
+ {stop, normal, State#ch{state = terminating}}
+ catch
+ exit:Reason = #amqp_error{} ->
+ MethodName = rabbit_misc:method_record_type(Method),
+ {stop, normal, terminating(Reason#amqp_error{method = MethodName},
+ State)};
+ exit:normal ->
+ {stop, normal, State};
+ _:Reason ->
+ {stop, {Reason, erlang:get_stacktrace()}, State}
+ end;
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
@@ -207,18 +222,15 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
handle_cast({conserve_memory, Conserve}, State) ->
flow_control(not Conserve, State);
-handle_cast({flow_timeout, Ref}, State) ->
- handle_exiting_function(
- fun (#ch{flow = #flow{client = ClientFlow,
- pending = {Ref1, _TRef}}})
- when Ref =:= Ref1 ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not ClientFlow]);
- (State1) ->
- {noreply, State1}
- end, none, State).
+handle_cast({flow_timeout, Ref},
+ State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
+ {stop, normal, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
+handle_cast({flow_timeout, _Ref}, State) ->
+ {noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -259,6 +271,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
+ ok = rollback_and_notify(State),
+ Reader ! {channel_exit, Channel, Reason},
+ State#ch{state = terminating}.
+
return_queue_declare_ok(State, NoWait, Q) ->
NewState = State#ch{most_recently_declared_queue =
(Q#amqqueue.name)#resource.name},
@@ -357,31 +374,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-handle_exiting_function(Fun, Method, State) ->
- try Fun(State) of
- {reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- noreply(NewState);
- {noreply, NewState} ->
- noreply(NewState);
- stop ->
- {stop, normal, State#ch{state = terminating}}
- catch
- exit:Reason = #amqp_error{} ->
- ok = rollback_and_notify(State),
- MethodName = case Method of
- none -> none;
- _ -> rabbit_misc:method_record_type(Method)
- end,
- State#ch.reader_pid ! {channel_exit, State#ch.channel,
- Reason#amqp_error{method = MethodName}},
- {stop, normal, State#ch{state = terminating}};
- exit:normal ->
- {stop, normal, State};
- _:Reason ->
- {stop, {Reason, erlang:get_stacktrace()}, State}
- end.
-
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{reply, #'channel.open_ok'{}, State#ch{state = running}};