diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-22 12:47:50 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-22 12:47:50 +0000 |
commit | 4afc5770d7fa20fb954c8a06594bb668aa1f4579 (patch) | |
tree | a29d4106d02554d4f304eefb2d03d34a8cff9f5c /src/rabbit_mirror_queue_misc.erl | |
parent | e1e8117349e22065da6b48434c5677d4a3199343 (diff) | |
parent | b6a6cf56afb68c45a409e9879c7629663cf655ea (diff) | |
download | rabbitmq-server-4afc5770d7fa20fb954c8a06594bb668aa1f4579.tar.gz |
Merge in stablebug25873
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 48 |
1 files changed, 19 insertions, 29 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9ebe2cfe..ca495733 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,9 +17,10 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). --export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, +-export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1]). + is_mirrored/1, update_mirrors/2, validate_policy/1, + maybe_auto_sync/1]). %% for testing only -export([module/1]). @@ -45,10 +46,8 @@ (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). --spec(add_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> - {'ok', atom()} | rabbit_types:error(any())). +-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async') + -> 'ok'). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -56,6 +55,7 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -140,7 +140,7 @@ on_node_up() -> end end, [], rabbit_queue) end), - [add_mirror(QName, node()) || QName <- QNames], + [add_mirror(QName, node(), async) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> @@ -165,45 +165,35 @@ drop_mirror(QName, MirrorNode) -> end end). -add_mirrors(QName, Nodes) -> - [add_mirror(QName, Node) || Node <- Nodes], +add_mirrors(QName, Nodes, SyncMode) -> + [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. -add_mirror(QName, MirrorNode) -> +add_mirror(QName, MirrorNode, SyncMode) -> rabbit_amqqueue:with( QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> - start_child(Name, MirrorNode, Q); + start_child(Name, MirrorNode, Q, SyncMode); [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q) + false -> start_child(Name, MirrorNode, Q, SyncMode) end end end). -start_child(Name, MirrorNode, Q) -> +start_child(Name, MirrorNode, Q, SyncMode) -> case rabbit_misc:with_exit_handler( - rabbit_misc:const({ok, down}), + rabbit_misc:const(down), fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, SPid} when is_pid(SPid) -> - maybe_auto_sync(Q), - rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - {ok, started}; - {error, {{stale_master_pid, StalePid}, _}} -> - rabbit_log:warning("Detected stale HA master while adding " - "mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, StalePid]), - {ok, stale_master}; - {error, {{duplicate_live_master, _}=Err, _}} -> - Err; - Other -> - Other + {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode); + _ -> ok end. report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> @@ -317,7 +307,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], - add_mirrors (QName, NewNodes -- OldNodes), + add_mirrors (QName, NewNodes -- OldNodes, async), drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. |