summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-04-30 11:28:02 +0100
committerTim Watson <tim@rabbitmq.com>2013-04-30 11:28:02 +0100
commit1cfb7c934656b4457b2c790d8949502be696c893 (patch)
tree72a0803fa120d9445cb804d61dda568594348d85
parenta103c073e56a87b0b2cc86eb1f1cc0ce68c9b91f (diff)
parent8726fa142693ca8a68fb9a634d14322739891b89 (diff)
downloadrabbitmq-server-bug25501.tar.gz
merge defaultbug25501
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml2
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/gen_server2.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl171
-rw-r--r--src/rabbit_auth_backend_internal.erl4
-rw-r--r--src/rabbit_autoheal.erl199
-rw-r--r--src/rabbit_exchange.erl61
-rw-r--r--src/rabbit_exchange_decorator.erl41
-rw-r--r--src/rabbit_mirror_queue_slave.erl15
-rw-r--r--src/rabbit_mnesia.erl46
-rw-r--r--src/rabbit_net.erl13
-rw-r--r--src/rabbit_node_monitor.erl110
-rw-r--r--src/rabbit_policy.erl14
-rw-r--r--src/rabbit_ssl.erl29
-rw-r--r--src/rabbit_tests.erl11
-rw-r--r--src/rabbit_upgrade_functions.erl16
-rw-r--r--src/rabbit_vm.erl184
18 files changed, 682 insertions, 261 deletions
diff --git a/Makefile b/Makefile
index bf33b931..449d1edd 100644
--- a/Makefile
+++ b/Makefile
@@ -64,7 +64,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json
+AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json $(AMQP_CODEGEN_DIR)/credit_extension.json
AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index bbd2fe5b..0f3c0faf 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -289,7 +289,7 @@
<variablelist>
<varlistentry id="join_cluster">
- <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg> <arg choice="opt">--ram</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index eeee799e..4282755d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -40,7 +40,7 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy}).
+ scratches, policy, decorators}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 9109febd..507d1cda 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -75,6 +75,12 @@
%% format_message_queue/2 which is the equivalent of format_status/2
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
+%%
+%% 9) The function with_state/2 can be used to debug a process with
+%% heavyweight state (without needing to copy the entire state out of
+%% process as sys:get_status/1 would). Pass through a function which
+%% can be invoked on the state, get back the result. The state is not
+%% modified.
%% All modifications are (C) 2009-2013 VMware, Inc.
@@ -184,6 +190,7 @@
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
+ with_state/2,
enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
%% System exports
@@ -382,6 +389,16 @@ multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
+%% -----------------------------------------------------------------
+%% Apply a function to a generic server's state.
+%% -----------------------------------------------------------------
+with_state(Name, Fun) ->
+ case catch gen:call(Name, '$with_state', Fun, infinity) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, with_state, [Name, Fun]}})
+ end.
%%-----------------------------------------------------------------
%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
@@ -645,6 +662,8 @@ in({'$gen_cast', Msg} = Input,
in({'$gen_call', From, Msg} = Input,
GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
in(Input, F(Msg, From, GS2State), GS2State);
+in({'$with_state', _From, _Fun} = Input, GS2State) ->
+ in(Input, 0, GS2State);
in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
in(Input, infinity, GS2State);
in({system, _From, _Req} = Input, GS2State) ->
@@ -663,6 +682,10 @@ process_msg({system, From, Req},
%% gen_server puts Hib on the end as the 7th arg, but that version
%% of the fun seems not to be documented so leaving out for now.
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
+process_msg({'$with_state', From, Fun},
+ GS2State = #gs2_state{state = State}) ->
+ reply(From, catch Fun(State)),
+ loop(GS2State);
process_msg({'EXIT', Parent, Reason} = Msg,
GS2State = #gs2_state { parent = Parent }) ->
terminate(Reason, Msg, GS2State);
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b016c4d2..066392f8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,10 +49,6 @@
ttl_timer_ref,
ttl_timer_expiry,
senders,
- publish_seqno,
- unconfirmed,
- delayed_stop,
- queue_monitors,
dlx,
dlx_routing_key,
max_length,
@@ -151,9 +147,6 @@ init_state(Q) ->
has_had_consumers = false,
active_consumers = queue:new(),
senders = pmon:new(),
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
@@ -820,82 +813,34 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
State1.
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
- publish_seqno = SeqNo0,
- unconfirmed = UC0,
- queue_monitors = QMons0,
backing_queue_state = BQS,
backing_queue = BQ}) ->
QName = qname(State),
- {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
- Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
- case dead_letter_publish(Msg, Reason,
- X, RK, SeqNo, QName) of
- [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
- QPids -> {AckImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
- pmon:monitor_all(QPids, QMons)}
- end
- end, {[], SeqNo0, UC0, QMons0}, BQS),
- {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
- {Res, State#q{publish_seqno = SeqNo1,
- unconfirmed = UC1,
- queue_monitors = QMons1,
- backing_queue_state = BQS2}}.
-
-dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ {Res, Acks1, BQS1} =
+ Fun(fun (Msg, AckTag, Acks) ->
+ dead_letter_publish(Msg, Reason, X, RK, QName),
+ [AckTag | Acks]
+ end, [], BQS),
+ {_Guids, BQS2} = BQ:ack(Acks1, BQS1),
+ {Res, State#q{backing_queue_state = BQS2}}.
+
+dead_letter_publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
{Queues, Cycles} = detect_dead_letter_cycles(
- DLMsg, rabbit_exchange:route(X, Delivery)),
+ Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(
- rabbit_amqqueue:lookup(Queues), Delivery),
- DeliveredQPids.
-
-handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed = UC}) ->
- case pmon:is_monitored(QPid, QMons) of
- false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- QNameS = rabbit_misc:rs(qname(State)),
- rabbit_log:warning("DLQ ~p for ~s died with "
- "~p unconfirmed messages~n",
- [QPid, QNameS, length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = pmon:erase(QPid, QMons),
- unconfirmed = UC1})
- end.
+ rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
-stop(State) -> stop(undefined, noreply, State).
+stop(State) -> stop(noreply, State).
-stop(From, Reply, State = #q{unconfirmed = UC}) ->
- case {dtree:is_empty(UC), Reply} of
- {true, noreply} -> {stop, normal, State};
- {true, _} -> {stop, normal, Reply, State};
- {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
- end.
+stop(noreply, State) -> {stop, normal, State};
+stop(Reply, State) -> {stop, normal, Reply, State}.
-cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
- unconfirmed = UC,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case dtree:is_empty(UC) andalso DS =/= undefined of
- true -> case DS of
- {_, noreply} -> ok;
- {From, Reply} -> gen_server2:reply(From, Reply)
- end,
- {stop, normal, State1};
- false -> noreply(State1)
- end.
-detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
+detect_dead_letter_cycles(expired,
+ #basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
NoCycles = {Queues, []},
@@ -904,22 +849,42 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, DeathTables} ->
- OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
- {table, D} <- DeathTables],
- OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- OldQueuesSet = ordsets:from_list(OldQueues1),
+ {array, Deaths} ->
{Cycling, NotCycling} =
lists:partition(
- fun(Queue) ->
- ordsets:is_element(Queue#resource.name,
- OldQueuesSet)
+ fun (#resource{name = Queue}) ->
+ is_dead_letter_cycle(Queue, Deaths)
end, Queues),
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- Deaths],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
{NotCycling, [[QName | OldQueues1] ||
#resource{name = QName} <- Cycling]};
_ ->
NoCycles
end
+ end;
+detect_dead_letter_cycles(_Reason, _Msg, Queues) ->
+ {Queues, []}.
+
+is_dead_letter_cycle(Queue, Deaths) ->
+ {Cycle, Rest} =
+ lists:splitwith(
+ fun ({table, D}) ->
+ {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
+ (_) ->
+ true
+ end, Deaths),
+ %% Is there a cycle, and if so, is it entirely due to expiry?
+ case Rest of
+ [] -> false;
+ [H|_] -> lists:all(
+ fun ({table, D}) ->
+ {longstr, <<"expired">>} =:=
+ rabbit_misc:table_lookup(D, <<"reason">>);
+ (_) ->
+ false
+ end, Cycle ++ [H])
end.
make_dead_letter_msg(Msg = #basic_message{content = Content,
@@ -1073,9 +1038,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -1115,16 +1077,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) ->
gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, Delivered, State));
-handle_call({notify_down, ChPid}, From, State) ->
+handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent. FIXME: in case of a
- %% delayed stop the reply is sent earlier.
+ %% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop(From, ok, State1)
+ {stop, State1} -> stop(ok, State1)
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
@@ -1186,7 +1147,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -1215,7 +1176,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop(From, ok, State1)
+ true -> stop(ok, State1)
end
end;
@@ -1224,14 +1185,14 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, From,
+handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop(From, {ok, BQ:len(BQS)}, State)
+ true -> stop({ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1286,19 +1247,6 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
- {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- State1 = case dtree:is_defined(QPid, UC1) of
- false -> QMons = State#q.queue_monitors,
- State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
- true -> State
- end,
- cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State1#q{unconfirmed = UC1});
-
-handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
@@ -1405,15 +1353,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
handle_cast(wake_up, State) ->
noreply(State).
-%% We need to not ignore this as we need to remove outstanding
-%% confirms due to queue death.
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason},
- State = #q{delayed_stop = DS}) when DS =/= undefined ->
- handle_queue_down(DownPid, Reason, State);
-
-handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> stop(State);
@@ -1442,9 +1381,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% unexpectedly.
stop(State);
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
+ {ok, State1} -> noreply(State1);
{stop, State1} -> stop(State1)
end;
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 2dc1cad3..483666b4 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -203,7 +203,9 @@ hash_password(Cleartext) ->
<<Salt/binary, Hash/binary>>.
check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
- Hash =:= salted_md5(Salt, Cleartext).
+ Hash =:= salted_md5(Salt, Cleartext);
+check_password(_Cleartext, _Any) ->
+ false.
make_salt() ->
{A1,A2,A3} = now(),
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
new file mode 100644
index 00000000..c00c2dd6
--- /dev/null
+++ b/src/rabbit_autoheal.erl
@@ -0,0 +1,199 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_autoheal).
+
+-export([init/0, maybe_start/1, node_down/2, handle_msg/3]).
+
+%% The named process we are running in.
+-define(SERVER, rabbit_node_monitor).
+
+%%----------------------------------------------------------------------------
+
+%% In order to autoheal we want to:
+%%
+%% * Find the winning partition
+%% * Stop all nodes in other partitions
+%% * Wait for them all to be stopped
+%% * Start them again
+%%
+%% To keep things simple, we assume all nodes are up. We don't start
+%% unless all nodes are up, and if a node goes down we abandon the
+%% whole process. To further keep things simple we also defer the
+%% decision as to the winning node to the "leader" - arbitrarily
+%% selected as the first node in the cluster.
+%%
+%% To coordinate the restarting nodes we pick a special node from the
+%% winning partition - the "winner". Restarting nodes then stop, tell
+%% the winner they have done so, and wait for it to tell them it is
+%% safe to start again.
+%%
+%% The winner and the leader are not necessarily the same node! Since
+%% the leader may end up restarting, we also make sure that it does
+%% not announce its decision (and thus cue other nodes to restart)
+%% until it has seen a request from every node that has experienced a
+%% partition.
+%%
+%% Possible states:
+%%
+%% not_healing
+%% - the default
+%%
+%% {winner_waiting, OutstandingStops, Notify}
+%% - we are the winner and are waiting for all losing nodes to stop
+%% before telling them they can restart
+%%
+%% restarting
+%% - we are restarting. Of course the node monitor immediately dies
+%% then so this state does not last long. We therefore send the
+%% autoheal_safe_to_start message to the rabbit_outside_app_process
+%% instead.
+
+%%----------------------------------------------------------------------------
+
+init() -> not_healing.
+
+maybe_start(not_healing) ->
+ case enabled() of
+ true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ send(Leader, {request_start, node()}),
+ rabbit_log:info("Autoheal request sent to ~p~n", [Leader]),
+ not_healing;
+ false -> not_healing
+ end;
+maybe_start(State) ->
+ State.
+
+enabled() ->
+ {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling).
+
+node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) ->
+ Autoheal;
+node_down(_Node, not_healing) ->
+ not_healing;
+node_down(Node, _State) ->
+ rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
+ not_healing.
+
+%% By receiving this message we become the leader
+%% TODO should we try to debounce this?
+handle_msg({request_start, Node},
+ not_healing, Partitions) ->
+ rabbit_log:info("Autoheal request received from ~p~n", [Node]),
+ case rabbit_node_monitor:all_rabbit_nodes_up() of
+ false -> not_healing;
+ true -> AllPartitions = all_partitions(Partitions),
+ {Winner, Losers} = make_decision(AllPartitions),
+ rabbit_log:info("Autoheal decision~n"
+ " * Partitions: ~p~n"
+ " * Winner: ~p~n"
+ " * Losers: ~p~n",
+ [AllPartitions, Winner, Losers]),
+ send(Winner, {become_winner, Losers}),
+ [send(L, {winner_is, Winner}) || L <- Losers],
+ not_healing
+ end;
+
+handle_msg({become_winner, Losers},
+ not_healing, _Partitions) ->
+ rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
+ [Losers]),
+ {winner_waiting, Losers, Losers};
+
+handle_msg({become_winner, Losers},
+ {winner_waiting, WaitFor, Notify}, _Partitions) ->
+ rabbit_log:info("Autoheal: I am the winner, waiting additionally for "
+ "~p to stop~n", [Losers]),
+ {winner_waiting, lists:usort(Losers ++ WaitFor),
+ lists:usort(Losers ++ Notify)};
+
+handle_msg({winner_is, Winner},
+ not_healing, _Partitions) ->
+ rabbit_log:warning(
+ "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
+ rabbit_node_monitor:run_outside_applications(
+ fun () ->
+ MRef = erlang:monitor(process, {?SERVER, Winner}),
+ rabbit:stop(),
+ send(Winner, {node_stopped, node()}),
+ receive
+ {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok;
+ autoheal_safe_to_start -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ rabbit:start()
+ end),
+ restarting;
+
+%% This is the winner receiving its last notification that a node has
+%% stopped - all nodes can now start again
+handle_msg({node_stopped, Node},
+ {winner_waiting, [Node], Notify}, _Partitions) ->
+ rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ not_healing;
+
+handle_msg({node_stopped, Node},
+ {winner_waiting, WaitFor, Notify}, _Partitions) ->
+ {winner_waiting, WaitFor -- [Node], Notify};
+
+handle_msg(_, restarting, _Partitions) ->
+ %% ignore, we can contribute no further
+ restarting;
+
+handle_msg({node_stopped, _Node}, State, _Partitions) ->
+ %% ignore, we already cancelled the autoheal process
+ State.
+
+%%----------------------------------------------------------------------------
+
+send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}.
+
+make_decision(AllPartitions) ->
+ Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
+ [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
+ {Winner, lists:append(Rest)}.
+
+partition_value(Partition) ->
+ Connections = [Res || Node <- Partition,
+ Res <- [rpc:call(Node, rabbit_networking,
+ connections_local, [])],
+ is_list(Res)],
+ {length(lists:append(Connections)), length(Partition)}.
+
+%% We have our local understanding of what partitions exist; but we
+%% only know which nodes we have been partitioned from, not which
+%% nodes are partitioned from each other.
+all_partitions(PartitionedWith) ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ Partitions = [{node(), PartitionedWith} |
+ [rpc:call(Node, rabbit_node_monitor, partitions, [])
+ || Node <- Nodes -- [node()]]],
+ all_partitions(Partitions, [Nodes]).
+
+all_partitions([], Partitions) ->
+ Partitions;
+all_partitions([{Node, CantSee} | Rest], Partitions) ->
+ {[Containing], Others} =
+ lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions),
+ A = Containing -- CantSee,
+ B = Containing -- A,
+ Partitions1 = case {A, B} of
+ {[], _} -> Partitions;
+ {_, []} -> Partitions;
+ _ -> [A, B | Others]
+ end,
+ all_partitions(Rest, Partitions1).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9e98448d..b4bdd348 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -68,7 +68,8 @@
-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
-spec(update/2 ::
(name(),
- fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
+ -> not_found | rabbit_types:exchange()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -113,25 +114,39 @@ recover() ->
callback(X, create, map_create_tx(Tx), [X])
end,
rabbit_durable_exchange),
+ report_missing_decorators(Xs),
[XName || #exchange{name = XName} <- Xs].
-callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+report_missing_decorators(Xs) ->
+ Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) ||
+ #exchange{decorators = D} <- Xs])),
+ case [M || M <- Mods, code:which(M) =:= non_existing] of
+ [] -> ok;
+ M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M])
+ end.
+
+callback(X = #exchange{type = XType,
+ decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- registry_lookup(exchange_decorator)],
+ M <- rabbit_exchange_decorator:select(all, Decorators)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
-policy_changed(X = #exchange{type = XType}, X1) ->
- [ok = M:policy_changed(X, X1) ||
- M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
+policy_changed(X = #exchange{type = XType,
+ decorators = Decorators},
+ X1 = #exchange{decorators = Decorators1}) ->
+ D = rabbit_exchange_decorator:select(all, Decorators),
+ D1 = rabbit_exchange_decorator:select(all, Decorators1),
+ DAll = lists:usort(D ++ D1),
+ [ok = M:policy_changed(X, X1) || M <- [type_to_module(XType) | DAll]],
ok.
-serialise_events(X = #exchange{type = Type}) ->
+serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
lists:any(fun (M) -> M:serialise_events(X) end,
- registry_lookup(exchange_decorator))
+ rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -143,16 +158,6 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-registry_lookup(exchange_decorator_route = Class) ->
- case get(exchange_decorator_route_modules) of
- undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
- put(exchange_decorator_route_modules, Mods),
- Mods;
- Mods -> Mods
- end;
-registry_lookup(Class) ->
- [M || {_, M} <- rabbit_registry:lookup_all(Class)].
-
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName,
type = Type,
@@ -273,7 +278,8 @@ update_scratch(Name, App, Fun) ->
Scratches2 = orddict:store(
App, Fun(Scratch), Scratches1),
X#exchange{scratches = Scratches2}
- end)
+ end),
+ ok
end).
update(Name, Fun) ->
@@ -284,9 +290,10 @@ update(Name, Fun) ->
case Durable of
true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
_ -> ok
- end;
+ end,
+ X1;
[] ->
- ok
+ not_found
end.
info_keys() -> ?INFO_KEYS.
@@ -318,15 +325,15 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-route(#exchange{name = #resource{virtual_host = VHost,
- name = RName} = XName} = X,
+route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
+ decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
- case {registry_lookup(exchange_decorator_route), RName == <<"">>} of
- {[], true} ->
+ case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
+ {<<"">>, []} ->
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
- {Decorators, _} ->
- lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
+ {_, SelectedDecorators} ->
+ lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []}))
end.
route1(_, _, {[], _, QNames}) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 040b55db..3abaa48c 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -16,6 +16,10 @@
-module(rabbit_exchange_decorator).
+-include("rabbit.hrl").
+
+-export([select/2, set/1]).
+
%% This is like an exchange type except that:
%%
%% 1) It applies to all exchanges as soon as it is installed, therefore
@@ -57,10 +61,13 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% Decorators can optionally implement route/2 which allows additional
-%% destinations to be added to the routing decision.
-%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
-%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
+%% Allows additional destinations to be added to the routing decision.
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name() | rabbit_exchange:name()].
+
+%% Whether the decorator wishes to receive callbacks for the exchange
+%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
+-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
-else.
@@ -68,8 +75,32 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3},
+ {route, 2}, {active_for, 1}];
behaviour_info(_Other) ->
undefined.
-endif.
+
+%%----------------------------------------------------------------------------
+
+%% select a subset of active decorators
+select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute);
+select(route, {Route, _NoRoute}) -> filter(Route);
+select(raw, {Route, NoRoute}) -> Route ++ NoRoute.
+
+filter(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(X) ->
+ Decs = lists:foldl(fun (D, {Route, NoRoute}) ->
+ ActiveFor = D:active_for(X),
+ {cons_if_eq(all, ActiveFor, D, Route),
+ cons_if_eq(noroute, ActiveFor, D, NoRoute)}
+ end, {[], []}, list()),
+ X#exchange{decorators = Decs}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+
+cons_if_eq(Select, Select, Item, List) -> [Item | List];
+cons_if_eq(_Select, _Other, _Item, List) -> List.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 22edfcb6..964b0eb4 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -605,10 +605,13 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+ %% The channel will be monitored iff we have received a delivery
+ %% from it but not heard about its death from the master. So if it
+ %% is monitored we need to point the death out to the master (see
+ %% essay).
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
- true -> credit_flow:peer_down(ChPid),
- confirm_sender_death(ChPid)
+ true -> confirm_sender_death(ChPid)
end,
State.
@@ -621,6 +624,10 @@ confirm_sender_death(Pid) ->
fun (?MODULE, State = #state { known_senders = KS,
gm = GM }) ->
%% We're running still as a slave
+ %%
+ %% See comment in local_sender_death/2; we might have
+ %% received a sender_death in the meanwhile so check
+ %% again.
ok = case pmon:is_monitored(Pid, KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
@@ -766,6 +773,9 @@ process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
+ %% The channel will be monitored iff we have received a message
+ %% from it. In this case we just want to avoid doing work if we
+ %% never got any messages.
{ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
true -> MS1 = case dict:find(ChPid, SQ) of
@@ -775,6 +785,7 @@ process_instruction({sender_death, ChPid},
lists:foldl(fun dict:erase/2, MS,
sets:to_list(PendingCh))
end,
+ credit_flow:peer_down(ChPid),
State #state { sender_queues = dict:erase(ChPid, SQ),
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c39e898c..8cd976fa 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -404,7 +404,7 @@ cluster_status(WhichNodes) ->
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
- cluster_status_from_mnesia()}.
+ delegate_beam_hash(), cluster_status_from_mnesia()}.
node_type() ->
DiscNodes = cluster_nodes(disc),
@@ -562,10 +562,13 @@ check_cluster_consistency(Node) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
{error, not_found};
- {_OTP, _Rabbit, {error, _}} ->
+ {_OTP, _Rabbit, _Hash, {error, _}} ->
{error, not_found};
- {OTP, Rabbit, {ok, Status}} ->
- case check_consistency(OTP, Rabbit, Node, Status) of
+ {_OTP, Rabbit, _Status} ->
+ %% pre-2013/04 format implies version mismatch
+ version_error("Rabbit", rabbit_misc:version(), Rabbit);
+ {OTP, Rabbit, Hash, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Hash, Node, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
end
@@ -732,14 +735,17 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-check_consistency(OTP, Rabbit) ->
+check_consistency(OTP, Rabbit, Hash) ->
rabbit_misc:sequence_error(
- [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
+ [check_otp_consistency(OTP),
+ check_rabbit_consistency(Rabbit),
+ check_beam_compatibility(Hash)]).
-check_consistency(OTP, Rabbit, Node, Status) ->
+check_consistency(OTP, Rabbit, Hash, Node, Status) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
check_rabbit_consistency(Rabbit),
+ check_beam_compatibility(Hash),
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
@@ -780,6 +786,21 @@ check_rabbit_consistency(Remote) ->
rabbit_misc:version(), Remote, "Rabbit",
fun rabbit_misc:version_minor_equivalent/2).
+check_beam_compatibility(RemoteHash) ->
+ case RemoteHash == delegate_beam_hash() of
+ true -> ok;
+ false -> {error, {incompatible_bytecode,
+ "Incompatible Erlang bytecode found on nodes"}}
+ end.
+
+%% The delegate module sends functions across the cluster; if it is
+%% out of sync (say due to mixed compilers), we will get badfun
+%% exceptions when trying to do so. Let's detect that at startup.
+delegate_beam_hash() ->
+ {delegate, Obj, _} = code:get_object_code(delegate),
+ {ok, {delegate, Hash}} = beam_lib:md5(Obj),
+ Hash.
+
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
%% mnesia tables aren't there because restarted RAM nodes won't have
@@ -805,11 +826,12 @@ find_good_node([]) ->
none;
find_good_node([Node | Nodes]) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
- {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ {_OTP, _Rabbit, _} -> find_good_node(Nodes);
+ {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b53c16bf..b7f4d019 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
- close/1, fast_close/1, sockname/1, peername/1, peercert/1,
- connection_string/2, socket_ends/2]).
+ recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
+ setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
+ peercert/1, connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -48,6 +48,8 @@
-spec(recv/1 :: (socket()) ->
{'data', [char()] | binary()} | 'closed' |
rabbit_types:error(any()) | {'other', any()}).
+-spec(sync_recv/2 :: (socket(), integer()) -> rabbit_types:ok(binary()) |
+ rabbit_types:error(any())).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
@@ -114,6 +116,11 @@ recv(S, {DataTag, ClosedTag, ErrorTag}) ->
Other -> {other, Other}
end.
+sync_recv(Sock, Length) when ?IS_SSL(Sock) ->
+ ssl:recv(Sock#ssl_socket.ssl, Length);
+sync_recv(Sock, Length) ->
+ gen_tcp:recv(Sock, Length).
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index fb74d4a3..7d844c72 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -30,11 +30,14 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
+ %% Utils
+-export([all_rabbit_nodes_up/0, run_outside_applications/1]).
+
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
--record(state, {monitors, partitions, subscribers, down_ping_timer}).
+-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}).
%%----------------------------------------------------------------------------
@@ -57,6 +60,9 @@
-spec(partitions/0 :: () -> {node(), [node()]}).
-spec(subscribe/1 :: (pid()) -> 'ok').
+-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
+-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -194,10 +200,12 @@ init([]) ->
%% writing out the cluster status files - bad things can then
%% happen.
process_flag(trap_exit, true),
+ net_kernel:monitor_nodes(true),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
- partitions = []}}.
+ partitions = [],
+ autoheal = rabbit_autoheal:init()}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -251,16 +259,22 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
ok = handle_dead_rabbit(Node),
[P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
{noreply, handle_dead_rabbit_state(
+ Node,
State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
+handle_info({nodedown, Node}, State) ->
+ ok = handle_dead_node(Node),
+ {noreply, State};
+
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
State = #state{partitions = Partitions,
- monitors = Monitors}) ->
+ monitors = Monitors,
+ autoheal = AState}) ->
%% We will not get a node_up from this node - yet we should treat it as
%% up (mostly).
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
@@ -271,7 +285,13 @@ handle_info({mnesia_system_event,
ok = handle_live_rabbit(Node),
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State1#state{partitions = Partitions1}};
+ {noreply, State1#state{partitions = Partitions1,
+ autoheal = rabbit_autoheal:maybe_start(AState)}};
+
+handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
+ partitions = Partitions}) ->
+ AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
+ {noreply, State#state{autoheal = AState1}};
handle_info(ping_nodes, State) ->
%% We ping nodes when some are down to ensure that we find out
@@ -318,6 +338,18 @@ handle_dead_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
+ ok.
+
+handle_dead_node(_Node) ->
+ %% In general in rabbit_node_monitor we care about whether the
+ %% rabbit application is up rather than the node; we do this so
+ %% that we can respond in the same way to "rabbitmqctl stop_app"
+ %% and "rabbitmqctl stop" as much as possible.
+ %%
+ %% However, for pause_minority mode we can't do this, since we
+ %% depend on looking at whether other nodes are up to decide
+ %% whether to come back up ourselves - if we decide that based on
+ %% the rabbit application we would go down and never come back.
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
case majority() of
@@ -326,44 +358,32 @@ handle_dead_rabbit(Node) ->
end;
{ok, ignore} ->
ok;
+ {ok, autoheal} ->
+ ok;
{ok, Term} ->
rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
"assuming 'ignore'~n", [Term]),
ok
- end,
- ok.
-
-majority() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
-
-all_nodes_up() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) =:= length(Nodes).
-
-%% mnesia:system_info(db_nodes) (and hence
-%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
-%% when partitioned.
-alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
-
-alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
-
-alive_rabbit_nodes() ->
- [N || N <- alive_nodes(), rabbit_nodes:is_process_running(N, rabbit)].
+ end.
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
Nodes = rabbit_mnesia:cluster_nodes(all),
+ run_outside_applications(fun () ->
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+
+run_outside_applications(Fun) ->
spawn(fun () ->
%% If our group leader is inside an application we are about
%% to stop, application:stop/1 does not return.
group_leader(whereis(init), self()),
- %% Ensure only one restarting process at a time, will
+ %% Ensure only one such process at a time, will
%% exit(badarg) (harmlessly) if one is already running
- register(rabbit_restarting_process, self()),
- rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ register(rabbit_outside_app_process, self()),
+ Fun()
end).
wait_for_cluster_recovery(Nodes) ->
@@ -373,7 +393,8 @@ wait_for_cluster_recovery(Nodes) ->
wait_for_cluster_recovery(Nodes)
end.
-handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+handle_dead_rabbit_state(Node, State = #state{partitions = Partitions,
+ autoheal = Autoheal}) ->
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -383,7 +404,9 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
[] -> [];
_ -> Partitions
end,
- ensure_ping_timer(State#state{partitions = Partitions1}).
+ ensure_ping_timer(
+ State#state{partitions = Partitions1,
+ autoheal = rabbit_autoheal:node_down(Node, Autoheal)}).
ensure_ping_timer(State) ->
rabbit_misc:ensure_timer(
@@ -416,3 +439,30 @@ legacy_should_be_disc_node(DiscNodes) ->
add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
del_node(Node, Nodes) -> Nodes -- [Node].
+
+%%--------------------------------------------------------------------
+
+%% mnesia:system_info(db_nodes) (and hence
+%% rabbit_mnesia:cluster_nodes(running)) does not give reliable
+%% results when partitioned. So we have a small set of replacement
+%% functions here. "rabbit" in a function's name implies we test if
+%% the rabbit application is up, not just the node.
+
+majority() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
+
+all_nodes_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_nodes(Nodes)) =:= length(Nodes).
+
+all_rabbit_nodes_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
+
+alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
+
+alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
+
+alive_rabbit_nodes(Nodes) ->
+ [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)].
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7398cd2d..0990c662 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,7 +46,8 @@ name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
+ X#exchange{policy = set0(Name)}).
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
@@ -170,9 +171,14 @@ update_policies(VHost) ->
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
case match(XName, Policies) of
OldPolicy -> no_change;
- NewPolicy -> rabbit_exchange:update(
- XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
- {X, X#exchange{policy = NewPolicy}}
+ NewPolicy -> case rabbit_exchange:update(
+ XName, fun (X0) ->
+ rabbit_exchange_decorator:set(
+ X0 #exchange{policy = NewPolicy})
+ end) of
+ #exchange{} = X1 -> {X, X1};
+ not_found -> {X, X }
+ end
end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index b1238623..96277b68 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -162,15 +162,16 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
{?'id-at-pseudonym' , "PSEUDONYM"},
{?'id-domainComponent' , "DC"},
{?'id-emailAddress' , "EMAILADDRESS"},
- {?'street-address' , "STREET"}],
+ {?'street-address' , "STREET"},
+ {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl
case proplists:lookup(T, Fmts) of
{_, Fmt} ->
- io_lib:format(Fmt ++ "=~s", [FV]);
+ rabbit_misc:format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
- TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
- io_lib:format("~s:~s", [string:join(TypeL, "."), FV]);
+ TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)],
+ rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]);
none ->
- io_lib:format("~p:~s", [T, FV])
+ rabbit_misc:format("~p=~s", [T, FV])
end.
%% Escape a string as per RFC4514.
@@ -204,14 +205,26 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
format_directory_string(ST, S);
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
- io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
- [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+ rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
+ [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
%% We appear to get an untagged value back for an ia5string
%% (e.g. domainComponent).
format_asn1_value(V) when is_list(V) ->
V;
+format_asn1_value(V) when is_binary(V) ->
+ %% OTP does not decode some values when combined with an unknown
+ %% type. That's probably wrong, so as a last ditch effort let's
+ %% try manually decoding. 'DirectoryString' is semi-arbitrary -
+ %% but it is the type which covers the various string types we
+ %% handle below.
+ try
+ {ST, S} = public_key:der_decode('DirectoryString', V),
+ format_directory_string(ST, S)
+ catch _:_ ->
+ rabbit_misc:format("~p", [V])
+ end;
format_asn1_value(V) ->
- io_lib:format("~p", [V]).
+ rabbit_misc:format("~p", [V]).
%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
%% teletexString TeletexString (SIZE (1..maxSize)),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fc529797..2e46304f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -63,6 +63,7 @@ all_tests() ->
passed = test_server_status(),
passed = test_amqp_connection_refusal(),
passed = test_confirms(),
+ passed = test_with_state(),
passed =
do_if_secondary_node(
fun run_cluster_dependent_tests/1,
@@ -563,8 +564,9 @@ test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,
name = <<"test_exchange">>},
- X = #exchange{name = XName, type = topic, durable = false,
- auto_delete = false, arguments = []},
+ X0 = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ X = rabbit_exchange_decorator:set(X0),
%% create
rabbit_exchange_type_topic:validate(X),
exchange_op_callback(X, create, []),
@@ -1299,6 +1301,11 @@ test_confirms() ->
passed.
+test_with_state() ->
+ fhc_state = gen_server2:with_state(file_handle_cache,
+ fun (S) -> element(1, S) end),
+ passed.
+
test_statistics_event_receiver(Pid) ->
receive
Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 457b1567..b7b1635b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -43,6 +43,7 @@
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
+-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -68,6 +69,7 @@
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
+-spec(exchange_decorators/0 :: () -> 'ok').
-endif.
@@ -282,6 +284,20 @@ gm_pids() ->
|| T <- Tables],
ok.
+exchange_decorators() ->
+ ok = exchange_decorators(rabbit_exchange),
+ ok = exchange_decorators(rabbit_durable_exchange).
+
+exchange_decorators(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ Policy}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, Policy,
+ {[], []}}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index b3e9ec66..c28b0cd5 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -33,18 +33,23 @@
%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
- Conns = (sup_memory(rabbit_tcp_client_sup) +
- sup_memory(ssl_connection_sup) +
- sup_memory(amqp_sup)),
- Qs = (sup_memory(rabbit_amqqueue_sup) +
- sup_memory(rabbit_mirror_queue_slave_sup)),
+ ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup],
+ QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup],
+ MsgIndexProcs = [msg_store_transient, msg_store_persistent],
+ MgmtDbProcs = [rabbit_mgmt_sup],
+ PluginProcs = plugin_sups(),
+
+ All = [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs],
+
+ {Sums, _Other} = sum_processes(lists:append(All), [memory]),
+
+ [Conns, Qs, MsgIndexProc, MgmtDbProc, AllPlugins] =
+ [aggregate_memory(Names, Sums) || Names <- All],
+
Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
- MsgIndexProc = (pid_memory(msg_store_transient) +
- pid_memory(msg_store_persistent)),
MgmtDbETS = ets_memory(rabbit_mgmt_db),
- MgmtDbProc = sup_memory(rabbit_mgmt_sup),
- Plugins = plugin_memory() - MgmtDbProc,
+ Plugins = AllPlugins - MgmtDbProc,
[{total, Total},
{processes, Processes},
@@ -55,7 +60,7 @@ memory() ->
{system, System}] =
erlang:memory([total, processes, ets, atom, binary, code, system]),
- OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins,
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - AllPlugins,
[{total, Total},
{connection_procs, Conns},
@@ -78,35 +83,6 @@ memory() ->
%%----------------------------------------------------------------------------
-sup_memory(Sup) ->
- lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) +
- pid_memory(Sup).
-
-sup_children(Sup) ->
- rabbit_misc:with_exit_handler(
- rabbit_misc:const([]),
- fun () ->
- %% Just in case we end up talking to something that is
- %% not a supervisor by mistake.
- case supervisor:which_children(Sup) of
- L when is_list(L) -> L;
- _ -> []
- end
- end).
-
-pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
- {memory, M} -> M;
- _ -> 0
- end;
-pid_memory(Name) when is_atom(Name) -> case whereis(Name) of
- P when is_pid(P) -> pid_memory(P);
- _ -> 0
- end.
-
-child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid);
-child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid);
-child_memory(_, _) -> 0.
-
mnesia_memory() ->
case mnesia:system_info(is_running) of
yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
@@ -121,20 +97,132 @@ ets_memory(Name) ->
bytes(Words) -> Words * erlang:system_info(wordsize).
-plugin_memory() ->
- lists:sum([plugin_memory(App) ||
- {App, _, _} <- application:which_applications(),
- is_plugin(atom_to_list(App))]).
+plugin_sups() ->
+ lists:append([plugin_sup(App) ||
+ {App, _, _} <- application:which_applications(),
+ is_plugin(atom_to_list(App))]).
-plugin_memory(App) ->
+plugin_sup(App) ->
case application_controller:get_master(App) of
- undefined -> 0;
+ undefined -> [];
Master -> case application_master:get_child(Master) of
- {Pid, _} when is_pid(Pid) -> sup_memory(Pid);
- Pid when is_pid(Pid) -> sup_memory(Pid);
- _ -> 0
+ {Pid, _} when is_pid(Pid) -> [process_name(Pid)];
+ Pid when is_pid(Pid) -> [process_name(Pid)];
+ _ -> []
end
end.
+process_name(Pid) ->
+ case process_info(Pid, registered_name) of
+ {registered_name, Name} -> Name;
+ _ -> Pid
+ end.
+
is_plugin("rabbitmq_" ++ _) -> true;
is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).
+
+aggregate_memory(Names, Sums) ->
+ lists:sum([extract_memory(Name, Sums) || Name <- Names]).
+
+extract_memory(Name, Sums) ->
+ {value, {_, Accs}} = lists:keysearch(Name, 1, Sums),
+ {value, {memory, V}} = lists:keysearch(memory, 1, Accs),
+ V.
+
+%%----------------------------------------------------------------------------
+
+%% NB: this code is non-rabbit specific.
+
+-ifdef(use_specs).
+-type(process() :: pid() | atom()).
+-type(info_key() :: atom()).
+-type(info_value() :: any()).
+-type(info_item() :: {info_key(), info_value()}).
+-type(accumulate() :: fun ((info_key(), info_value(), info_value()) ->
+ info_value())).
+-spec(sum_processes/2 :: ([process()], [info_key()]) ->
+ {[{process(), [info_item()]}], [info_item()]}).
+-spec(sum_processes/3 :: ([process()], accumulate(), [info_item()]) ->
+ {[{process(), [info_item()]}], [info_item()]}).
+-endif.
+
+sum_processes(Names, Items) ->
+ sum_processes(Names, fun (_, X, Y) -> X + Y end,
+ [{Item, 0} || Item <- Items]).
+
+%% summarize the process_info of all processes based on their
+%% '$ancestor' hierarchy, recorded in their process dictionary.
+%%
+%% The function takes
+%%
+%% 1) a list of names/pids of processes that are accumulation points
+%% in the hierarchy.
+%%
+%% 2) a function that aggregates individual info items -taking the
+%% info item key, value and accumulated value as the input and
+%% producing a new accumulated value.
+%%
+%% 3) a list of info item key / initial accumulator value pairs.
+%%
+%% The process_info of a process is accumulated at the nearest of its
+%% ancestors that is mentioned in the first argument, or, if no such
+%% ancestor exists or the ancestor information is absent, in a special
+%% 'other' bucket.
+%%
+%% The result is a pair consisting of
+%%
+%% 1) a k/v list, containing for each of the accumulation names/pids a
+%% list of info items, containing the accumulated data, and
+%%
+%% 2) the 'other' bucket - a list of info items containing the
+%% accumulated data of all processes with no matching ancestors
+%%
+%% Note that this function operates on names as well as pids, but
+%% these must match whatever is contained in the '$ancestor' process
+%% dictionary entry. Generally that means for all registered processes
+%% the name should be used.
+sum_processes(Names, Fun, Acc0) ->
+ Items = [Item || {Item, _Val0} <- Acc0],
+ Acc0Dict = orddict:from_list(Acc0),
+ NameAccs0 = orddict:from_list([{Name, Acc0Dict} || Name <- Names]),
+ {NameAccs, OtherAcc} =
+ lists:foldl(
+ fun (Pid, Acc) ->
+ InfoItems = [registered_name, dictionary | Items],
+ case process_info(Pid, InfoItems) of
+ undefined ->
+ Acc;
+ [{registered_name, RegName}, {dictionary, D} | Vals] ->
+ %% see docs for process_info/2 for the
+ %% special handling of 'registered_name'
+ %% info items
+ Extra = case RegName of
+ [] -> [];
+ N -> [N]
+ end,
+ accumulate(find_ancestor(Extra, D, Names), Fun,
+ orddict:from_list(Vals), Acc)
+ end
+ end, {NameAccs0, Acc0Dict}, processes()),
+ %% these conversions aren't strictly necessary; we do them simply
+ %% for the sake of encapsulating the representation.
+ {[{Name, orddict:to_list(Accs)} ||
+ {Name, Accs} <- orddict:to_list(NameAccs)],
+ orddict:to_list(OtherAcc)}.
+
+find_ancestor(Extra, D, Names) ->
+ Ancestors = case lists:keysearch('$ancestors', 1, D) of
+ {value, {_, Ancs}} -> Ancs;
+ false -> []
+ end,
+ case lists:splitwith(fun (A) -> not lists:member(A, Names) end,
+ Extra ++ Ancestors) of
+ {_, []} -> undefined;
+ {_, [Name | _]} -> Name
+ end.
+
+accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}) ->
+ {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)};
+accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}) ->
+ F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end,
+ {orddict:update(Name, F, NameAccs), OtherAcc}.