diff options
author | Jim Apperly <jim@rabbitmq.com> | 2011-12-07 10:58:09 +0000 |
---|---|---|
committer | Jim Apperly <jim@rabbitmq.com> | 2011-12-07 10:58:09 +0000 |
commit | 7c5faf36be6e4f4b21f96a6341a8c7debf6b3912 (patch) | |
tree | fbc93e6f0bc14c008bbd8b363c9ad86cbf47a2a7 | |
parent | 3ab91d66007a31038b17cd78076bc1e070da1ce1 (diff) | |
parent | 76ba0cd2e756d76e7e9ce945afa99bf464d4460a (diff) | |
download | rabbitmq-server-7c5faf36be6e4f4b21f96a6341a8c7debf6b3912.tar.gz |
Merge bug24595 into default
-rw-r--r-- | src/mirrored_supervisor.erl | 42 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 27 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 22 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 8 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 10 |
6 files changed, 90 insertions, 21 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 8dfe39f8..6e8f96d9 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -242,8 +242,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) -> start_link0(Prefix, Group, Init) -> case apply(?SUPERVISOR, start_link, Prefix ++ [?MODULE, {overall, Group, Init}]) of - {ok, Pid} -> call(Pid, {init, Pid}), - {ok, Pid}; + {ok, Pid} -> case catch call(Pid, {init, Pid}) of + ok -> {ok, Pid}; + E -> E + end; Other -> Other end. @@ -346,13 +348,20 @@ handle_call({init, Overall}, _From, end || Pid <- Rest], Delegate = child(Overall, delegate), erlang:monitor(process, Delegate), - [maybe_start(Group, Delegate, S) || S <- ChildSpecs], - {reply, ok, State#state{overall = Overall, delegate = Delegate}}; + State1 = State#state{overall = Overall, delegate = Delegate}, + case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + true -> {reply, ok, State1}; + false -> {stop, shutdown, State1} + end; handle_call({start_child, ChildSpec}, _From, State = #state{delegate = Delegate, group = Group}) -> - {reply, maybe_start(Group, Delegate, ChildSpec), State}; + {reply, case maybe_start(Group, Delegate, ChildSpec) of + already_in_mnesia -> {error, already_present}; + {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; + Else -> Else + end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, group = Group}) -> @@ -400,13 +409,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. Self = self(), - case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> ok - end, - {noreply, State}; + R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of + [Self | _] -> {atomic, ChildSpecs} = + mnesia:transaction(fun() -> update_all(Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] + end, + case all_started(R) of + true -> {noreply, State}; + false -> {stop, shutdown, State} + end; handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. @@ -428,8 +440,8 @@ maybe_start(Group, Delegate, ChildSpec) -> check_start(Group, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> {error, already_present}; - {atomic, Pid} -> {error, {already_started, Pid}}; + {atomic, undefined} -> already_in_mnesia; + {atomic, Pid} -> {already_in_mnesia, Pid}; %% If we are torn down while in the transaction... {aborted, E} -> {error, E} end. @@ -499,6 +511,8 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. +all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. + %%---------------------------------------------------------------------------- create_tables() -> diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 6c91bc4f..d87fddd1 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -36,13 +36,16 @@ all_tests() -> passed = test_already_there(), passed = test_delete_restart(), passed = test_which_children(), - passed = test_large_group(), +%% commented out in order to determine whether this is the only test +%% that is failing - see bug 24362 +%% passed = test_large_group(), passed = test_childspecs_at_init(), passed = test_anonymous_supervisors(), passed = test_no_migration_on_shutdown(), passed = test_start_idempotence(), passed = test_unsupported(), passed = test_ignore(), + passed = test_startup_failure(), passed. %% Simplest test @@ -195,6 +198,22 @@ test_ignore() -> {sup, fake_strategy_for_ignore, []}), passed. +test_startup_failure() -> + [test_startup_failure(F) || F <- [want_error, want_exit]], + passed. + +test_startup_failure(Fail) -> + process_flag(trap_exit, true), + ?MS:start_link(get_group(group), ?MODULE, + {sup, one_for_one, [childspec(Fail)]}), + receive + {'EXIT', _, shutdown} -> + ok + after 1000 -> + exit({did_not_exit, Fail}) + end, + process_flag(trap_exit, false). + %% --------------------------------------------------------------------------- with_sups(Fun, Sups) -> @@ -228,6 +247,12 @@ start_sup0(Name, Group, ChildSpecs) -> childspec(Id) -> {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. +start_gs(want_error) -> + {error, foo}; + +start_gs(want_exit) -> + exit(foo); + start_gs(Id) -> gen_server:start_link({local, Id}, ?MODULE, server, []). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b3e92b69..96017df8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -244,7 +244,7 @@ determine_queue_nodes(Args) -> case [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] of [Node] -> {Node, undefined}; - [First | Rest] -> {First, Rest} + [First | Rest] -> {First, [First | Rest]} end; {{_Type, <<"all">>}, _} -> {node(), all}; diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e625a427..0d221b05 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -277,6 +277,7 @@ has_for_source(SrcName) -> contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> + lock_route_tables(), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, Routes = lists:usort( mnesia:match_object(rabbit_route, Match, write) ++ @@ -351,7 +352,28 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). +%% For bulk operations we lock the tables we are operating on in order +%% to reduce the time complexity. Without the table locks we end up +%% with num_tables*num_bulk_bindings row-level locks. Takiing each +%% lock takes time proportional to the number of existing locks, thus +%% resulting in O(num_bulk_bindings^2) complexity. +%% +%% The locks need to be write locks since ultimately we end up +%% removing all these rows. +%% +%% The downside of all this is that no other binding operations except +%% lookup/routing (which uses dirty ops) can take place +%% concurrently. However, that is the case already since the bulk +%% operations involve mnesia:match_object calls with a partial key, +%% which entails taking a table lock. +lock_route_tables() -> + [mnesia:lock({table, T}, write) || T <- [rabbit_route, + rabbit_reverse_route, + rabbit_semi_durable_route, + rabbit_durable_route]]. + remove_for_destination(DstName, DeleteFun) -> + lock_route_tables(), Match = reverse_route( #route{binding = #binding{destination = DstName, _ = '_'}}), ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 73eaed14..d68063db 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -526,9 +526,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, SS, MonitoringPids), - MTC = dict:from_list( - [{MsgId, {ChPid, MsgSeqNo}} || - {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_, MTC0) -> + MTC0 + end, gb_trees:empty(), MSList), NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2c0912df..045ab89a 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -307,9 +307,15 @@ connections() -> rabbit_networking, connections_local, []). connections_local() -> - [rabbit_connection_sup:reader(ConnSup) || + [Reader || {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children(rabbit_tcp_client_sup), + Reader <- [try + rabbit_connection_sup:reader(ConnSup) + catch exit:{noproc, _} -> + noproc + end], + Reader =/= noproc]. connection_info_keys() -> rabbit_reader:info_keys(). |