diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-18 14:20:33 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-18 14:20:33 +0000 |
commit | 8326fecaca6f864633c3e98fc15df6a081c112d5 (patch) | |
tree | f2c85e2bada34e3207f0ec915b2d9050ca998c5f /src/rabbit_amqqueue.erl | |
parent | e2170d47dfb8ab2e3685e284c6d2f8d0cea1bd33 (diff) | |
parent | fcee3e87e62f5c33fad955c0d1f8c616d4ea3cb5 (diff) | |
download | rabbitmq-server-8326fecaca6f864633c3e98fc15df6a081c112d5.tar.gz |
Merge in default
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 72 |
1 files changed, 37 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b71410fe..67bf000d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,7 +24,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0, notify_policy_changed/1]). +-export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). @@ -110,7 +110,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), @@ -221,36 +221,37 @@ start(Qs) -> find_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( + mnesia:async_dirty( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, pid = Pid} <- mnesia:table(rabbit_durable_queue), - mnesia:read(rabbit_queue, Name, read) =:= [], - node(Pid) == Node])) + node(Pid) == Node, + mnesia:read(rabbit_queue, Name, read) =:= []])) end). recover_durable_queues(QueuesAndRecoveryTerms) -> - Qs = [{start_queue_process(node(), Q), Terms} || - {Q, Terms} <- QueuesAndRecoveryTerms], - [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, - gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. + {Results, Failures} = + gen_server2:mcall([{start_queue_process(node(), Q), + {init, {self(), Terms}}} || + {Q, Terms} <- QueuesAndRecoveryTerms]), + [rabbit_log:error("Queue ~p failed to initialise: ~p~n", + [Pid, Error]) || {Pid, Error} <- Failures], + [Q || {_, {new, Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q0 = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), - Q1 = start_queue_process(Node, Q0), - gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity). + Q = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + gm_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Q#amqqueue{pid = Pid}. + Pid. add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), @@ -354,14 +355,14 @@ with(Name, F, E) -> {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do - %% with the QPid. + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(not_found_or_absent_dirty(Name)); - false -> timer:sleep(25), - with(Name, F, E) - end + fun () -> false = rabbit_misc:is_process_alive(QPid), + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -503,19 +504,20 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). %% the first place since a node failed). Therefore we keep poking at %% the list of queues until we were able to talk to a live process or %% the queue no longer exists. -force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]). +force_event_refresh(Ref) -> + force_event_refresh([Q#amqqueue.name || Q <- list()], Ref). -force_event_refresh(QNames) -> +force_event_refresh(QNames, Ref) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - {_, Bad} = rabbit_misc:multi_call( - [Q#amqqueue.pid || Q <- Qs], force_event_refresh), + {_, Bad} = gen_server2:mcall( + [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), FailedPids = [Pid || {Pid, _Reason} <- Bad], Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, lists:member(Pid, FailedPids)], case Failed of [] -> ok; _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), - force_event_refresh(Failed) + force_event_refresh(Failed, Ref) end. notify_policy_changed(#amqqueue{pid = QPid}) -> |