From e54434c75490ff3995fb97afb5fafd1541ec81f7 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 11 Nov 2013 14:08:08 +0000 Subject: Port the deadlock-breaking code from bug 25852. --- src/rabbit_mirror_queue_misc.erl | 34 ++++++++++++++-------------------- src/rabbit_mirror_queue_slave.erl | 36 ++++++++++++++++++++++++++---------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 8ad7c62f..47a44278 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -19,7 +19,8 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1]). + is_mirrored/1, update_mirrors/2, validate_policy/1, + maybe_auto_sync/1]). %% for testing only -export([module/1]). @@ -94,10 +95,14 @@ remove_from_queue(QueueName, Self, LiveGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - store_updated_slaves( - Q #amqqueue { pid = QPid1, + Q1 = Q#amqqueue{pid = QPid1, slave_pids = SPids1, - gm_pids = GMPids1 }), + gm_pids = GMPids1}, + store_updated_slaves(Q1), + %% If we add and remove nodes at the same time + %% it's possible we need to sync after removing + %% the master. Let's check. + maybe_auto_sync(Q1), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -179,26 +184,15 @@ add_mirror(QName, MirrorNode) -> end end). -start_child(Name, MirrorNode, Q) -> +start_child(_Name, MirrorNode, Q) -> + %% TODO re-add some log stuff here. case rabbit_misc:with_exit_handler( - rabbit_misc:const({ok, down}), + rabbit_misc:const(down), fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, SPid} when is_pid(SPid) -> - maybe_auto_sync(Q), - rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - {ok, started}; - {error, {{stale_master_pid, StalePid}, _}} -> - rabbit_log:warning("Detected stale HA master while adding " - "mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, StalePid]), - {ok, stale_master}; - {error, {{duplicate_live_master, _}=Err, _}} -> - Err; - Other -> - Other + {ok, SPid} -> rabbit_mirror_queue_slave:go(SPid); + _ -> ok end. report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6f78d1d2..aa69aeed 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1]). +-export([start_link/1, set_maximum_since_use/2, info/1, go/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +78,14 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(Q = #amqqueue { name = QName }) -> +init(Q) -> + {ok, {not_started, Q}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}. + +go(SPid) -> gen_server2:cast(SPid, go). + +handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -124,22 +131,24 @@ init(Q = #amqqueue { name = QName }) -> }, ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}; + rabbit_mirror_queue_misc:maybe_auto_sync(Q1), + {noreply, State}; {stale, StalePid} -> - {stop, {stale_master_pid, StalePid}}; + gm:leave(GM), + {stop, {stale_master_pid, StalePid}, NotStarted}; duplicate_live_master -> - {stop, {duplicate_live_master, Node}}; + gm:leave(GM), + {stop, {duplicate_live_master, Node}, NotStarted}; existing -> gm:leave(GM), - ignore; + {stop, normal, NotStarted}; master_in_recovery -> + gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - ignore + {stop, normal, NotStarted} end. init_it(Self, GM, Node, QName) -> @@ -208,6 +217,9 @@ handle_call({gm_deaths, LiveGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). +handle_cast(go, {not_started, _Q} = NotStarted) -> + handle_go(NotStarted); + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -293,6 +305,8 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. +terminate(normal, {not_started, _Q}) -> + ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -403,7 +417,9 @@ handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). inform_deaths(SPid, Live) -> - case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of + case rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of ok -> ok; {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} end. -- cgit v1.2.1 From f867807a58ccb611c8fc22eef3d01ceb7c8fc312 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 11 Nov 2013 14:22:57 +0000 Subject: Re-add the log invocations we lost. This does lead to the stale master pid warning getting logged on a different node (the slave node rather than the node that attempted to start the slave) but I doubt we care about that. --- src/rabbit_mirror_queue_misc.erl | 7 ++++--- src/rabbit_mirror_queue_slave.erl | 3 +++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 47a44278..c53ad7b9 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -184,14 +184,15 @@ add_mirror(QName, MirrorNode) -> end end). -start_child(_Name, MirrorNode, Q) -> - %% TODO re-add some log stuff here. +start_child(Name, MirrorNode, Q) -> case rabbit_misc:with_exit_handler( rabbit_misc:const(down), fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, SPid} -> rabbit_mirror_queue_slave:go(SPid); + {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid); _ -> ok end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index aa69aeed..69e68d20 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -134,6 +134,9 @@ handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) -> rabbit_mirror_queue_misc:maybe_auto_sync(Q1), {noreply, State}; {stale, StalePid} -> + rabbit_log:warning("Detected stale HA master while adding " + "mirror of ~s: ~p~n", + [rabbit_misc:rs(QName), StalePid]), gm:leave(GM), {stop, {stale_master_pid, StalePid}, NotStarted}; duplicate_live_master -> -- cgit v1.2.1 From 451849adf9a5e0b589d5ed85af1663d129eebf19 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 11 Nov 2013 15:01:37 +0000 Subject: Forgot to add a spec --- src/rabbit_mirror_queue_misc.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index c53ad7b9..65cc58be 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -57,6 +57,7 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -endif. -- cgit v1.2.1 From cd629f5c15b4113b366bca43b159a55fa851dcfd Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 12 Nov 2013 15:03:12 +0000 Subject: Restore synchronous addition of slaves in the queue.declare case. --- Makefile | 2 +- src/rabbit_mirror_queue_master.erl | 8 +++++++- src/rabbit_mirror_queue_misc.erl | 26 ++++++++++++-------------- src/rabbit_mirror_queue_slave.erl | 30 ++++++++++++++++++++---------- 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/Makefile b/Makefile index e413f879..859825f5 100644 --- a/Makefile +++ b/Makefile @@ -222,7 +222,7 @@ start-background-node: all start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) - ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid + #./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3abd81f5..d9cef642 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -110,7 +110,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), + %% We need synchronous add here (i.e. do not return until the + %% slave is running) so that when queue declaration is finished + %% all slaves are up; we don't want to end up with unsynced slaves + %% just by declaring a new queue. But add can't be synchronous all + %% the time as it can be called by slaves and that's + %% deadlock-prone. + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), #state { name = QName, gm = GM, coordinator = CPid, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 65cc58be..f2c8b211 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). --export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, +-export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, maybe_auto_sync/1]). @@ -46,10 +46,8 @@ (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). --spec(add_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> - {'ok', atom()} | rabbit_types:error(any())). +-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async') + -> 'ok'). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -141,7 +139,7 @@ on_node_up() -> end end, [], rabbit_queue) end), - [add_mirror(QName, node()) || QName <- QNames], + [add_mirror(QName, node(), async) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> @@ -166,26 +164,26 @@ drop_mirror(QName, MirrorNode) -> end end). -add_mirrors(QName, Nodes) -> - [add_mirror(QName, Node) || Node <- Nodes], +add_mirrors(QName, Nodes, SyncMode) -> + [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. -add_mirror(QName, MirrorNode) -> +add_mirror(QName, MirrorNode, SyncMode) -> rabbit_amqqueue:with( QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> - start_child(Name, MirrorNode, Q); + start_child(Name, MirrorNode, Q, SyncMode); [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q) + false -> start_child(Name, MirrorNode, Q, SyncMode) end end end). -start_child(Name, MirrorNode, Q) -> +start_child(Name, MirrorNode, Q, SyncMode) -> case rabbit_misc:with_exit_handler( rabbit_misc:const(down), fun () -> @@ -193,7 +191,7 @@ start_child(Name, MirrorNode, Q) -> end) of {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid); + rabbit_mirror_queue_slave:go(SPid, SyncMode); _ -> ok end. @@ -308,7 +306,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], - add_mirrors (QName, NewNodes -- OldNodes), + add_mirrors (QName, NewNodes -- OldNodes, async), drop_mirrors(QName, OldNodes -- NewNodes), maybe_auto_sync(NewQ), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 69e68d20..96f89ecc 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1, go/1]). +-export([start_link/1, set_maximum_since_use/2, info/1, go/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -83,9 +83,10 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -go(SPid) -> gen_server2:cast(SPid, go). +go(SPid, sync) -> gen_server2:call(SPid, go, infinity); +go(SPid, async) -> gen_server2:cast(SPid, go). -handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) -> +handle_go(Q = #amqqueue{name = QName}) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -132,26 +133,26 @@ handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) -> ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), rabbit_mirror_queue_misc:maybe_auto_sync(Q1), - {noreply, State}; + {ok, State}; {stale, StalePid} -> rabbit_log:warning("Detected stale HA master while adding " "mirror of ~s: ~p~n", [rabbit_misc:rs(QName), StalePid]), gm:leave(GM), - {stop, {stale_master_pid, StalePid}, NotStarted}; + {error, {stale_master_pid, StalePid}}; duplicate_live_master -> gm:leave(GM), - {stop, {duplicate_live_master, Node}, NotStarted}; + {error, {duplicate_live_master, Node}}; existing -> gm:leave(GM), - {stop, normal, NotStarted}; + {error, normal}; master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - {stop, normal, NotStarted} + {error, normal} end. init_it(Self, GM, Node, QName) -> @@ -185,6 +186,12 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). +handle_call(go, _From, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {reply, ok, State}; + {error, Error} -> {stop, Error, NotStarted} + end; + handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), @@ -220,8 +227,11 @@ handle_call({gm_deaths, LiveGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). -handle_cast(go, {not_started, _Q} = NotStarted) -> - handle_go(NotStarted); +handle_cast(go, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {noreply, State}; + {error, Error} -> {stop, Error, NotStarted} + end; handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -- cgit v1.2.1 From b6a6cf56afb68c45a409e9879c7629663cf655ea Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 12 Nov 2013 15:06:45 +0000 Subject: *cough* --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 859825f5..e413f879 100644 --- a/Makefile +++ b/Makefile @@ -222,7 +222,7 @@ start-background-node: all start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) - #./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid + ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -- cgit v1.2.1