diff options
author | Daniil Fedotov <hairyhum@gmail.com> | 2018-07-31 17:25:19 +0100 |
---|---|---|
committer | Luke Bakken <lbakken@pivotal.io> | 2020-01-09 17:59:11 -0800 |
commit | a9746c3a992ffedcc32915ccecaf96227ca70ff9 (patch) | |
tree | 730f4d0ec9e6d6dde05b0992976a3e81dafab28e | |
parent | b3c5fae8fb12b5a194f7f6e652dbb03ebf520817 (diff) | |
download | rabbitmq-server-git-mnevis-experimental.tar.gz |
Experiment to integrate ramnesia into RabbitMQ.mnevis-experimental
Can run multiple nodes using predefined config.
Runs from the virgin node state..
Can declare a queue.
Logs are written to stdout to be visible on startup.
Rename ramnesia to mnevis
WIP. Fixed startup
wip
Startup and recovery with mnevis
Cleanup, use the right branch
user cluster_nodes as initial_nodes in mnevis
Use mnevis transaction options to ensure local node may do dirty reads after transactions.
RabbitMQ relies on data being readable with dirty_read after creating entries.
Make sure mnevis node starts
Specify ra branch
Comment unused functions
Move mnevis dependency to rabbit_common
Testing out replicated rabbit_listener
fixup
Add leader node to status, add mnevis TODOs
Rename leader to mnevis_leader in node info
WIP. Remove rabbit_node_monitor.
rabbit_node_monitor is there to compensate for mnesia parittion handling
behaviour. Without mnesia clustering there is not much sense in it.
There are still some places where rabbit_node_monitor was used.
Marked with TODOs.
Feature flags through mnevis.
Make sure feature flags use mnevis API instead of the mnesia API.
-rw-r--r-- | rabbit.config | 14 | ||||
-rw-r--r-- | src/amqqueue_v1.erl | 4 | ||||
-rw-r--r-- | src/gm.erl | 4 | ||||
-rw-r--r-- | src/rabbit.erl | 59 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 34 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 459 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 21 | ||||
-rw-r--r-- | src/rabbit_connection_tracking.erl | 20 | ||||
-rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 5 | ||||
-rw-r--r-- | src/rabbit_core_ff.erl | 8 | ||||
-rw-r--r-- | src/rabbit_feature_flags.erl | 9 | ||||
-rw-r--r-- | src/rabbit_health_check.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 342 | ||||
-rw-r--r-- | src/rabbit_mnesia_rename.erl | 8 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 19 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 932 | ||||
-rw-r--r-- | src/rabbit_prelaunch_cluster.erl | 2 | ||||
-rw-r--r-- | src/rabbit_table.erl | 25 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 9 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 4 | ||||
-rw-r--r-- | test/amqqueue_backward_compatibility_SUITE.erl | 2 | ||||
-rw-r--r-- | test/mirrored_supervisor_SUITE.erl | 4 |
23 files changed, 393 insertions, 1607 deletions
diff --git a/rabbit.config b/rabbit.config new file mode 100644 index 0000000000..030e3dbdbc --- /dev/null +++ b/rabbit.config @@ -0,0 +1,14 @@ +[ +{rabbit, [ + {cluster_nodes, {['rabbit@localhost' + , + 'rabbit1@localhost' + ], disc}} + ]}, +{ramnesia, [ + {initial_nodes, ['rabbit@localhost' + , + 'rabbit1@localhost' + ]}]}, +{ra, [{data_dir, "/tmp/ramnesia"}]} +]. diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl index 95d7083362..10cb36bc52 100644 --- a/src/amqqueue_v1.erl +++ b/src/amqqueue_v1.erl @@ -334,9 +334,9 @@ record_version_to_use() -> upgrade(#amqqueue{} = Queue) -> Queue. --spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). +-spec upgrade_to(amqqueue(), amqqueue_v1) -> amqqueue(). -upgrade_to(?record_version, #amqqueue{} = Queue) -> +upgrade_to(#amqqueue{} = Queue, ?record_version) -> Queue. % arguments diff --git a/src/gm.erl b/src/gm.erl index 353ad41add..e10be91677 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -488,7 +488,7 @@ create_tables() -> create_tables([]) -> ok; create_tables([{Table, Attributes} | Tables]) -> - case mnesia:create_table(Table, Attributes) of + case mnevis:create_table(Table, Attributes) of {atomic, ok} -> create_tables(Tables); {aborted, {already_exists, Table}} -> create_tables(Tables); Err -> Err @@ -535,7 +535,7 @@ validate_members(Server, Members) -> -spec forget_group(group_name()) -> 'ok'. forget_group(GroupName) -> - {atomic, ok} = mnesia:sync_transaction( + {atomic, ok} = rabbit_misc:mnevis_transaction( fun () -> mnesia:delete({?GROUP_TABLE, GroupName}) end), diff --git a/src/rabbit.erl b/src/rabbit.erl index 7f16d0b1d1..600e1893c5 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -142,12 +142,12 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_node_monitor, - [{description, "node monitor"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_node_monitor]}}, - {requires, [rabbit_alarm, guid_generator]}, - {enables, core_initialized}]}). +% -rabbit_boot_step({rabbit_node_monitor, +% [{description, "node monitor"}, +% {mfa, {rabbit_sup, start_restartable_child, +% [rabbit_node_monitor]}}, +% {requires, [rabbit_alarm, guid_generator]}, +% {enables, core_initialized}]}). -rabbit_boot_step({rabbit_epmd_monitor, [{description, "epmd monitor"}, @@ -167,13 +167,13 @@ [{description, "core initialized"}, {requires, kernel_ready}]}). --rabbit_boot_step({upgrade_queues, - [{description, "per-vhost message store migration"}, - {mfa, {rabbit_upgrade, - maybe_migrate_queues_to_per_vhost_storage, - []}}, - {requires, [core_initialized]}, - {enables, recovery}]}). +% -rabbit_boot_step({upgrade_queues, +% [{description, "per-vhost message store migration"}, +% {mfa, {rabbit_upgrade, +% maybe_migrate_queues_to_per_vhost_storage, +% []}}, +% {requires, [core_initialized]}, +% {enables, recovery}]}). -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, @@ -241,15 +241,15 @@ {requires, pre_flight} ]}). --rabbit_boot_step({notify_cluster, - [{description, "notifies cluster peers of our presence"}, - {mfa, {rabbit_node_monitor, notify_node_up, []}}, - {requires, pre_flight}]}). +% -rabbit_boot_step({notify_cluster, +% [{description, "notifies cluster peers of our presence"}, +% {mfa, {rabbit_node_monitor, notify_node_up, []}}, +% {requires, pre_flight}]}). -rabbit_boot_step({networking, [{description, "TCP and TLS listeners"}, {mfa, {rabbit_networking, boot, []}}, - {requires, notify_cluster}]}). + {requires, pre_flight}]}). %%--------------------------------------------------------------------------- @@ -280,6 +280,7 @@ start() -> %% start() vs. boot(): we want to throw an error in start(). + % TODO COMMENT OUT rabbit_mnesia:check_cluster_consistency(), start_it(temporary). -spec boot() -> 'ok'. @@ -320,6 +321,8 @@ run_prelaunch_second_phase() -> #{initial_pass := IsInitialPass} = Context = rabbit_prelaunch:get_context(), + set_mnevis_initial_nodes(), + case IsInitialPass of true -> rabbit_log_prelaunch:debug(""), @@ -367,6 +370,16 @@ run_prelaunch_second_phase() -> end, ok. +set_mnevis_initial_nodes() -> + case application:get_env(mnevis, initial_nodes, none) of + none -> + case application:get_env(rabbit, cluster_nodes, none) of + none -> ok; + {Nodes, disc} -> application:set_env(mnevis, initial_nodes, Nodes) + end; + _ -> ok + end. + %% Try to send systemd ready notification if it makes sense in the %% current environment. standard_error is used intentionally in all %% logging statements, so all this messages will end in systemd @@ -1049,7 +1062,9 @@ boot_delegate() -> -spec recover() -> 'ok'. recover() -> + io:format("Recover policy ~n"), ok = rabbit_policy:recover(), + io:format("Recover vhost ~n"), ok = rabbit_vhost:recover(), ok = lager_exchange_backend:maybe_init_exchange(). @@ -1057,7 +1072,13 @@ recover() -> maybe_insert_default_data() -> case rabbit_table:needs_default_data() of - true -> insert_default_data(); + true -> + {ok, _, {_, Leader}} = ra:members(mnevis_node:node_id()), + case Leader == node() of + true -> + insert_default_data(); + false -> ok + end; false -> ok end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5ff2e09636..3b5d43f4ff 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1794,7 +1794,7 @@ forget_all_durable(Node) -> %% Note rabbit is not running so we avoid e.g. the worker pool. Also why %% we don't invoke the return from rabbit_binding:process_deletions/1. {atomic, ok} = - mnesia:sync_transaction( + rabbit_misc:mnevis_transaction( fun () -> Qs = mnesia:match_object(rabbit_durable_queue, amqqueue:pattern_match_all(), write), @@ -1968,10 +1968,12 @@ delete_queues_on_node_down(Node) -> lists:unzip(lists:flatten([ rabbit_misc:execute_mnesia_transaction( fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end - ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node)) + ) + || Queues <- partition_queues(queues_to_delete_when_node_down(Node)) ])). delete_queue(QueueName) -> +io:format("Delete queue ~p~n", [QueueName]), ok = mnesia:delete({rabbit_queue, QueueName}), rabbit_binding:remove_transient_for_destination(QueueName). @@ -1990,13 +1992,27 @@ partition_queues(T) -> queues_to_delete_when_node_down(NodeDown) -> rabbit_misc:execute_mnesia_transaction(fun () -> - qlc:e(qlc:q([amqqueue:get_name(Q) || - Q <- mnesia:table(rabbit_queue), - amqqueue:qnode(Q) == NodeDown andalso - not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso - (not rabbit_amqqueue:is_replicated(Q) orelse - rabbit_amqqueue:is_dead_exclusive(Q))] - )) + % qlc:e(qlc:q([amqqueue:get_name(Q) || + % Q <- mnesia:table(rabbit_queue), + % amqqueue:qnode(Q) == NodeDown andalso + % not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso + % (not rabbit_amqqueue:is_replicated(Q) orelse + % rabbit_amqqueue:is_dead_exclusive(Q))] + % )) + mnesia:foldl(fun(Q, Acc) -> + case amqqueue:qnode(Q) == NodeDown andalso + not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso + (not rabbit_amqqueue:is_replicated(Q) orelse + rabbit_amqqueue:is_dead_exclusive(Q)) of + true -> + [amqqueue:get_name(Q) | Acc]; + false -> + Acc + end + end, + [], + rabbit_queue) + end). notify_queue_binding_deletions(QueueDeletions) -> diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl deleted file mode 100644 index 77165fc26c..0000000000 --- a/src/rabbit_autoheal.erl +++ /dev/null @@ -1,459 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at https://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_autoheal). - --export([init/0, enabled/0, maybe_start/1, rabbit_down/2, node_down/2, - handle_msg/3, process_down/2]). - -%% The named process we are running in. --define(SERVER, rabbit_node_monitor). - --define(MNESIA_STOPPED_PING_INTERNAL, 200). - --define(AUTOHEAL_STATE_AFTER_RESTART, rabbit_autoheal_state_after_restart). - -%%---------------------------------------------------------------------------- - -%% In order to autoheal we want to: -%% -%% * Find the winning partition -%% * Stop all nodes in other partitions -%% * Wait for them all to be stopped -%% * Start them again -%% -%% To keep things simple, we assume all nodes are up. We don't start -%% unless all nodes are up, and if a node goes down we abandon the -%% whole process. To further keep things simple we also defer the -%% decision as to the winning node to the "leader" - arbitrarily -%% selected as the first node in the cluster. -%% -%% To coordinate the restarting nodes we pick a special node from the -%% winning partition - the "winner". Restarting nodes then stop, and -%% wait for it to tell them it is safe to start again. The winner -%% determines that a node has stopped just by seeing if its rabbit app -%% stops - if a node stops for any other reason it just gets a message -%% it will ignore, and otherwise we carry on. -%% -%% Meanwhile, the leader may continue to receive new autoheal requests: -%% all of them are ignored. The winner notifies the leader when the -%% current autoheal process is finished (ie. when all losers stopped and -%% were asked to start again) or was aborted. When the leader receives -%% the notification or if it looses contact with the winner, it can -%% accept new autoheal requests. -%% -%% The winner and the leader are not necessarily the same node. -%% -%% The leader can be a loser and will restart in this case. It remembers -%% there is an autoheal in progress by temporarily saving the autoheal -%% state to the application environment. -%% -%% == Possible states == -%% -%% not_healing -%% - the default -%% -%% {winner_waiting, OutstandingStops, Notify} -%% - we are the winner and are waiting for all losing nodes to stop -%% before telling them they can restart -%% -%% {leader_waiting, Winner, Notify} -%% - we are the leader, and have already assigned the winner and losers. -%% We are waiting for a confirmation from the winner that the autoheal -%% process has ended. Meanwhile we can ignore autoheal requests. -%% Because we may be a loser too, this state is saved to the application -%% environment and restored on startup. -%% -%% restarting -%% - we are restarting. Of course the node monitor immediately dies -%% then so this state does not last long. We therefore send the -%% autoheal_safe_to_start message to the rabbit_outside_app_process -%% instead. -%% -%% == Message flow == -%% -%% 1. Any node (leader included) >> {request_start, node()} >> Leader -%% When Mnesia detects it is running partitioned or -%% when a remote node starts, rabbit_node_monitor calls -%% rabbit_autoheal:maybe_start/1. The message above is sent to the -%% leader so the leader can take a decision. -%% -%% 2. Leader >> {become_winner, Losers} >> Winner -%% The leader notifies the winner so the latter can proceed with -%% the autoheal. -%% -%% 3. Winner >> {winner_is, Winner} >> All losers -%% The winner notifies losers they must stop. -%% -%% 4. Winner >> autoheal_safe_to_start >> All losers -%% When either all losers stopped or the autoheal process was -%% aborted, the winner notifies losers they can start again. -%% -%% 5. Leader >> report_autoheal_status >> Winner -%% The leader asks the autoheal status to the winner. This only -%% happens when the leader is a loser too. If this is not the case, -%% this message is never sent. -%% -%% 6. Winner >> {autoheal_finished, Winner} >> Leader -%% The winner notifies the leader that the autoheal process was -%% either finished or aborted (ie. autoheal_safe_to_start was sent -%% to losers). - -%%---------------------------------------------------------------------------- - -init() -> - %% We check the application environment for a saved autoheal state - %% saved during a restart. If this node is a leader, it is used - %% to determine if it needs to ask the winner to report about the - %% autoheal progress. - State = case application:get_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART) of - {ok, S} -> S; - undefined -> not_healing - end, - ok = application:unset_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART), - case State of - {leader_waiting, Winner, _} -> - rabbit_log:info( - "Autoheal: in progress, requesting report from ~p~n", [Winner]), - send(Winner, report_autoheal_status); - _ -> - ok - end, - State. - -maybe_start(not_healing) -> - case enabled() of - true -> Leader = leader(), - send(Leader, {request_start, node()}), - rabbit_log:info("Autoheal request sent to ~p~n", [Leader]), - not_healing; - false -> not_healing - end; -maybe_start(State) -> - State. - -enabled() -> - case application:get_env(rabbit, cluster_partition_handling) of - {ok, autoheal} -> true; - {ok, {pause_if_all_down, _, autoheal}} -> true; - _ -> false - end. - -leader() -> - [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), - Leader. - -%% This is the winner receiving its last notification that a node has -%% stopped - all nodes can now start again -rabbit_down(Node, {winner_waiting, [Node], Notify}) -> - rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), - winner_finish(Notify); - -rabbit_down(Node, {winner_waiting, WaitFor, Notify}) -> - {winner_waiting, WaitFor -- [Node], Notify}; - -rabbit_down(Winner, {leader_waiting, Winner, Losers}) -> - abort([Winner], Losers); - -rabbit_down(_Node, State) -> - %% Ignore. Either: - %% o we already cancelled the autoheal process; - %% o we are still waiting the winner's report. - State. - -node_down(_Node, not_healing) -> - not_healing; - -node_down(Node, {winner_waiting, _, Notify}) -> - abort([Node], Notify); - -node_down(Node, {leader_waiting, Node, _Notify}) -> - %% The winner went down, we don't know what to do so we simply abort. - rabbit_log:info("Autoheal: aborting - winner ~p went down~n", [Node]), - not_healing; - -node_down(Node, {leader_waiting, _, _} = St) -> - %% If it is a partial partition, the winner might continue with the - %% healing process. If it is a full partition, the winner will also - %% see it and abort. Let's wait for it. - rabbit_log:info("Autoheal: ~p went down, waiting for winner decision ~n", [Node]), - St; - -node_down(Node, _State) -> - rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), - not_healing. - -%% If the process that has to restart the node crashes for an unexpected reason, -%% we go back to a not healing state so the node is able to recover. -process_down({'EXIT', Pid, Reason}, {restarting, Pid}) when Reason =/= normal -> - rabbit_log:info("Autoheal: aborting - the process responsible for restarting the " - "node terminated with reason: ~p~n", [Reason]), - not_healing; - -process_down(_, State) -> - State. - -%% By receiving this message we become the leader -%% TODO should we try to debounce this? -handle_msg({request_start, Node}, - not_healing, Partitions) -> - rabbit_log:info("Autoheal request received from ~p~n", [Node]), - case check_other_nodes(Partitions) of - {error, E} -> - rabbit_log:info("Autoheal request denied: ~s~n", [fmt_error(E)]), - not_healing; - {ok, AllPartitions} -> - {Winner, Losers} = make_decision(AllPartitions), - rabbit_log:info("Autoheal decision~n" - " * Partitions: ~p~n" - " * Winner: ~p~n" - " * Losers: ~p~n", - [AllPartitions, Winner, Losers]), - case node() =:= Winner of - true -> handle_msg({become_winner, Losers}, - not_healing, Partitions); - false -> send(Winner, {become_winner, Losers}), - {leader_waiting, Winner, Losers} - end - end; - -handle_msg({request_start, Node}, - State, _Partitions) -> - rabbit_log:info("Autoheal request received from ~p when healing; " - "ignoring~n", [Node]), - State; - -handle_msg({become_winner, Losers}, - not_healing, _Partitions) -> - rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n", - [Losers]), - stop_partition(Losers); - -handle_msg({become_winner, Losers}, - {winner_waiting, _, Losers}, _Partitions) -> - %% The leader has aborted the healing, might have seen us down but - %% we didn't see the same. Let's try again as it is the same partition. - rabbit_log:info("Autoheal: I am the winner and received a duplicated " - "request, waiting again for ~p to stop~n", [Losers]), - stop_partition(Losers); - -handle_msg({become_winner, _}, - {winner_waiting, _, Losers}, _Partitions) -> - %% Something has happened to the leader, it might have seen us down but we - %% are still alive. Partitions have changed, cannot continue. - rabbit_log:info("Autoheal: I am the winner and received another healing " - "request, partitions have changed to ~p. Aborting ~n", [Losers]), - winner_finish(Losers), - not_healing; - -handle_msg({winner_is, Winner}, State = not_healing, - _Partitions) -> - %% This node is a loser, nothing else. - Pid = restart_loser(State, Winner), - {restarting, Pid}; -handle_msg({winner_is, Winner}, State = {leader_waiting, Winner, _}, - _Partitions) -> - %% This node is the leader and a loser at the same time. - Pid = restart_loser(State, Winner), - {restarting, Pid}; - -handle_msg(Request, {restarting, Pid} = St, _Partitions) -> - %% ignore, we can contribute no further - rabbit_log:info("Autoheal: Received the request ~p while waiting for ~p " - "to restart the node. Ignoring it ~n", [Request, Pid]), - St; - -handle_msg(report_autoheal_status, not_healing, _Partitions) -> - %% The leader is asking about the autoheal status to us (the - %% winner). This happens when the leader is a loser and it just - %% restarted. We are in the "not_healing" state, so the previous - %% autoheal process ended: let's tell this to the leader. - send(leader(), {autoheal_finished, node()}), - not_healing; - -handle_msg(report_autoheal_status, State, _Partitions) -> - %% Like above, the leader is asking about the autoheal status. We - %% are not finished with it. There is no need to send anything yet - %% to the leader: we will send the notification when it is over. - State; - -handle_msg({autoheal_finished, Winner}, - {leader_waiting, Winner, _}, _Partitions) -> - %% The winner is finished with the autoheal process and notified us - %% (the leader). We can transition to the "not_healing" state and - %% accept new requests. - rabbit_log:info("Autoheal finished according to winner ~p~n", [Winner]), - not_healing; - -handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) - when Winner =:= node() -> - %% We are the leader and the winner. The state already transitioned - %% to "not_healing" at the end of the autoheal process. - rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]), - not_healing; - -handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) -> - %% We might have seen the winner down during a partial partition and - %% transitioned to not_healing. However, the winner was still able - %% to finish. Let it pass. - rabbit_log:info("Autoheal finished according to winner ~p." - " Unexpected, I might have previously seen the winner down~n", [Winner]), - not_healing. - -%%---------------------------------------------------------------------------- - -send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. - -abort(Down, Notify) -> - rabbit_log:info("Autoheal: aborting - ~p down~n", [Down]), - %% Make sure any nodes waiting for us start - it won't necessarily - %% heal the partition but at least they won't get stuck. - %% If we are executing this, we are not stopping. Thus, don't wait - %% for ourselves! - winner_finish(Notify -- [node()]). - -winner_finish(Notify) -> - %% There is a race in Mnesia causing a starting loser to hang - %% forever if another loser stops at the same time: the starting - %% node connects to the other node, negotiates the protocol and - %% attempts to acquire a write lock on the schema on the other node. - %% If the other node stops between the protocol negotiation and lock - %% request, the starting node never gets an answer to its lock - %% request. - %% - %% To work around the problem, we make sure Mnesia is stopped on all - %% losing nodes before sending the "autoheal_safe_to_start" signal. - wait_for_mnesia_shutdown(Notify), - [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], - send(leader(), {autoheal_finished, node()}), - not_healing. - -%% This improves the previous implementation, but could still potentially enter an infinity -%% loop. If it also possible that for when it finishes some of the nodes have been -%% manually restarted, but we can't do much more (apart from stop them again). So let it -%% continue and notify all the losers to restart. -wait_for_mnesia_shutdown(AllNodes) -> - Monitors = lists:foldl(fun(Node, Monitors0) -> - pmon:monitor({mnesia_sup, Node}, Monitors0) - end, pmon:new(), AllNodes), - wait_for_supervisors(Monitors). - -wait_for_supervisors(Monitors) -> - case pmon:is_empty(Monitors) of - true -> - ok; - false -> - receive - {'DOWN', _MRef, process, {mnesia_sup, _} = I, _Reason} -> - wait_for_supervisors(pmon:erase(I, Monitors)) - after - 60000 -> - AliveLosers = [Node || {_, Node} <- pmon:monitored(Monitors)], - rabbit_log:info("Autoheal: mnesia in nodes ~p is still up, sending " - "winner notification again to these ~n", [AliveLosers]), - [send(L, {winner_is, node()}) || L <- AliveLosers], - wait_for_mnesia_shutdown(AliveLosers) - end - end. - -restart_loser(State, Winner) -> - rabbit_log:warning( - "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), - rabbit_node_monitor:run_outside_applications( - fun () -> - MRef = erlang:monitor(process, {?SERVER, Winner}), - rabbit:stop(), - NextState = receive - {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> - not_healing; - autoheal_safe_to_start -> - State - end, - erlang:demonitor(MRef, [flush]), - %% During the restart, the autoheal state is lost so we - %% store it in the application environment temporarily so - %% init/0 can pick it up. - %% - %% This is useful to the leader which is a loser at the - %% same time: because the leader is restarting, there - %% is a great chance it misses the "autoheal finished!" - %% notification from the winner. Thanks to the saved - %% state, it knows it needs to ask the winner if the - %% autoheal process is finished or not. - application:set_env(rabbit, - ?AUTOHEAL_STATE_AFTER_RESTART, NextState), - rabbit:start() - end, true). - -make_decision(AllPartitions) -> - Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), - [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), - {Winner, lists:append(Rest)}. - -partition_value(Partition) -> - Connections = [Res || Node <- Partition, - Res <- [rpc:call(Node, rabbit_networking, - connections_local, [])], - is_list(Res)], - {length(lists:append(Connections)), length(Partition)}. - -%% We have our local understanding of what partitions exist; but we -%% only know which nodes we have been partitioned from, not which -%% nodes are partitioned from each other. -check_other_nodes(LocalPartitions) -> - Nodes = rabbit_mnesia:cluster_nodes(all), - {Results, Bad} = rabbit_node_monitor:status(Nodes -- [node()]), - RemotePartitions = [{Node, proplists:get_value(partitions, Res)} - || {Node, Res} <- Results], - RemoteDown = [{Node, Down} - || {Node, Res} <- Results, - Down <- [Nodes -- proplists:get_value(nodes, Res)], - Down =/= []], - case {Bad, RemoteDown} of - {[], []} -> Partitions = [{node(), LocalPartitions} | RemotePartitions], - {ok, all_partitions(Partitions, [Nodes])}; - {[], _} -> {error, {remote_down, RemoteDown}}; - {_, _} -> {error, {nodes_down, Bad}} - end. - -all_partitions([], Partitions) -> - Partitions; -all_partitions([{Node, CantSee} | Rest], Partitions) -> - {[Containing], Others} = - lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions), - A = Containing -- CantSee, - B = Containing -- A, - Partitions1 = case {A, B} of - {[], _} -> Partitions; - {_, []} -> Partitions; - _ -> [A, B | Others] - end, - all_partitions(Rest, Partitions1). - -fmt_error({remote_down, RemoteDown}) -> - rabbit_misc:format("Remote nodes disconnected:~n ~p", [RemoteDown]); -fmt_error({nodes_down, NodesDown}) -> - rabbit_misc:format("Local nodes down: ~p", [NodesDown]). - -stop_partition(Losers) -> - %% The leader said everything was ready - do we agree? If not then - %% give up. - Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers), - case Down of - [] -> [send(L, {winner_is, node()}) || L <- Losers], - {winner_waiting, Losers, Losers}; - _ -> abort(Down, Losers) - end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4fd218754d..5570aca1b1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2265,8 +2265,9 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) -> send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> - case rabbit_node_monitor:pause_partition_guard() of - ok -> + %% TODO: rabbit_node_monitor + % case rabbit_node_monitor:pause_partition_guard() of + % ok -> Confirms = lists:append(C), Rejects = lists:append(R), ConfirmMsgSeqNos = @@ -2285,13 +2286,17 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> send_nacks(RejectMsgSeqNos, ConfirmMsgSeqNos, State1#ch{rejected = []}); - pausing -> State - end; + % pausing -> State + % end; send_confirms_and_nacks(State) -> - case rabbit_node_monitor:pause_partition_guard() of - ok -> maybe_complete_tx(State); - pausing -> State - end. + %% TODO: rabbit_node_monitor + % case rabbit_node_monitor:pause_partition_guard() of + % ok -> + maybe_complete_tx(State) + % ; + % pausing -> State + % end + . send_nacks([], _, State) -> State; diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index 403fbf4191..e1781a6253 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -186,7 +186,7 @@ ensure_per_vhost_tracked_connections_table_for_this_node() -> ensure_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_connection}, + case mnevis:create_table(TableName, [{record_name, tracked_connection}, {attributes, record_info(fields, tracked_connection)}]) of {atomic, ok} -> ok; {aborted, {already_exists, _}} -> ok; @@ -200,7 +200,7 @@ ensure_tracked_connections_table_for_node(Node) -> ensure_per_vhost_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost}, + case mnevis:create_table(TableName, [{record_name, tracked_connection_per_vhost}, {attributes, record_info(fields, tracked_connection_per_vhost)}]) of {atomic, ok} -> ok; {aborted, {already_exists, _}} -> ok; @@ -227,7 +227,7 @@ clear_tracked_connection_tables_for_this_node() -> delete_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_table_name_for(Node), - case mnesia:delete_table(TableName) of + case mnevis:delete_table(TableName) of {atomic, ok} -> ok; {aborted, {no_exists, _}} -> ok; {aborted, Error} -> @@ -240,7 +240,7 @@ delete_tracked_connections_table_for_node(Node) -> delete_per_vhost_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:delete_table(TableName) of + case mnevis:delete_table(TableName) of {atomic, ok} -> ok; {aborted, {no_exists, _}} -> ok; {aborted, Error} -> @@ -368,9 +368,15 @@ count_connections_in(VirtualHost) -> lists:foldl(fun (Node, Acc) -> Tab = tracked_connection_per_vhost_table_name_for(Node), try - N = case mnesia:dirty_read(Tab, VirtualHost) of - [] -> 0; - [Val] -> Val#tracked_connection_per_vhost.connection_count + N = case rabbit_misc:mnevis_transaction( + fun() -> + case mnesia:dirty_read({Tab, VirtualHost}) of + [] -> 0; + [Val] -> Val#tracked_connection_per_vhost.connection_count + end + end) of + {atomic, Val} -> Val; + {aborted, _Reason} -> 0 end, Acc + N catch _:Err -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index bc27d59363..92bc6bc2dc 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -46,7 +46,10 @@ [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_connection_tracking]}}, - {requires, [rabbit_event, rabbit_node_monitor]}, + {requires, [rabbit_event + %% TODO: rabbit_node_monitor + % ,rabbit_node_monitor + ]}, {enables, ?MODULE}]}). %% diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl index 03f686fb33..fe83a68d2e 100644 --- a/src/rabbit_core_ff.erl +++ b/src/rabbit_core_ff.erl @@ -59,14 +59,13 @@ quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) -> Tables = ?quorum_queue_tables, rabbit_table:wait(Tables, _Retry = true), Fields = amqqueue:fields(amqqueue_v2), - mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso - mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields. + mnevis:table_info(rabbit_queue, attributes) =:= Fields andalso + mnevis:table_info(rabbit_durable_queue, attributes) =:= Fields. migrate_to_amqqueue_with_type(FeatureName, [Table | Rest], Fields) -> rabbit_log:info("Feature flag `~s`: migrating Mnesia table ~s...", [FeatureName, Table]), - Fun = fun(Queue) -> amqqueue:upgrade_to(amqqueue_v2, Queue) end, - case mnesia:transform_table(Table, Fun, Fields) of + case mnevis:transform_table(Table, {amqqueue, upgrade_to, [amqqueue_v2]}, Fields) of {atomic, ok} -> migrate_to_amqqueue_with_type(FeatureName, Rest, Fields); @@ -86,6 +85,7 @@ implicit_default_bindings_migration(FeatureName, _FeatureProps, %% Default exchange bindings are now implicit (not stored in the %% route tables). It should be safe to remove them outside of a %% transaction. + %% TODO: how safe is it in mnevis? rabbit_table:wait([rabbit_queue]), Queues = mnesia:dirty_all_keys(rabbit_queue), remove_explicit_default_bindings(FeatureName, Queues); diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl index 941d1617e7..d3c87d0f94 100644 --- a/src/rabbit_feature_flags.erl +++ b/src/rabbit_feature_flags.erl @@ -1612,16 +1612,19 @@ mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, Timeout) -> %% @private remote_nodes() -> - mnesia:system_info(db_nodes) -- [node()]. + mnevis:db_nodes() -- [node()]. -spec running_remote_nodes() -> [node()]. %% @private running_remote_nodes() -> - mnesia:system_info(running_db_nodes) -- [node()]. + %% TODO: why do we need only running nodes? + remote_nodes(). + % mnesia:system_info(running_db_nodes) -- [node()]. query_running_remote_nodes(Node, Timeout) -> - case rpc:call(Node, mnesia, system_info, [running_db_nodes], Timeout) of + %% TODO: this is weird. Why do we need running_db_nodes from remote? + case rpc:call(Node, mnevis, db_nodes, [], Timeout) of {badrpc, _} = Error -> Error; Nodes -> Nodes -- [node()] end. diff --git a/src/rabbit_health_check.erl b/src/rabbit_health_check.erl index 53666ac411..2a4ae773d1 100644 --- a/src/rabbit_health_check.erl +++ b/src/rabbit_health_check.erl @@ -61,13 +61,12 @@ node_health_check(list_queues) -> health_check_queues(rabbit_vhost:list_names()); node_health_check(rabbit_node_monitor) -> - case rabbit_node_monitor:partitions() of - [] -> - ok; - L when is_list(L), length(L) > 0 -> - ErrorMsg = io_lib:format("cluster partition in effect: ~p", [L]), - {error_string, ErrorMsg} - end; + ok; + %% TODO: rabbit_node_monitor + % case rabbit_node_monitor:partitions() of + % L when is_list(L) -> + % ok + % end; node_health_check(alarms) -> case proplists:get_value(alarms, rabbit:status()) of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b1921e032f..cec52b681b 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -54,10 +54,12 @@ %% Used internally in rpc calls -export([node_info/0, remove_node_if_mnesia_running/1]). --ifdef(TEST). --compile(export_all). --export([init_with_lock/3]). --endif. +-compile(nowarn_unused_function). + +% -ifdef(TEST). +% -compile(export_all). +% -export([init_with_lock/3]). +% -endif. %%---------------------------------------------------------------------------- @@ -75,6 +77,8 @@ init() -> ensure_mnesia_running(), ensure_mnesia_dir(), + %% TODO mnevis: node start + ok = mnevis_node:start(), case is_virgin_node() of true -> rabbit_log:info("Node database directory at ~ts is empty. " @@ -90,12 +94,68 @@ init() -> rabbit_peer_discovery:maybe_init(), rabbit_peer_discovery:maybe_register() end, + + %% Create schema on all nodes + case is_virgin_node() of + true -> + ok = create_schema(); + false -> + ok + end, + + io:format("~nDb nodes ~p~n", [mnevis:db_nodes()]), + io:format("~nRunning Db nodes ~p~n", [mnevis:running_db_nodes()]), + + %%% io:format("Get cluster status ~n"), + %%% {ok, Status} = cluster_status_from_mnesia(), + %%% io:format("Cluster status ~p~n", [Status]), + + %% TODO: rabbit_node_monitor + %%% rabbit_node_monitor:write_cluster_status(Status), + + %%% % rabbit_node_monitor:update_cluster_status(), + + %%% % case is_virgin_node() of + %%% % true -> + + %%% % rabbit_log:info("Node database directory at ~s is empty. " + %%% % "Assuming we need to join an existing cluster or initialise from scratch...~n", + %%% % [dir()]), + %%% % rabbit_peer_discovery:log_configured_backend(), + %%% % rabbit_peer_discovery:maybe_init(), + %%% % init_with_lock(); + %%% % false -> + + %%% % NodeType = node_type(), + %%% % init_db_and_upgrade(cluster_nodes(all), NodeType, + %%% % NodeType =:= ram, _Retry = true), + %%% % rabbit_peer_discovery:maybe_init(), + %%% % rabbit_peer_discovery:maybe_register() + %%% % end, %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - %% let's make it so. - ok = rabbit_node_monitor:global_sync(), + %% TODO: rabbit_node_monitor + % ok = rabbit_node_monitor:global_sync(), + ok. + +wait_for_tables() -> + Tables = [T || {T, _D} <- rabbit_table:definitions()], + lists:foreach(fun(Table) -> + wait_for_table(Table) + end, + Tables), ok. +wait_for_table(Table) -> + rabbit_log:error("Waiting for table ~p~n", [Table]), + case lists:member(Table, mnesia:system_info(tables)) of + true -> ok; + false -> + timer:sleep(1000), + wait_for_table(Table) + end. + init_with_lock() -> {Retries, Timeout} = rabbit_peer_discovery:retry_timeout(), init_with_lock(Retries, Timeout, fun init_from_config/0). @@ -134,7 +194,7 @@ init_from_config() -> (Name, BadNames) when is_atom(Name) -> BadNames; (Name, BadNames) -> [Name | BadNames] end, - {DiscoveredNodes, NodeType} = + {DiscoveredNodes, _NodeType} = case rabbit_peer_discovery:discover_cluster_nodes() of {ok, {Nodes, Type} = Config} when is_list(Nodes) andalso @@ -158,29 +218,28 @@ init_from_config() -> "Enabling debug logging might help troubleshoot."), init_db_and_upgrade([node()], disc, false, _Retry = true); _ -> - rabbit_log:info("Peer nodes we can cluster with: ~s~n", - [rabbit_peer_discovery:format_discovered_nodes(Peers)]), - join_discovered_peers(Peers, NodeType) + rabbit_log:info("Peer nodes we can cluster with: ~s~n", [rabbit_peer_discovery:format_discovered_nodes(Peers)]) + %% TODO LRB MNEVIS join_discovered_peers(Peers, NodeType) end. %% Attempts to join discovered, %% reachable and compatible (in terms of Mnesia internal protocol version and such) %% cluster peers in order. -join_discovered_peers(TryNodes, NodeType) -> - case find_reachable_peer_to_cluster_with(nodes_excl_me(TryNodes)) of - {ok, Node} -> - rabbit_log:info("Node '~s' selected for auto-clustering~n", [Node]), - {ok, {_, DiscNodes, _}} = discover_cluster0(Node), - init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true), - rabbit_connection_tracking:boot(), - rabbit_node_monitor:notify_joined_cluster(); - none -> - rabbit_log:warning( - "Could not successfully contact any node of: ~s (as in Erlang distribution). " - "Starting as a blank standalone node...~n", - [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]), - init_db_and_upgrade([node()], disc, false, _Retry = true) - end. +% join_discovered_peers(TryNodes, NodeType) -> +% case find_reachable_peer_to_cluster_with(nodes_excl_me(TryNodes)) of +% {ok, Node} -> +% rabbit_log:info("Node '~s' selected for auto-clustering~n", [Node]), +% {ok, {_, DiscNodes, _}} = discover_cluster0(Node), +% init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true), +% rabbit_connection_tracking:boot(), +% rabbit_node_monitor:notify_joined_cluster(); +% none -> +% rabbit_log:warning( +% "Could not successfully contact any node of: ~s (as in Erlang distribution). " +% "Starting as a blank standalone node...~n", +% [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]), +% init_db_and_upgrade([node()], disc, false, _Retry = true) +% end. %% Make the node join a cluster. The node will be reset automatically %% before we actually cluster it. The nodes provided will be used to @@ -225,7 +284,8 @@ join_cluster(DiscoveryNode, NodeType) -> ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true, _Retry = true), rabbit_connection_tracking:boot(), - rabbit_node_monitor:notify_joined_cluster(), + %% TODO: rabbit_node_monitor + % rabbit_node_monitor:notify_joined_cluster(), ok; {error, Reason} -> {error, Reason} @@ -284,7 +344,8 @@ wipe() -> [erlang:disconnect_node(N) || N <- cluster_nodes(all)], %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok = rabbit_node_monitor:reset_cluster_status(), + %% TODO: rabbit_node_monitor + % ok = rabbit_node_monitor:reset_cluster_status(), ok. -spec change_cluster_node_type(node_type()) -> 'ok'. @@ -311,14 +372,15 @@ change_cluster_node_type(Type) -> update_cluster_nodes(DiscoveryNode) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), - Status = {AllNodes, _, _} = discover_cluster([DiscoveryNode]), + _Status = {AllNodes, _, _} = discover_cluster([DiscoveryNode]), case me_in_nodes(AllNodes) of true -> %% As in `check_consistency/0', we can safely delete the %% schema here, since it'll be replicated from the other %% nodes mnesia:delete_schema([node()]), - rabbit_node_monitor:write_cluster_status(Status), + %% TODO: rabbit_node_monitor + % rabbit_node_monitor:write_cluster_status(Status), rabbit_log:info("Updating cluster nodes from ~p~n", [DiscoveryNode]), init_db_with_mnesia(AllNodes, node_type(), true, true, _Retry = false); @@ -365,7 +427,7 @@ remove_node_offline_node(Node) -> %% want - we need to know the running nodes *now*. If the current node is a %% RAM node it will return bogus results, but we don't care since we only do %% this operation from disc nodes. - case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of + case {mnevis:running_db_nodes() -- [Node], node_type()} of {[], disc} -> start_mnesia(), try @@ -402,18 +464,24 @@ status() -> [{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++ IfNonEmpty(ram, cluster_nodes(ram)))}] ++ case is_running() of - true -> RunningNodes = cluster_nodes(running), - [{running_nodes, RunningNodes}, - {cluster_name, rabbit_nodes:cluster_name()}, - {partitions, mnesia_partitions(RunningNodes)}]; - false -> [] + true -> + RunningNodes = cluster_nodes(running), + [{running_nodes, RunningNodes}, + {cluster_name, rabbit_nodes:cluster_name()}, + {partitions, mnesia_partitions(RunningNodes)}, + {mnevis_leader, get_mnevis_leader()}]; + false -> + [] end. -mnesia_partitions(Nodes) -> - Replies = rabbit_node_monitor:partitions(Nodes), - [Reply || Reply = {_, R} <- Replies, R =/= []]. +mnesia_partitions(_Nodes) -> []. + %% TODO: rabbit_node_monitor + % Replies = rabbit_node_monitor:partitions(Nodes), + % [Reply || Reply = {_, R} <- Replies, R =/= []]. -is_running() -> mnesia:system_info(is_running) =:= yes. +is_running() -> + % TODO mnevis + mnesia:system_info(is_running) =:= yes. -spec is_clustered() -> boolean(). @@ -459,27 +527,31 @@ cluster_status_from_mnesia() -> false -> {error, mnesia_not_running}; true -> + % TODO mnevis %% If the tables are not present, it means that %% `init_db/3' hasn't been run yet. In other words, either %% we are a virgin node or a restarted RAM node. In both %% cases we're not interested in what mnesia has to say. - NodeType = case mnesia:system_info(use_dir) of - true -> disc; - false -> ram - end, - case rabbit_table:is_present() of - true -> AllNodes = mnesia:system_info(db_nodes), - DiscCopies = mnesia:table_info(schema, disc_copies), - DiscNodes = case NodeType of - disc -> nodes_incl_me(DiscCopies); - ram -> DiscCopies - end, + % NodeType = case mnesia:system_info(use_dir) of + % true -> disc; + % false -> ram + % end, + % case rabbit_table:is_present() of + % true -> + AllNodes = mnevis:db_nodes(), + + %% Ignoring disk node setting + % DiscCopies = mnesia:table_info(schema, disc_copies), + % DiscNodes = case NodeType of + % disc -> nodes_incl_me(DiscCopies); + % ram -> DiscCopies + % end, %% `mnesia:system_info(running_db_nodes)' is safe since %% we know that mnesia is running - RunningNodes = mnesia:system_info(running_db_nodes), - {ok, {AllNodes, DiscNodes, RunningNodes}}; - false -> {error, tables_not_present} - end + RunningNodes = mnevis:running_db_nodes(), + {ok, {AllNodes, AllNodes, RunningNodes}} + % false -> {error, tables_not_present} + % end end. cluster_status(WhichNodes) -> @@ -488,12 +560,15 @@ cluster_status(WhichNodes) -> {ok, Nodes0} -> Nodes0; {error, _Reason} -> - {AllNodes0, DiscNodes0, RunningNodes0} = - rabbit_node_monitor:read_cluster_status(), + %% TODO: rabbit_node_monitor + %% Read cluster status from offline mnevis? + {[], [], []} + % {AllNodes0, DiscNodes0, RunningNodes0} = + % rabbit_node_monitor:read_cluster_status(), %% The cluster status file records the status when the node is %% online, but we know for sure that the node is offline now, so %% we can remove it from the list of running nodes. - {AllNodes0, DiscNodes0, nodes_excl_me(RunningNodes0)} + % {AllNodes0, DiscNodes0, nodes_excl_me(RunningNodes0)} end, case WhichNodes of status -> Nodes; @@ -510,13 +585,14 @@ node_info() -> -spec node_type() -> node_type(). -node_type() -> - {_AllNodes, DiscNodes, _RunningNodes} = - rabbit_node_monitor:read_cluster_status(), - case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of - true -> disc; - false -> ram - end. +node_type() -> disc. + %% TODO: rabbit_node_monitor + % {_AllNodes, DiscNodes, _RunningNodes} = + % rabbit_node_monitor:read_cluster_status(), + % case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of + % true -> disc; + % false -> ram + % end. -spec dir() -> file:filename(). @@ -557,7 +633,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> end, ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin), ensure_schema_integrity(), - rabbit_node_monitor:update_cluster_status(), + %% TODO: rabbit_node_monitor + % rabbit_node_monitor:update_cluster_status(), ok. -spec init_db_unchecked([node()], node_type()) -> 'ok'. @@ -672,14 +749,18 @@ check_cluster_consistency() -> case lists:foldl( fun (Node, {error, _}) -> check_cluster_consistency(Node, true); (_Node, {ok, Status}) -> {ok, Status} - end, {error, not_found}, nodes_excl_me(cluster_nodes(all))) + end, + {error, not_found}, + nodes_excl_me(cluster_nodes(all))) of - {ok, Status = {RemoteAllNodes, _, _}} -> + {ok, _Status = {RemoteAllNodes, _, _}} -> case ordsets:is_subset(ordsets:from_list(cluster_nodes(all)), ordsets:from_list(RemoteAllNodes)) of true -> ok; false -> + error({should_not_get_here, cluster_inconsistency}), + %% We delete the schema here since we think we are %% clustered with nodes that are no longer in the %% cluster and there is no other way to remove @@ -693,7 +774,9 @@ check_cluster_consistency() -> %% nodes. mnesia:delete_schema([node()]) end, - rabbit_node_monitor:write_cluster_status(Status); + ok; + %% TODO: rabbit_node_monitor + % rabbit_node_monitor:write_cluster_status(Status); {error, not_found} -> ok; {error, _} = E -> @@ -805,11 +888,21 @@ schema_ok_or_move() -> %% We only care about disc nodes since ram nodes are supposed to catch %% up only create_schema() -> - stop_mnesia(), - rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), - start_mnesia(), - ok = rabbit_table:create(), + % TODO mnevis + % io:format("Create schema ~n"), + % stop_mnesia(), + % rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), + % start_mnesia(), + case is_leader(node()) of + true -> + io:format("Create tables ~n"), + ok = rabbit_table:create(); + false -> + wait_for_tables() + end, + io:format("Check integrity ~n"), ensure_schema_integrity(), + io:format("Record version ~n"), ok = rabbit_version:record_desired(). move_db() -> @@ -844,7 +937,8 @@ remove_node_if_mnesia_running(Node) -> case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> rabbit_amqqueue:forget_all_durable(Node), - rabbit_node_monitor:notify_left_cluster(Node), + %% TODO: rabbit_node_monitor + % rabbit_node_monitor:notify_left_cluster(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} @@ -873,11 +967,11 @@ wait_for(Condition) -> rabbit_log:info("Waiting for ~p...~n", [Condition]), timer:sleep(1000). -start_mnesia(CheckConsistency) -> - case CheckConsistency of - true -> check_cluster_consistency(); - false -> ok - end, +start_mnesia(_CheckConsistency) -> + % case CheckConsistency of + % true -> check_cluster_consistency(); + % false -> ok + % end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ensure_mnesia_running(). @@ -985,45 +1079,47 @@ check_rabbit_consistency(RemoteNode, RemoteVersion) -> %% exception of the cluster status files, which will be there thanks to %% `rabbit_node_monitor:prepare_cluster_status_file/0'. is_virgin_node() -> - case rabbit_file:list_dir(dir()) of - {error, enoent} -> - true; - {ok, []} -> - true; - {ok, List0} -> - IgnoredFiles0 = - [rabbit_node_monitor:cluster_status_filename(), - rabbit_node_monitor:running_nodes_filename(), - rabbit_node_monitor:quorum_filename(), - rabbit_feature_flags:enabled_feature_flags_list_file()], - IgnoredFiles = [filename:basename(File) || File <- IgnoredFiles0], - List = List0 -- IgnoredFiles, - List =:= [] - end. - -find_reachable_peer_to_cluster_with([]) -> - none; -find_reachable_peer_to_cluster_with([Node | Nodes]) -> - Fail = fun (Fmt, Args) -> - rabbit_log:warning( - "Could not auto-cluster with node ~s: " ++ Fmt, [Node | Args]), - find_reachable_peer_to_cluster_with(Nodes) - end, - case remote_node_info(Node) of - {badrpc, _} = Reason -> - Fail("~p~n", [Reason]); - %% old delegate hash check - {_OTP, RMQ, Hash, _} when is_binary(Hash) -> - Fail("version ~s~n", [RMQ]); - {_OTP, _RMQ, _Protocol, {error, _} = E} -> - Fail("~p~n", [E]); - {OTP, RMQ, Protocol, _} -> - case check_consistency(Node, OTP, RMQ, Protocol) of - {error, _} -> Fail("versions ~p~n", - [{OTP, RMQ}]); - ok -> {ok, Node} - end - end. + %%% case rabbit_file:list_dir(dir()) of + %%% {error, enoent} -> + %%% true; + %%% {ok, []} -> + %%% true; + %%% {ok, List0} -> + %%% IgnoredFiles0 = + %%% [rabbit_node_monitor:cluster_status_filename(), + %%% rabbit_node_monitor:running_nodes_filename(), + %%% rabbit_node_monitor:quorum_filename(), + %%% rabbit_feature_flags:enabled_feature_flags_list_file()], + %%% IgnoredFiles = [filename:basename(File) || File <- IgnoredFiles0], + %%% List = List0 -- IgnoredFiles, + %%% List =:= [] + %%% end. + {ok, Tables, _} = ra:consistent_query(mnevis_node:node_id(), fun(_) -> mnesia:system_info(tables) end), + not lists:member(rabbit_queue, Tables). + +% find_reachable_peer_to_cluster_with([]) -> +% none; +% find_reachable_peer_to_cluster_with([Node | Nodes]) -> +% Fail = fun (Fmt, Args) -> +% rabbit_log:warning( +% "Could not auto-cluster with node ~s: " ++ Fmt, [Node | Args]), +% find_reachable_peer_to_cluster_with(Nodes) +% end, +% case remote_node_info(Node) of +% {badrpc, _} = Reason -> +% Fail("~p~n", [Reason]); +% %% old delegate hash check +% {_OTP, RMQ, Hash, _} when is_binary(Hash) -> +% Fail("version ~s~n", [RMQ]); +% {_OTP, _RMQ, _Protocol, {error, _} = E} -> +% Fail("~p~n", [E]); +% {OTP, RMQ, Protocol, _} -> +% case check_consistency(Node, OTP, RMQ, Protocol) of +% {error, _} -> Fail("versions ~p~n", +% [{OTP, RMQ}]); +% ok -> {ok, Node} +% end +% end. is_only_clustered_disc_node() -> node_type() =:= disc andalso is_clustered() andalso @@ -1034,10 +1130,22 @@ are_we_clustered_with(Node) -> me_in_nodes(Nodes) -> lists:member(node(), Nodes). -nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]). +% nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]). nodes_excl_me(Nodes) -> Nodes -- [node()]. +is_leader(Node) -> + case ra:members(mnevis_node:node_id()) of + {ok, _, {_, Node}} -> + true; + _ -> + false + end. + +get_mnevis_leader() -> + {ok, _, {_, Node}} = ra:members(mnevis_node:node_id()), + Node. + -spec e(any()) -> no_return(). e(Tag) -> throw({error, {Tag, error_description(Tag)}}). diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index c74a390ae2..5882a867d8 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -207,8 +207,10 @@ convert_backup(NodeMap, FromBackup, ToBackup) -> end, switched). config_files() -> - [rabbit_node_monitor:running_nodes_filename(), - rabbit_node_monitor:cluster_status_filename()]. + %% TODO: rabbit_node_monitor + []. + % [rabbit_node_monitor:running_nodes_filename(), + % rabbit_node_monitor:cluster_status_filename()]. backup_of_conf(Path) -> filename:join([dir(), filename:basename(Path)]). @@ -254,7 +256,7 @@ rename_in_running_mnesia(FromNode, ToNode) -> ok. transform_table(Table, Map) -> - mnesia:sync_transaction( + rabbit_misc:mnevis_transaction( fun () -> mnesia:lock({table, Table}, write), transform_table(Table, Map, mnesia:first(Table)) diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index c008b969ba..d11e925644 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -263,14 +263,17 @@ tcp_listener_started(Protocol, Opts, IPAddress, Port) -> %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1 %% We need the host so we can distinguish multiple instances of the above %% in a cluster. - ok = mnesia:dirty_write( - rabbit_listener, - #listener{node = node(), - protocol = Protocol, - host = tcp_host(IPAddress), - ip_address = IPAddress, - port = Port, - opts = Opts}). + TcpHost = tcp_host(IPAddress), + L = #listener{node = node(), + protocol = Protocol, + host = TcpHost, + ip_address = IPAddress, + port = Port, + opts = Opts}, + F = fun () -> + ok = mnesia:write(rabbit_listener, L, write) + end, + ok = rabbit_misc:execute_mnesia_transaction(F). -spec tcp_listener_stopped (_, _, diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl deleted file mode 100644 index 5461e176e3..0000000000 --- a/src/rabbit_node_monitor.erl +++ /dev/null @@ -1,932 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at https://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_node_monitor). - -%% Transitional step until we can require Erlang/OTP 21 and -%% use the now recommended try/catch syntax for obtaining the stack trace. --compile(nowarn_deprecated_function). - --behaviour(gen_server). - --export([start_link/0]). --export([running_nodes_filename/0, - cluster_status_filename/0, quorum_filename/0, - prepare_cluster_status_files/0, - write_cluster_status/1, read_cluster_status/0, - update_cluster_status/0, reset_cluster_status/0]). --export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). --export([partitions/0, partitions/1, status/1, subscribe/1]). --export([pause_partition_guard/0]). --export([global_sync/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - - %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0, - alive_nodes/1, alive_rabbit_nodes/1]). - --define(SERVER, ?MODULE). --define(NODE_REPLY_TIMEOUT, 5000). --define(RABBIT_UP_RPC_TIMEOUT, 2000). --define(RABBIT_DOWN_PING_INTERVAL, 1000). - --record(state, {monitors, partitions, subscribers, down_ping_timer, - keepalive_timer, autoheal, guid, node_guids}). - -%%---------------------------------------------------------------------------- -%% Start -%%---------------------------------------------------------------------------- - --spec start_link() -> rabbit_types:ok_pid_or_error(). - -start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%---------------------------------------------------------------------------- -%% Cluster file operations -%%---------------------------------------------------------------------------- - -%% The cluster file information is kept in two files. The "cluster -%% status file" contains all the clustered nodes and the disc nodes. -%% The "running nodes file" contains the currently running nodes or -%% the running nodes at shutdown when the node is down. -%% -%% We strive to keep the files up to date and we rely on this -%% assumption in various situations. Obviously when mnesia is offline -%% the information we have will be outdated, but it cannot be -%% otherwise. - --spec running_nodes_filename() -> string(). - -running_nodes_filename() -> - filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). - --spec cluster_status_filename() -> string(). - -cluster_status_filename() -> - filename:join(rabbit_mnesia:dir(), "cluster_nodes.config"). - -quorum_filename() -> - filename:join(rabbit_mnesia:dir(), "quorum"). - --spec prepare_cluster_status_files() -> 'ok' | no_return(). - -prepare_cluster_status_files() -> - rabbit_mnesia:ensure_mnesia_dir(), - RunningNodes1 = case try_read_file(running_nodes_filename()) of - {ok, [Nodes]} when is_list(Nodes) -> Nodes; - {ok, Other} -> corrupt_cluster_status_files(Other); - {error, enoent} -> [] - end, - ThisNode = [node()], - %% The running nodes file might contain a set or a list, in case - %% of the legacy file - RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1), - {AllNodes1, DiscNodes} = - case try_read_file(cluster_status_filename()) of - {ok, [{AllNodes, DiscNodes0}]} -> - {AllNodes, DiscNodes0}; - {ok, [AllNodes0]} when is_list(AllNodes0) -> - {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)}; - {ok, Files} -> - corrupt_cluster_status_files(Files); - {error, enoent} -> - LegacyNodes = legacy_cluster_nodes([]), - {LegacyNodes, LegacyNodes} - end, - AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), - ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). - --spec corrupt_cluster_status_files(any()) -> no_return(). - -corrupt_cluster_status_files(F) -> - throw({error, corrupt_cluster_status_files, F}). - --spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'. - -write_cluster_status({All, Disc, Running}) -> - ClusterStatusFN = cluster_status_filename(), - Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of - ok -> - RunningNodesFN = running_nodes_filename(), - {RunningNodesFN, - rabbit_file:write_term_file(RunningNodesFN, [Running])}; - E1 = {error, _} -> - {ClusterStatusFN, E1} - end, - case Res of - {_, ok} -> ok; - {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) - end. - --spec read_cluster_status() -> rabbit_mnesia:cluster_status(). - -read_cluster_status() -> - case {try_read_file(cluster_status_filename()), - try_read_file(running_nodes_filename())} of - {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) -> - {All, Disc, Running}; - {Stat, Run} -> - throw({error, {corrupt_or_missing_cluster_files, Stat, Run}}) - end. - --spec update_cluster_status() -> 'ok'. - -update_cluster_status() -> - {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), - write_cluster_status(Status). - --spec reset_cluster_status() -> 'ok'. - -reset_cluster_status() -> - write_cluster_status({[node()], [node()], [node()]}). - -%%---------------------------------------------------------------------------- -%% Cluster notifications -%%---------------------------------------------------------------------------- - --spec notify_node_up() -> 'ok'. - -notify_node_up() -> - gen_server:cast(?SERVER, notify_node_up). - --spec notify_joined_cluster() -> 'ok'. - -notify_joined_cluster() -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], - gen_server:abcast(Nodes, ?SERVER, - {joined_cluster, node(), rabbit_mnesia:node_type()}), - ok. - --spec notify_left_cluster(node()) -> 'ok'. - -notify_left_cluster(Node) -> - Nodes = rabbit_mnesia:cluster_nodes(running), - gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), - ok. - -%%---------------------------------------------------------------------------- -%% Server calls -%%---------------------------------------------------------------------------- - --spec partitions() -> [node()]. - -partitions() -> - gen_server:call(?SERVER, partitions, infinity). - --spec partitions([node()]) -> [{node(), [node()]}]. - -partitions(Nodes) -> - {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT), - Replies. - --spec status([node()]) -> {[{node(), [node()]}], [node()]}. - -status(Nodes) -> - gen_server:multi_call(Nodes, ?SERVER, status, infinity). - --spec subscribe(pid()) -> 'ok'. - -subscribe(Pid) -> - gen_server:cast(?SERVER, {subscribe, Pid}). - -%%---------------------------------------------------------------------------- -%% pause_minority/pause_if_all_down safety -%%---------------------------------------------------------------------------- - -%% If we are in a minority and pause_minority mode then a) we are -%% going to shut down imminently and b) we should not confirm anything -%% until then, since anything we confirm is likely to be lost. -%% -%% The same principles apply to a node which isn't part of the preferred -%% partition when we are in pause_if_all_down mode. -%% -%% We could confirm something by having an HA queue see the pausing -%% state (and fail over into it) before the node monitor stops us, or -%% by using unmirrored queues and just having them vanish (and -%% confirming messages as thrown away). -%% -%% So we have channels call in here before issuing confirms, to do a -%% lightweight check that we have not entered a pausing state. - --spec pause_partition_guard() -> 'ok' | 'pausing'. - -pause_partition_guard() -> - case get(pause_partition_guard) of - not_pause_mode -> - ok; - undefined -> - {ok, M} = application:get_env(rabbit, cluster_partition_handling), - case M of - pause_minority -> - pause_minority_guard([], ok); - {pause_if_all_down, PreferredNodes, _} -> - pause_if_all_down_guard(PreferredNodes, [], ok); - _ -> - put(pause_partition_guard, not_pause_mode), - ok - end; - {minority_mode, Nodes, LastState} -> - pause_minority_guard(Nodes, LastState); - {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} -> - pause_if_all_down_guard(PreferredNodes, Nodes, LastState) - end. - -pause_minority_guard(LastNodes, LastState) -> - case nodes() of - LastNodes -> LastState; - _ -> NewState = case majority() of - false -> pausing; - true -> ok - end, - put(pause_partition_guard, - {minority_mode, nodes(), NewState}), - NewState - end. - -pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) -> - case nodes() of - LastNodes -> LastState; - _ -> NewState = case in_preferred_partition(PreferredNodes) of - false -> pausing; - true -> ok - end, - put(pause_partition_guard, - {pause_if_all_down_mode, PreferredNodes, nodes(), - NewState}), - NewState - end. - -%%---------------------------------------------------------------------------- -%% "global" hang workaround. -%%---------------------------------------------------------------------------- - -%% This code works around a possible inconsistency in the "global" -%% state, causing global:sync/0 to never return. -%% -%% 1. A process is spawned. -%% 2. If after 15", global:sync() didn't return, the "global" -%% state is parsed. -%% 3. If it detects that a sync is blocked for more than 10", -%% the process sends fake nodedown/nodeup events to the two -%% nodes involved (one local, one remote). -%% 4. Both "global" instances restart their synchronisation. -%% 5. globao:sync() finally returns. -%% -%% FIXME: Remove this workaround, once we got rid of the change to -%% "dist_auto_connect" and fixed the bugs uncovered. - -global_sync() -> - Pid = spawn(fun workaround_global_hang/0), - ok = global:sync(), - Pid ! global_sync_done, - ok. - -workaround_global_hang() -> - receive - global_sync_done -> - ok - after 10000 -> - find_blocked_global_peers() - end. - -find_blocked_global_peers() -> - Snapshot1 = snapshot_global_dict(), - timer:sleep(10000), - Snapshot2 = snapshot_global_dict(), - find_blocked_global_peers1(Snapshot2, Snapshot1). - -snapshot_global_dict() -> - {status, _, _, [Dict | _]} = sys:get_status(global_name_server), - [E || {{sync_tag_his, _}, _} = E <- Dict]. - -find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest], - OlderSnapshot) -> - case lists:member(Item, OlderSnapshot) of - true -> unblock_global_peer(Peer); - false -> ok - end, - find_blocked_global_peers1(Rest, OlderSnapshot); -find_blocked_global_peers1([], _) -> - ok. - -unblock_global_peer(PeerNode) -> - ThisNode = node(), - PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]), - error_logger:info_msg( - "Global hang workaround: global state on ~s seems broken~n" - " * Peer global state: ~p~n" - " * Local global state: ~p~n" - "Faking nodedown/nodeup between ~s and ~s~n", - [PeerNode, PeerState, sys:get_status(global_name_server), - PeerNode, ThisNode]), - {global_name_server, ThisNode} ! {nodedown, PeerNode}, - {global_name_server, PeerNode} ! {nodedown, ThisNode}, - {global_name_server, ThisNode} ! {nodeup, PeerNode}, - {global_name_server, PeerNode} ! {nodeup, ThisNode}, - ok. - -%%---------------------------------------------------------------------------- -%% gen_server callbacks -%%---------------------------------------------------------------------------- - -init([]) -> - %% We trap exits so that the supervisor will not just kill us. We - %% want to be sure that we are not going to be killed while - %% writing out the cluster status files - bad things can then - %% happen. - process_flag(trap_exit, true), - net_kernel:monitor_nodes(true, [nodedown_reason]), - {ok, _} = mnesia:subscribe(system), - %% If the node has been restarted, Mnesia can trigger a system notification - %% before the monitor subscribes to receive them. To avoid autoheal blocking due to - %% the inconsistent database event never arriving, we being monitoring all running - %% nodes as early as possible. The rest of the monitoring ops will only be triggered - %% when notifications arrive. - Nodes = possibly_partitioned_nodes(), - startup_log(Nodes), - Monitors = lists:foldl(fun(Node, Monitors0) -> - pmon:monitor({rabbit, Node}, Monitors0) - end, pmon:new(), Nodes), - {ok, ensure_keepalive_timer(#state{monitors = Monitors, - subscribers = pmon:new(), - partitions = [], - guid = rabbit_guid:gen(), - node_guids = maps:new(), - autoheal = rabbit_autoheal:init()})}. - -handle_call(partitions, _From, State = #state{partitions = Partitions}) -> - {reply, Partitions, State}; - -handle_call(status, _From, State = #state{partitions = Partitions}) -> - {reply, [{partitions, Partitions}, - {nodes, [node() | nodes()]}], State}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(notify_node_up, State = #state{guid = GUID}) -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], - gen_server:abcast(Nodes, ?SERVER, - {node_up, node(), rabbit_mnesia:node_type(), GUID}), - %% register other active rabbits with this rabbit - DiskNodes = rabbit_mnesia:cluster_nodes(disc), - [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of - true -> disc; - false -> ram - end}) || N <- Nodes], - {noreply, State}; - -%%---------------------------------------------------------------------------- -%% Partial partition detection -%% -%% Every node generates a GUID each time it starts, and announces that -%% GUID in 'node_up', with 'announce_guid' sent by return so the new -%% node knows the GUIDs of the others. These GUIDs are sent in all the -%% partial partition related messages to ensure that we ignore partial -%% partition messages from before we restarted (to avoid getting stuck -%% in a loop). -%% -%% When one node gets nodedown from another, it then sends -%% 'check_partial_partition' to all the nodes it still thinks are -%% alive. If any of those (intermediate) nodes still see the "down" -%% node as up, they inform it that this has happened. The original -%% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then -%% disconnect from the intermediate node to "upgrade" to a full -%% partition. -%% -%% In pause_minority mode it will instead immediately pause until all -%% nodes come back. This is because the contract for pause_minority is -%% that nodes should never sit in a partitioned state - if it just -%% disconnected, it would become a minority, pause, realise it's not -%% in a minority any more, and come back, still partitioned (albeit no -%% longer partially). -%% ---------------------------------------------------------------------------- - -handle_cast({node_up, Node, NodeType, GUID}, - State = #state{guid = MyGUID, - node_guids = GUIDs}) -> - cast(Node, {announce_guid, node(), MyGUID}), - GUIDs1 = maps:put(Node, GUID, GUIDs), - handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1}); - -handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> - {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}}; - -handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, - State = #state{guid = MyGUID, - node_guids = GUIDs}) -> - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso - maps:find(Node, GUIDs) =:= {ok, NodeGUID} of - true -> spawn_link( %%[1] - fun () -> - case rpc:call(Node, rabbit, is_running, []) of - {badrpc, _} -> ok; - _ -> - rabbit_log:warning("Received a 'DOWN' message" - " from ~p but still can" - " communicate with it ~n", - [Node]), - cast(Rep, {partial_partition, - Node, node(), RepGUID}) - end - end); - false -> ok - end, - {noreply, State}; -%% [1] We checked that we haven't heard the node go down - but we -%% really should make sure we can actually communicate with -%% it. Otherwise there's a race where we falsely detect a partial -%% partition. -%% -%% Now of course the rpc:call/4 may take a long time to return if -%% connectivity with the node is actually interrupted - but that's OK, -%% we only really want to do something in a timely manner if -%% connectivity is OK. However, of course as always we must not block -%% the node monitor, so we do the check in a separate process. - -handle_cast({check_partial_partition, _Node, _Reporter, - _NodeGUID, _GUID, _ReporterGUID}, State) -> - {noreply, State}; - -handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID}, - State = #state{guid = MyGUID}) -> - FmtBase = "Partial partition detected:~n" - " * We saw DOWN from ~s~n" - " * We can still see ~s which can see ~s~n", - ArgsBase = [NotReallyDown, Proxy, NotReallyDown], - case application:get_env(rabbit, cluster_partition_handling) of - {ok, pause_minority} -> - rabbit_log:error( - FmtBase ++ " * pause_minority mode enabled~n" - "We will therefore pause until the *entire* cluster recovers~n", - ArgsBase), - await_cluster_recovery(fun all_nodes_up/0), - {noreply, State}; - {ok, {pause_if_all_down, PreferredNodes, _}} -> - case in_preferred_partition(PreferredNodes) of - true -> rabbit_log:error( - FmtBase ++ "We will therefore intentionally " - "disconnect from ~s~n", ArgsBase ++ [Proxy]), - upgrade_to_full_partition(Proxy); - false -> rabbit_log:info( - FmtBase ++ "We are about to pause, no need " - "for further actions~n", ArgsBase) - end, - {noreply, State}; - {ok, _} -> - rabbit_log:error( - FmtBase ++ "We will therefore intentionally disconnect from ~s~n", - ArgsBase ++ [Proxy]), - upgrade_to_full_partition(Proxy), - {noreply, State} - end; - -handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) -> - {noreply, State}; - -%% Sometimes it appears the Erlang VM does not give us nodedown -%% messages reliably when another node disconnects from us. Therefore -%% we are told just before the disconnection so we can reciprocate. -handle_cast({partial_partition_disconnect, Other}, State) -> - rabbit_log:error("Partial partition disconnect from ~s~n", [Other]), - disconnect(Other), - {noreply, State}; - -%% Note: when updating the status file, we can't simply write the -%% mnesia information since the message can (and will) overtake the -%% mnesia propagation. -handle_cast({node_up, Node, NodeType}, - State = #state{monitors = Monitors}) -> - rabbit_log:info("rabbit on node ~p up~n", [Node]), - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({add_node(Node, AllNodes), - case NodeType of - disc -> add_node(Node, DiscNodes); - ram -> DiscNodes - end, - add_node(Node, RunningNodes)}), - ok = handle_live_rabbit(Node), - Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of - true -> - Monitors; - false -> - pmon:monitor({rabbit, Node}, Monitors) - end, - {noreply, maybe_autoheal(State#state{monitors = Monitors1})}; - -handle_cast({joined_cluster, Node, NodeType}, State) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({add_node(Node, AllNodes), - case NodeType of - disc -> add_node(Node, DiscNodes); - ram -> DiscNodes - end, - RunningNodes}), - {noreply, State}; - -handle_cast({left_cluster, Node}, State) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes), - del_node(Node, RunningNodes)}), - {noreply, State}; - -handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) -> - {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}}; - -handle_cast(keepalive, State) -> - {noreply, State}; - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, - State = #state{monitors = Monitors, subscribers = Subscribers}) -> - rabbit_log:info("rabbit on node ~p down~n", [Node]), - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), - [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], - {noreply, handle_dead_rabbit( - Node, - State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; - -handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #state{subscribers = Subscribers}) -> - {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; - -handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, - node_guids = GUIDs}) -> - rabbit_log:info("node ~p down: ~p~n", - [Node, proplists:get_value(nodedown_reason, Info)]), - Check = fun (N, CheckGUID, DownGUID) -> - cast(N, {check_partial_partition, - Node, node(), DownGUID, CheckGUID, MyGUID}) - end, - case maps:find(Node, GUIDs) of - {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) - -- [node(), Node], - [case maps:find(N, GUIDs) of - {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); - error -> ok - end || N <- Alive]; - error -> ok - end, - {noreply, handle_dead_node(Node, State)}; - -handle_info({nodeup, Node, _Info}, State) -> - rabbit_log:info("node ~p up~n", [Node]), - {noreply, State}; - -handle_info({mnesia_system_event, - {inconsistent_database, running_partitioned_network, Node}}, - State = #state{partitions = Partitions, - monitors = Monitors}) -> - %% We will not get a node_up from this node - yet we should treat it as - %% up (mostly). - State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of - true -> State; - false -> State#state{ - monitors = pmon:monitor({rabbit, Node}, Monitors)} - end, - ok = handle_live_rabbit(Node), - Partitions1 = lists:usort([Node | Partitions]), - {noreply, maybe_autoheal(State1#state{partitions = Partitions1})}; - -handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState, - partitions = Partitions}) -> - AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions), - {noreply, State#state{autoheal = AState1}}; - -handle_info(ping_down_nodes, State) -> - %% We ping nodes when some are down to ensure that we find out - %% about healed partitions quickly. We ping all nodes rather than - %% just the ones we know are down for simplicity; it's not expensive - %% to ping the nodes that are up, after all. - State1 = State#state{down_ping_timer = undefined}, - Self = self(), - %% We ping in a separate process since in a partition it might - %% take some noticeable length of time and we don't want to block - %% the node monitor for that long. - spawn_link(fun () -> - ping_all(), - case all_nodes_up() of - true -> ok; - false -> Self ! ping_down_nodes_again - end - end), - {noreply, State1}; - -handle_info(ping_down_nodes_again, State) -> - {noreply, ensure_ping_timer(State)}; - -handle_info(ping_up_nodes, State) -> - %% In this case we need to ensure that we ping "quickly" - - %% i.e. only nodes that we know to be up. - [cast(N, keepalive) || N <- alive_nodes() -- [node()]], - {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})}; - -handle_info({'EXIT', _, _} = Info, State = #state{autoheal = AState0}) -> - AState = rabbit_autoheal:process_down(Info, AState0), - {noreply, State#state{autoheal = AState}}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - rabbit_misc:stop_timer(State, #state.down_ping_timer), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- -%% Functions that call the module specific hooks when nodes go up/down -%%---------------------------------------------------------------------------- - -handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> - %% In general in rabbit_node_monitor we care about whether the - %% rabbit application is up rather than the node; we do this so - %% that we can respond in the same way to "rabbitmqctl stop_app" - %% and "rabbitmqctl stop" as much as possible. - %% - %% However, for pause_minority and pause_if_all_down modes we can't do - %% this, since we depend on looking at whether other nodes are up - %% to decide whether to come back up ourselves - if we decide that - %% based on the rabbit application we would go down and never come - %% back. - case application:get_env(rabbit, cluster_partition_handling) of - {ok, pause_minority} -> - case majority([Node]) of - true -> ok; - false -> await_cluster_recovery(fun majority/0) - end, - State; - {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} -> - case in_preferred_partition(PreferredNodes, [Node]) of - true -> ok; - false -> await_cluster_recovery( - fun in_preferred_partition/0) - end, - case HowToRecover of - autoheal -> State#state{autoheal = - rabbit_autoheal:node_down(Node, Autoheal)}; - _ -> State - end; - {ok, ignore} -> - State; - {ok, autoheal} -> - State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)}; - {ok, Term} -> - rabbit_log:warning("cluster_partition_handling ~p unrecognised, " - "assuming 'ignore'~n", [Term]), - State - end. - -await_cluster_recovery(Condition) -> - rabbit_log:warning("Cluster minority/secondary status detected - " - "awaiting recovery~n", []), - run_outside_applications(fun () -> - rabbit:stop(), - wait_for_cluster_recovery(Condition) - end, false), - ok. - -run_outside_applications(Fun, WaitForExistingProcess) -> - spawn_link(fun () -> - %% Ignore exit messages from the monitor - the link is needed - %% to ensure the monitor detects abnormal exits from this process - %% and can reset the 'restarting' status on the autoheal, avoiding - %% a deadlock. The monitor is restarted when rabbit does, so messages - %% in the other direction should be ignored. - process_flag(trap_exit, true), - %% If our group leader is inside an application we are about - %% to stop, application:stop/1 does not return. - group_leader(whereis(init), self()), - register_outside_app_process(Fun, WaitForExistingProcess) - end). - -register_outside_app_process(Fun, WaitForExistingProcess) -> - %% Ensure only one such process at a time, the exit(badarg) is - %% harmless if one is already running. - %% - %% If WaitForExistingProcess is false, the given fun is simply not - %% executed at all and the process exits. - %% - %% If WaitForExistingProcess is true, we wait for the end of the - %% currently running process before executing the given function. - try register(rabbit_outside_app_process, self()) of - true -> - do_run_outside_app_fun(Fun) - catch - error:badarg when WaitForExistingProcess -> - MRef = erlang:monitor(process, rabbit_outside_app_process), - receive - {'DOWN', MRef, _, _, _} -> - %% The existing process exited, let's try to - %% register again. - register_outside_app_process(Fun, WaitForExistingProcess) - end; - error:badarg -> - ok - end. - -do_run_outside_app_fun(Fun) -> - try - Fun() - catch _:E -> - rabbit_log:error( - "rabbit_outside_app_process:~n~p~n~p~n", - [E, erlang:get_stacktrace()]) - end. - -wait_for_cluster_recovery(Condition) -> - ping_all(), - case Condition() of - true -> rabbit:start(); - false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), - wait_for_cluster_recovery(Condition) - end. - -handle_dead_rabbit(Node, State = #state{partitions = Partitions, - autoheal = Autoheal}) -> - %% TODO: This may turn out to be a performance hog when there are - %% lots of nodes. We really only need to execute some of these - %% statements on *one* node, rather than all of them. - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), - ok = rabbit_alarm:on_node_down(Node), - ok = rabbit_mnesia:on_node_down(Node), - %% If we have been partitioned, and we are now in the only remaining - %% partition, we no longer care about partitions - forget them. Note - %% that we do not attempt to deal with individual (other) partitions - %% going away. It's only safe to forget anything about partitions when - %% there are no partitions. - Down = Partitions -- alive_rabbit_nodes(), - NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running), - Partitions1 = case Partitions -- Down -- NoLongerPartitioned of - [] -> []; - _ -> Partitions - end, - ensure_ping_timer( - State#state{partitions = Partitions1, - autoheal = rabbit_autoheal:rabbit_down(Node, Autoheal)}). - -ensure_ping_timer(State) -> - rabbit_misc:ensure_timer( - State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, - ping_down_nodes). - -ensure_keepalive_timer(State) -> - {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval), - rabbit_misc:ensure_timer( - State, #state.keepalive_timer, Interval, ping_up_nodes). - -handle_live_rabbit(Node) -> - ok = rabbit_amqqueue:on_node_up(Node), - ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). - -maybe_autoheal(State = #state{partitions = []}) -> - State; - -maybe_autoheal(State = #state{autoheal = AState}) -> - case all_nodes_up() of - true -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)}; - false -> State - end. - -%%-------------------------------------------------------------------- -%% Internal utils -%%-------------------------------------------------------------------- - -try_read_file(FileName) -> - case rabbit_file:read_term_file(FileName) of - {ok, Term} -> {ok, Term}; - {error, enoent} -> {error, enoent}; - {error, E} -> throw({error, {cannot_read_file, FileName, E}}) - end. - -legacy_cluster_nodes(Nodes) -> - %% We get all the info that we can, including the nodes from - %% mnesia, which will be there if the node is a disc node (empty - %% list otherwise) - lists:usort(Nodes ++ mnesia:system_info(db_nodes)). - -legacy_disc_nodes(AllNodes) -> - case AllNodes == [] orelse lists:member(node(), AllNodes) of - true -> [node()]; - false -> [] - end. - -add_node(Node, Nodes) -> lists:usort([Node | Nodes]). - -del_node(Node, Nodes) -> Nodes -- [Node]. - -cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg). - -upgrade_to_full_partition(Proxy) -> - cast(Proxy, {partial_partition_disconnect, node()}), - disconnect(Proxy). - -%% When we call this, it's because we want to force Mnesia to detect a -%% partition. But if we just disconnect_node/1 then Mnesia won't -%% detect a very short partition. So we want to force a slightly -%% longer disconnect. Unfortunately we don't have a way to blacklist -%% individual nodes; the best we can do is turn off auto-connect -%% altogether. -disconnect(Node) -> - application:set_env(kernel, dist_auto_connect, never), - erlang:disconnect_node(Node), - timer:sleep(1000), - application:unset_env(kernel, dist_auto_connect), - ok. - -%%-------------------------------------------------------------------- - -%% mnesia:system_info(db_nodes) (and hence -%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes -%% when partitioned, just those that we are sharing Mnesia state -%% with. So we have a small set of replacement functions -%% here. "rabbit" in a function's name implies we test if the rabbit -%% application is up, not just the node. - -%% As we use these functions to decide what to do in pause_minority or -%% pause_if_all_down states, they *must* be fast, even in the case where -%% TCP connections are timing out. So that means we should be careful -%% about whether we connect to nodes which are currently disconnected. - -majority() -> - majority([]). - -majority(NodesDown) -> - Nodes = rabbit_mnesia:cluster_nodes(all), - AliveNodes = alive_nodes(Nodes) -- NodesDown, - length(AliveNodes) / length(Nodes) > 0.5. - -in_preferred_partition() -> - {ok, {pause_if_all_down, PreferredNodes, _}} = - application:get_env(rabbit, cluster_partition_handling), - in_preferred_partition(PreferredNodes). - -in_preferred_partition(PreferredNodes) -> - in_preferred_partition(PreferredNodes, []). - -in_preferred_partition(PreferredNodes, NodesDown) -> - Nodes = rabbit_mnesia:cluster_nodes(all), - RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)], - AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown, - RealPreferredNodes =:= [] orelse AliveNodes =/= []. - -all_nodes_up() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) =:= length(Nodes). - --spec all_rabbit_nodes_up() -> boolean(). - -all_rabbit_nodes_up() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). - --spec alive_nodes([node()]) -> [node()]. - -alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)). -alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. - --spec alive_rabbit_nodes([node()]) -> [node()]. - -alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). - -alive_rabbit_nodes(Nodes) -> - [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. - -%% This one is allowed to connect! - --spec ping_all() -> 'ok'. - -ping_all() -> - [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)], - ok. - -possibly_partitioned_nodes() -> - alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running). - -startup_log([]) -> - rabbit_log:info("Starting rabbit_node_monitor~n", []); -startup_log(Nodes) -> - rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n", - [Nodes]). diff --git a/src/rabbit_prelaunch_cluster.erl b/src/rabbit_prelaunch_cluster.erl index 9d3cda99e3..556ec99720 100644 --- a/src/rabbit_prelaunch_cluster.erl +++ b/src/rabbit_prelaunch_cluster.erl @@ -6,7 +6,7 @@ setup(Context) -> rabbit_log_prelaunch:debug(""), rabbit_log_prelaunch:debug("== Clustering =="), rabbit_log_prelaunch:debug("Preparing cluster status files"), - rabbit_node_monitor:prepare_cluster_status_files(), + % TODO LRB MNEVIS rabbit_node_monitor:prepare_cluster_status_files(), case Context of #{initial_pass := true} -> rabbit_log_prelaunch:debug("Upgrading Mnesia schema"), diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 7e64e4fcd5..240366bdeb 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -39,7 +39,7 @@ create() -> lists:foreach(fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of + case mnevis:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, @@ -54,6 +54,7 @@ ensure_secondary_indexes() -> ensure_secondary_index(rabbit_queue, vhost), ok. +%% TODO mnevis add table index ensure_secondary_index(Table, Field) -> case mnesia:add_table_index(Table, Field) of {atomic, ok} -> ok; @@ -68,8 +69,8 @@ ensure_secondary_index(Table, Field) -> -spec create_local_copy('disc' | 'ram') -> 'ok'. create_local_copy(disc) -> - create_local_copy(schema, disc_copies), - create_local_copies(disc); + create_local_copy(schema, ram_copies), + create_local_copies(ram); create_local_copy(ram) -> create_local_copies(ram), create_local_copy(schema, ram_copies). @@ -195,14 +196,14 @@ clear_ram_only_tables() -> create_local_copies(Type) -> lists:foreach( fun ({Tab, TabDef}) -> - HasDiscCopies = has_copy_type(TabDef, disc_copies), + HasDiscCopies = has_copy_type(TabDef, ram_copies), HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies), LocalTab = proplists:get_bool(local_content, TabDef), StorageType = if Type =:= disc orelse LocalTab -> if - HasDiscCopies -> disc_copies; + HasDiscCopies -> ram_copies; HasDiscOnlyCopies -> disc_only_copies; true -> ram_copies end; @@ -272,28 +273,22 @@ names() -> [Tab || {Tab, _} <- definitions()]. definitions(disc) -> definitions(); definitions(ram) -> - [{Tab, [{disc_copies, []}, {ram_copies, [node()]} | - proplists:delete( - ram_copies, proplists:delete(disc_copies, TabDef))]} || - {Tab, TabDef} <- definitions()]. + definitions(). definitions() -> [{rabbit_user, [{record_name, internal_user}, {attributes, record_info(fields, internal_user)}, - {disc_copies, [node()]}, {match, #internal_user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}, {match, #user_permission{user_vhost = #user_vhost{_='_'}, permission = #permission{_='_'}, _='_'}}]}, {rabbit_topic_permission, [{record_name, topic_permission}, {attributes, record_info(fields, topic_permission)}, - {disc_copies, [node()]}, {match, #topic_permission{topic_permission_key = #topic_permission_key{_='_'}, permission = #permission{_='_'}, _='_'}}]}, @@ -301,7 +296,7 @@ definitions() -> [ {record_name, vhost}, {attributes, vhost:fields()}, - {disc_copies, [node()]}, + % TODO MNEVIS LRB {disc_copies, [node()]}, {match, vhost:pattern_match_all()}]}, {rabbit_listener, [{record_name, listener}, @@ -311,7 +306,6 @@ definitions() -> {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {disc_copies, [node()]}, {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_semi_durable_route, [{record_name, route}, @@ -348,7 +342,6 @@ definitions() -> {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, @@ -361,12 +354,10 @@ definitions() -> {rabbit_runtime_parameters, [{record_name, runtime_parameters}, {attributes, record_info(fields, runtime_parameters)}, - {disc_copies, [node()]}, {match, #runtime_parameters{_='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, amqqueue:fields()}, - {disc_copies, [node()]}, {match, amqqueue:pattern_match_on_name(queue_name_match())}]}, {rabbit_queue, [{record_name, amqqueue}, diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 8cae363928..4d858bd7a5 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -159,7 +159,8 @@ upgrade_mode(AllNodes) -> {disc, []} -> primary; {disc, _} -> - Filename = rabbit_node_monitor:running_nodes_filename(), + %% TODO: rabbit_node_monitor + Filename = "<no filename>", % rabbit_node_monitor:running_nodes_filename(), die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " "disc node to shut down.~n~nNote: if several disc " diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3fd09af499..20df5353c0 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -418,7 +418,7 @@ internal_system_x() -> -spec cluster_name() -> 'ok'. cluster_name() -> - {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0), + {atomic, ok} = rabbit_misc:mnevis_transaction(fun cluster_name_tx/0), ok. cluster_name_tx() -> @@ -646,17 +646,18 @@ exchange_options(Table) -> transform(TableName, Fun, FieldList) -> rabbit_table:wait([TableName]), - {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), + %% TODO: make sure there is no conflicts + {atomic, ok} = mnevis:transform_table(TableName, Fun, FieldList), ok. transform(TableName, Fun, FieldList, NewRecordName) -> rabbit_table:wait([TableName]), - {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, + {atomic, ok} = mnevis:transform_table(TableName, Fun, FieldList, NewRecordName), ok. create(Tab, TabDef) -> - {atomic, ok} = mnesia:create_table(Tab, TabDef), + {atomic, ok} = mnevis:create_table(Tab, TabDef), ok. %% Dumb replacement for rabbit_exchange:declare that does not require diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8914197aaf..b03531d0aa 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -36,15 +36,19 @@ recover() -> %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. + io:format("amqq on node down ~n"), rabbit_amqqueue:on_node_down(node()), + io:format("amqq warn limit ~n"), rabbit_amqqueue:warn_file_limit(), %% Prepare rabbit_semi_durable_route table + io:format("binding recover ~n"), rabbit_binding:recover(), %% rabbit_vhost_sup_sup will start the actual recovery. %% So recovery will be run every time a vhost supervisor is restarted. + io:format("vhost sup sup start ~n"), ok = rabbit_vhost_sup_sup:start(), [ok = rabbit_vhost_sup_sup:init_vhost(VHost) || VHost <- list_names()], diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl index a02c4721bc..8f606716ee 100644 --- a/test/amqqueue_backward_compatibility_SUITE.erl +++ b/test/amqqueue_backward_compatibility_SUITE.erl @@ -297,6 +297,6 @@ upgrade_v1_to_v2(_) -> ?amqqueue_v1_type), ?assert(?is_amqqueue_v1(OldQueue)), ?assert(not ?is_amqqueue_v2(OldQueue)), - NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), + NewQueue = amqqueue:upgrade_to(OldQueue, amqqueue_v2), ?assert(not ?is_amqqueue_v1(NewQueue)), ?assert(?is_amqqueue_v2(NewQueue)). diff --git a/test/mirrored_supervisor_SUITE.erl b/test/mirrored_supervisor_SUITE.erl index d4bf6883da..f6893ea421 100644 --- a/test/mirrored_supervisor_SUITE.erl +++ b/test/mirrored_supervisor_SUITE.erl @@ -48,7 +48,7 @@ init_per_suite(Config) -> lists:foreach( fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of + case mnevis:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> @@ -277,7 +277,7 @@ pid_of(Id) -> Pid. tx_fun(Fun) -> - case mnesia:sync_transaction(Fun) of + case rabbit_misc:mnevis_transaction(Fun) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. |