summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl39
-rw-r--r--src/rabbit_exchange.erl25
-rw-r--r--src/rabbit_misc.erl17
3 files changed, 52 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..69c97dfe 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -122,19 +122,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ durable_queues, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(durable_queues),
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 925c335c..e72669ac 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -102,22 +102,15 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(
- fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges),
- mnesia:foldl(
- fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute),
- Acc
- end, ok, durable_routes),
- ok
- end).
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange) -> ok = mnesia:write(Exchange) end,
+ durable_exchanges),
+ ok = rabbit_misc:table_foreach(
+ fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute)
+ end, durable_routes),
+ ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 053bde54..5730fdc0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
+-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
@@ -97,6 +98,7 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
@@ -295,6 +297,21 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
+%% For each entry in a table, execute a function in a transaction.
+%% This is often far more efficient than wrapping a tx around the lot.
+%%
+%% We ignore entries that have been modified or removed.
+table_foreach(F, TableName) ->
+ lists:foreach(
+ fun (E) -> execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> ok;
+ _ -> F(E)
+ end
+ end)
+ end, dirty_read_all(TableName)),
+ ok.
+
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).