summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl35
1 files changed, 17 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 24320f51..6dcd04d5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -254,10 +254,10 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = []}).
+ rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -439,7 +439,7 @@ internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
- rabbit_binding:remove_for_queue(QueueName).
+ rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
case rabbit_misc:execute_mnesia_transaction(
@@ -450,8 +450,7 @@ internal_delete(QueueName) ->
end
end) of
{error, _} = Err -> Err;
- PostHook -> PostHook(),
- ok
+ Deletions -> ok = rabbit_binding:process_deletions(Deletions)
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
@@ -470,19 +469,20 @@ maybe_expire(QPid) ->
gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
- [Hook() ||
- Hook <- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end)],
- ok.
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end))).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- rabbit_binding:remove_transient_for_queue(QueueName).
+ rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
@@ -508,4 +508,3 @@ delegate_call(Pid, Msg, Timeout) ->
delegate_cast(Pid, Msg) ->
delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
-