summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-18 14:20:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-18 14:20:33 +0000
commit8326fecaca6f864633c3e98fc15df6a081c112d5 (patch)
treef2c85e2bada34e3207f0ec915b2d9050ca998c5f /src/rabbit_amqqueue.erl
parente2170d47dfb8ab2e3685e284c6d2f8d0cea1bd33 (diff)
parentfcee3e87e62f5c33fad955c0d1f8c616d4ea3cb5 (diff)
downloadrabbitmq-server-8326fecaca6f864633c3e98fc15df6a081c112d5.tar.gz
Merge in default
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl72
1 files changed, 37 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b71410fe..67bf000d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -24,7 +24,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0, notify_policy_changed/1]).
+-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
@@ -110,7 +110,7 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
--spec(force_event_refresh/0 :: () -> 'ok').
+-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
@@ -221,36 +221,37 @@ start(Qs) ->
find_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
+ mnesia:async_dirty(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
pid = Pid}
<- mnesia:table(rabbit_durable_queue),
- mnesia:read(rabbit_queue, Name, read) =:= [],
- node(Pid) == Node]))
+ node(Pid) == Node,
+ mnesia:read(rabbit_queue, Name, read) =:= []]))
end).
recover_durable_queues(QueuesAndRecoveryTerms) ->
- Qs = [{start_queue_process(node(), Q), Terms} ||
- {Q, Terms} <- QueuesAndRecoveryTerms],
- [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs,
- gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}].
+ {Results, Failures} =
+ gen_server2:mcall([{start_queue_process(node(), Q),
+ {init, {self(), Terms}}} ||
+ {Q, Terms} <- QueuesAndRecoveryTerms]),
+ [rabbit_log:error("Queue ~p failed to initialise: ~p~n",
+ [Pid, Error]) || {Pid, Error} <- Failures],
+ [Q || {_, {new, Q}} <- Results].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q0 = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
- {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
- Q1 = start_queue_process(Node, Q0),
- gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity).
+ Q = rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = []}),
+ {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
- Q#amqqueue{pid = Pid}.
+ Pid.
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
@@ -354,14 +355,14 @@ with(Name, F, E) ->
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
- %% with the QPid.
+ %% with the QPid. F() should be written s.t. that this
+ %% cannot happen, so we bail if it does since that
+ %% indicates a code bug and we don't want to get stuck in
+ %% the retry loop.
rabbit_misc:with_exit_handler(
- fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E(not_found_or_absent_dirty(Name));
- false -> timer:sleep(25),
- with(Name, F, E)
- end
+ fun () -> false = rabbit_misc:is_process_alive(QPid),
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -503,19 +504,20 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
%% the first place since a node failed). Therefore we keep poking at
%% the list of queues until we were able to talk to a live process or
%% the queue no longer exists.
-force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]).
+force_event_refresh(Ref) ->
+ force_event_refresh([Q#amqqueue.name || Q <- list()], Ref).
-force_event_refresh(QNames) ->
+force_event_refresh(QNames, Ref) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- {_, Bad} = rabbit_misc:multi_call(
- [Q#amqqueue.pid || Q <- Qs], force_event_refresh),
+ {_, Bad} = gen_server2:mcall(
+ [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
lists:member(Pid, FailedPids)],
case Failed of
[] -> ok;
_ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
- force_event_refresh(Failed)
+ force_event_refresh(Failed, Ref)
end.
notify_policy_changed(#amqqueue{pid = QPid}) ->