summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-07-30 13:34:19 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-07-30 13:34:19 +0100
commit8425507c6987f1892a443b3796e4ae16dd781298 (patch)
treef4dbe531a37b99361ed59bfc54cd36b0c1966a84
parent0a5477e7801e57754f312715a8f784745ff9a3d9 (diff)
parentf620eb3dfe99ee83e24477a5fef1dda490ff286b (diff)
downloadrabbitmq-server-bug25048.tar.gz
merge defaultbug25048
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl49
-rw-r--r--src/rabbit_mirror_queue_misc.erl23
-rw-r--r--src/rabbit_mirror_queue_slave.erl22
-rw-r--r--src/rabbit_upgrade_functions.erl15
6 files changed, 67 insertions, 51 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index e8b4a623..d6fac46d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,8 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes, policy}).
+ arguments, pid, slave_pids, sync_slave_pids, mirror_nodes,
+ policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d82ac266..a5f227bc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0]).
+-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
@@ -102,6 +102,7 @@
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
+-spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -215,6 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
+ sync_slave_pids = [],
mirror_nodes = MNodes}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -475,6 +477,8 @@ force_event_refresh(QNames) ->
force_event_refresh(Failed)
end.
+wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
+
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 388af413..b4071627 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -93,6 +93,7 @@
consumers,
memory,
slave_pids,
+ synchronised_slave_pids,
backing_queue_status
]).
@@ -102,9 +103,7 @@
durable,
auto_delete,
arguments,
- owner_pid,
- slave_pids,
- synchronised_slave_pids
+ owner_pid
]).
-define(INFO_KEYS,
@@ -893,37 +892,7 @@ make_dead_letter_msg(Reason,
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) ->
- {Prefix, Items1} =
- case lists:member(synchronised_slave_pids, Items) of
- true -> Prefix1 = slaves_status(State),
- case lists:member(slave_pids, Items) of
- true -> {Prefix1, Items -- [slave_pids]};
- false -> {proplists:delete(slave_pids, Prefix1), Items}
- end;
- false -> {[], Items}
- end,
- Prefix ++ [{Item, i(Item, State)}
- || Item <- (Items1 -- [synchronised_slave_pids])].
-
-slaves_status(#q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} ->
- [{slave_pids, ''}, {synchronised_slave_pids, ''}];
- {ok, #amqqueue{slave_pids = SPids}} ->
- {Results, _Bad} =
- delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
- {SPids1, SSPids} =
- lists:foldl(
- fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
- {[Pid | SPidsN],
- case proplists:get_bool(is_synchronised, Infos) of
- true -> [Pid | SSPidsN];
- false -> SSPidsN
- end}
- end, {[], []}, Results),
- [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
- end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -957,9 +926,14 @@ i(memory, _) ->
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{mirror_nodes = undefined}} -> '';
{ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
+i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> '';
+ {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids
+ end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1307,7 +1281,10 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State);
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- dead_letter_msg(Msg, AckTag, Reason, State).
+ dead_letter_msg(Msg, AckTag, Reason, State);
+
+handle_cast(wake_up, State) ->
+ noreply(State).
%% We need to not ignore this as we need to remove outstanding
%% confirms due to queue death.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ba62a734..29e2d29f 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -18,7 +18,7 @@
-export([remove_from_queue/2, on_node_up/0,
drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
- report_deaths/4]).
+ report_deaths/4, store_updated_slaves/1]).
-include("rabbit.hrl").
@@ -37,6 +37,8 @@
-spec(add_mirror/3 ::
(rabbit_types:vhost(), binary(), atom())
-> rabbit_types:ok_or_error(any())).
+-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
+ rabbit_types:amqqueue()).
-endif.
@@ -58,8 +60,8 @@ remove_from_queue(QueueName, DeadPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid),
@@ -72,9 +74,9 @@ remove_from_queue(QueueName, DeadPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 },
- ok = rabbit_amqqueue:store_queue(Q1),
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
@@ -192,3 +194,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
end,
rabbit_misc:pid_to_string(MirrorPid),
[[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
+
+store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ ok = rabbit_amqqueue:store_queue(Q1),
+ %% Wake it up so that we emit a stats event
+ rabbit_amqqueue:wake_up(Q1),
+ Q1.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c4ae307c..e4d78c45 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -155,7 +155,8 @@ init_it(Self, Node, QueueName) ->
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
[] ->
MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
{new, QPid};
[QPid] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -166,8 +167,8 @@ init_it(Self, Node, QueueName) ->
case rabbit_misc:is_process_alive(SPid) of
true -> existing;
false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{ slave_pids = MPids1 }),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
{new, QPid}
end
end.
@@ -465,8 +466,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
- rabbit_event:notify(queue_slave_promoted, [{pid, self()},
- {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
[rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
@@ -933,8 +932,17 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
%% unsynchronised: we assert that can never happen.
set_synchronised(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
- rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
- {name, QName}]),
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2)
+ end
+ end),
State #state { synchronised = true };
set_synchronised(true, State) ->
State;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 18704807..47b22b98 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -40,6 +40,7 @@
-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
-rabbit_upgrade({policy, mnesia,
[exchange_scratches, ha_mirrors]}).
+-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -62,6 +63,7 @@
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
+-spec(sync_slave_pids/0 :: () -> 'ok').
-endif.
@@ -240,6 +242,19 @@ queue_policy(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid,
slave_pids, mirror_nodes, policy]).
+sync_slave_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddSyncSlavesFun =
+ fun ({amqqueue, N, D, AD, Excl, Args, Pid, SPids, MNodes, Pol}) ->
+ {amqqueue, N, D, AD, Excl, Args, Pid, SPids, [], MNodes, Pol}
+ end,
+ [ok = transform(T, AddSyncSlavesFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, mirror_nodes, policy])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->