summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-15 14:43:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-15 14:43:16 +0100
commitfc442faac16cc393036a771505420706d91d5e22 (patch)
tree98b9842826ca74f28f331f70dd689d935469e1f1
parent10c2f19d0576407b0148d3218d2d2fd66ca6c64e (diff)
downloadrabbitmq-server-bug26176.tar.gz
Notify about policy and parameter removal after the vhost is deleted, not during the attendant tx.bug26176
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_policy.erl16
-rw-r--r--src/rabbit_runtime_parameters.erl21
-rw-r--r--src/rabbit_vhost.erl21
4 files changed, 46 insertions, 28 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74f9cacf..eb9ed4ed 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -187,7 +187,7 @@ force_event_refresh(Ref) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
+init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
?store_proc_name({ConnName, Channel}),
@@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
- reader_pid = ReaderPid,
+ reader_pid = Foo,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
@@ -894,8 +894,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, State),
+ test(State, ExchangeName),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -1119,7 +1118,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ State1 = test2(State, Msgs),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1);
({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
@@ -1165,6 +1164,13 @@ handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
+test2(State, Msgs) ->
+ rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs).
+
+test(State, ExchangeName) ->
+ check_not_default_exchange(ExchangeName),
+ check_configure_permitted(ExchangeName, State).
+
%%----------------------------------------------------------------------------
%% We get the queue process to send the consume_ok on our behalf. This
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 6e0abd69..0a69fb32 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -213,17 +213,23 @@ notify_clear(VHost, <<"policy">>, Name) ->
%% [1] We need to prevent this from becoming O(n^2) in a similar
%% manner to rabbit_binding:remove_for_{source,destination}. So see
%% the comment in rabbit_binding:lock_route_tables/0 for more rationale.
+%% [2] We could be here in a post-tx fun after the vhost has been
+%% deleted; in which case it's fine to do nothing.
update_policies(VHost) ->
Tabs = [rabbit_queue, rabbit_durable_queue,
rabbit_exchange, rabbit_durable_exchange],
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
[mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
- Policies = list(VHost),
- {[update_exchange(X, Policies) ||
- X <- rabbit_exchange:list(VHost)],
- [update_queue(Q, Policies) ||
- Q <- rabbit_amqqueue:list(VHost)]}
+ case catch list(VHost) of
+ {error, {no_such_vhost, _}} ->
+ ok; %% [2]
+ Policies ->
+ {[update_exchange(X, Policies) ||
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies) ||
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end
end),
[catch notify(X) || X <- Xs],
[catch notify(Q) || Q <- Qs],
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 7307330b..cf125913 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -29,6 +29,7 @@
-ifdef(use_specs).
-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
+-type(ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok')).
-spec(parse_set/5 :: (rabbit_types:vhost(), binary(), binary(), string(),
rabbit_types:user() | 'none') -> ok_or_error_string()).
@@ -38,9 +39,9 @@
rabbit_types:user() | 'none') -> ok_or_error_string()).
-spec(set_global/2 :: (atom(), term()) -> 'ok').
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
- -> ok_or_error_string()).
+ -> ok_thunk_or_error_string()).
-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
- -> ok_or_error_string()).
+ -> ok_thunk_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]).
-spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]).
@@ -137,16 +138,22 @@ clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
clear_any(VHost, Component, Name) ->
- case lookup(VHost, Component, Name) of
- not_found -> {error_string, "Parameter does not exist"};
- _ -> mnesia_clear(VHost, Component, Name),
+ Notify = fun () ->
case lookup_component(Component) of
{ok, Mod} -> event_notify(
- parameter_cleared, VHost, Component,
- [{name, Name}]),
+ parameter_cleared, VHost, Component,
+ [{name, Name}]),
Mod:notify_clear(VHost, Component, Name);
_ -> ok
end
+ end,
+ case lookup(VHost, Component, Name) of
+ not_found -> {error_string, "Parameter does not exist"};
+ _ -> mnesia_clear(VHost, Component, Name),
+ case mnesia:is_transaction() of
+ true -> Notify;
+ false -> Notify()
+ end
end.
mnesia_clear(VHost, Component, Name) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index b57627e4..cfa3add4 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -88,12 +88,11 @@ delete(VHostPath) ->
#amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)],
[assert_benign(rabbit_exchange:delete(Name, false)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
- R = rabbit_misc:execute_mnesia_transaction(
- with(VHostPath, fun () ->
- ok = internal_delete(VHostPath)
- end)),
+ Funs = rabbit_misc:execute_mnesia_transaction(
+ with(VHostPath, fun () -> internal_delete(VHostPath) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
- R.
+ [ok = Fun() || Fun <- Funs],
+ ok.
assert_benign(ok) -> ok;
assert_benign({ok, _}) -> ok;
@@ -111,14 +110,14 @@ internal_delete(VHostPath) ->
[ok = rabbit_auth_backend_internal:clear_permissions(
proplists:get_value(user, Info), VHostPath)
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
- [ok = rabbit_runtime_parameters:clear(VHostPath,
- proplists:get_value(component, Info),
- proplists:get_value(name, Info))
+ Fs1 = [rabbit_runtime_parameters:clear(VHostPath,
+ proplists:get_value(component, Info),
+ proplists:get_value(name, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
- || Info <- rabbit_policy:list(VHostPath)],
+ Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
+ || Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
- ok.
+ Fs1 ++ Fs2.
exists(VHostPath) ->
mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].