diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-07-30 13:34:19 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-07-30 13:34:19 +0100 |
commit | 8425507c6987f1892a443b3796e4ae16dd781298 (patch) | |
tree | f4dbe531a37b99361ed59bfc54cd36b0c1966a84 | |
parent | 0a5477e7801e57754f312715a8f784745ff9a3d9 (diff) | |
parent | f620eb3dfe99ee83e24477a5fef1dda490ff286b (diff) | |
download | rabbitmq-server-bug25048.tar.gz |
merge defaultbug25048
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 23 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 22 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 |
6 files changed, 67 insertions, 51 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e8b4a623..d6fac46d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,8 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes, policy}). + arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, + policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d82ac266..a5f227bc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0]). +-export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). @@ -102,6 +102,7 @@ -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). +-spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -215,6 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], + sync_slave_pids = [], mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); @@ -475,6 +477,8 @@ force_event_refresh(QNames) -> force_event_refresh(Failed) end. +wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). + consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 388af413..b4071627 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -93,6 +93,7 @@ consumers, memory, slave_pids, + synchronised_slave_pids, backing_queue_status ]). @@ -102,9 +103,7 @@ durable, auto_delete, arguments, - owner_pid, - slave_pids, - synchronised_slave_pids + owner_pid ]). -define(INFO_KEYS, @@ -893,37 +892,7 @@ make_dead_letter_msg(Reason, now_micros() -> timer:now_diff(now(), {0,0,0}). -infos(Items, State) -> - {Prefix, Items1} = - case lists:member(synchronised_slave_pids, Items) of - true -> Prefix1 = slaves_status(State), - case lists:member(slave_pids, Items) of - true -> {Prefix1, Items -- [slave_pids]}; - false -> {proplists:delete(slave_pids, Prefix1), Items} - end; - false -> {[], Items} - end, - Prefix ++ [{Item, i(Item, State)} - || Item <- (Items1 -- [synchronised_slave_pids])]. - -slaves_status(#q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> - [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - {ok, #amqqueue{slave_pids = SPids}} -> - {Results, _Bad} = - delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), - {SPids1, SSPids} = - lists:foldl( - fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> - {[Pid | SPidsN], - case proplists:get_bool(is_synchronised, Infos) of - true -> [Pid | SSPidsN]; - false -> SSPidsN - end} - end, {[], []}, Results), - [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}] - end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; @@ -957,9 +926,14 @@ i(memory, _) -> M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{mirror_nodes = undefined}} -> ''; {ok, #amqqueue{slave_pids = SPids}} -> SPids end; +i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> ''; + {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids + end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1307,7 +1281,10 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State); handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> - dead_letter_msg(Msg, AckTag, Reason, State). + dead_letter_msg(Msg, AckTag, Reason, State); + +handle_cast(wake_up, State) -> + noreply(State). %% We need to not ignore this as we need to remove outstanding %% confirms due to queue death. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ba62a734..29e2d29f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,7 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4]). + report_deaths/4, store_updated_slaves/1]). -include("rabbit.hrl"). @@ -37,6 +37,8 @@ -spec(add_mirror/3 :: (rabbit_types:vhost(), binary(), atom()) -> rabbit_types:ok_or_error(any())). +-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> + rabbit_types:amqqueue()). -endif. @@ -58,8 +60,8 @@ remove_from_queue(QueueName, DeadPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids }] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), @@ -72,9 +74,9 @@ remove_from_queue(QueueName, DeadPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }, - ok = rabbit_amqqueue:store_queue(Q1), + store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -192,3 +194,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). + +store_updated_slaves(Q = #amqqueue{slave_pids = SPids, + sync_slave_pids = SSPids}) -> + SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + ok = rabbit_amqqueue:store_queue(Q1), + %% Wake it up so that we emit a stats event + rabbit_amqqueue:wake_up(Q1), + Q1. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c4ae307c..e4d78c45 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -155,7 +155,8 @@ init_it(Self, Node, QueueName) -> case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of @@ -166,8 +167,8 @@ init_it(Self, Node, QueueName) -> case rabbit_misc:is_process_alive(SPid) of true -> existing; false -> MPids1 = (MPids -- [SPid]) ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{ slave_pids = MPids1 }), + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), {new, QPid} end end. @@ -465,8 +466,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> - rabbit_event:notify(queue_slave_promoted, [{pid, self()}, - {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, @@ -933,8 +932,17 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, %% unsynchronised: we assert that can never happen. set_synchronised(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> - rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, - {name, QName}]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2) + end + end), State #state { synchronised = true }; set_synchronised(true, State) -> State; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 18704807..47b22b98 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -40,6 +40,7 @@ -rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). +-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -62,6 +63,7 @@ -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). +-spec(sync_slave_pids/0 :: () -> 'ok'). -endif. @@ -240,6 +242,19 @@ queue_policy(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, mirror_nodes, policy]). +sync_slave_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddSyncSlavesFun = + fun ({amqqueue, N, D, AD, Excl, Args, Pid, SPids, MNodes, Pol}) -> + {amqqueue, N, D, AD, Excl, Args, Pid, SPids, [], MNodes, Pol} + end, + [ok = transform(T, AddSyncSlavesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, mirror_nodes, policy]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |