diff options
Diffstat (limited to 'src/rabbit_table.erl')
-rw-r--r-- | src/rabbit_table.erl | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl new file mode 100644 index 00000000..a29c57d5 --- /dev/null +++ b/src/rabbit_table.erl @@ -0,0 +1,311 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_table). + +-export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, + force_load/0, is_present/0, is_empty/0, + check_schema_integrity/0, clear_ram_only_tables/0]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(create/0 :: () -> 'ok'). +-spec(create_local_copy/1 :: ('disc' | 'ram') -> 'ok'). +-spec(wait_for_replicated/0 :: () -> 'ok'). +-spec(wait/1 :: ([atom()]) -> 'ok'). +-spec(force_load/0 :: () -> 'ok'). +-spec(is_present/0 :: () -> boolean()). +-spec(is_empty/0 :: () -> boolean()). +-spec(check_schema_integrity/0 :: () -> rabbit_types:ok_or_error(any())). +-spec(clear_ram_only_tables/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Main interface +%%---------------------------------------------------------------------------- + +create() -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, definitions()), + ok. + +%% The sequence in which we delete the schema and then the other +%% tables is important: if we delete the schema first when moving to +%% RAM mnesia will loudly complain since it doesn't make much sense to +%% do that. But when moving to disc, we need to move the schema first. +create_local_copy(disc) -> + create_local_copy(schema, disc_copies), + create_local_copies(disc); +create_local_copy(ram) -> + create_local_copies(ram), + create_local_copy(schema, ram_copies). + +wait_for_replicated() -> + wait([Tab || {Tab, TabDef} <- definitions(), + not lists:member({local_content, true}, TabDef)]). + +wait(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +force_load() -> [mnesia:force_load_table(T) || T <- names()], ok. + +is_present() -> names() -- mnesia:system_info(tables) =:= []. + +is_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + names()). + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait(names()), + check(fun check_content/2); + Other -> Other + end. + +clear_ram_only_tables() -> + Node = node(), + lists:foreach( + fun (TabName) -> + case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of + true -> {atomic, ok} = mnesia:clear_table(TabName); + false -> ok + end + end, names()), + ok. + +%%-------------------------------------------------------------------- +%% Internal helpers +%%-------------------------------------------------------------------- + +create_local_copies(Type) -> + lists:foreach( + fun ({Tab, TabDef}) -> + HasDiscCopies = has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), + StorageType = + if + Type =:= disc orelse LocalTab -> + if + HasDiscCopies -> disc_copies; + HasDiscOnlyCopies -> disc_only_copies; + true -> ram_copies + end; + Type =:= ram -> + ram_copies + end, + ok = create_local_copy(Tab, StorageType) + end, definitions(Type)), + ok. + +create_local_copy(Tab, Type) -> + StorageType = mnesia:table_info(Tab, storage_type), + {atomic, ok} = + if + StorageType == unknown -> + mnesia:add_table_copy(Tab, node(), Type); + StorageType /= Type -> + mnesia:change_table_copy_type(Tab, node(), Type); + true -> {atomic, ok} + end, + ok. + +has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + +check_attributes(Tab, TabDef) -> + {_, ExpAttrs} = proplists:lookup(attributes, TabDef), + case mnesia:table_info(Tab, attributes) of + ExpAttrs -> ok; + Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} + end. + +check_content(Tab, TabDef) -> + {_, Match} = proplists:lookup(match, TabDef), + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + ok; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> ok; + _ -> {error, {table_content_invalid, Tab, Match, ObjList}} + end + end. + +check(Fun) -> + case [Error || {Tab, TabDef} <- definitions(), + case Fun(Tab, TabDef) of + ok -> Error = none, false; + {error, Error} -> true + end] of + [] -> ok; + Errors -> {error, Errors} + end. + +%%-------------------------------------------------------------------- +%% Table definitions +%%-------------------------------------------------------------------- + +names() -> [Tab || {Tab, _} <- definitions()]. + +%% The tables aren't supposed to be on disk on a ram node +definitions(disc) -> + definitions(); +definitions(ram) -> + [{Tab, [{disc_copies, []}, {ram_copies, [node()]} | + proplists:delete( + ram_copies, proplists:delete(disc_copies, TabDef))]} || + {Tab, TabDef} <- definitions()]. + +definitions() -> + [{rabbit_user, + [{record_name, internal_user}, + {attributes, record_info(fields, internal_user)}, + {disc_copies, [node()]}, + {match, #internal_user{_='_'}}]}, + {rabbit_user_permission, + [{record_name, user_permission}, + {attributes, record_info(fields, user_permission)}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, + {rabbit_vhost, + [{record_name, vhost}, + {attributes, record_info(fields, vhost)}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, + {rabbit_listener, + [{record_name, listener}, + {attributes, record_info(fields, listener)}, + {type, bag}, + {match, #listener{_='_'}}]}, + {rabbit_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_semi_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_reverse_route, + [{record_name, reverse_route}, + {attributes, record_info(fields, reverse_route)}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, + {rabbit_durable_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, + {rabbit_runtime_parameters, + [{record_name, runtime_parameters}, + {attributes, record_info(fields, runtime_parameters)}, + {disc_copies, [node()]}, + {match, #runtime_parameters{_='_'}}]}, + {rabbit_durable_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, + {rabbit_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions() + ++ mirrored_supervisor:table_definitions(). + +binding_match() -> + #binding{source = exchange_name_match(), + destination = binding_destination_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{destination = binding_destination_match(), + source = exchange_name_match(), + _='_'}. +binding_destination_match() -> + resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. +trie_edge_match() -> + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. +trie_binding_match() -> + #trie_binding{exchange_name = exchange_name_match(), _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. |