diff options
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 15 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 21 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 20 |
5 files changed, 38 insertions, 39 deletions
@@ -347,7 +347,7 @@ $(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML)))) # Note that all targets which depend on clean must have clean in their # name. Also any target that doesn't depend on clean should not have # clean in its name, unless you know that you don't need any of the -# automatic dependency generation for that target (eg cleandb). +# automatic dependency generation for that target (e.g. cleandb). # We want to load the dep file if *any* target *doesn't* contain # "clean" - i.e. if removing all clean-like targets leaves something diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3a620fa..12cd0c93 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -710,15 +710,12 @@ infos(Items, State) -> || Item <- (Items1 -- [synchronised_slave_pids])]. slaves_status(#q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - _ -> + {ok, #amqqueue{slave_pids = SPids}} -> {Results, _Bad} = - delegate:invoke( - SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), {SPids1, SSPids} = lists:foldl( fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> @@ -762,11 +759,9 @@ i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, - slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> []; - _ -> SPids + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{slave_pids = SPids}} -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9a6879b1..b6d38172 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -720,13 +720,14 @@ gb_trees_foreach(Fun, Tree) -> %% [{"-q",true},{"-p","/"}]} get_options(Defs, As) -> lists:foldl(fun(Def, {AsIn, RsIn}) -> - {AsOut, Value} = case Def of - {flag, Key} -> - get_flag(Key, AsIn); - {option, Key, Default} -> - get_option(Key, Default, AsIn) - end, - {AsOut, [{Key, Value} | RsIn]} + {K, {AsOut, V}} = + case Def of + {flag, Key} -> + {Key, get_flag(Key, AsIn)}; + {option, Key, Default} -> + {Key, get_option(Key, Default, AsIn)} + end, + {AsOut, [{K, V} | RsIn]} end, {As, []}, Defs). get_option(K, _Default, [K, V | As]) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 54a7add2..323cf0ce 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -61,23 +61,26 @@ notify_cluster() -> %%-------------------------------------------------------------------- init([]) -> - {ok, no_state}. + {ok, ordsets:new()}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, State) -> - rabbit_log:info("rabbit on ~p up~n", [Node]), - erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node), - {noreply, State}; +handle_cast({rabbit_running_on, Node}, Nodes) -> + case ordsets:is_element(Node, Nodes) of + true -> {noreply, Nodes}; + false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + ok = handle_live_rabbit(Node), + {noreply, ordsets:add_element(Node, Nodes)} + end; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> - rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> + rabbit_log:info("rabbit on node ~p down~n", [Node]), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, ordsets:del_element(Node, Nodes)}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 908a279c..01242e81 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -505,9 +505,11 @@ handle_frame(Type, Channel, Payload, process_frame(Frame, Channel, State) -> case get({channel, Channel}) of {ChPid, AState} -> - NewAState = process_channel_frame(Frame, Channel, ChPid, AState), - put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); + case process_channel_frame(Frame, ChPid, AState) of + {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {error, Reason} -> handle_exception(State, Channel, Reason) + end; undefined when ?IS_RUNNING(State) -> ok = create_channel(Channel, State), process_frame(Frame, Channel, State); @@ -910,17 +912,15 @@ create_channel(Channel, State) -> put({channel, Channel}, {ChPid, AState}), ok. -process_channel_frame(Frame, Channel, ChPid, AState) -> +process_channel_frame(Frame, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> NewAState; + {ok, NewAState} -> {ok, NewAState}; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - NewAState; + {ok, NewAState}; {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( ChPid, Method, Content), - NewAState; - {error, Reason} -> self() ! {channel_exit, Channel, - Reason}, - AState + {ok, NewAState}; + {error, Reason} -> {error, Reason} end. handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> |