diff options
author | Jerry Kuch <jerryk@vmware.com> | 2011-12-14 11:32:04 -0800 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2011-12-14 11:32:04 -0800 |
commit | eacc62290fb9675a849bc5c09c4607a19a392bed (patch) | |
tree | ce370d8fc95b34f7e83067e979446ffaa6c219e5 | |
parent | ebfec3cae85683b7020dc7967f8847f3f25084a5 (diff) | |
parent | 37a79fa3f83aa88c5114ef68ab3b2461defd3997 (diff) | |
download | rabbitmq-server-eacc62290fb9675a849bc5c09c4607a19a392bed.tar.gz |
Merge bug24582
-rw-r--r-- | docs/rabbitmq-service.xml | 3 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 16 | ||||
-rw-r--r-- | src/gen_server2.erl | 18 | ||||
-rw-r--r-- | src/gm.erl | 42 | ||||
-rw-r--r-- | src/mirrored_supervisor.erl | 42 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 29 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 22 | ||||
-rw-r--r-- | src/rabbit_control.erl | 24 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 19 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 10 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 20 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 30 |
15 files changed, 230 insertions, 66 deletions
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index 3368960b..a4bd1580 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -66,7 +66,8 @@ Display usage information. <para> Install the service. The service will not be started. Subsequent invocations will update the service parameters if -relevant environment variables were modified. +relevant environment variables were modified or if the active +plugins were changed. </para> </listitem> </varlistentry> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index f21888bd..15755038 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1315,6 +1315,22 @@ </para> </listitem> </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>eval</command> <arg choice="req"><replaceable>expr</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Evaluate an arbitrary Erlang expression. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl eval 'node().'</screen> + <para role="example"> + This command returns the name of the node to which rabbitmqctl has connected. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ab6c4e64..49913d26 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) -> exit(process_not_registered) end; get_proc_name({global, Name}) -> - case global:safe_whereis_name(Name) of + case whereis_name(Name) of undefined -> exit(process_not_registered_globally); Pid when Pid =:= self() -> @@ -1101,7 +1101,7 @@ get_parent() -> name_to_pid(Name) -> case whereis(Name) of undefined -> - case global:safe_whereis_name(Name) of + case whereis_name(Name) of undefined -> exit(could_not_find_registerd_name); Pid -> @@ -1111,6 +1111,20 @@ name_to_pid(Name) -> Pid end. +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> + case is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> + Pid + end; + [] -> undefined + end. + find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> PrioriCall = function_exported_or_default( Mod, 'prioritise_call', 3, @@ -386,6 +386,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -515,8 +516,8 @@ group_members(Server) -> init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), + Self = make_member(GroupName), gen_server2:cast(self(), join), - Self = self(), {ok, #state { self = Self, left = {Self, undefined}, right = {Self, undefined}, @@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); @@ -604,7 +606,8 @@ handle_cast({broadcast, Msg}, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + State}); handle_cast({broadcast, Msg}, State) -> internal_broadcast(Msg, none, State); @@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self, State1 = check_neighbours(State #state { view = View, members_state = MembersState }), handle_callback_result( - {Module:joined(Args, all_known_members(View)), State1}); + {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast(leave, State) -> {stop, normal, State}. @@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> - Result = Module:handle_msg(Args, Self, Msg), + Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; @@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> end, try case gen_server2:call( - Left, {add_on_right, Self}, infinity) of + get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); not_ready -> join_group(Self, GroupName) end @@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) -> mnesia:sync_transaction( fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = 0 }, + version = ?VERSION_START }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false. ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> {Self, undefined}; ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + ok = gen_server2:cast(get_pid(RealNeighbour), + {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> true = erlang:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, - ok = gen_server2:cast(RealNeighbour, Msg), + ok = gen_server2:cast(get_pid(RealNeighbour), Msg), ok = case Neighbour of Self -> ok; - _ -> gen_server2:cast(Neighbour, Msg) + _ -> gen_server2:cast(get_pid(Neighbour), Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor(Self, Self) -> undefined; maybe_monitor(Other, _Self) -> - erlang:monitor(process, Other). + erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) -> build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +make_member(GroupName) -> + {case read_group(GroupName) of + #gm_group { version = Version } -> Version; + {error, not_found} -> ?VERSION_START + end, self()}. + +get_pid({_Version, Pid}) -> Pid. + +get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% --------------------------------------------------------------------------- %% Activity assembly @@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, {activity, Self, Activity}). send_right(Right, View, Msg) -> - ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> lists:foldl( fun ({Id, Pubs, _Acks}, ok) -> lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, Id, Pub); + Module:handle_msg(Args, get_pid(Id), Pub); (_, Error) -> Error end, ok, Pubs); @@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) -> Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> ok; - _ -> Module:members_changed(Args, Births, Deaths) + _ -> Module:members_changed(Args, get_pids(Births), + get_pids(Deaths)) end. handle_callback_result({Result, State}) -> diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 8dfe39f8..6e8f96d9 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -242,8 +242,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) -> start_link0(Prefix, Group, Init) -> case apply(?SUPERVISOR, start_link, Prefix ++ [?MODULE, {overall, Group, Init}]) of - {ok, Pid} -> call(Pid, {init, Pid}), - {ok, Pid}; + {ok, Pid} -> case catch call(Pid, {init, Pid}) of + ok -> {ok, Pid}; + E -> E + end; Other -> Other end. @@ -346,13 +348,20 @@ handle_call({init, Overall}, _From, end || Pid <- Rest], Delegate = child(Overall, delegate), erlang:monitor(process, Delegate), - [maybe_start(Group, Delegate, S) || S <- ChildSpecs], - {reply, ok, State#state{overall = Overall, delegate = Delegate}}; + State1 = State#state{overall = Overall, delegate = Delegate}, + case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + true -> {reply, ok, State1}; + false -> {stop, shutdown, State1} + end; handle_call({start_child, ChildSpec}, _From, State = #state{delegate = Delegate, group = Group}) -> - {reply, maybe_start(Group, Delegate, ChildSpec), State}; + {reply, case maybe_start(Group, Delegate, ChildSpec) of + already_in_mnesia -> {error, already_present}; + {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; + Else -> Else + end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, group = Group}) -> @@ -400,13 +409,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. Self = self(), - case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> ok - end, - {noreply, State}; + R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of + [Self | _] -> {atomic, ChildSpecs} = + mnesia:transaction(fun() -> update_all(Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] + end, + case all_started(R) of + true -> {noreply, State}; + false -> {stop, shutdown, State} + end; handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. @@ -428,8 +440,8 @@ maybe_start(Group, Delegate, ChildSpec) -> check_start(Group, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> {error, already_present}; - {atomic, Pid} -> {error, {already_started, Pid}}; + {atomic, undefined} -> already_in_mnesia; + {atomic, Pid} -> {already_in_mnesia, Pid}; %% If we are torn down while in the transaction... {aborted, E} -> {error, E} end. @@ -499,6 +511,8 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. +all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. + %%---------------------------------------------------------------------------- create_tables() -> diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 0900f56f..d48a9ca5 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -43,6 +43,7 @@ all_tests() -> passed = test_start_idempotence(), passed = test_unsupported(), passed = test_ignore(), + passed = test_startup_failure(), passed. %% Simplest test @@ -158,7 +159,7 @@ test_no_migration_on_shutdown() -> try call(worker, ping), exit(worker_should_not_have_migrated) - catch exit:{timeout_waiting_for_server, _} -> + catch exit:{timeout_waiting_for_server, _, _} -> ok end end, [evil, good]). @@ -195,6 +196,22 @@ test_ignore() -> {sup, fake_strategy_for_ignore, []}), passed. +test_startup_failure() -> + [test_startup_failure(F) || F <- [want_error, want_exit]], + passed. + +test_startup_failure(Fail) -> + process_flag(trap_exit, true), + ?MS:start_link(get_group(group), ?MODULE, + {sup, one_for_one, [childspec(Fail)]}), + receive + {'EXIT', _, shutdown} -> + ok + after 1000 -> + exit({did_not_exit, Fail}) + end, + process_flag(trap_exit, false). + %% --------------------------------------------------------------------------- with_sups(Fun, Sups) -> @@ -228,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) -> childspec(Id) -> {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. +start_gs(want_error) -> + {error, foo}; + +start_gs(want_exit) -> + exit(foo); + start_gs(Id) -> gen_server:start_link({local, Id}, ?MODULE, server, []). @@ -245,10 +268,10 @@ inc_group() -> get_group(Group) -> {Group, get(counter)}. -call(Id, Msg) -> call(Id, Msg, 100, 10). +call(Id, Msg) -> call(Id, Msg, 1000, 100). call(Id, Msg, 0, _Decr) -> - exit({timeout_waiting_for_server, {Id, Msg}}); + exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); call(Id, Msg, MaxDelay, Decr) -> try diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b3e92b69..96017df8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -244,7 +244,7 @@ determine_queue_nodes(Args) -> case [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] of [Node] -> {Node, undefined}; - [First | Rest] -> {First, Rest} + [First | Rest] -> {First, [First | Rest]} end; {{_Type, <<"all">>}, _} -> {node(), all}; diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e625a427..0d221b05 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -277,6 +277,7 @@ has_for_source(SrcName) -> contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> + lock_route_tables(), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, Routes = lists:usort( mnesia:match_object(rabbit_route, Match, write) ++ @@ -351,7 +352,28 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). +%% For bulk operations we lock the tables we are operating on in order +%% to reduce the time complexity. Without the table locks we end up +%% with num_tables*num_bulk_bindings row-level locks. Takiing each +%% lock takes time proportional to the number of existing locks, thus +%% resulting in O(num_bulk_bindings^2) complexity. +%% +%% The locks need to be write locks since ultimately we end up +%% removing all these rows. +%% +%% The downside of all this is that no other binding operations except +%% lookup/routing (which uses dirty ops) can take place +%% concurrently. However, that is the case already since the bulk +%% operations involve mnesia:match_object calls with a partial key, +%% which entails taking a table lock. +lock_route_tables() -> + [mnesia:lock({table, T}, write) || T <- [rabbit_route, + rabbit_reverse_route, + rabbit_semi_durable_route, + rabbit_durable_route]]. + remove_for_destination(DstName, DeleteFun) -> + lock_route_tables(), Match = reverse_route( #route{binding = #binding{destination = DstName, _ = '_'}}), ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index fa8dd262..20486af5 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -98,6 +98,9 @@ start() -> {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); + {error_string, Reason} -> + print_error("~s", [Reason]), + rabbit_misc:quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), rabbit_misc:quit(2); @@ -368,7 +371,23 @@ action(report, Node, _Args, _Opts, Inform) -> [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], io:format("End of server status report~n"), - ok. + ok; + +action(eval, Node, [Expr], _Opts, _Inform) -> + case erl_scan:string(Expr) of + {ok, Scanned, _} -> + case erl_parse:parse_exprs(Scanned) of + {ok, Parsed} -> + {value, Value, _} = unsafe_rpc( + Node, erl_eval, exprs, [Parsed, []]), + io:format("~p~n", [Value]), + ok; + {error, E} -> + {error_string, format_parse_error(E)} + end; + {error, E, _} -> + {error_string, format_parse_error(E)} + end. %%---------------------------------------------------------------------------- @@ -443,6 +462,9 @@ system(Cmd) -> escape_quotes(Cmd) -> lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). +format_parse_error({_Line, Mod, Err}) -> + lists:flatten(Mod:format_error(Err)). + %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 523af749..2d0f5014 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -89,8 +89,15 @@ guid() -> erlang:md5(term_to_binary(G)). %% generate a readable string representation of a GUID. +%% +%% employs base64url encoding, which is safer in more contexts than +%% plain base64. string_guid(Prefix) -> - Prefix ++ "-" ++ base64:encode_to_string(guid()). + Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; + ($\/, Acc) -> [$\_ | Acc]; + ($\=, Acc) -> Acc; + (Chr, Acc) -> [Chr | Acc] + end, [], base64:encode_to_string(guid())). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7182042d..d68063db 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -526,9 +526,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, SS, MonitoringPids), - MTC = dict:from_list( - [{MsgId, {ChPid, MsgSeqNo}} || - {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_, MTC0) -> + MTC0 + end, gb_trees:empty(), MSList), NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), @@ -725,7 +727,7 @@ process_instruction( never -> {MQ2, PendingCh, MS}; eventually -> - {MQ2, sets:add_element(MsgId, PendingCh), + {MQ2, PendingCh, dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 88192e8f..0578cf7d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -250,18 +250,23 @@ assert_args_equivalence(Orig, New, Name, Keys) -> ok. assert_args_equivalence1(Orig, New, Name, Key) -> - case {table_lookup(Orig, Key), table_lookup(New, Key)} of + {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)}, + FailureFun = fun () -> + protocol_error(precondition_failed, "inequivalent arg '~s'" + "for ~s: received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) + end, + case {Orig1, New1} of {Same, Same} -> ok; - {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} -> + {{OrigType, OrigVal}, {NewType, NewVal}} -> case type_class(OrigType) == type_class(NewType) andalso OrigVal == NewVal of true -> ok; - false -> protocol_error(precondition_failed, "inequivalent arg" - " '~s' for ~s: received ~s but current" - " is ~s", - [Key, rs(Name), val(New1), val(Orig1)]) - end + false -> FailureFun() + end; + {_, _} -> + FailureFun() end. val(undefined) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2c0912df..045ab89a 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -307,9 +307,15 @@ connections() -> rabbit_networking, connections_local, []). connections_local() -> - [rabbit_connection_sup:reader(ConnSup) || + [Reader || {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children(rabbit_tcp_client_sup), + Reader <- [try + rabbit_connection_sup:reader(ConnSup) + catch exit:{noproc, _} -> + noproc + end], + Reader =/= noproc]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 0862f1b2..62c004f7 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -111,8 +111,7 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", NewImplicitlyEnabled -- ImplicitlyEnabled), - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n") + report_change() end; action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> @@ -140,8 +139,7 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> print_list("The following plugins have been disabled:", ImplicitlyEnabled -- NewImplicitlyEnabled), write_enabled_plugins(PluginsFile, NewEnabled), - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n") + report_change() end. %%---------------------------------------------------------------------------- @@ -374,3 +372,17 @@ maybe_warn_mochiweb(Enabled) -> false -> ok end. + +report_change() -> + io:format("Plugin configuration has changed. " + "Restart RabbitMQ for changes to take effect.~n"), + case os:type() of + {win32, _OsName} -> + io:format("If you have RabbitMQ running as a service then you must" + " reinstall by running~n rabbitmq-service.bat stop~n" + " rabbitmq-service.bat install~n" + " rabbitmq-service.bat start~n~n"); + _ -> + ok + end. + diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4b545466..f03c1d1c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> [begin ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( - fun () -> queue_index_walker_reader(QueueName, Gatherer) + fun () -> link(Gatherer), + ok = queue_index_walker_reader(QueueName, Gatherer), + unlink(Gatherer), + ok end) end || QueueName <- DurableQueues], queue_index_walker({next, Gatherer}); @@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> + Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of - false -> {array_new(), 0}; + false -> Empty; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - {ok, SegData} = file_handle_cache:read( - Hdl, ?SEGMENT_TOTAL_SIZE), - Res = load_segment_entries(KeepAcked, SegData, array_new(), 0), + Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of + {ok, SegData} -> load_segment_entries( + KeepAcked, SegData, Empty); + eof -> Empty + end, ok = file_handle_cache:close(Hdl), Res end. @@ -853,15 +859,15 @@ load_segment_entries(KeepAcked, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, SegData/binary>>, - SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}) -> {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1); + load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); load_segment_entries(KeepAcked, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, SegData/binary>>, - SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}) -> {UnackedCountDelta, SegEntries1} = case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> @@ -871,10 +877,10 @@ load_segment_entries(KeepAcked, {_Pub, del, no_ack} -> {-1, array:reset(RelSeq, SegEntries)} end, - load_segment_entries(KeepAcked, SegData, SegEntries1, - UnackedCount + UnackedCountDelta); -load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) -> - {SegEntries, UnackedCount}. + load_segment_entries(KeepAcked, SegData, + {SegEntries1, UnackedCount + UnackedCountDelta}); +load_segment_entries(_KeepAcked, _SegData, Res) -> + Res. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |