diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-18 12:56:41 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-18 12:56:41 +0100 |
commit | 508e72d273eb223be2f0b36f04eecb88f87054e8 (patch) | |
tree | 81cfd3acae89e09e2329785b0e77ebda979e0c62 /src/rabbit_amqqueue.erl | |
parent | 94ecaab70dcd2ce7ceac95e8802d3a54267f4b81 (diff) | |
parent | 79e0a773bdfd72ceb82e54fa25407a2620e95914 (diff) | |
download | rabbitmq-server-508e72d273eb223be2f0b36f04eecb88f87054e8.tar.gz |
stable to default
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 94 |
1 files changed, 71 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e45e026e..692179fc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,7 +18,7 @@ -export([recover/0, stop/0, start/1, declare/5, declare/6, delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). --export([pseudo_queue/2]). +-export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -29,8 +29,8 @@ -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). --export([on_node_down/1]). --export([update/2, store_queue/1, policy_changed/2]). +-export([on_node_up/1, on_node_down/1]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -174,9 +174,12 @@ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). +-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(update_decorators/1 :: (name()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). @@ -254,15 +257,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> %% effect) this might not be possible to satisfy. declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), - Q = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + down_slave_nodes = [], + gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). @@ -308,12 +313,24 @@ store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = [], sync_slave_pids = [], - gm_pids = []}, write), - ok = mnesia:write(rabbit_queue, Q, write), - ok; + gm_pids = [], + decorators = undefined}, write), + store_queue_ram(Q); store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(rabbit_queue, Q, write), - ok. + store_queue_ram(Q). + +store_queue_ram(Q) -> + ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). + +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). policy_changed(Q1 = #amqqueue{decorators = Decorators1}, Q2 = #amqqueue{decorators = Decorators2}) -> @@ -444,7 +461,8 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -650,15 +668,23 @@ forget_all_durable(Node) -> fun () -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), - [rabbit_binding:process_deletions( - internal_delete1(Name, true)) || - #amqqueue{name = Name, pid = Pid} = Q <- Qs, - node(Pid) =:= Node, - rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], + [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs, + node(Pid) =:= Node], ok end), ok. +forget_node_for_queue(#amqqueue{name = Name, + down_slave_nodes = []}) -> + %% No slaves to recover from, queue is gone + rabbit_binding:process_deletions(internal_delete1(Name, true)); + +forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> + %% Promote a slave while down - it'll happily recover as a master + Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H), + down_slave_nodes = T}, + ok = mnesia:write(rabbit_durable_queue, Q1, write). + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). @@ -674,6 +700,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + #amqqueue{_ = '_'}, write), + [case lists:member(Node, DSNs) of + true -> DSNs1 = DSNs -- [Node], + store_queue( + Q#amqqueue{down_slave_nodes = DSNs1}); + false -> ok + end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + ok + end). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -709,6 +749,14 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + down_slave_nodes = none, + gm_pids = none, + policy = none, + decorators = none}. + deliver([], _Delivery, _Flow) -> %% /dev/null optimisation []; |