summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_misc.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-22 12:47:50 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-22 12:47:50 +0000
commit4afc5770d7fa20fb954c8a06594bb668aa1f4579 (patch)
treea29d4106d02554d4f304eefb2d03d34a8cff9f5c /src/rabbit_mirror_queue_misc.erl
parente1e8117349e22065da6b48434c5677d4a3199343 (diff)
parentb6a6cf56afb68c45a409e9879c7629663cf655ea (diff)
downloadrabbitmq-server-4afc5770d7fa20fb954c8a06594bb668aa1f4579.tar.gz
Merge in stablebug25873
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r--src/rabbit_mirror_queue_misc.erl48
1 files changed, 19 insertions, 29 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9ebe2cfe..ca495733 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,9 +17,10 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1,
+ maybe_auto_sync/1]).
%% for testing only
-export([module/1]).
@@ -45,10 +46,8 @@
(rabbit_amqqueue:name(), pid(), [pid()])
-> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
--spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
--spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) ->
- {'ok', atom()} | rabbit_types:error(any())).
+-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async')
+ -> 'ok').
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -56,6 +55,7 @@
-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
-spec(update_mirrors/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
+-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -140,7 +140,7 @@ on_node_up() ->
end
end, [], rabbit_queue)
end),
- [add_mirror(QName, node()) || QName <- QNames],
+ [add_mirror(QName, node(), async) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
@@ -165,45 +165,35 @@ drop_mirror(QName, MirrorNode) ->
end
end).
-add_mirrors(QName, Nodes) ->
- [add_mirror(QName, Node) || Node <- Nodes],
+add_mirrors(QName, Nodes, SyncMode) ->
+ [add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
-add_mirror(QName, MirrorNode) ->
+add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_amqqueue:with(
QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
- start_child(Name, MirrorNode, Q);
+ start_child(Name, MirrorNode, Q, SyncMode);
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q)
+ false -> start_child(Name, MirrorNode, Q, SyncMode)
end
end
end).
-start_child(Name, MirrorNode, Q) ->
+start_child(Name, MirrorNode, Q, SyncMode) ->
case rabbit_misc:with_exit_handler(
- rabbit_misc:const({ok, down}),
+ rabbit_misc:const(down),
fun () ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
- {ok, SPid} when is_pid(SPid) ->
- maybe_auto_sync(Q),
- rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- {ok, started};
- {error, {{stale_master_pid, StalePid}, _}} ->
- rabbit_log:warning("Detected stale HA master while adding "
- "mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, StalePid]),
- {ok, stale_master};
- {error, {{duplicate_live_master, _}=Err, _}} ->
- Err;
- Other ->
- Other
+ {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ _ -> ok
end.
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
@@ -317,7 +307,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
{NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
- add_mirrors (QName, NewNodes -- OldNodes),
+ add_mirrors (QName, NewNodes -- OldNodes, async),
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.