summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim Apperly <jim@rabbitmq.com>2011-12-07 10:58:09 +0000
committerJim Apperly <jim@rabbitmq.com>2011-12-07 10:58:09 +0000
commit7c5faf36be6e4f4b21f96a6341a8c7debf6b3912 (patch)
treefbc93e6f0bc14c008bbd8b363c9ad86cbf47a2a7
parent3ab91d66007a31038b17cd78076bc1e070da1ce1 (diff)
parent76ba0cd2e756d76e7e9ce945afa99bf464d4460a (diff)
downloadrabbitmq-server-7c5faf36be6e4f4b21f96a6341a8c7debf6b3912.tar.gz
Merge bug24595 into default
-rw-r--r--src/mirrored_supervisor.erl42
-rw-r--r--src/mirrored_supervisor_tests.erl27
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_binding.erl22
-rw-r--r--src/rabbit_mirror_queue_slave.erl8
-rw-r--r--src/rabbit_networking.erl10
6 files changed, 90 insertions, 21 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 8dfe39f8..6e8f96d9 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -242,8 +242,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) ->
start_link0(Prefix, Group, Init) ->
case apply(?SUPERVISOR, start_link,
Prefix ++ [?MODULE, {overall, Group, Init}]) of
- {ok, Pid} -> call(Pid, {init, Pid}),
- {ok, Pid};
+ {ok, Pid} -> case catch call(Pid, {init, Pid}) of
+ ok -> {ok, Pid};
+ E -> E
+ end;
Other -> Other
end.
@@ -346,13 +348,20 @@ handle_call({init, Overall}, _From,
end || Pid <- Rest],
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
- [maybe_start(Group, Delegate, S) || S <- ChildSpecs],
- {reply, ok, State#state{overall = Overall, delegate = Delegate}};
+ State1 = State#state{overall = Overall, delegate = Delegate},
+ case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ true -> {reply, ok, State1};
+ false -> {stop, shutdown, State1}
+ end;
handle_call({start_child, ChildSpec}, _From,
State = #state{delegate = Delegate,
group = Group}) ->
- {reply, maybe_start(Group, Delegate, ChildSpec), State};
+ {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ already_in_mnesia -> {error, already_present};
+ {already_in_mnesia, Pid} -> {error, {already_started, Pid}};
+ Else -> Else
+ end, State};
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group}) ->
@@ -400,13 +409,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
Self = self(),
- case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> ok
- end,
- {noreply, State};
+ R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
+ [Self | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(fun() -> update_all(Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
+ end,
+ case all_started(R) of
+ true -> {noreply, State};
+ false -> {stop, shutdown, State}
+ end;
handle_info(Info, State) ->
{stop, {unexpected_info, Info}, State}.
@@ -428,8 +440,8 @@ maybe_start(Group, Delegate, ChildSpec) ->
check_start(Group, Delegate, ChildSpec)
end) of
{atomic, start} -> start(Delegate, ChildSpec);
- {atomic, undefined} -> {error, already_present};
- {atomic, Pid} -> {error, {already_started, Pid}};
+ {atomic, undefined} -> already_in_mnesia;
+ {atomic, Pid} -> {already_in_mnesia, Pid};
%% If we are torn down while in the transaction...
{aborted, E} -> {error, E}
end.
@@ -499,6 +511,8 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
+all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+
%%----------------------------------------------------------------------------
create_tables() ->
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 6c91bc4f..d87fddd1 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -36,13 +36,16 @@ all_tests() ->
passed = test_already_there(),
passed = test_delete_restart(),
passed = test_which_children(),
- passed = test_large_group(),
+%% commented out in order to determine whether this is the only test
+%% that is failing - see bug 24362
+%% passed = test_large_group(),
passed = test_childspecs_at_init(),
passed = test_anonymous_supervisors(),
passed = test_no_migration_on_shutdown(),
passed = test_start_idempotence(),
passed = test_unsupported(),
passed = test_ignore(),
+ passed = test_startup_failure(),
passed.
%% Simplest test
@@ -195,6 +198,22 @@ test_ignore() ->
{sup, fake_strategy_for_ignore, []}),
passed.
+test_startup_failure() ->
+ [test_startup_failure(F) || F <- [want_error, want_exit]],
+ passed.
+
+test_startup_failure(Fail) ->
+ process_flag(trap_exit, true),
+ ?MS:start_link(get_group(group), ?MODULE,
+ {sup, one_for_one, [childspec(Fail)]}),
+ receive
+ {'EXIT', _, shutdown} ->
+ ok
+ after 1000 ->
+ exit({did_not_exit, Fail})
+ end,
+ process_flag(trap_exit, false).
+
%% ---------------------------------------------------------------------------
with_sups(Fun, Sups) ->
@@ -228,6 +247,12 @@ start_sup0(Name, Group, ChildSpecs) ->
childspec(Id) ->
{Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.
+start_gs(want_error) ->
+ {error, foo};
+
+start_gs(want_exit) ->
+ exit(foo);
+
start_gs(Id) ->
gen_server:start_link({local, Id}, ?MODULE, server, []).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b3e92b69..96017df8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -244,7 +244,7 @@ determine_queue_nodes(Args) ->
case [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes] of
[Node] -> {Node, undefined};
- [First | Rest] -> {First, Rest}
+ [First | Rest] -> {First, [First | Rest]}
end;
{{_Type, <<"all">>}, _} ->
{node(), all};
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e625a427..0d221b05 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -277,6 +277,7 @@ has_for_source(SrcName) ->
contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
+ lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
Routes = lists:usort(
mnesia:match_object(rabbit_route, Match, write) ++
@@ -351,7 +352,28 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+%% For bulk operations we lock the tables we are operating on in order
+%% to reduce the time complexity. Without the table locks we end up
+%% with num_tables*num_bulk_bindings row-level locks. Takiing each
+%% lock takes time proportional to the number of existing locks, thus
+%% resulting in O(num_bulk_bindings^2) complexity.
+%%
+%% The locks need to be write locks since ultimately we end up
+%% removing all these rows.
+%%
+%% The downside of all this is that no other binding operations except
+%% lookup/routing (which uses dirty ops) can take place
+%% concurrently. However, that is the case already since the bulk
+%% operations involve mnesia:match_object calls with a partial key,
+%% which entails taking a table lock.
+lock_route_tables() ->
+ [mnesia:lock({table, T}, write) || T <- [rabbit_route,
+ rabbit_reverse_route,
+ rabbit_semi_durable_route,
+ rabbit_durable_route]].
+
remove_for_destination(DstName, DeleteFun) ->
+ lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 73eaed14..d68063db 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -526,9 +526,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS, MonitoringPids),
- MTC = dict:from_list(
- [{MsgId, {ChPid, MsgSeqNo}} ||
- {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2c0912df..045ab89a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -307,9 +307,15 @@ connections() ->
rabbit_networking, connections_local, []).
connections_local() ->
- [rabbit_connection_sup:reader(ConnSup) ||
+ [Reader ||
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children(rabbit_tcp_client_sup),
+ Reader <- [try
+ rabbit_connection_sup:reader(ConnSup)
+ catch exit:{noproc, _} ->
+ noproc
+ end],
+ Reader =/= noproc].
connection_info_keys() -> rabbit_reader:info_keys().