summaryrefslogtreecommitdiff
path: root/src/rabbit_table.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_table.erl')
-rw-r--r--src/rabbit_table.erl311
1 files changed, 311 insertions, 0 deletions
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
new file mode 100644
index 00000000..a29c57d5
--- /dev/null
+++ b/src/rabbit_table.erl
@@ -0,0 +1,311 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_table).
+
+-export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1,
+ force_load/0, is_present/0, is_empty/0,
+ check_schema_integrity/0, clear_ram_only_tables/0]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(create/0 :: () -> 'ok').
+-spec(create_local_copy/1 :: ('disc' | 'ram') -> 'ok').
+-spec(wait_for_replicated/0 :: () -> 'ok').
+-spec(wait/1 :: ([atom()]) -> 'ok').
+-spec(force_load/0 :: () -> 'ok').
+-spec(is_present/0 :: () -> boolean()).
+-spec(is_empty/0 :: () -> boolean()).
+-spec(check_schema_integrity/0 :: () -> rabbit_types:ok_or_error(any())).
+-spec(clear_ram_only_tables/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Main interface
+%%----------------------------------------------------------------------------
+
+create() ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end, definitions()),
+ ok.
+
+%% The sequence in which we delete the schema and then the other
+%% tables is important: if we delete the schema first when moving to
+%% RAM mnesia will loudly complain since it doesn't make much sense to
+%% do that. But when moving to disc, we need to move the schema first.
+create_local_copy(disc) ->
+ create_local_copy(schema, disc_copies),
+ create_local_copies(disc);
+create_local_copy(ram) ->
+ create_local_copies(ram),
+ create_local_copy(schema, ram_copies).
+
+wait_for_replicated() ->
+ wait([Tab || {Tab, TabDef} <- definitions(),
+ not lists:member({local_content, true}, TabDef)]).
+
+wait(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+force_load() -> [mnesia:force_load_table(T) || T <- names()], ok.
+
+is_present() -> names() -- mnesia:system_info(tables) =:= [].
+
+is_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ names()).
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait(names()),
+ check(fun check_content/2);
+ Other -> Other
+ end.
+
+clear_ram_only_tables() ->
+ Node = node(),
+ lists:foreach(
+ fun (TabName) ->
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
+ end, names()),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
+%%--------------------------------------------------------------------
+
+create_local_copies(Type) ->
+ lists:foreach(
+ fun ({Tab, TabDef}) ->
+ HasDiscCopies = has_copy_type(TabDef, disc_copies),
+ HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies),
+ LocalTab = proplists:get_bool(local_content, TabDef),
+ StorageType =
+ if
+ Type =:= disc orelse LocalTab ->
+ if
+ HasDiscCopies -> disc_copies;
+ HasDiscOnlyCopies -> disc_only_copies;
+ true -> ram_copies
+ end;
+ Type =:= ram ->
+ ram_copies
+ end,
+ ok = create_local_copy(Tab, StorageType)
+ end, definitions(Type)),
+ ok.
+
+create_local_copy(Tab, Type) ->
+ StorageType = mnesia:table_info(Tab, storage_type),
+ {atomic, ok} =
+ if
+ StorageType == unknown ->
+ mnesia:add_table_copy(Tab, node(), Type);
+ StorageType /= Type ->
+ mnesia:change_table_copy_type(Tab, node(), Type);
+ true -> {atomic, ok}
+ end,
+ ok.
+
+has_copy_type(TabDef, DiscType) ->
+ lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
+
+check_attributes(Tab, TabDef) ->
+ {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
+ case mnesia:table_info(Tab, attributes) of
+ ExpAttrs -> ok;
+ Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}}
+ end.
+
+check_content(Tab, TabDef) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ ok;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> ok;
+ _ -> {error, {table_content_invalid, Tab, Match, ObjList}}
+ end
+ end.
+
+check(Fun) ->
+ case [Error || {Tab, TabDef} <- definitions(),
+ case Fun(Tab, TabDef) of
+ ok -> Error = none, false;
+ {error, Error} -> true
+ end] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
+%%--------------------------------------------------------------------
+%% Table definitions
+%%--------------------------------------------------------------------
+
+names() -> [Tab || {Tab, _} <- definitions()].
+
+%% The tables aren't supposed to be on disk on a ram node
+definitions(disc) ->
+ definitions();
+definitions(ram) ->
+ [{Tab, [{disc_copies, []}, {ram_copies, [node()]} |
+ proplists:delete(
+ ram_copies, proplists:delete(disc_copies, TabDef))]} ||
+ {Tab, TabDef} <- definitions()].
+
+definitions() ->
+ [{rabbit_user,
+ [{record_name, internal_user},
+ {attributes, record_info(fields, internal_user)},
+ {disc_copies, [node()]},
+ {match, #internal_user{_='_'}}]},
+ {rabbit_user_permission,
+ [{record_name, user_permission},
+ {attributes, record_info(fields, user_permission)},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
+ {rabbit_vhost,
+ [{record_name, vhost},
+ {attributes, record_info(fields, vhost)},
+ {disc_copies, [node()]},
+ {match, #vhost{_='_'}}]},
+ {rabbit_listener,
+ [{record_name, listener},
+ {attributes, record_info(fields, listener)},
+ {type, bag},
+ {match, #listener{_='_'}}]},
+ {rabbit_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_reverse_route,
+ [{record_name, reverse_route},
+ {attributes, record_info(fields, reverse_route)},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
+ {rabbit_durable_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
+ {rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, record_info(fields, runtime_parameters)},
+ {disc_copies, [node()]},
+ {match, #runtime_parameters{_='_'}}]},
+ {rabbit_durable_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)},
+ {disc_copies, [node()]},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
+ {rabbit_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ gm:table_definitions()
+ ++ mirrored_supervisor:table_definitions().
+
+binding_match() ->
+ #binding{source = exchange_name_match(),
+ destination = binding_destination_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{destination = binding_destination_match(),
+ source = exchange_name_match(),
+ _='_'}.
+binding_destination_match() ->
+ resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
+trie_edge_match() ->
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
+trie_binding_match() ->
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.