summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-12-14 11:32:04 -0800
committerJerry Kuch <jerryk@vmware.com>2011-12-14 11:32:04 -0800
commiteacc62290fb9675a849bc5c09c4607a19a392bed (patch)
treece370d8fc95b34f7e83067e979446ffaa6c219e5
parentebfec3cae85683b7020dc7967f8847f3f25084a5 (diff)
parent37a79fa3f83aa88c5114ef68ab3b2461defd3997 (diff)
downloadrabbitmq-server-eacc62290fb9675a849bc5c09c4607a19a392bed.tar.gz
Merge bug24582
-rw-r--r--docs/rabbitmq-service.xml3
-rw-r--r--docs/rabbitmqctl.1.xml16
-rw-r--r--src/gen_server2.erl18
-rw-r--r--src/gm.erl42
-rw-r--r--src/mirrored_supervisor.erl42
-rw-r--r--src/mirrored_supervisor_tests.erl29
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_binding.erl22
-rw-r--r--src/rabbit_control.erl24
-rw-r--r--src/rabbit_guid.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_misc.erl19
-rw-r--r--src/rabbit_networking.erl10
-rw-r--r--src/rabbit_plugins.erl20
-rw-r--r--src/rabbit_queue_index.erl30
15 files changed, 230 insertions, 66 deletions
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index 3368960b..a4bd1580 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -66,7 +66,8 @@ Display usage information.
<para>
Install the service. The service will not be started.
Subsequent invocations will update the service parameters if
-relevant environment variables were modified.
+relevant environment variables were modified or if the active
+plugins were changed.
</para>
</listitem>
</varlistentry>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index f21888bd..15755038 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1315,6 +1315,22 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>eval</command> <arg choice="req"><replaceable>expr</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Evaluate an arbitrary Erlang expression.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl eval 'node().'</screen>
+ <para role="example">
+ This command returns the name of the node to which rabbitmqctl has connected.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ab6c4e64..49913d26 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) ->
exit(process_not_registered)
end;
get_proc_name({global, Name}) ->
- case global:safe_whereis_name(Name) of
+ case whereis_name(Name) of
undefined ->
exit(process_not_registered_globally);
Pid when Pid =:= self() ->
@@ -1101,7 +1101,7 @@ get_parent() ->
name_to_pid(Name) ->
case whereis(Name) of
undefined ->
- case global:safe_whereis_name(Name) of
+ case whereis_name(Name) of
undefined ->
exit(could_not_find_registerd_name);
Pid ->
@@ -1111,6 +1111,20 @@ name_to_pid(Name) ->
Pid
end.
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() ->
+ case is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true ->
+ Pid
+ end;
+ [] -> undefined
+ end.
+
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
PrioriCall = function_exported_or_default(
Mod, 'prioritise_call', 3,
diff --git a/src/gm.erl b/src/gm.erl
index 8c838a70..6c899122 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -386,6 +386,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -515,8 +516,8 @@ group_members(Server) ->
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
+ Self = make_member(GroupName),
gen_server2:cast(self(), join),
- Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
@@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
@@ -604,7 +606,8 @@ handle_cast({broadcast, Msg},
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ State});
handle_cast({broadcast, Msg}, State) ->
internal_broadcast(Msg, none, State);
@@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
- {Module:joined(Args, all_known_members(View)), State1});
+ {Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast(leave, State) ->
{stop, normal, State}.
@@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
- Result = Module:handle_msg(Args, Self, Msg),
+ Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
@@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
end,
try
case gen_server2:call(
- Left, {add_on_right, Self}, infinity) of
+ get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName)
end
@@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) ->
mnesia:sync_transaction(
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
- version = 0 },
+ version = ?VERSION_START },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
@@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
- ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ ok = gen_server2:cast(get_pid(RealNeighbour),
+ {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
- ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
- _ -> gen_server2:cast(Neighbour, Msg)
+ _ -> gen_server2:cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor(Self, Self) ->
undefined;
maybe_monitor(Other, _Self) ->
- erlang:monitor(process, Other).
+ erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) ->
build_members_state(MembersStateList) ->
?DICT:from_list(MembersStateList).
+make_member(GroupName) ->
+ {case read_group(GroupName) of
+ #gm_group { version = Version } -> Version;
+ {error, not_found} -> ?VERSION_START
+ end, self()}.
+
+get_pid({_Version, Pid}) -> Pid.
+
+get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% ---------------------------------------------------------------------------
%% Activity assembly
@@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, {activity, Self, Activity}).
send_right(Right, View, Msg) ->
- ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+ ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
lists:foldl(
fun ({Id, Pubs, _Acks}, ok) ->
lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, Id, Pub);
+ Module:handle_msg(Args, get_pid(Id), Pub);
(_, Error) ->
Error
end, ok, Pubs);
@@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
- _ -> Module:members_changed(Args, Births, Deaths)
+ _ -> Module:members_changed(Args, get_pids(Births),
+ get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 8dfe39f8..6e8f96d9 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -242,8 +242,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) ->
start_link0(Prefix, Group, Init) ->
case apply(?SUPERVISOR, start_link,
Prefix ++ [?MODULE, {overall, Group, Init}]) of
- {ok, Pid} -> call(Pid, {init, Pid}),
- {ok, Pid};
+ {ok, Pid} -> case catch call(Pid, {init, Pid}) of
+ ok -> {ok, Pid};
+ E -> E
+ end;
Other -> Other
end.
@@ -346,13 +348,20 @@ handle_call({init, Overall}, _From,
end || Pid <- Rest],
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
- [maybe_start(Group, Delegate, S) || S <- ChildSpecs],
- {reply, ok, State#state{overall = Overall, delegate = Delegate}};
+ State1 = State#state{overall = Overall, delegate = Delegate},
+ case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ true -> {reply, ok, State1};
+ false -> {stop, shutdown, State1}
+ end;
handle_call({start_child, ChildSpec}, _From,
State = #state{delegate = Delegate,
group = Group}) ->
- {reply, maybe_start(Group, Delegate, ChildSpec), State};
+ {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ already_in_mnesia -> {error, already_present};
+ {already_in_mnesia, Pid} -> {error, {already_started, Pid}};
+ Else -> Else
+ end, State};
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group}) ->
@@ -400,13 +409,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
Self = self(),
- case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> ok
- end,
- {noreply, State};
+ R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
+ [Self | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(fun() -> update_all(Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
+ end,
+ case all_started(R) of
+ true -> {noreply, State};
+ false -> {stop, shutdown, State}
+ end;
handle_info(Info, State) ->
{stop, {unexpected_info, Info}, State}.
@@ -428,8 +440,8 @@ maybe_start(Group, Delegate, ChildSpec) ->
check_start(Group, Delegate, ChildSpec)
end) of
{atomic, start} -> start(Delegate, ChildSpec);
- {atomic, undefined} -> {error, already_present};
- {atomic, Pid} -> {error, {already_started, Pid}};
+ {atomic, undefined} -> already_in_mnesia;
+ {atomic, Pid} -> {already_in_mnesia, Pid};
%% If we are torn down while in the transaction...
{aborted, E} -> {error, E}
end.
@@ -499,6 +511,8 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
+all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+
%%----------------------------------------------------------------------------
create_tables() ->
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 0900f56f..d48a9ca5 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -43,6 +43,7 @@ all_tests() ->
passed = test_start_idempotence(),
passed = test_unsupported(),
passed = test_ignore(),
+ passed = test_startup_failure(),
passed.
%% Simplest test
@@ -158,7 +159,7 @@ test_no_migration_on_shutdown() ->
try
call(worker, ping),
exit(worker_should_not_have_migrated)
- catch exit:{timeout_waiting_for_server, _} ->
+ catch exit:{timeout_waiting_for_server, _, _} ->
ok
end
end, [evil, good]).
@@ -195,6 +196,22 @@ test_ignore() ->
{sup, fake_strategy_for_ignore, []}),
passed.
+test_startup_failure() ->
+ [test_startup_failure(F) || F <- [want_error, want_exit]],
+ passed.
+
+test_startup_failure(Fail) ->
+ process_flag(trap_exit, true),
+ ?MS:start_link(get_group(group), ?MODULE,
+ {sup, one_for_one, [childspec(Fail)]}),
+ receive
+ {'EXIT', _, shutdown} ->
+ ok
+ after 1000 ->
+ exit({did_not_exit, Fail})
+ end,
+ process_flag(trap_exit, false).
+
%% ---------------------------------------------------------------------------
with_sups(Fun, Sups) ->
@@ -228,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) ->
childspec(Id) ->
{Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.
+start_gs(want_error) ->
+ {error, foo};
+
+start_gs(want_exit) ->
+ exit(foo);
+
start_gs(Id) ->
gen_server:start_link({local, Id}, ?MODULE, server, []).
@@ -245,10 +268,10 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 100, 10).
+call(Id, Msg) -> call(Id, Msg, 1000, 100).
call(Id, Msg, 0, _Decr) ->
- exit({timeout_waiting_for_server, {Id, Msg}});
+ exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
call(Id, Msg, MaxDelay, Decr) ->
try
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b3e92b69..96017df8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -244,7 +244,7 @@ determine_queue_nodes(Args) ->
case [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes] of
[Node] -> {Node, undefined};
- [First | Rest] -> {First, Rest}
+ [First | Rest] -> {First, [First | Rest]}
end;
{{_Type, <<"all">>}, _} ->
{node(), all};
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e625a427..0d221b05 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -277,6 +277,7 @@ has_for_source(SrcName) ->
contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
+ lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
Routes = lists:usort(
mnesia:match_object(rabbit_route, Match, write) ++
@@ -351,7 +352,28 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+%% For bulk operations we lock the tables we are operating on in order
+%% to reduce the time complexity. Without the table locks we end up
+%% with num_tables*num_bulk_bindings row-level locks. Takiing each
+%% lock takes time proportional to the number of existing locks, thus
+%% resulting in O(num_bulk_bindings^2) complexity.
+%%
+%% The locks need to be write locks since ultimately we end up
+%% removing all these rows.
+%%
+%% The downside of all this is that no other binding operations except
+%% lookup/routing (which uses dirty ops) can take place
+%% concurrently. However, that is the case already since the bulk
+%% operations involve mnesia:match_object calls with a partial key,
+%% which entails taking a table lock.
+lock_route_tables() ->
+ [mnesia:lock({table, T}, write) || T <- [rabbit_route,
+ rabbit_reverse_route,
+ rabbit_semi_durable_route,
+ rabbit_durable_route]].
+
remove_for_destination(DstName, DeleteFun) ->
+ lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index fa8dd262..20486af5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -98,6 +98,9 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
+ rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
@@ -368,7 +371,23 @@ action(report, Node, _Args, _Opts, Inform) ->
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
io:format("End of server status report~n"),
- ok.
+ ok;
+
+action(eval, Node, [Expr], _Opts, _Inform) ->
+ case erl_scan:string(Expr) of
+ {ok, Scanned, _} ->
+ case erl_parse:parse_exprs(Scanned) of
+ {ok, Parsed} ->
+ {value, Value, _} = unsafe_rpc(
+ Node, erl_eval, exprs, [Parsed, []]),
+ io:format("~p~n", [Value]),
+ ok;
+ {error, E} ->
+ {error_string, format_parse_error(E)}
+ end;
+ {error, E, _} ->
+ {error_string, format_parse_error(E)}
+ end.
%%----------------------------------------------------------------------------
@@ -443,6 +462,9 @@ system(Cmd) ->
escape_quotes(Cmd) ->
lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+format_parse_error({_Line, Mod, Err}) ->
+ lists:flatten(Mod:format_error(Err)).
+
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 523af749..2d0f5014 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -89,8 +89,15 @@ guid() ->
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
+%%
+%% employs base64url encoding, which is safer in more contexts than
+%% plain base64.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(guid()).
+ Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(guid())).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7182042d..d68063db 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -526,9 +526,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS, MonitoringPids),
- MTC = dict:from_list(
- [{MsgId, {ChPid, MsgSeqNo}} ||
- {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
@@ -725,7 +727,7 @@ process_instruction(
never ->
{MQ2, PendingCh, MS};
eventually ->
- {MQ2, sets:add_element(MsgId, PendingCh),
+ {MQ2, PendingCh,
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 88192e8f..0578cf7d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -250,18 +250,23 @@ assert_args_equivalence(Orig, New, Name, Keys) ->
ok.
assert_args_equivalence1(Orig, New, Name, Key) ->
- case {table_lookup(Orig, Key), table_lookup(New, Key)} of
+ {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)},
+ FailureFun = fun () ->
+ protocol_error(precondition_failed, "inequivalent arg '~s'"
+ "for ~s: received ~s but current is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
+ end,
+ case {Orig1, New1} of
{Same, Same} ->
ok;
- {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} ->
+ {{OrigType, OrigVal}, {NewType, NewVal}} ->
case type_class(OrigType) == type_class(NewType) andalso
OrigVal == NewVal of
true -> ok;
- false -> protocol_error(precondition_failed, "inequivalent arg"
- " '~s' for ~s: received ~s but current"
- " is ~s",
- [Key, rs(Name), val(New1), val(Orig1)])
- end
+ false -> FailureFun()
+ end;
+ {_, _} ->
+ FailureFun()
end.
val(undefined) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2c0912df..045ab89a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -307,9 +307,15 @@ connections() ->
rabbit_networking, connections_local, []).
connections_local() ->
- [rabbit_connection_sup:reader(ConnSup) ||
+ [Reader ||
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children(rabbit_tcp_client_sup),
+ Reader <- [try
+ rabbit_connection_sup:reader(ConnSup)
+ catch exit:{noproc, _} ->
+ noproc
+ end],
+ Reader =/= noproc].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 0862f1b2..62c004f7 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -111,8 +111,7 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
NewImplicitlyEnabled -- ImplicitlyEnabled),
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n")
+ report_change()
end;
action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
@@ -140,8 +139,7 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
write_enabled_plugins(PluginsFile, NewEnabled),
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n")
+ report_change()
end.
%%----------------------------------------------------------------------------
@@ -374,3 +372,17 @@ maybe_warn_mochiweb(Enabled) ->
false ->
ok
end.
+
+report_change() ->
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n"),
+ case os:type() of
+ {win32, _OsName} ->
+ io:format("If you have RabbitMQ running as a service then you must"
+ " reinstall by running~n rabbitmq-service.bat stop~n"
+ " rabbitmq-service.bat install~n"
+ " rabbitmq-service.bat start~n~n");
+ _ ->
+ ok
+ end.
+
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4b545466..f03c1d1c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
- fun () -> queue_index_walker_reader(QueueName, Gatherer)
+ fun () -> link(Gatherer),
+ ok = queue_index_walker_reader(QueueName, Gatherer),
+ unlink(Gatherer),
+ ok
end)
end || QueueName <- DurableQueues],
queue_index_walker({next, Gatherer});
@@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
+ Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
- false -> {array_new(), 0};
+ false -> Empty;
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {ok, SegData} = file_handle_cache:read(
- Hdl, ?SEGMENT_TOTAL_SIZE),
- Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
+ Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
+ {ok, SegData} -> load_segment_entries(
+ KeepAcked, SegData, Empty);
+ eof -> Empty
+ end,
ok = file_handle_cache:close(Hdl),
Res
end.
@@ -853,15 +859,15 @@ load_segment_entries(KeepAcked,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+ load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
load_segment_entries(KeepAcked,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
@@ -871,10 +877,10 @@ load_segment_entries(KeepAcked,
{_Pub, del, no_ack} ->
{-1, array:reset(RelSeq, SegEntries)}
end,
- load_segment_entries(KeepAcked, SegData, SegEntries1,
- UnackedCount + UnackedCountDelta);
-load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
- {SegEntries, UnackedCount}.
+ load_segment_entries(KeepAcked, SegData,
+ {SegEntries1, UnackedCount + UnackedCountDelta});
+load_segment_entries(_KeepAcked, _SegData, Res) ->
+ Res.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).