diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-11 16:54:55 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-11 16:54:55 +0100 |
| commit | 215589110977e13e95d29024bb4f12e488d4d156 (patch) | |
| tree | 698e0ea96d853a9252cc070924edf8f6c7428d16 | |
| parent | f9b8ae679c9955c1be9dbfd1ad77e192d3269049 (diff) | |
| parent | 52c0123ccaae1a5b6a400d21b01a7c15c842e6bb (diff) | |
| download | rabbitmq-server-git-215589110977e13e95d29024bb4f12e488d4d156.tar.gz | |
merge bug23104 into default
| -rw-r--r-- | src/rabbit_mnesia.erl | 122 |
1 files changed, 89 insertions, 33 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d9a70797dd..4a5adfaeed 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -91,7 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = wait_for_tables(), ok. is_db_empty() -> @@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force), - ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -157,50 +155,80 @@ table_definitions() -> [{rabbit_user, [{record_name, user}, {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, {rabbit_vhost, [{record_name, vhost}, {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, {rabbit_config, [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, - {type, bag}]}, + {type, bag}, + {match, #listener{_='_'}}]}, {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_reverse_route, [{record_name, reverse_route}, {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + +binding_match() -> + #binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. table_names() -> [Tab || {Tab, _} <- table_definitions()]. @@ -232,26 +260,55 @@ ensure_mnesia_not_running() -> yes -> throw({error, mnesia_unexpectedly_running}) end. +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + check_schema_integrity() -> - TabDefs = table_definitions(), Tables = mnesia:system_info(tables), - case [Error || Tab <- table_names(), + case [Error || {Tab, TabDef} <- table_definitions(), case lists:member(Tab, Tables) of false -> Error = {table_missing, Tab}, true; true -> - {_, TabDef} = proplists:lookup(Tab, TabDefs), {_, ExpAttrs} = proplists:lookup(attributes, TabDef), Attrs = mnesia:table_info(Tab, attributes), Error = {table_attributes_mismatch, Tab, ExpAttrs, Attrs}, Attrs /= ExpAttrs end] of - [] -> ok; + [] -> check_table_integrity(); Errors -> {error, Errors} end. +check_table_integrity() -> + ok = wait_for_tables(), + case lists:all(fun ({Tab, TabDef}) -> + {_, Match} = proplists:lookup(match, TabDef), + read_test_table(Tab, Match) + end, table_definitions()) of + true -> ok; + false -> {error, invalid_table_content} + end. + +read_test_table(Tab, Match) -> + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + true; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> true; + _ -> false + end + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -347,8 +404,9 @@ init_db(ClusterNodes, Force) -> ok = create_local_table_copies(case IsDiskNode of true -> disc; false -> ram - end) - end; + end), + ok = ensure_schema_integrity() + end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -363,7 +421,9 @@ create_schema() -> cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - create_tables(). + ok = create_tables(), + ok = ensure_schema_integrity(), + ok = wait_for_tables(). move_db() -> mnesia:stop(), @@ -389,11 +449,12 @@ move_db() -> create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> - case mnesia:create_table(Tab, TabDef) of + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, - Tab, TabDef, Reason}}) + Tab, TabDef1, Reason}}) end end, table_definitions()), @@ -448,17 +509,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> - case check_schema_integrity() of - ok -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end; + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) + throw({error, {failed_waiting_for_tables, Reason}}) end. reset(Force) -> |
