summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_table.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_table.erl')
-rw-r--r--deps/rabbit/src/rabbit_table.erl416
1 files changed, 416 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_table.erl b/deps/rabbit/src/rabbit_table.erl
new file mode 100644
index 0000000000..77534763d0
--- /dev/null
+++ b/deps/rabbit/src/rabbit_table.erl
@@ -0,0 +1,416 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_table).
+
+-export([
+ create/0, create/2, ensure_local_copies/1, ensure_table_copy/2,
+ wait_for_replicated/1, wait/1, wait/2,
+ force_load/0, is_present/0, is_empty/0, needs_default_data/0,
+ check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
+ wait_for_replicated/0, exists/1]).
+
+%% for testing purposes
+-export([definitions/0]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-type retry() :: boolean().
+-type mnesia_table() :: atom().
+
+%%----------------------------------------------------------------------------
+%% Main interface
+%%----------------------------------------------------------------------------
+
+-spec create() -> 'ok'.
+
+create() ->
+ lists:foreach(
+ fun ({Table, Def}) -> create(Table, Def) end,
+ definitions()),
+ ensure_secondary_indexes(),
+ ok.
+
+-spec create(mnesia_table(), list()) -> rabbit_types:ok_or_error(any()).
+
+create(TableName, TableDefinition) ->
+ TableDefinition1 = proplists:delete(match, TableDefinition),
+ rabbit_log:debug("Will create a schema database table '~s'", [TableName]),
+ case mnesia:create_table(TableName, TableDefinition1) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists, TableName}} -> ok;
+ {aborted, {already_exists, TableName, _}} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed, TableName, TableDefinition1, Reason}})
+ end.
+
+-spec exists(mnesia_table()) -> boolean().
+exists(Table) ->
+ lists:member(Table, mnesia:system_info(tables)).
+
+%% Sets up secondary indexes in a blank node database.
+ensure_secondary_indexes() ->
+ ensure_secondary_index(rabbit_queue, vhost),
+ ok.
+
+ensure_secondary_index(Table, Field) ->
+ case mnesia:add_table_index(Table, Field) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, Table, _}} -> ok
+ end.
+
+-spec ensure_table_copy(mnesia_table(), node()) -> ok | {error, any()}.
+ensure_table_copy(TableName, Node) ->
+ rabbit_log:debug("Will add a local schema database copy for table '~s'", [TableName]),
+ case mnesia:add_table_copy(TableName, Node, disc_copies) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists, TableName}} -> ok;
+ {aborted, {already_exists, TableName, _}} -> ok;
+ {aborted, Reason} -> {error, Reason}
+ end.
+
+%% This arity only exists for backwards compatibility with certain
+%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
+
+-spec wait_for_replicated() -> 'ok'.
+
+wait_for_replicated() ->
+ wait_for_replicated(false).
+
+-spec wait_for_replicated(retry()) -> 'ok'.
+
+wait_for_replicated(Retry) ->
+ wait([Tab || {Tab, TabDef} <- definitions(),
+ not lists:member({local_content, true}, TabDef)], Retry).
+
+-spec wait([atom()]) -> 'ok'.
+
+wait(TableNames) ->
+ wait(TableNames, _Retry = false).
+
+wait(TableNames, Retry) ->
+ {Timeout, Retries} = retry_timeout(Retry),
+ wait(TableNames, Timeout, Retries).
+
+wait(TableNames, Timeout, Retries) ->
+ %% We might be in ctl here for offline ops, in which case we can't
+ %% get_env() for the rabbit app.
+ rabbit_log:info("Waiting for Mnesia tables for ~p ms, ~p retries left~n",
+ [Timeout, Retries - 1]),
+ Result = case mnesia:wait_for_tables(TableNames, Timeout) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
+ {error, {timeout_waiting_for_tables, AllNodes, BadTabs}};
+ {error, Reason} ->
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
+ {error, {failed_waiting_for_tables, AllNodes, Reason}}
+ end,
+ case {Retries, Result} of
+ {_, ok} ->
+ rabbit_log:info("Successfully synced tables from a peer"),
+ ok;
+ {1, {error, _} = Error} ->
+ throw(Error);
+ {_, {error, Error}} ->
+ rabbit_log:warning("Error while waiting for Mnesia tables: ~p~n", [Error]),
+ wait(TableNames, Timeout, Retries - 1)
+ end.
+
+retry_timeout(_Retry = false) ->
+ {retry_timeout(), 1};
+retry_timeout(_Retry = true) ->
+ Retries = case application:get_env(rabbit, mnesia_table_loading_retry_limit) of
+ {ok, T} -> T;
+ undefined -> 10
+ end,
+ {retry_timeout(), Retries}.
+
+-spec retry_timeout() -> non_neg_integer() | infinity.
+
+retry_timeout() ->
+ case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of
+ {ok, T} -> T;
+ undefined -> 30000
+ end.
+
+-spec force_load() -> 'ok'.
+
+force_load() -> [mnesia:force_load_table(T) || T <- names()], ok.
+
+-spec is_present() -> boolean().
+
+is_present() -> names() -- mnesia:system_info(tables) =:= [].
+
+-spec is_empty() -> boolean().
+
+is_empty() -> is_empty(names()).
+
+-spec needs_default_data() -> boolean().
+
+needs_default_data() -> is_empty([rabbit_user, rabbit_user_permission,
+ rabbit_vhost]).
+
+is_empty(Names) ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ Names).
+
+-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()).
+
+check_schema_integrity(Retry) ->
+ Tables = mnesia:system_info(tables),
+ case check(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> wait(names(), Retry),
+ check(fun check_content/2);
+ Other -> Other
+ end.
+
+-spec clear_ram_only_tables() -> 'ok'.
+
+clear_ram_only_tables() ->
+ Node = node(),
+ lists:foreach(
+ fun (TabName) ->
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
+ end, names()),
+ ok.
+
+%% The sequence in which we delete the schema and then the other
+%% tables is important: if we delete the schema first when moving to
+%% RAM mnesia will loudly complain since it doesn't make much sense to
+%% do that. But when moving to disc, we need to move the schema first.
+
+-spec ensure_local_copies('disc' | 'ram') -> 'ok'.
+
+ensure_local_copies(disc) ->
+ create_local_copy(schema, disc_copies),
+ create_local_copies(disc);
+ensure_local_copies(ram) ->
+ create_local_copies(ram),
+ create_local_copy(schema, ram_copies).
+
+%%--------------------------------------------------------------------
+%% Internal helpers
+%%--------------------------------------------------------------------
+
+create_local_copies(Type) ->
+ lists:foreach(
+ fun ({Tab, TabDef}) ->
+ HasDiscCopies = has_copy_type(TabDef, disc_copies),
+ HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies),
+ LocalTab = proplists:get_bool(local_content, TabDef),
+ StorageType =
+ if
+ Type =:= disc orelse LocalTab ->
+ if
+ HasDiscCopies -> disc_copies;
+ HasDiscOnlyCopies -> disc_only_copies;
+ true -> ram_copies
+ end;
+ Type =:= ram ->
+ ram_copies
+ end,
+ ok = create_local_copy(Tab, StorageType)
+ end, definitions(Type)),
+ ok.
+
+create_local_copy(Tab, Type) ->
+ StorageType = mnesia:table_info(Tab, storage_type),
+ {atomic, ok} =
+ if
+ StorageType == unknown ->
+ mnesia:add_table_copy(Tab, node(), Type);
+ StorageType /= Type ->
+ mnesia:change_table_copy_type(Tab, node(), Type);
+ true -> {atomic, ok}
+ end,
+ ok.
+
+has_copy_type(TabDef, DiscType) ->
+ lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
+
+check_attributes(Tab, TabDef) ->
+ {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
+ case mnesia:table_info(Tab, attributes) of
+ ExpAttrs -> ok;
+ Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}}
+ end.
+
+check_content(Tab, TabDef) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ ok;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> ok;
+ _ -> {error, {table_content_invalid, Tab, Match, ObjList}}
+ end
+ end.
+
+check(Fun) ->
+ case [Error || {Tab, TabDef} <- definitions(),
+ begin
+ {Ret, Error} = case Fun(Tab, TabDef) of
+ ok -> {false, none};
+ {error, E} -> {true, E}
+ end,
+ Ret
+ end] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
+%%--------------------------------------------------------------------
+%% Table definitions
+%%--------------------------------------------------------------------
+
+names() -> [Tab || {Tab, _} <- definitions()].
+
+%% The tables aren't supposed to be on disk on a ram node
+definitions(disc) ->
+ definitions();
+definitions(ram) ->
+ [{Tab, [{disc_copies, []}, {ram_copies, [node()]} |
+ proplists:delete(
+ ram_copies, proplists:delete(disc_copies, TabDef))]} ||
+ {Tab, TabDef} <- definitions()].
+
+definitions() ->
+ [{rabbit_user,
+ [{record_name, internal_user},
+ {attributes, internal_user:fields()},
+ {disc_copies, [node()]},
+ {match, internal_user:pattern_match_all()}]},
+ {rabbit_user_permission,
+ [{record_name, user_permission},
+ {attributes, record_info(fields, user_permission)},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
+ {rabbit_topic_permission,
+ [{record_name, topic_permission},
+ {attributes, record_info(fields, topic_permission)},
+ {disc_copies, [node()]},
+ {match, #topic_permission{topic_permission_key = #topic_permission_key{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
+ {rabbit_vhost,
+ [
+ {record_name, vhost},
+ {attributes, vhost:fields()},
+ {disc_copies, [node()]},
+ {match, vhost:pattern_match_all()}]},
+ {rabbit_listener,
+ [{record_name, listener},
+ {attributes, record_info(fields, listener)},
+ {type, bag},
+ {match, #listener{_='_'}}]},
+ {rabbit_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_reverse_route,
+ [{record_name, reverse_route},
+ {attributes, record_info(fields, reverse_route)},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
+ {rabbit_durable_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
+ {rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, record_info(fields, runtime_parameters)},
+ {disc_copies, [node()]},
+ {match, #runtime_parameters{_='_'}}]},
+ {rabbit_durable_queue,
+ [{record_name, amqqueue},
+ {attributes, amqqueue:fields()},
+ {disc_copies, [node()]},
+ {match, amqqueue:pattern_match_on_name(queue_name_match())}]},
+ {rabbit_queue,
+ [{record_name, amqqueue},
+ {attributes, amqqueue:fields()},
+ {match, amqqueue:pattern_match_on_name(queue_name_match())}]}
+ ]
+ ++ gm:table_definitions()
+ ++ mirrored_supervisor:table_definitions().
+
+binding_match() ->
+ #binding{source = exchange_name_match(),
+ destination = binding_destination_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{destination = binding_destination_match(),
+ source = exchange_name_match(),
+ _='_'}.
+binding_destination_match() ->
+ resource_match('_').
+trie_node_match() ->
+ #trie_node{exchange_name = exchange_name_match(), _='_'}.
+trie_edge_match() ->
+ #trie_edge{exchange_name = exchange_name_match(), _='_'}.
+trie_binding_match() ->
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.