summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2018-07-31 17:25:19 +0100
committerLuke Bakken <lbakken@pivotal.io>2020-01-09 17:59:11 -0800
commita9746c3a992ffedcc32915ccecaf96227ca70ff9 (patch)
tree730f4d0ec9e6d6dde05b0992976a3e81dafab28e
parentb3c5fae8fb12b5a194f7f6e652dbb03ebf520817 (diff)
downloadrabbitmq-server-git-mnevis-experimental.tar.gz
Experiment to integrate ramnesia into RabbitMQ.mnevis-experimental
Can run multiple nodes using predefined config. Runs from the virgin node state.. Can declare a queue. Logs are written to stdout to be visible on startup. Rename ramnesia to mnevis WIP. Fixed startup wip Startup and recovery with mnevis Cleanup, use the right branch user cluster_nodes as initial_nodes in mnevis Use mnevis transaction options to ensure local node may do dirty reads after transactions. RabbitMQ relies on data being readable with dirty_read after creating entries. Make sure mnevis node starts Specify ra branch Comment unused functions Move mnevis dependency to rabbit_common Testing out replicated rabbit_listener fixup Add leader node to status, add mnevis TODOs Rename leader to mnevis_leader in node info WIP. Remove rabbit_node_monitor. rabbit_node_monitor is there to compensate for mnesia parittion handling behaviour. Without mnesia clustering there is not much sense in it. There are still some places where rabbit_node_monitor was used. Marked with TODOs. Feature flags through mnevis. Make sure feature flags use mnevis API instead of the mnesia API.
-rw-r--r--rabbit.config14
-rw-r--r--src/amqqueue_v1.erl4
-rw-r--r--src/gm.erl4
-rw-r--r--src/rabbit.erl59
-rw-r--r--src/rabbit_amqqueue.erl34
-rw-r--r--src/rabbit_autoheal.erl459
-rw-r--r--src/rabbit_channel.erl21
-rw-r--r--src/rabbit_connection_tracking.erl20
-rw-r--r--src/rabbit_connection_tracking_handler.erl5
-rw-r--r--src/rabbit_core_ff.erl8
-rw-r--r--src/rabbit_feature_flags.erl9
-rw-r--r--src/rabbit_health_check.erl13
-rw-r--r--src/rabbit_mnesia.erl342
-rw-r--r--src/rabbit_mnesia_rename.erl8
-rw-r--r--src/rabbit_networking.erl19
-rw-r--r--src/rabbit_node_monitor.erl932
-rw-r--r--src/rabbit_prelaunch_cluster.erl2
-rw-r--r--src/rabbit_table.erl25
-rw-r--r--src/rabbit_upgrade.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl9
-rw-r--r--src/rabbit_vhost.erl4
-rw-r--r--test/amqqueue_backward_compatibility_SUITE.erl2
-rw-r--r--test/mirrored_supervisor_SUITE.erl4
23 files changed, 393 insertions, 1607 deletions
diff --git a/rabbit.config b/rabbit.config
new file mode 100644
index 0000000000..030e3dbdbc
--- /dev/null
+++ b/rabbit.config
@@ -0,0 +1,14 @@
+[
+{rabbit, [
+ {cluster_nodes, {['rabbit@localhost'
+ ,
+ 'rabbit1@localhost'
+ ], disc}}
+ ]},
+{ramnesia, [
+ {initial_nodes, ['rabbit@localhost'
+ ,
+ 'rabbit1@localhost'
+ ]}]},
+{ra, [{data_dir, "/tmp/ramnesia"}]}
+].
diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl
index 95d7083362..10cb36bc52 100644
--- a/src/amqqueue_v1.erl
+++ b/src/amqqueue_v1.erl
@@ -334,9 +334,9 @@ record_version_to_use() ->
upgrade(#amqqueue{} = Queue) -> Queue.
--spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue().
+-spec upgrade_to(amqqueue(), amqqueue_v1) -> amqqueue().
-upgrade_to(?record_version, #amqqueue{} = Queue) ->
+upgrade_to(#amqqueue{} = Queue, ?record_version) ->
Queue.
% arguments
diff --git a/src/gm.erl b/src/gm.erl
index 353ad41add..e10be91677 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -488,7 +488,7 @@ create_tables() ->
create_tables([]) ->
ok;
create_tables([{Table, Attributes} | Tables]) ->
- case mnesia:create_table(Table, Attributes) of
+ case mnevis:create_table(Table, Attributes) of
{atomic, ok} -> create_tables(Tables);
{aborted, {already_exists, Table}} -> create_tables(Tables);
Err -> Err
@@ -535,7 +535,7 @@ validate_members(Server, Members) ->
-spec forget_group(group_name()) -> 'ok'.
forget_group(GroupName) ->
- {atomic, ok} = mnesia:sync_transaction(
+ {atomic, ok} = rabbit_misc:mnevis_transaction(
fun () ->
mnesia:delete({?GROUP_TABLE, GroupName})
end),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7f16d0b1d1..600e1893c5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -142,12 +142,12 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_node_monitor,
- [{description, "node monitor"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_node_monitor]}},
- {requires, [rabbit_alarm, guid_generator]},
- {enables, core_initialized}]}).
+% -rabbit_boot_step({rabbit_node_monitor,
+% [{description, "node monitor"},
+% {mfa, {rabbit_sup, start_restartable_child,
+% [rabbit_node_monitor]}},
+% {requires, [rabbit_alarm, guid_generator]},
+% {enables, core_initialized}]}).
-rabbit_boot_step({rabbit_epmd_monitor,
[{description, "epmd monitor"},
@@ -167,13 +167,13 @@
[{description, "core initialized"},
{requires, kernel_ready}]}).
--rabbit_boot_step({upgrade_queues,
- [{description, "per-vhost message store migration"},
- {mfa, {rabbit_upgrade,
- maybe_migrate_queues_to_per_vhost_storage,
- []}},
- {requires, [core_initialized]},
- {enables, recovery}]}).
+% -rabbit_boot_step({upgrade_queues,
+% [{description, "per-vhost message store migration"},
+% {mfa, {rabbit_upgrade,
+% maybe_migrate_queues_to_per_vhost_storage,
+% []}},
+% {requires, [core_initialized]},
+% {enables, recovery}]}).
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
@@ -241,15 +241,15 @@
{requires, pre_flight}
]}).
--rabbit_boot_step({notify_cluster,
- [{description, "notifies cluster peers of our presence"},
- {mfa, {rabbit_node_monitor, notify_node_up, []}},
- {requires, pre_flight}]}).
+% -rabbit_boot_step({notify_cluster,
+% [{description, "notifies cluster peers of our presence"},
+% {mfa, {rabbit_node_monitor, notify_node_up, []}},
+% {requires, pre_flight}]}).
-rabbit_boot_step({networking,
[{description, "TCP and TLS listeners"},
{mfa, {rabbit_networking, boot, []}},
- {requires, notify_cluster}]}).
+ {requires, pre_flight}]}).
%%---------------------------------------------------------------------------
@@ -280,6 +280,7 @@
start() ->
%% start() vs. boot(): we want to throw an error in start().
+ % TODO COMMENT OUT rabbit_mnesia:check_cluster_consistency(),
start_it(temporary).
-spec boot() -> 'ok'.
@@ -320,6 +321,8 @@ run_prelaunch_second_phase() ->
#{initial_pass := IsInitialPass} =
Context = rabbit_prelaunch:get_context(),
+ set_mnevis_initial_nodes(),
+
case IsInitialPass of
true ->
rabbit_log_prelaunch:debug(""),
@@ -367,6 +370,16 @@ run_prelaunch_second_phase() ->
end,
ok.
+set_mnevis_initial_nodes() ->
+ case application:get_env(mnevis, initial_nodes, none) of
+ none ->
+ case application:get_env(rabbit, cluster_nodes, none) of
+ none -> ok;
+ {Nodes, disc} -> application:set_env(mnevis, initial_nodes, Nodes)
+ end;
+ _ -> ok
+ end.
+
%% Try to send systemd ready notification if it makes sense in the
%% current environment. standard_error is used intentionally in all
%% logging statements, so all this messages will end in systemd
@@ -1049,7 +1062,9 @@ boot_delegate() ->
-spec recover() -> 'ok'.
recover() ->
+ io:format("Recover policy ~n"),
ok = rabbit_policy:recover(),
+ io:format("Recover vhost ~n"),
ok = rabbit_vhost:recover(),
ok = lager_exchange_backend:maybe_init_exchange().
@@ -1057,7 +1072,13 @@ recover() ->
maybe_insert_default_data() ->
case rabbit_table:needs_default_data() of
- true -> insert_default_data();
+ true ->
+ {ok, _, {_, Leader}} = ra:members(mnevis_node:node_id()),
+ case Leader == node() of
+ true ->
+ insert_default_data();
+ false -> ok
+ end;
false -> ok
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5ff2e09636..3b5d43f4ff 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1794,7 +1794,7 @@ forget_all_durable(Node) ->
%% Note rabbit is not running so we avoid e.g. the worker pool. Also why
%% we don't invoke the return from rabbit_binding:process_deletions/1.
{atomic, ok} =
- mnesia:sync_transaction(
+ rabbit_misc:mnevis_transaction(
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
amqqueue:pattern_match_all(), write),
@@ -1968,10 +1968,12 @@ delete_queues_on_node_down(Node) ->
lists:unzip(lists:flatten([
rabbit_misc:execute_mnesia_transaction(
fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
- ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
+ )
+ || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
])).
delete_queue(QueueName) ->
+io:format("Delete queue ~p~n", [QueueName]),
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
@@ -1990,13 +1992,27 @@ partition_queues(T) ->
queues_to_delete_when_node_down(NodeDown) ->
rabbit_misc:execute_mnesia_transaction(fun () ->
- qlc:e(qlc:q([amqqueue:get_name(Q) ||
- Q <- mnesia:table(rabbit_queue),
- amqqueue:qnode(Q) == NodeDown andalso
- not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso
- (not rabbit_amqqueue:is_replicated(Q) orelse
- rabbit_amqqueue:is_dead_exclusive(Q))]
- ))
+ % qlc:e(qlc:q([amqqueue:get_name(Q) ||
+ % Q <- mnesia:table(rabbit_queue),
+ % amqqueue:qnode(Q) == NodeDown andalso
+ % not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso
+ % (not rabbit_amqqueue:is_replicated(Q) orelse
+ % rabbit_amqqueue:is_dead_exclusive(Q))]
+ % ))
+ mnesia:foldl(fun(Q, Acc) ->
+ case amqqueue:qnode(Q) == NodeDown andalso
+ not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso
+ (not rabbit_amqqueue:is_replicated(Q) orelse
+ rabbit_amqqueue:is_dead_exclusive(Q)) of
+ true ->
+ [amqqueue:get_name(Q) | Acc];
+ false ->
+ Acc
+ end
+ end,
+ [],
+ rabbit_queue)
+
end).
notify_queue_binding_deletions(QueueDeletions) ->
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
deleted file mode 100644
index 77165fc26c..0000000000
--- a/src/rabbit_autoheal.erl
+++ /dev/null
@@ -1,459 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_autoheal).
-
--export([init/0, enabled/0, maybe_start/1, rabbit_down/2, node_down/2,
- handle_msg/3, process_down/2]).
-
-%% The named process we are running in.
--define(SERVER, rabbit_node_monitor).
-
--define(MNESIA_STOPPED_PING_INTERNAL, 200).
-
--define(AUTOHEAL_STATE_AFTER_RESTART, rabbit_autoheal_state_after_restart).
-
-%%----------------------------------------------------------------------------
-
-%% In order to autoheal we want to:
-%%
-%% * Find the winning partition
-%% * Stop all nodes in other partitions
-%% * Wait for them all to be stopped
-%% * Start them again
-%%
-%% To keep things simple, we assume all nodes are up. We don't start
-%% unless all nodes are up, and if a node goes down we abandon the
-%% whole process. To further keep things simple we also defer the
-%% decision as to the winning node to the "leader" - arbitrarily
-%% selected as the first node in the cluster.
-%%
-%% To coordinate the restarting nodes we pick a special node from the
-%% winning partition - the "winner". Restarting nodes then stop, and
-%% wait for it to tell them it is safe to start again. The winner
-%% determines that a node has stopped just by seeing if its rabbit app
-%% stops - if a node stops for any other reason it just gets a message
-%% it will ignore, and otherwise we carry on.
-%%
-%% Meanwhile, the leader may continue to receive new autoheal requests:
-%% all of them are ignored. The winner notifies the leader when the
-%% current autoheal process is finished (ie. when all losers stopped and
-%% were asked to start again) or was aborted. When the leader receives
-%% the notification or if it looses contact with the winner, it can
-%% accept new autoheal requests.
-%%
-%% The winner and the leader are not necessarily the same node.
-%%
-%% The leader can be a loser and will restart in this case. It remembers
-%% there is an autoheal in progress by temporarily saving the autoheal
-%% state to the application environment.
-%%
-%% == Possible states ==
-%%
-%% not_healing
-%% - the default
-%%
-%% {winner_waiting, OutstandingStops, Notify}
-%% - we are the winner and are waiting for all losing nodes to stop
-%% before telling them they can restart
-%%
-%% {leader_waiting, Winner, Notify}
-%% - we are the leader, and have already assigned the winner and losers.
-%% We are waiting for a confirmation from the winner that the autoheal
-%% process has ended. Meanwhile we can ignore autoheal requests.
-%% Because we may be a loser too, this state is saved to the application
-%% environment and restored on startup.
-%%
-%% restarting
-%% - we are restarting. Of course the node monitor immediately dies
-%% then so this state does not last long. We therefore send the
-%% autoheal_safe_to_start message to the rabbit_outside_app_process
-%% instead.
-%%
-%% == Message flow ==
-%%
-%% 1. Any node (leader included) >> {request_start, node()} >> Leader
-%% When Mnesia detects it is running partitioned or
-%% when a remote node starts, rabbit_node_monitor calls
-%% rabbit_autoheal:maybe_start/1. The message above is sent to the
-%% leader so the leader can take a decision.
-%%
-%% 2. Leader >> {become_winner, Losers} >> Winner
-%% The leader notifies the winner so the latter can proceed with
-%% the autoheal.
-%%
-%% 3. Winner >> {winner_is, Winner} >> All losers
-%% The winner notifies losers they must stop.
-%%
-%% 4. Winner >> autoheal_safe_to_start >> All losers
-%% When either all losers stopped or the autoheal process was
-%% aborted, the winner notifies losers they can start again.
-%%
-%% 5. Leader >> report_autoheal_status >> Winner
-%% The leader asks the autoheal status to the winner. This only
-%% happens when the leader is a loser too. If this is not the case,
-%% this message is never sent.
-%%
-%% 6. Winner >> {autoheal_finished, Winner} >> Leader
-%% The winner notifies the leader that the autoheal process was
-%% either finished or aborted (ie. autoheal_safe_to_start was sent
-%% to losers).
-
-%%----------------------------------------------------------------------------
-
-init() ->
- %% We check the application environment for a saved autoheal state
- %% saved during a restart. If this node is a leader, it is used
- %% to determine if it needs to ask the winner to report about the
- %% autoheal progress.
- State = case application:get_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART) of
- {ok, S} -> S;
- undefined -> not_healing
- end,
- ok = application:unset_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART),
- case State of
- {leader_waiting, Winner, _} ->
- rabbit_log:info(
- "Autoheal: in progress, requesting report from ~p~n", [Winner]),
- send(Winner, report_autoheal_status);
- _ ->
- ok
- end,
- State.
-
-maybe_start(not_healing) ->
- case enabled() of
- true -> Leader = leader(),
- send(Leader, {request_start, node()}),
- rabbit_log:info("Autoheal request sent to ~p~n", [Leader]),
- not_healing;
- false -> not_healing
- end;
-maybe_start(State) ->
- State.
-
-enabled() ->
- case application:get_env(rabbit, cluster_partition_handling) of
- {ok, autoheal} -> true;
- {ok, {pause_if_all_down, _, autoheal}} -> true;
- _ -> false
- end.
-
-leader() ->
- [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
- Leader.
-
-%% This is the winner receiving its last notification that a node has
-%% stopped - all nodes can now start again
-rabbit_down(Node, {winner_waiting, [Node], Notify}) ->
- rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
- winner_finish(Notify);
-
-rabbit_down(Node, {winner_waiting, WaitFor, Notify}) ->
- {winner_waiting, WaitFor -- [Node], Notify};
-
-rabbit_down(Winner, {leader_waiting, Winner, Losers}) ->
- abort([Winner], Losers);
-
-rabbit_down(_Node, State) ->
- %% Ignore. Either:
- %% o we already cancelled the autoheal process;
- %% o we are still waiting the winner's report.
- State.
-
-node_down(_Node, not_healing) ->
- not_healing;
-
-node_down(Node, {winner_waiting, _, Notify}) ->
- abort([Node], Notify);
-
-node_down(Node, {leader_waiting, Node, _Notify}) ->
- %% The winner went down, we don't know what to do so we simply abort.
- rabbit_log:info("Autoheal: aborting - winner ~p went down~n", [Node]),
- not_healing;
-
-node_down(Node, {leader_waiting, _, _} = St) ->
- %% If it is a partial partition, the winner might continue with the
- %% healing process. If it is a full partition, the winner will also
- %% see it and abort. Let's wait for it.
- rabbit_log:info("Autoheal: ~p went down, waiting for winner decision ~n", [Node]),
- St;
-
-node_down(Node, _State) ->
- rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
- not_healing.
-
-%% If the process that has to restart the node crashes for an unexpected reason,
-%% we go back to a not healing state so the node is able to recover.
-process_down({'EXIT', Pid, Reason}, {restarting, Pid}) when Reason =/= normal ->
- rabbit_log:info("Autoheal: aborting - the process responsible for restarting the "
- "node terminated with reason: ~p~n", [Reason]),
- not_healing;
-
-process_down(_, State) ->
- State.
-
-%% By receiving this message we become the leader
-%% TODO should we try to debounce this?
-handle_msg({request_start, Node},
- not_healing, Partitions) ->
- rabbit_log:info("Autoheal request received from ~p~n", [Node]),
- case check_other_nodes(Partitions) of
- {error, E} ->
- rabbit_log:info("Autoheal request denied: ~s~n", [fmt_error(E)]),
- not_healing;
- {ok, AllPartitions} ->
- {Winner, Losers} = make_decision(AllPartitions),
- rabbit_log:info("Autoheal decision~n"
- " * Partitions: ~p~n"
- " * Winner: ~p~n"
- " * Losers: ~p~n",
- [AllPartitions, Winner, Losers]),
- case node() =:= Winner of
- true -> handle_msg({become_winner, Losers},
- not_healing, Partitions);
- false -> send(Winner, {become_winner, Losers}),
- {leader_waiting, Winner, Losers}
- end
- end;
-
-handle_msg({request_start, Node},
- State, _Partitions) ->
- rabbit_log:info("Autoheal request received from ~p when healing; "
- "ignoring~n", [Node]),
- State;
-
-handle_msg({become_winner, Losers},
- not_healing, _Partitions) ->
- rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
- [Losers]),
- stop_partition(Losers);
-
-handle_msg({become_winner, Losers},
- {winner_waiting, _, Losers}, _Partitions) ->
- %% The leader has aborted the healing, might have seen us down but
- %% we didn't see the same. Let's try again as it is the same partition.
- rabbit_log:info("Autoheal: I am the winner and received a duplicated "
- "request, waiting again for ~p to stop~n", [Losers]),
- stop_partition(Losers);
-
-handle_msg({become_winner, _},
- {winner_waiting, _, Losers}, _Partitions) ->
- %% Something has happened to the leader, it might have seen us down but we
- %% are still alive. Partitions have changed, cannot continue.
- rabbit_log:info("Autoheal: I am the winner and received another healing "
- "request, partitions have changed to ~p. Aborting ~n", [Losers]),
- winner_finish(Losers),
- not_healing;
-
-handle_msg({winner_is, Winner}, State = not_healing,
- _Partitions) ->
- %% This node is a loser, nothing else.
- Pid = restart_loser(State, Winner),
- {restarting, Pid};
-handle_msg({winner_is, Winner}, State = {leader_waiting, Winner, _},
- _Partitions) ->
- %% This node is the leader and a loser at the same time.
- Pid = restart_loser(State, Winner),
- {restarting, Pid};
-
-handle_msg(Request, {restarting, Pid} = St, _Partitions) ->
- %% ignore, we can contribute no further
- rabbit_log:info("Autoheal: Received the request ~p while waiting for ~p "
- "to restart the node. Ignoring it ~n", [Request, Pid]),
- St;
-
-handle_msg(report_autoheal_status, not_healing, _Partitions) ->
- %% The leader is asking about the autoheal status to us (the
- %% winner). This happens when the leader is a loser and it just
- %% restarted. We are in the "not_healing" state, so the previous
- %% autoheal process ended: let's tell this to the leader.
- send(leader(), {autoheal_finished, node()}),
- not_healing;
-
-handle_msg(report_autoheal_status, State, _Partitions) ->
- %% Like above, the leader is asking about the autoheal status. We
- %% are not finished with it. There is no need to send anything yet
- %% to the leader: we will send the notification when it is over.
- State;
-
-handle_msg({autoheal_finished, Winner},
- {leader_waiting, Winner, _}, _Partitions) ->
- %% The winner is finished with the autoheal process and notified us
- %% (the leader). We can transition to the "not_healing" state and
- %% accept new requests.
- rabbit_log:info("Autoheal finished according to winner ~p~n", [Winner]),
- not_healing;
-
-handle_msg({autoheal_finished, Winner}, not_healing, _Partitions)
- when Winner =:= node() ->
- %% We are the leader and the winner. The state already transitioned
- %% to "not_healing" at the end of the autoheal process.
- rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]),
- not_healing;
-
-handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) ->
- %% We might have seen the winner down during a partial partition and
- %% transitioned to not_healing. However, the winner was still able
- %% to finish. Let it pass.
- rabbit_log:info("Autoheal finished according to winner ~p."
- " Unexpected, I might have previously seen the winner down~n", [Winner]),
- not_healing.
-
-%%----------------------------------------------------------------------------
-
-send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}.
-
-abort(Down, Notify) ->
- rabbit_log:info("Autoheal: aborting - ~p down~n", [Down]),
- %% Make sure any nodes waiting for us start - it won't necessarily
- %% heal the partition but at least they won't get stuck.
- %% If we are executing this, we are not stopping. Thus, don't wait
- %% for ourselves!
- winner_finish(Notify -- [node()]).
-
-winner_finish(Notify) ->
- %% There is a race in Mnesia causing a starting loser to hang
- %% forever if another loser stops at the same time: the starting
- %% node connects to the other node, negotiates the protocol and
- %% attempts to acquire a write lock on the schema on the other node.
- %% If the other node stops between the protocol negotiation and lock
- %% request, the starting node never gets an answer to its lock
- %% request.
- %%
- %% To work around the problem, we make sure Mnesia is stopped on all
- %% losing nodes before sending the "autoheal_safe_to_start" signal.
- wait_for_mnesia_shutdown(Notify),
- [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
- send(leader(), {autoheal_finished, node()}),
- not_healing.
-
-%% This improves the previous implementation, but could still potentially enter an infinity
-%% loop. If it also possible that for when it finishes some of the nodes have been
-%% manually restarted, but we can't do much more (apart from stop them again). So let it
-%% continue and notify all the losers to restart.
-wait_for_mnesia_shutdown(AllNodes) ->
- Monitors = lists:foldl(fun(Node, Monitors0) ->
- pmon:monitor({mnesia_sup, Node}, Monitors0)
- end, pmon:new(), AllNodes),
- wait_for_supervisors(Monitors).
-
-wait_for_supervisors(Monitors) ->
- case pmon:is_empty(Monitors) of
- true ->
- ok;
- false ->
- receive
- {'DOWN', _MRef, process, {mnesia_sup, _} = I, _Reason} ->
- wait_for_supervisors(pmon:erase(I, Monitors))
- after
- 60000 ->
- AliveLosers = [Node || {_, Node} <- pmon:monitored(Monitors)],
- rabbit_log:info("Autoheal: mnesia in nodes ~p is still up, sending "
- "winner notification again to these ~n", [AliveLosers]),
- [send(L, {winner_is, node()}) || L <- AliveLosers],
- wait_for_mnesia_shutdown(AliveLosers)
- end
- end.
-
-restart_loser(State, Winner) ->
- rabbit_log:warning(
- "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
- rabbit_node_monitor:run_outside_applications(
- fun () ->
- MRef = erlang:monitor(process, {?SERVER, Winner}),
- rabbit:stop(),
- NextState = receive
- {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} ->
- not_healing;
- autoheal_safe_to_start ->
- State
- end,
- erlang:demonitor(MRef, [flush]),
- %% During the restart, the autoheal state is lost so we
- %% store it in the application environment temporarily so
- %% init/0 can pick it up.
- %%
- %% This is useful to the leader which is a loser at the
- %% same time: because the leader is restarting, there
- %% is a great chance it misses the "autoheal finished!"
- %% notification from the winner. Thanks to the saved
- %% state, it knows it needs to ask the winner if the
- %% autoheal process is finished or not.
- application:set_env(rabbit,
- ?AUTOHEAL_STATE_AFTER_RESTART, NextState),
- rabbit:start()
- end, true).
-
-make_decision(AllPartitions) ->
- Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
- [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
- {Winner, lists:append(Rest)}.
-
-partition_value(Partition) ->
- Connections = [Res || Node <- Partition,
- Res <- [rpc:call(Node, rabbit_networking,
- connections_local, [])],
- is_list(Res)],
- {length(lists:append(Connections)), length(Partition)}.
-
-%% We have our local understanding of what partitions exist; but we
-%% only know which nodes we have been partitioned from, not which
-%% nodes are partitioned from each other.
-check_other_nodes(LocalPartitions) ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- {Results, Bad} = rabbit_node_monitor:status(Nodes -- [node()]),
- RemotePartitions = [{Node, proplists:get_value(partitions, Res)}
- || {Node, Res} <- Results],
- RemoteDown = [{Node, Down}
- || {Node, Res} <- Results,
- Down <- [Nodes -- proplists:get_value(nodes, Res)],
- Down =/= []],
- case {Bad, RemoteDown} of
- {[], []} -> Partitions = [{node(), LocalPartitions} | RemotePartitions],
- {ok, all_partitions(Partitions, [Nodes])};
- {[], _} -> {error, {remote_down, RemoteDown}};
- {_, _} -> {error, {nodes_down, Bad}}
- end.
-
-all_partitions([], Partitions) ->
- Partitions;
-all_partitions([{Node, CantSee} | Rest], Partitions) ->
- {[Containing], Others} =
- lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions),
- A = Containing -- CantSee,
- B = Containing -- A,
- Partitions1 = case {A, B} of
- {[], _} -> Partitions;
- {_, []} -> Partitions;
- _ -> [A, B | Others]
- end,
- all_partitions(Rest, Partitions1).
-
-fmt_error({remote_down, RemoteDown}) ->
- rabbit_misc:format("Remote nodes disconnected:~n ~p", [RemoteDown]);
-fmt_error({nodes_down, NodesDown}) ->
- rabbit_misc:format("Local nodes down: ~p", [NodesDown]).
-
-stop_partition(Losers) ->
- %% The leader said everything was ready - do we agree? If not then
- %% give up.
- Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
- case Down of
- [] -> [send(L, {winner_is, node()}) || L <- Losers],
- {winner_waiting, Losers, Losers};
- _ -> abort(Down, Losers)
- end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4fd218754d..5570aca1b1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2265,8 +2265,9 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
- case rabbit_node_monitor:pause_partition_guard() of
- ok ->
+ %% TODO: rabbit_node_monitor
+ % case rabbit_node_monitor:pause_partition_guard() of
+ % ok ->
Confirms = lists:append(C),
Rejects = lists:append(R),
ConfirmMsgSeqNos =
@@ -2285,13 +2286,17 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
send_nacks(RejectMsgSeqNos,
ConfirmMsgSeqNos,
State1#ch{rejected = []});
- pausing -> State
- end;
+ % pausing -> State
+ % end;
send_confirms_and_nacks(State) ->
- case rabbit_node_monitor:pause_partition_guard() of
- ok -> maybe_complete_tx(State);
- pausing -> State
- end.
+ %% TODO: rabbit_node_monitor
+ % case rabbit_node_monitor:pause_partition_guard() of
+ % ok ->
+ maybe_complete_tx(State)
+ % ;
+ % pausing -> State
+ % end
+ .
send_nacks([], _, State) ->
State;
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index 403fbf4191..e1781a6253 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -186,7 +186,7 @@ ensure_per_vhost_tracked_connections_table_for_this_node() ->
ensure_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_connection},
+ case mnevis:create_table(TableName, [{record_name, tracked_connection},
{attributes, record_info(fields, tracked_connection)}]) of
{atomic, ok} -> ok;
{aborted, {already_exists, _}} -> ok;
@@ -200,7 +200,7 @@ ensure_tracked_connections_table_for_node(Node) ->
ensure_per_vhost_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost},
+ case mnevis:create_table(TableName, [{record_name, tracked_connection_per_vhost},
{attributes, record_info(fields, tracked_connection_per_vhost)}]) of
{atomic, ok} -> ok;
{aborted, {already_exists, _}} -> ok;
@@ -227,7 +227,7 @@ clear_tracked_connection_tables_for_this_node() ->
delete_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_table_name_for(Node),
- case mnesia:delete_table(TableName) of
+ case mnevis:delete_table(TableName) of
{atomic, ok} -> ok;
{aborted, {no_exists, _}} -> ok;
{aborted, Error} ->
@@ -240,7 +240,7 @@ delete_tracked_connections_table_for_node(Node) ->
delete_per_vhost_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:delete_table(TableName) of
+ case mnevis:delete_table(TableName) of
{atomic, ok} -> ok;
{aborted, {no_exists, _}} -> ok;
{aborted, Error} ->
@@ -368,9 +368,15 @@ count_connections_in(VirtualHost) ->
lists:foldl(fun (Node, Acc) ->
Tab = tracked_connection_per_vhost_table_name_for(Node),
try
- N = case mnesia:dirty_read(Tab, VirtualHost) of
- [] -> 0;
- [Val] -> Val#tracked_connection_per_vhost.connection_count
+ N = case rabbit_misc:mnevis_transaction(
+ fun() ->
+ case mnesia:dirty_read({Tab, VirtualHost}) of
+ [] -> 0;
+ [Val] -> Val#tracked_connection_per_vhost.connection_count
+ end
+ end) of
+ {atomic, Val} -> Val;
+ {aborted, _Reason} -> 0
end,
Acc + N
catch _:Err ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index bc27d59363..92bc6bc2dc 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -46,7 +46,10 @@
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_connection_tracking]}},
- {requires, [rabbit_event, rabbit_node_monitor]},
+ {requires, [rabbit_event
+ %% TODO: rabbit_node_monitor
+ % ,rabbit_node_monitor
+ ]},
{enables, ?MODULE}]}).
%%
diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl
index 03f686fb33..fe83a68d2e 100644
--- a/src/rabbit_core_ff.erl
+++ b/src/rabbit_core_ff.erl
@@ -59,14 +59,13 @@ quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) ->
Tables = ?quorum_queue_tables,
rabbit_table:wait(Tables, _Retry = true),
Fields = amqqueue:fields(amqqueue_v2),
- mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso
- mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields.
+ mnevis:table_info(rabbit_queue, attributes) =:= Fields andalso
+ mnevis:table_info(rabbit_durable_queue, attributes) =:= Fields.
migrate_to_amqqueue_with_type(FeatureName, [Table | Rest], Fields) ->
rabbit_log:info("Feature flag `~s`: migrating Mnesia table ~s...",
[FeatureName, Table]),
- Fun = fun(Queue) -> amqqueue:upgrade_to(amqqueue_v2, Queue) end,
- case mnesia:transform_table(Table, Fun, Fields) of
+ case mnevis:transform_table(Table, {amqqueue, upgrade_to, [amqqueue_v2]}, Fields) of
{atomic, ok} -> migrate_to_amqqueue_with_type(FeatureName,
Rest,
Fields);
@@ -86,6 +85,7 @@ implicit_default_bindings_migration(FeatureName, _FeatureProps,
%% Default exchange bindings are now implicit (not stored in the
%% route tables). It should be safe to remove them outside of a
%% transaction.
+ %% TODO: how safe is it in mnevis?
rabbit_table:wait([rabbit_queue]),
Queues = mnesia:dirty_all_keys(rabbit_queue),
remove_explicit_default_bindings(FeatureName, Queues);
diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl
index 941d1617e7..d3c87d0f94 100644
--- a/src/rabbit_feature_flags.erl
+++ b/src/rabbit_feature_flags.erl
@@ -1612,16 +1612,19 @@ mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, Timeout) ->
%% @private
remote_nodes() ->
- mnesia:system_info(db_nodes) -- [node()].
+ mnevis:db_nodes() -- [node()].
-spec running_remote_nodes() -> [node()].
%% @private
running_remote_nodes() ->
- mnesia:system_info(running_db_nodes) -- [node()].
+ %% TODO: why do we need only running nodes?
+ remote_nodes().
+ % mnesia:system_info(running_db_nodes) -- [node()].
query_running_remote_nodes(Node, Timeout) ->
- case rpc:call(Node, mnesia, system_info, [running_db_nodes], Timeout) of
+ %% TODO: this is weird. Why do we need running_db_nodes from remote?
+ case rpc:call(Node, mnevis, db_nodes, [], Timeout) of
{badrpc, _} = Error -> Error;
Nodes -> Nodes -- [node()]
end.
diff --git a/src/rabbit_health_check.erl b/src/rabbit_health_check.erl
index 53666ac411..2a4ae773d1 100644
--- a/src/rabbit_health_check.erl
+++ b/src/rabbit_health_check.erl
@@ -61,13 +61,12 @@ node_health_check(list_queues) ->
health_check_queues(rabbit_vhost:list_names());
node_health_check(rabbit_node_monitor) ->
- case rabbit_node_monitor:partitions() of
- [] ->
- ok;
- L when is_list(L), length(L) > 0 ->
- ErrorMsg = io_lib:format("cluster partition in effect: ~p", [L]),
- {error_string, ErrorMsg}
- end;
+ ok;
+ %% TODO: rabbit_node_monitor
+ % case rabbit_node_monitor:partitions() of
+ % L when is_list(L) ->
+ % ok
+ % end;
node_health_check(alarms) ->
case proplists:get_value(alarms, rabbit:status()) of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b1921e032f..cec52b681b 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -54,10 +54,12 @@
%% Used internally in rpc calls
-export([node_info/0, remove_node_if_mnesia_running/1]).
--ifdef(TEST).
--compile(export_all).
--export([init_with_lock/3]).
--endif.
+-compile(nowarn_unused_function).
+
+% -ifdef(TEST).
+% -compile(export_all).
+% -export([init_with_lock/3]).
+% -endif.
%%----------------------------------------------------------------------------
@@ -75,6 +77,8 @@
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
+ %% TODO mnevis: node start
+ ok = mnevis_node:start(),
case is_virgin_node() of
true ->
rabbit_log:info("Node database directory at ~ts is empty. "
@@ -90,12 +94,68 @@ init() ->
rabbit_peer_discovery:maybe_init(),
rabbit_peer_discovery:maybe_register()
end,
+
+ %% Create schema on all nodes
+ case is_virgin_node() of
+ true ->
+ ok = create_schema();
+ false ->
+ ok
+ end,
+
+ io:format("~nDb nodes ~p~n", [mnevis:db_nodes()]),
+ io:format("~nRunning Db nodes ~p~n", [mnevis:running_db_nodes()]),
+
+ %%% io:format("Get cluster status ~n"),
+ %%% {ok, Status} = cluster_status_from_mnesia(),
+ %%% io:format("Cluster status ~p~n", [Status]),
+
+ %% TODO: rabbit_node_monitor
+ %%% rabbit_node_monitor:write_cluster_status(Status),
+
+ %%% % rabbit_node_monitor:update_cluster_status(),
+
+ %%% % case is_virgin_node() of
+ %%% % true ->
+
+ %%% % rabbit_log:info("Node database directory at ~s is empty. "
+ %%% % "Assuming we need to join an existing cluster or initialise from scratch...~n",
+ %%% % [dir()]),
+ %%% % rabbit_peer_discovery:log_configured_backend(),
+ %%% % rabbit_peer_discovery:maybe_init(),
+ %%% % init_with_lock();
+ %%% % false ->
+
+ %%% % NodeType = node_type(),
+ %%% % init_db_and_upgrade(cluster_nodes(all), NodeType,
+ %%% % NodeType =:= ram, _Retry = true),
+ %%% % rabbit_peer_discovery:maybe_init(),
+ %%% % rabbit_peer_discovery:maybe_register()
+ %%% % end,
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case -
%% let's make it so.
- ok = rabbit_node_monitor:global_sync(),
+ %% TODO: rabbit_node_monitor
+ % ok = rabbit_node_monitor:global_sync(),
+ ok.
+
+wait_for_tables() ->
+ Tables = [T || {T, _D} <- rabbit_table:definitions()],
+ lists:foreach(fun(Table) ->
+ wait_for_table(Table)
+ end,
+ Tables),
ok.
+wait_for_table(Table) ->
+ rabbit_log:error("Waiting for table ~p~n", [Table]),
+ case lists:member(Table, mnesia:system_info(tables)) of
+ true -> ok;
+ false ->
+ timer:sleep(1000),
+ wait_for_table(Table)
+ end.
+
init_with_lock() ->
{Retries, Timeout} = rabbit_peer_discovery:retry_timeout(),
init_with_lock(Retries, Timeout, fun init_from_config/0).
@@ -134,7 +194,7 @@ init_from_config() ->
(Name, BadNames) when is_atom(Name) -> BadNames;
(Name, BadNames) -> [Name | BadNames]
end,
- {DiscoveredNodes, NodeType} =
+ {DiscoveredNodes, _NodeType} =
case rabbit_peer_discovery:discover_cluster_nodes() of
{ok, {Nodes, Type} = Config}
when is_list(Nodes) andalso
@@ -158,29 +218,28 @@ init_from_config() ->
"Enabling debug logging might help troubleshoot."),
init_db_and_upgrade([node()], disc, false, _Retry = true);
_ ->
- rabbit_log:info("Peer nodes we can cluster with: ~s~n",
- [rabbit_peer_discovery:format_discovered_nodes(Peers)]),
- join_discovered_peers(Peers, NodeType)
+ rabbit_log:info("Peer nodes we can cluster with: ~s~n", [rabbit_peer_discovery:format_discovered_nodes(Peers)])
+ %% TODO LRB MNEVIS join_discovered_peers(Peers, NodeType)
end.
%% Attempts to join discovered,
%% reachable and compatible (in terms of Mnesia internal protocol version and such)
%% cluster peers in order.
-join_discovered_peers(TryNodes, NodeType) ->
- case find_reachable_peer_to_cluster_with(nodes_excl_me(TryNodes)) of
- {ok, Node} ->
- rabbit_log:info("Node '~s' selected for auto-clustering~n", [Node]),
- {ok, {_, DiscNodes, _}} = discover_cluster0(Node),
- init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true),
- rabbit_connection_tracking:boot(),
- rabbit_node_monitor:notify_joined_cluster();
- none ->
- rabbit_log:warning(
- "Could not successfully contact any node of: ~s (as in Erlang distribution). "
- "Starting as a blank standalone node...~n",
- [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]),
- init_db_and_upgrade([node()], disc, false, _Retry = true)
- end.
+% join_discovered_peers(TryNodes, NodeType) ->
+% case find_reachable_peer_to_cluster_with(nodes_excl_me(TryNodes)) of
+% {ok, Node} ->
+% rabbit_log:info("Node '~s' selected for auto-clustering~n", [Node]),
+% {ok, {_, DiscNodes, _}} = discover_cluster0(Node),
+% init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true),
+% rabbit_connection_tracking:boot(),
+% rabbit_node_monitor:notify_joined_cluster();
+% none ->
+% rabbit_log:warning(
+% "Could not successfully contact any node of: ~s (as in Erlang distribution). "
+% "Starting as a blank standalone node...~n",
+% [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]),
+% init_db_and_upgrade([node()], disc, false, _Retry = true)
+% end.
%% Make the node join a cluster. The node will be reset automatically
%% before we actually cluster it. The nodes provided will be used to
@@ -225,7 +284,8 @@ join_cluster(DiscoveryNode, NodeType) ->
ok = init_db_with_mnesia(ClusterNodes, NodeType,
true, true, _Retry = true),
rabbit_connection_tracking:boot(),
- rabbit_node_monitor:notify_joined_cluster(),
+ %% TODO: rabbit_node_monitor
+ % rabbit_node_monitor:notify_joined_cluster(),
ok;
{error, Reason} ->
{error, Reason}
@@ -284,7 +344,8 @@ wipe() ->
[erlang:disconnect_node(N) || N <- cluster_nodes(all)],
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok = rabbit_node_monitor:reset_cluster_status(),
+ %% TODO: rabbit_node_monitor
+ % ok = rabbit_node_monitor:reset_cluster_status(),
ok.
-spec change_cluster_node_type(node_type()) -> 'ok'.
@@ -311,14 +372,15 @@ change_cluster_node_type(Type) ->
update_cluster_nodes(DiscoveryNode) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- Status = {AllNodes, _, _} = discover_cluster([DiscoveryNode]),
+ _Status = {AllNodes, _, _} = discover_cluster([DiscoveryNode]),
case me_in_nodes(AllNodes) of
true ->
%% As in `check_consistency/0', we can safely delete the
%% schema here, since it'll be replicated from the other
%% nodes
mnesia:delete_schema([node()]),
- rabbit_node_monitor:write_cluster_status(Status),
+ %% TODO: rabbit_node_monitor
+ % rabbit_node_monitor:write_cluster_status(Status),
rabbit_log:info("Updating cluster nodes from ~p~n",
[DiscoveryNode]),
init_db_with_mnesia(AllNodes, node_type(), true, true, _Retry = false);
@@ -365,7 +427,7 @@ remove_node_offline_node(Node) ->
%% want - we need to know the running nodes *now*. If the current node is a
%% RAM node it will return bogus results, but we don't care since we only do
%% this operation from disc nodes.
- case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of
+ case {mnevis:running_db_nodes() -- [Node], node_type()} of
{[], disc} ->
start_mnesia(),
try
@@ -402,18 +464,24 @@ status() ->
[{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
IfNonEmpty(ram, cluster_nodes(ram)))}] ++
case is_running() of
- true -> RunningNodes = cluster_nodes(running),
- [{running_nodes, RunningNodes},
- {cluster_name, rabbit_nodes:cluster_name()},
- {partitions, mnesia_partitions(RunningNodes)}];
- false -> []
+ true ->
+ RunningNodes = cluster_nodes(running),
+ [{running_nodes, RunningNodes},
+ {cluster_name, rabbit_nodes:cluster_name()},
+ {partitions, mnesia_partitions(RunningNodes)},
+ {mnevis_leader, get_mnevis_leader()}];
+ false ->
+ []
end.
-mnesia_partitions(Nodes) ->
- Replies = rabbit_node_monitor:partitions(Nodes),
- [Reply || Reply = {_, R} <- Replies, R =/= []].
+mnesia_partitions(_Nodes) -> [].
+ %% TODO: rabbit_node_monitor
+ % Replies = rabbit_node_monitor:partitions(Nodes),
+ % [Reply || Reply = {_, R} <- Replies, R =/= []].
-is_running() -> mnesia:system_info(is_running) =:= yes.
+is_running() ->
+ % TODO mnevis
+ mnesia:system_info(is_running) =:= yes.
-spec is_clustered() -> boolean().
@@ -459,27 +527,31 @@ cluster_status_from_mnesia() ->
false ->
{error, mnesia_not_running};
true ->
+ % TODO mnevis
%% If the tables are not present, it means that
%% `init_db/3' hasn't been run yet. In other words, either
%% we are a virgin node or a restarted RAM node. In both
%% cases we're not interested in what mnesia has to say.
- NodeType = case mnesia:system_info(use_dir) of
- true -> disc;
- false -> ram
- end,
- case rabbit_table:is_present() of
- true -> AllNodes = mnesia:system_info(db_nodes),
- DiscCopies = mnesia:table_info(schema, disc_copies),
- DiscNodes = case NodeType of
- disc -> nodes_incl_me(DiscCopies);
- ram -> DiscCopies
- end,
+ % NodeType = case mnesia:system_info(use_dir) of
+ % true -> disc;
+ % false -> ram
+ % end,
+ % case rabbit_table:is_present() of
+ % true ->
+ AllNodes = mnevis:db_nodes(),
+
+ %% Ignoring disk node setting
+ % DiscCopies = mnesia:table_info(schema, disc_copies),
+ % DiscNodes = case NodeType of
+ % disc -> nodes_incl_me(DiscCopies);
+ % ram -> DiscCopies
+ % end,
%% `mnesia:system_info(running_db_nodes)' is safe since
%% we know that mnesia is running
- RunningNodes = mnesia:system_info(running_db_nodes),
- {ok, {AllNodes, DiscNodes, RunningNodes}};
- false -> {error, tables_not_present}
- end
+ RunningNodes = mnevis:running_db_nodes(),
+ {ok, {AllNodes, AllNodes, RunningNodes}}
+ % false -> {error, tables_not_present}
+ % end
end.
cluster_status(WhichNodes) ->
@@ -488,12 +560,15 @@ cluster_status(WhichNodes) ->
{ok, Nodes0} ->
Nodes0;
{error, _Reason} ->
- {AllNodes0, DiscNodes0, RunningNodes0} =
- rabbit_node_monitor:read_cluster_status(),
+ %% TODO: rabbit_node_monitor
+ %% Read cluster status from offline mnevis?
+ {[], [], []}
+ % {AllNodes0, DiscNodes0, RunningNodes0} =
+ % rabbit_node_monitor:read_cluster_status(),
%% The cluster status file records the status when the node is
%% online, but we know for sure that the node is offline now, so
%% we can remove it from the list of running nodes.
- {AllNodes0, DiscNodes0, nodes_excl_me(RunningNodes0)}
+ % {AllNodes0, DiscNodes0, nodes_excl_me(RunningNodes0)}
end,
case WhichNodes of
status -> Nodes;
@@ -510,13 +585,14 @@ node_info() ->
-spec node_type() -> node_type().
-node_type() ->
- {_AllNodes, DiscNodes, _RunningNodes} =
- rabbit_node_monitor:read_cluster_status(),
- case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of
- true -> disc;
- false -> ram
- end.
+node_type() -> disc.
+ %% TODO: rabbit_node_monitor
+ % {_AllNodes, DiscNodes, _RunningNodes} =
+ % rabbit_node_monitor:read_cluster_status(),
+ % case DiscNodes =:= [] orelse me_in_nodes(DiscNodes) of
+ % true -> disc;
+ % false -> ram
+ % end.
-spec dir() -> file:filename().
@@ -557,7 +633,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
end,
ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin),
ensure_schema_integrity(),
- rabbit_node_monitor:update_cluster_status(),
+ %% TODO: rabbit_node_monitor
+ % rabbit_node_monitor:update_cluster_status(),
ok.
-spec init_db_unchecked([node()], node_type()) -> 'ok'.
@@ -672,14 +749,18 @@ check_cluster_consistency() ->
case lists:foldl(
fun (Node, {error, _}) -> check_cluster_consistency(Node, true);
(_Node, {ok, Status}) -> {ok, Status}
- end, {error, not_found}, nodes_excl_me(cluster_nodes(all)))
+ end,
+ {error, not_found},
+ nodes_excl_me(cluster_nodes(all)))
of
- {ok, Status = {RemoteAllNodes, _, _}} ->
+ {ok, _Status = {RemoteAllNodes, _, _}} ->
case ordsets:is_subset(ordsets:from_list(cluster_nodes(all)),
ordsets:from_list(RemoteAllNodes)) of
true ->
ok;
false ->
+ error({should_not_get_here, cluster_inconsistency}),
+
%% We delete the schema here since we think we are
%% clustered with nodes that are no longer in the
%% cluster and there is no other way to remove
@@ -693,7 +774,9 @@ check_cluster_consistency() ->
%% nodes.
mnesia:delete_schema([node()])
end,
- rabbit_node_monitor:write_cluster_status(Status);
+ ok;
+ %% TODO: rabbit_node_monitor
+ % rabbit_node_monitor:write_cluster_status(Status);
{error, not_found} ->
ok;
{error, _} = E ->
@@ -805,11 +888,21 @@ schema_ok_or_move() ->
%% We only care about disc nodes since ram nodes are supposed to catch
%% up only
create_schema() ->
- stop_mnesia(),
- rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
- start_mnesia(),
- ok = rabbit_table:create(),
+ % TODO mnevis
+ % io:format("Create schema ~n"),
+ % stop_mnesia(),
+ % rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
+ % start_mnesia(),
+ case is_leader(node()) of
+ true ->
+ io:format("Create tables ~n"),
+ ok = rabbit_table:create();
+ false ->
+ wait_for_tables()
+ end,
+ io:format("Check integrity ~n"),
ensure_schema_integrity(),
+ io:format("Record version ~n"),
ok = rabbit_version:record_desired().
move_db() ->
@@ -844,7 +937,8 @@ remove_node_if_mnesia_running(Node) ->
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
rabbit_amqqueue:forget_all_durable(Node),
- rabbit_node_monitor:notify_left_cluster(Node),
+ %% TODO: rabbit_node_monitor
+ % rabbit_node_monitor:notify_left_cluster(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
@@ -873,11 +967,11 @@ wait_for(Condition) ->
rabbit_log:info("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-start_mnesia(CheckConsistency) ->
- case CheckConsistency of
- true -> check_cluster_consistency();
- false -> ok
- end,
+start_mnesia(_CheckConsistency) ->
+ % case CheckConsistency of
+ % true -> check_cluster_consistency();
+ % false -> ok
+ % end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ensure_mnesia_running().
@@ -985,45 +1079,47 @@ check_rabbit_consistency(RemoteNode, RemoteVersion) ->
%% exception of the cluster status files, which will be there thanks to
%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
is_virgin_node() ->
- case rabbit_file:list_dir(dir()) of
- {error, enoent} ->
- true;
- {ok, []} ->
- true;
- {ok, List0} ->
- IgnoredFiles0 =
- [rabbit_node_monitor:cluster_status_filename(),
- rabbit_node_monitor:running_nodes_filename(),
- rabbit_node_monitor:quorum_filename(),
- rabbit_feature_flags:enabled_feature_flags_list_file()],
- IgnoredFiles = [filename:basename(File) || File <- IgnoredFiles0],
- List = List0 -- IgnoredFiles,
- List =:= []
- end.
-
-find_reachable_peer_to_cluster_with([]) ->
- none;
-find_reachable_peer_to_cluster_with([Node | Nodes]) ->
- Fail = fun (Fmt, Args) ->
- rabbit_log:warning(
- "Could not auto-cluster with node ~s: " ++ Fmt, [Node | Args]),
- find_reachable_peer_to_cluster_with(Nodes)
- end,
- case remote_node_info(Node) of
- {badrpc, _} = Reason ->
- Fail("~p~n", [Reason]);
- %% old delegate hash check
- {_OTP, RMQ, Hash, _} when is_binary(Hash) ->
- Fail("version ~s~n", [RMQ]);
- {_OTP, _RMQ, _Protocol, {error, _} = E} ->
- Fail("~p~n", [E]);
- {OTP, RMQ, Protocol, _} ->
- case check_consistency(Node, OTP, RMQ, Protocol) of
- {error, _} -> Fail("versions ~p~n",
- [{OTP, RMQ}]);
- ok -> {ok, Node}
- end
- end.
+ %%% case rabbit_file:list_dir(dir()) of
+ %%% {error, enoent} ->
+ %%% true;
+ %%% {ok, []} ->
+ %%% true;
+ %%% {ok, List0} ->
+ %%% IgnoredFiles0 =
+ %%% [rabbit_node_monitor:cluster_status_filename(),
+ %%% rabbit_node_monitor:running_nodes_filename(),
+ %%% rabbit_node_monitor:quorum_filename(),
+ %%% rabbit_feature_flags:enabled_feature_flags_list_file()],
+ %%% IgnoredFiles = [filename:basename(File) || File <- IgnoredFiles0],
+ %%% List = List0 -- IgnoredFiles,
+ %%% List =:= []
+ %%% end.
+ {ok, Tables, _} = ra:consistent_query(mnevis_node:node_id(), fun(_) -> mnesia:system_info(tables) end),
+ not lists:member(rabbit_queue, Tables).
+
+% find_reachable_peer_to_cluster_with([]) ->
+% none;
+% find_reachable_peer_to_cluster_with([Node | Nodes]) ->
+% Fail = fun (Fmt, Args) ->
+% rabbit_log:warning(
+% "Could not auto-cluster with node ~s: " ++ Fmt, [Node | Args]),
+% find_reachable_peer_to_cluster_with(Nodes)
+% end,
+% case remote_node_info(Node) of
+% {badrpc, _} = Reason ->
+% Fail("~p~n", [Reason]);
+% %% old delegate hash check
+% {_OTP, RMQ, Hash, _} when is_binary(Hash) ->
+% Fail("version ~s~n", [RMQ]);
+% {_OTP, _RMQ, _Protocol, {error, _} = E} ->
+% Fail("~p~n", [E]);
+% {OTP, RMQ, Protocol, _} ->
+% case check_consistency(Node, OTP, RMQ, Protocol) of
+% {error, _} -> Fail("versions ~p~n",
+% [{OTP, RMQ}]);
+% ok -> {ok, Node}
+% end
+% end.
is_only_clustered_disc_node() ->
node_type() =:= disc andalso is_clustered() andalso
@@ -1034,10 +1130,22 @@ are_we_clustered_with(Node) ->
me_in_nodes(Nodes) -> lists:member(node(), Nodes).
-nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]).
+% nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]).
nodes_excl_me(Nodes) -> Nodes -- [node()].
+is_leader(Node) ->
+ case ra:members(mnevis_node:node_id()) of
+ {ok, _, {_, Node}} ->
+ true;
+ _ ->
+ false
+ end.
+
+get_mnevis_leader() ->
+ {ok, _, {_, Node}} = ra:members(mnevis_node:node_id()),
+ Node.
+
-spec e(any()) -> no_return().
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index c74a390ae2..5882a867d8 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -207,8 +207,10 @@ convert_backup(NodeMap, FromBackup, ToBackup) ->
end, switched).
config_files() ->
- [rabbit_node_monitor:running_nodes_filename(),
- rabbit_node_monitor:cluster_status_filename()].
+ %% TODO: rabbit_node_monitor
+ [].
+ % [rabbit_node_monitor:running_nodes_filename(),
+ % rabbit_node_monitor:cluster_status_filename()].
backup_of_conf(Path) ->
filename:join([dir(), filename:basename(Path)]).
@@ -254,7 +256,7 @@ rename_in_running_mnesia(FromNode, ToNode) ->
ok.
transform_table(Table, Map) ->
- mnesia:sync_transaction(
+ rabbit_misc:mnevis_transaction(
fun () ->
mnesia:lock({table, Table}, write),
transform_table(Table, Map, mnesia:first(Table))
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index c008b969ba..d11e925644 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -263,14 +263,17 @@ tcp_listener_started(Protocol, Opts, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
%% We need the host so we can distinguish multiple instances of the above
%% in a cluster.
- ok = mnesia:dirty_write(
- rabbit_listener,
- #listener{node = node(),
- protocol = Protocol,
- host = tcp_host(IPAddress),
- ip_address = IPAddress,
- port = Port,
- opts = Opts}).
+ TcpHost = tcp_host(IPAddress),
+ L = #listener{node = node(),
+ protocol = Protocol,
+ host = TcpHost,
+ ip_address = IPAddress,
+ port = Port,
+ opts = Opts},
+ F = fun () ->
+ ok = mnesia:write(rabbit_listener, L, write)
+ end,
+ ok = rabbit_misc:execute_mnesia_transaction(F).
-spec tcp_listener_stopped
(_, _,
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
deleted file mode 100644
index 5461e176e3..0000000000
--- a/src/rabbit_node_monitor.erl
+++ /dev/null
@@ -1,932 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_node_monitor).
-
-%% Transitional step until we can require Erlang/OTP 21 and
-%% use the now recommended try/catch syntax for obtaining the stack trace.
--compile(nowarn_deprecated_function).
-
--behaviour(gen_server).
-
--export([start_link/0]).
--export([running_nodes_filename/0,
- cluster_status_filename/0, quorum_filename/0,
- prepare_cluster_status_files/0,
- write_cluster_status/1, read_cluster_status/0,
- update_cluster_status/0, reset_cluster_status/0]).
--export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
--export([partitions/0, partitions/1, status/1, subscribe/1]).
--export([pause_partition_guard/0]).
--export([global_sync/0]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
- %% Utils
--export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0,
- alive_nodes/1, alive_rabbit_nodes/1]).
-
--define(SERVER, ?MODULE).
--define(NODE_REPLY_TIMEOUT, 5000).
--define(RABBIT_UP_RPC_TIMEOUT, 2000).
--define(RABBIT_DOWN_PING_INTERVAL, 1000).
-
--record(state, {monitors, partitions, subscribers, down_ping_timer,
- keepalive_timer, autoheal, guid, node_guids}).
-
-%%----------------------------------------------------------------------------
-%% Start
-%%----------------------------------------------------------------------------
-
--spec start_link() -> rabbit_types:ok_pid_or_error().
-
-start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%%----------------------------------------------------------------------------
-%% Cluster file operations
-%%----------------------------------------------------------------------------
-
-%% The cluster file information is kept in two files. The "cluster
-%% status file" contains all the clustered nodes and the disc nodes.
-%% The "running nodes file" contains the currently running nodes or
-%% the running nodes at shutdown when the node is down.
-%%
-%% We strive to keep the files up to date and we rely on this
-%% assumption in various situations. Obviously when mnesia is offline
-%% the information we have will be outdated, but it cannot be
-%% otherwise.
-
--spec running_nodes_filename() -> string().
-
-running_nodes_filename() ->
- filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
-
--spec cluster_status_filename() -> string().
-
-cluster_status_filename() ->
- filename:join(rabbit_mnesia:dir(), "cluster_nodes.config").
-
-quorum_filename() ->
- filename:join(rabbit_mnesia:dir(), "quorum").
-
--spec prepare_cluster_status_files() -> 'ok' | no_return().
-
-prepare_cluster_status_files() ->
- rabbit_mnesia:ensure_mnesia_dir(),
- RunningNodes1 = case try_read_file(running_nodes_filename()) of
- {ok, [Nodes]} when is_list(Nodes) -> Nodes;
- {ok, Other} -> corrupt_cluster_status_files(Other);
- {error, enoent} -> []
- end,
- ThisNode = [node()],
- %% The running nodes file might contain a set or a list, in case
- %% of the legacy file
- RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
- {AllNodes1, DiscNodes} =
- case try_read_file(cluster_status_filename()) of
- {ok, [{AllNodes, DiscNodes0}]} ->
- {AllNodes, DiscNodes0};
- {ok, [AllNodes0]} when is_list(AllNodes0) ->
- {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)};
- {ok, Files} ->
- corrupt_cluster_status_files(Files);
- {error, enoent} ->
- LegacyNodes = legacy_cluster_nodes([]),
- {LegacyNodes, LegacyNodes}
- end,
- AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
- ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
-
--spec corrupt_cluster_status_files(any()) -> no_return().
-
-corrupt_cluster_status_files(F) ->
- throw({error, corrupt_cluster_status_files, F}).
-
--spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
-
-write_cluster_status({All, Disc, Running}) ->
- ClusterStatusFN = cluster_status_filename(),
- Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
- ok ->
- RunningNodesFN = running_nodes_filename(),
- {RunningNodesFN,
- rabbit_file:write_term_file(RunningNodesFN, [Running])};
- E1 = {error, _} ->
- {ClusterStatusFN, E1}
- end,
- case Res of
- {_, ok} -> ok;
- {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
- end.
-
--spec read_cluster_status() -> rabbit_mnesia:cluster_status().
-
-read_cluster_status() ->
- case {try_read_file(cluster_status_filename()),
- try_read_file(running_nodes_filename())} of
- {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
- {All, Disc, Running};
- {Stat, Run} ->
- throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
- end.
-
--spec update_cluster_status() -> 'ok'.
-
-update_cluster_status() ->
- {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
- write_cluster_status(Status).
-
--spec reset_cluster_status() -> 'ok'.
-
-reset_cluster_status() ->
- write_cluster_status({[node()], [node()], [node()]}).
-
-%%----------------------------------------------------------------------------
-%% Cluster notifications
-%%----------------------------------------------------------------------------
-
--spec notify_node_up() -> 'ok'.
-
-notify_node_up() ->
- gen_server:cast(?SERVER, notify_node_up).
-
--spec notify_joined_cluster() -> 'ok'.
-
-notify_joined_cluster() ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
- gen_server:abcast(Nodes, ?SERVER,
- {joined_cluster, node(), rabbit_mnesia:node_type()}),
- ok.
-
--spec notify_left_cluster(node()) -> 'ok'.
-
-notify_left_cluster(Node) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
- gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
- ok.
-
-%%----------------------------------------------------------------------------
-%% Server calls
-%%----------------------------------------------------------------------------
-
--spec partitions() -> [node()].
-
-partitions() ->
- gen_server:call(?SERVER, partitions, infinity).
-
--spec partitions([node()]) -> [{node(), [node()]}].
-
-partitions(Nodes) ->
- {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT),
- Replies.
-
--spec status([node()]) -> {[{node(), [node()]}], [node()]}.
-
-status(Nodes) ->
- gen_server:multi_call(Nodes, ?SERVER, status, infinity).
-
--spec subscribe(pid()) -> 'ok'.
-
-subscribe(Pid) ->
- gen_server:cast(?SERVER, {subscribe, Pid}).
-
-%%----------------------------------------------------------------------------
-%% pause_minority/pause_if_all_down safety
-%%----------------------------------------------------------------------------
-
-%% If we are in a minority and pause_minority mode then a) we are
-%% going to shut down imminently and b) we should not confirm anything
-%% until then, since anything we confirm is likely to be lost.
-%%
-%% The same principles apply to a node which isn't part of the preferred
-%% partition when we are in pause_if_all_down mode.
-%%
-%% We could confirm something by having an HA queue see the pausing
-%% state (and fail over into it) before the node monitor stops us, or
-%% by using unmirrored queues and just having them vanish (and
-%% confirming messages as thrown away).
-%%
-%% So we have channels call in here before issuing confirms, to do a
-%% lightweight check that we have not entered a pausing state.
-
--spec pause_partition_guard() -> 'ok' | 'pausing'.
-
-pause_partition_guard() ->
- case get(pause_partition_guard) of
- not_pause_mode ->
- ok;
- undefined ->
- {ok, M} = application:get_env(rabbit, cluster_partition_handling),
- case M of
- pause_minority ->
- pause_minority_guard([], ok);
- {pause_if_all_down, PreferredNodes, _} ->
- pause_if_all_down_guard(PreferredNodes, [], ok);
- _ ->
- put(pause_partition_guard, not_pause_mode),
- ok
- end;
- {minority_mode, Nodes, LastState} ->
- pause_minority_guard(Nodes, LastState);
- {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} ->
- pause_if_all_down_guard(PreferredNodes, Nodes, LastState)
- end.
-
-pause_minority_guard(LastNodes, LastState) ->
- case nodes() of
- LastNodes -> LastState;
- _ -> NewState = case majority() of
- false -> pausing;
- true -> ok
- end,
- put(pause_partition_guard,
- {minority_mode, nodes(), NewState}),
- NewState
- end.
-
-pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) ->
- case nodes() of
- LastNodes -> LastState;
- _ -> NewState = case in_preferred_partition(PreferredNodes) of
- false -> pausing;
- true -> ok
- end,
- put(pause_partition_guard,
- {pause_if_all_down_mode, PreferredNodes, nodes(),
- NewState}),
- NewState
- end.
-
-%%----------------------------------------------------------------------------
-%% "global" hang workaround.
-%%----------------------------------------------------------------------------
-
-%% This code works around a possible inconsistency in the "global"
-%% state, causing global:sync/0 to never return.
-%%
-%% 1. A process is spawned.
-%% 2. If after 15", global:sync() didn't return, the "global"
-%% state is parsed.
-%% 3. If it detects that a sync is blocked for more than 10",
-%% the process sends fake nodedown/nodeup events to the two
-%% nodes involved (one local, one remote).
-%% 4. Both "global" instances restart their synchronisation.
-%% 5. globao:sync() finally returns.
-%%
-%% FIXME: Remove this workaround, once we got rid of the change to
-%% "dist_auto_connect" and fixed the bugs uncovered.
-
-global_sync() ->
- Pid = spawn(fun workaround_global_hang/0),
- ok = global:sync(),
- Pid ! global_sync_done,
- ok.
-
-workaround_global_hang() ->
- receive
- global_sync_done ->
- ok
- after 10000 ->
- find_blocked_global_peers()
- end.
-
-find_blocked_global_peers() ->
- Snapshot1 = snapshot_global_dict(),
- timer:sleep(10000),
- Snapshot2 = snapshot_global_dict(),
- find_blocked_global_peers1(Snapshot2, Snapshot1).
-
-snapshot_global_dict() ->
- {status, _, _, [Dict | _]} = sys:get_status(global_name_server),
- [E || {{sync_tag_his, _}, _} = E <- Dict].
-
-find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest],
- OlderSnapshot) ->
- case lists:member(Item, OlderSnapshot) of
- true -> unblock_global_peer(Peer);
- false -> ok
- end,
- find_blocked_global_peers1(Rest, OlderSnapshot);
-find_blocked_global_peers1([], _) ->
- ok.
-
-unblock_global_peer(PeerNode) ->
- ThisNode = node(),
- PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]),
- error_logger:info_msg(
- "Global hang workaround: global state on ~s seems broken~n"
- " * Peer global state: ~p~n"
- " * Local global state: ~p~n"
- "Faking nodedown/nodeup between ~s and ~s~n",
- [PeerNode, PeerState, sys:get_status(global_name_server),
- PeerNode, ThisNode]),
- {global_name_server, ThisNode} ! {nodedown, PeerNode},
- {global_name_server, PeerNode} ! {nodedown, ThisNode},
- {global_name_server, ThisNode} ! {nodeup, PeerNode},
- {global_name_server, PeerNode} ! {nodeup, ThisNode},
- ok.
-
-%%----------------------------------------------------------------------------
-%% gen_server callbacks
-%%----------------------------------------------------------------------------
-
-init([]) ->
- %% We trap exits so that the supervisor will not just kill us. We
- %% want to be sure that we are not going to be killed while
- %% writing out the cluster status files - bad things can then
- %% happen.
- process_flag(trap_exit, true),
- net_kernel:monitor_nodes(true, [nodedown_reason]),
- {ok, _} = mnesia:subscribe(system),
- %% If the node has been restarted, Mnesia can trigger a system notification
- %% before the monitor subscribes to receive them. To avoid autoheal blocking due to
- %% the inconsistent database event never arriving, we being monitoring all running
- %% nodes as early as possible. The rest of the monitoring ops will only be triggered
- %% when notifications arrive.
- Nodes = possibly_partitioned_nodes(),
- startup_log(Nodes),
- Monitors = lists:foldl(fun(Node, Monitors0) ->
- pmon:monitor({rabbit, Node}, Monitors0)
- end, pmon:new(), Nodes),
- {ok, ensure_keepalive_timer(#state{monitors = Monitors,
- subscribers = pmon:new(),
- partitions = [],
- guid = rabbit_guid:gen(),
- node_guids = maps:new(),
- autoheal = rabbit_autoheal:init()})}.
-
-handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
- {reply, Partitions, State};
-
-handle_call(status, _From, State = #state{partitions = Partitions}) ->
- {reply, [{partitions, Partitions},
- {nodes, [node() | nodes()]}], State};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(notify_node_up, State = #state{guid = GUID}) ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
- gen_server:abcast(Nodes, ?SERVER,
- {node_up, node(), rabbit_mnesia:node_type(), GUID}),
- %% register other active rabbits with this rabbit
- DiskNodes = rabbit_mnesia:cluster_nodes(disc),
- [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
- true -> disc;
- false -> ram
- end}) || N <- Nodes],
- {noreply, State};
-
-%%----------------------------------------------------------------------------
-%% Partial partition detection
-%%
-%% Every node generates a GUID each time it starts, and announces that
-%% GUID in 'node_up', with 'announce_guid' sent by return so the new
-%% node knows the GUIDs of the others. These GUIDs are sent in all the
-%% partial partition related messages to ensure that we ignore partial
-%% partition messages from before we restarted (to avoid getting stuck
-%% in a loop).
-%%
-%% When one node gets nodedown from another, it then sends
-%% 'check_partial_partition' to all the nodes it still thinks are
-%% alive. If any of those (intermediate) nodes still see the "down"
-%% node as up, they inform it that this has happened. The original
-%% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then
-%% disconnect from the intermediate node to "upgrade" to a full
-%% partition.
-%%
-%% In pause_minority mode it will instead immediately pause until all
-%% nodes come back. This is because the contract for pause_minority is
-%% that nodes should never sit in a partitioned state - if it just
-%% disconnected, it would become a minority, pause, realise it's not
-%% in a minority any more, and come back, still partitioned (albeit no
-%% longer partially).
-%% ----------------------------------------------------------------------------
-
-handle_cast({node_up, Node, NodeType, GUID},
- State = #state{guid = MyGUID,
- node_guids = GUIDs}) ->
- cast(Node, {announce_guid, node(), MyGUID}),
- GUIDs1 = maps:put(Node, GUID, GUIDs),
- handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
-
-handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
- {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}};
-
-handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
- State = #state{guid = MyGUID,
- node_guids = GUIDs}) ->
- case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
- maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
- true -> spawn_link( %%[1]
- fun () ->
- case rpc:call(Node, rabbit, is_running, []) of
- {badrpc, _} -> ok;
- _ ->
- rabbit_log:warning("Received a 'DOWN' message"
- " from ~p but still can"
- " communicate with it ~n",
- [Node]),
- cast(Rep, {partial_partition,
- Node, node(), RepGUID})
- end
- end);
- false -> ok
- end,
- {noreply, State};
-%% [1] We checked that we haven't heard the node go down - but we
-%% really should make sure we can actually communicate with
-%% it. Otherwise there's a race where we falsely detect a partial
-%% partition.
-%%
-%% Now of course the rpc:call/4 may take a long time to return if
-%% connectivity with the node is actually interrupted - but that's OK,
-%% we only really want to do something in a timely manner if
-%% connectivity is OK. However, of course as always we must not block
-%% the node monitor, so we do the check in a separate process.
-
-handle_cast({check_partial_partition, _Node, _Reporter,
- _NodeGUID, _GUID, _ReporterGUID}, State) ->
- {noreply, State};
-
-handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID},
- State = #state{guid = MyGUID}) ->
- FmtBase = "Partial partition detected:~n"
- " * We saw DOWN from ~s~n"
- " * We can still see ~s which can see ~s~n",
- ArgsBase = [NotReallyDown, Proxy, NotReallyDown],
- case application:get_env(rabbit, cluster_partition_handling) of
- {ok, pause_minority} ->
- rabbit_log:error(
- FmtBase ++ " * pause_minority mode enabled~n"
- "We will therefore pause until the *entire* cluster recovers~n",
- ArgsBase),
- await_cluster_recovery(fun all_nodes_up/0),
- {noreply, State};
- {ok, {pause_if_all_down, PreferredNodes, _}} ->
- case in_preferred_partition(PreferredNodes) of
- true -> rabbit_log:error(
- FmtBase ++ "We will therefore intentionally "
- "disconnect from ~s~n", ArgsBase ++ [Proxy]),
- upgrade_to_full_partition(Proxy);
- false -> rabbit_log:info(
- FmtBase ++ "We are about to pause, no need "
- "for further actions~n", ArgsBase)
- end,
- {noreply, State};
- {ok, _} ->
- rabbit_log:error(
- FmtBase ++ "We will therefore intentionally disconnect from ~s~n",
- ArgsBase ++ [Proxy]),
- upgrade_to_full_partition(Proxy),
- {noreply, State}
- end;
-
-handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) ->
- {noreply, State};
-
-%% Sometimes it appears the Erlang VM does not give us nodedown
-%% messages reliably when another node disconnects from us. Therefore
-%% we are told just before the disconnection so we can reciprocate.
-handle_cast({partial_partition_disconnect, Other}, State) ->
- rabbit_log:error("Partial partition disconnect from ~s~n", [Other]),
- disconnect(Other),
- {noreply, State};
-
-%% Note: when updating the status file, we can't simply write the
-%% mnesia information since the message can (and will) overtake the
-%% mnesia propagation.
-handle_cast({node_up, Node, NodeType},
- State = #state{monitors = Monitors}) ->
- rabbit_log:info("rabbit on node ~p up~n", [Node]),
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({add_node(Node, AllNodes),
- case NodeType of
- disc -> add_node(Node, DiscNodes);
- ram -> DiscNodes
- end,
- add_node(Node, RunningNodes)}),
- ok = handle_live_rabbit(Node),
- Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
- true ->
- Monitors;
- false ->
- pmon:monitor({rabbit, Node}, Monitors)
- end,
- {noreply, maybe_autoheal(State#state{monitors = Monitors1})};
-
-handle_cast({joined_cluster, Node, NodeType}, State) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({add_node(Node, AllNodes),
- case NodeType of
- disc -> add_node(Node, DiscNodes);
- ram -> DiscNodes
- end,
- RunningNodes}),
- {noreply, State};
-
-handle_cast({left_cluster, Node}, State) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
- del_node(Node, RunningNodes)}),
- {noreply, State};
-
-handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
- {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
-
-handle_cast(keepalive, State) ->
- {noreply, State};
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
- State = #state{monitors = Monitors, subscribers = Subscribers}) ->
- rabbit_log:info("rabbit on node ~p down~n", [Node]),
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
- [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
- {noreply, handle_dead_rabbit(
- Node,
- State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
-
-handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #state{subscribers = Subscribers}) ->
- {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
-
-handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
- node_guids = GUIDs}) ->
- rabbit_log:info("node ~p down: ~p~n",
- [Node, proplists:get_value(nodedown_reason, Info)]),
- Check = fun (N, CheckGUID, DownGUID) ->
- cast(N, {check_partial_partition,
- Node, node(), DownGUID, CheckGUID, MyGUID})
- end,
- case maps:find(Node, GUIDs) of
- {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
- -- [node(), Node],
- [case maps:find(N, GUIDs) of
- {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
- error -> ok
- end || N <- Alive];
- error -> ok
- end,
- {noreply, handle_dead_node(Node, State)};
-
-handle_info({nodeup, Node, _Info}, State) ->
- rabbit_log:info("node ~p up~n", [Node]),
- {noreply, State};
-
-handle_info({mnesia_system_event,
- {inconsistent_database, running_partitioned_network, Node}},
- State = #state{partitions = Partitions,
- monitors = Monitors}) ->
- %% We will not get a node_up from this node - yet we should treat it as
- %% up (mostly).
- State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
- true -> State;
- false -> State#state{
- monitors = pmon:monitor({rabbit, Node}, Monitors)}
- end,
- ok = handle_live_rabbit(Node),
- Partitions1 = lists:usort([Node | Partitions]),
- {noreply, maybe_autoheal(State1#state{partitions = Partitions1})};
-
-handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
- partitions = Partitions}) ->
- AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
- {noreply, State#state{autoheal = AState1}};
-
-handle_info(ping_down_nodes, State) ->
- %% We ping nodes when some are down to ensure that we find out
- %% about healed partitions quickly. We ping all nodes rather than
- %% just the ones we know are down for simplicity; it's not expensive
- %% to ping the nodes that are up, after all.
- State1 = State#state{down_ping_timer = undefined},
- Self = self(),
- %% We ping in a separate process since in a partition it might
- %% take some noticeable length of time and we don't want to block
- %% the node monitor for that long.
- spawn_link(fun () ->
- ping_all(),
- case all_nodes_up() of
- true -> ok;
- false -> Self ! ping_down_nodes_again
- end
- end),
- {noreply, State1};
-
-handle_info(ping_down_nodes_again, State) ->
- {noreply, ensure_ping_timer(State)};
-
-handle_info(ping_up_nodes, State) ->
- %% In this case we need to ensure that we ping "quickly" -
- %% i.e. only nodes that we know to be up.
- [cast(N, keepalive) || N <- alive_nodes() -- [node()]],
- {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
-
-handle_info({'EXIT', _, _} = Info, State = #state{autoheal = AState0}) ->
- AState = rabbit_autoheal:process_down(Info, AState0),
- {noreply, State#state{autoheal = AState}};
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State) ->
- rabbit_misc:stop_timer(State, #state.down_ping_timer),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
-%% Functions that call the module specific hooks when nodes go up/down
-%%----------------------------------------------------------------------------
-
-handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
- %% In general in rabbit_node_monitor we care about whether the
- %% rabbit application is up rather than the node; we do this so
- %% that we can respond in the same way to "rabbitmqctl stop_app"
- %% and "rabbitmqctl stop" as much as possible.
- %%
- %% However, for pause_minority and pause_if_all_down modes we can't do
- %% this, since we depend on looking at whether other nodes are up
- %% to decide whether to come back up ourselves - if we decide that
- %% based on the rabbit application we would go down and never come
- %% back.
- case application:get_env(rabbit, cluster_partition_handling) of
- {ok, pause_minority} ->
- case majority([Node]) of
- true -> ok;
- false -> await_cluster_recovery(fun majority/0)
- end,
- State;
- {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} ->
- case in_preferred_partition(PreferredNodes, [Node]) of
- true -> ok;
- false -> await_cluster_recovery(
- fun in_preferred_partition/0)
- end,
- case HowToRecover of
- autoheal -> State#state{autoheal =
- rabbit_autoheal:node_down(Node, Autoheal)};
- _ -> State
- end;
- {ok, ignore} ->
- State;
- {ok, autoheal} ->
- State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)};
- {ok, Term} ->
- rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
- "assuming 'ignore'~n", [Term]),
- State
- end.
-
-await_cluster_recovery(Condition) ->
- rabbit_log:warning("Cluster minority/secondary status detected - "
- "awaiting recovery~n", []),
- run_outside_applications(fun () ->
- rabbit:stop(),
- wait_for_cluster_recovery(Condition)
- end, false),
- ok.
-
-run_outside_applications(Fun, WaitForExistingProcess) ->
- spawn_link(fun () ->
- %% Ignore exit messages from the monitor - the link is needed
- %% to ensure the monitor detects abnormal exits from this process
- %% and can reset the 'restarting' status on the autoheal, avoiding
- %% a deadlock. The monitor is restarted when rabbit does, so messages
- %% in the other direction should be ignored.
- process_flag(trap_exit, true),
- %% If our group leader is inside an application we are about
- %% to stop, application:stop/1 does not return.
- group_leader(whereis(init), self()),
- register_outside_app_process(Fun, WaitForExistingProcess)
- end).
-
-register_outside_app_process(Fun, WaitForExistingProcess) ->
- %% Ensure only one such process at a time, the exit(badarg) is
- %% harmless if one is already running.
- %%
- %% If WaitForExistingProcess is false, the given fun is simply not
- %% executed at all and the process exits.
- %%
- %% If WaitForExistingProcess is true, we wait for the end of the
- %% currently running process before executing the given function.
- try register(rabbit_outside_app_process, self()) of
- true ->
- do_run_outside_app_fun(Fun)
- catch
- error:badarg when WaitForExistingProcess ->
- MRef = erlang:monitor(process, rabbit_outside_app_process),
- receive
- {'DOWN', MRef, _, _, _} ->
- %% The existing process exited, let's try to
- %% register again.
- register_outside_app_process(Fun, WaitForExistingProcess)
- end;
- error:badarg ->
- ok
- end.
-
-do_run_outside_app_fun(Fun) ->
- try
- Fun()
- catch _:E ->
- rabbit_log:error(
- "rabbit_outside_app_process:~n~p~n~p~n",
- [E, erlang:get_stacktrace()])
- end.
-
-wait_for_cluster_recovery(Condition) ->
- ping_all(),
- case Condition() of
- true -> rabbit:start();
- false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
- wait_for_cluster_recovery(Condition)
- end.
-
-handle_dead_rabbit(Node, State = #state{partitions = Partitions,
- autoheal = Autoheal}) ->
- %% TODO: This may turn out to be a performance hog when there are
- %% lots of nodes. We really only need to execute some of these
- %% statements on *one* node, rather than all of them.
- ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node),
- ok = rabbit_alarm:on_node_down(Node),
- ok = rabbit_mnesia:on_node_down(Node),
- %% If we have been partitioned, and we are now in the only remaining
- %% partition, we no longer care about partitions - forget them. Note
- %% that we do not attempt to deal with individual (other) partitions
- %% going away. It's only safe to forget anything about partitions when
- %% there are no partitions.
- Down = Partitions -- alive_rabbit_nodes(),
- NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
- Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
- [] -> [];
- _ -> Partitions
- end,
- ensure_ping_timer(
- State#state{partitions = Partitions1,
- autoheal = rabbit_autoheal:rabbit_down(Node, Autoheal)}).
-
-ensure_ping_timer(State) ->
- rabbit_misc:ensure_timer(
- State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL,
- ping_down_nodes).
-
-ensure_keepalive_timer(State) ->
- {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval),
- rabbit_misc:ensure_timer(
- State, #state.keepalive_timer, Interval, ping_up_nodes).
-
-handle_live_rabbit(Node) ->
- ok = rabbit_amqqueue:on_node_up(Node),
- ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node).
-
-maybe_autoheal(State = #state{partitions = []}) ->
- State;
-
-maybe_autoheal(State = #state{autoheal = AState}) ->
- case all_nodes_up() of
- true -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)};
- false -> State
- end.
-
-%%--------------------------------------------------------------------
-%% Internal utils
-%%--------------------------------------------------------------------
-
-try_read_file(FileName) ->
- case rabbit_file:read_term_file(FileName) of
- {ok, Term} -> {ok, Term};
- {error, enoent} -> {error, enoent};
- {error, E} -> throw({error, {cannot_read_file, FileName, E}})
- end.
-
-legacy_cluster_nodes(Nodes) ->
- %% We get all the info that we can, including the nodes from
- %% mnesia, which will be there if the node is a disc node (empty
- %% list otherwise)
- lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
-
-legacy_disc_nodes(AllNodes) ->
- case AllNodes == [] orelse lists:member(node(), AllNodes) of
- true -> [node()];
- false -> []
- end.
-
-add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
-
-del_node(Node, Nodes) -> Nodes -- [Node].
-
-cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
-
-upgrade_to_full_partition(Proxy) ->
- cast(Proxy, {partial_partition_disconnect, node()}),
- disconnect(Proxy).
-
-%% When we call this, it's because we want to force Mnesia to detect a
-%% partition. But if we just disconnect_node/1 then Mnesia won't
-%% detect a very short partition. So we want to force a slightly
-%% longer disconnect. Unfortunately we don't have a way to blacklist
-%% individual nodes; the best we can do is turn off auto-connect
-%% altogether.
-disconnect(Node) ->
- application:set_env(kernel, dist_auto_connect, never),
- erlang:disconnect_node(Node),
- timer:sleep(1000),
- application:unset_env(kernel, dist_auto_connect),
- ok.
-
-%%--------------------------------------------------------------------
-
-%% mnesia:system_info(db_nodes) (and hence
-%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes
-%% when partitioned, just those that we are sharing Mnesia state
-%% with. So we have a small set of replacement functions
-%% here. "rabbit" in a function's name implies we test if the rabbit
-%% application is up, not just the node.
-
-%% As we use these functions to decide what to do in pause_minority or
-%% pause_if_all_down states, they *must* be fast, even in the case where
-%% TCP connections are timing out. So that means we should be careful
-%% about whether we connect to nodes which are currently disconnected.
-
-majority() ->
- majority([]).
-
-majority(NodesDown) ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- AliveNodes = alive_nodes(Nodes) -- NodesDown,
- length(AliveNodes) / length(Nodes) > 0.5.
-
-in_preferred_partition() ->
- {ok, {pause_if_all_down, PreferredNodes, _}} =
- application:get_env(rabbit, cluster_partition_handling),
- in_preferred_partition(PreferredNodes).
-
-in_preferred_partition(PreferredNodes) ->
- in_preferred_partition(PreferredNodes, []).
-
-in_preferred_partition(PreferredNodes, NodesDown) ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
- AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
- RealPreferredNodes =:= [] orelse AliveNodes =/= [].
-
-all_nodes_up() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) =:= length(Nodes).
-
--spec all_rabbit_nodes_up() -> boolean().
-
-all_rabbit_nodes_up() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
-
--spec alive_nodes([node()]) -> [node()].
-
-alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
-alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
-
--spec alive_rabbit_nodes([node()]) -> [node()].
-
-alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
-
-alive_rabbit_nodes(Nodes) ->
- [N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
-
-%% This one is allowed to connect!
-
--spec ping_all() -> 'ok'.
-
-ping_all() ->
- [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
- ok.
-
-possibly_partitioned_nodes() ->
- alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
-
-startup_log([]) ->
- rabbit_log:info("Starting rabbit_node_monitor~n", []);
-startup_log(Nodes) ->
- rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",
- [Nodes]).
diff --git a/src/rabbit_prelaunch_cluster.erl b/src/rabbit_prelaunch_cluster.erl
index 9d3cda99e3..556ec99720 100644
--- a/src/rabbit_prelaunch_cluster.erl
+++ b/src/rabbit_prelaunch_cluster.erl
@@ -6,7 +6,7 @@ setup(Context) ->
rabbit_log_prelaunch:debug(""),
rabbit_log_prelaunch:debug("== Clustering =="),
rabbit_log_prelaunch:debug("Preparing cluster status files"),
- rabbit_node_monitor:prepare_cluster_status_files(),
+ % TODO LRB MNEVIS rabbit_node_monitor:prepare_cluster_status_files(),
case Context of
#{initial_pass := true} ->
rabbit_log_prelaunch:debug("Upgrading Mnesia schema"),
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 7e64e4fcd5..240366bdeb 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -39,7 +39,7 @@
create() ->
lists:foreach(fun ({Tab, TabDef}) ->
TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
+ case mnevis:create_table(Tab, TabDef1) of
{atomic, ok} -> ok;
{aborted, Reason} ->
throw({error, {table_creation_failed,
@@ -54,6 +54,7 @@ ensure_secondary_indexes() ->
ensure_secondary_index(rabbit_queue, vhost),
ok.
+%% TODO mnevis add table index
ensure_secondary_index(Table, Field) ->
case mnesia:add_table_index(Table, Field) of
{atomic, ok} -> ok;
@@ -68,8 +69,8 @@ ensure_secondary_index(Table, Field) ->
-spec create_local_copy('disc' | 'ram') -> 'ok'.
create_local_copy(disc) ->
- create_local_copy(schema, disc_copies),
- create_local_copies(disc);
+ create_local_copy(schema, ram_copies),
+ create_local_copies(ram);
create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
@@ -195,14 +196,14 @@ clear_ram_only_tables() ->
create_local_copies(Type) ->
lists:foreach(
fun ({Tab, TabDef}) ->
- HasDiscCopies = has_copy_type(TabDef, disc_copies),
+ HasDiscCopies = has_copy_type(TabDef, ram_copies),
HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies),
LocalTab = proplists:get_bool(local_content, TabDef),
StorageType =
if
Type =:= disc orelse LocalTab ->
if
- HasDiscCopies -> disc_copies;
+ HasDiscCopies -> ram_copies;
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
@@ -272,28 +273,22 @@ names() -> [Tab || {Tab, _} <- definitions()].
definitions(disc) ->
definitions();
definitions(ram) ->
- [{Tab, [{disc_copies, []}, {ram_copies, [node()]} |
- proplists:delete(
- ram_copies, proplists:delete(disc_copies, TabDef))]} ||
- {Tab, TabDef} <- definitions()].
+ definitions().
definitions() ->
[{rabbit_user,
[{record_name, internal_user},
{attributes, record_info(fields, internal_user)},
- {disc_copies, [node()]},
{match, #internal_user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]},
{match, #user_permission{user_vhost = #user_vhost{_='_'},
permission = #permission{_='_'},
_='_'}}]},
{rabbit_topic_permission,
[{record_name, topic_permission},
{attributes, record_info(fields, topic_permission)},
- {disc_copies, [node()]},
{match, #topic_permission{topic_permission_key = #topic_permission_key{_='_'},
permission = #permission{_='_'},
_='_'}}]},
@@ -301,7 +296,7 @@ definitions() ->
[
{record_name, vhost},
{attributes, vhost:fields()},
- {disc_copies, [node()]},
+ % TODO MNEVIS LRB {disc_copies, [node()]},
{match, vhost:pattern_match_all()}]},
{rabbit_listener,
[{record_name, listener},
@@ -311,7 +306,6 @@ definitions() ->
{rabbit_durable_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {disc_copies, [node()]},
{match, #route{binding = binding_match(), _='_'}}]},
{rabbit_semi_durable_route,
[{record_name, route},
@@ -348,7 +342,6 @@ definitions() ->
{rabbit_durable_exchange,
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
- {disc_copies, [node()]},
{match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_exchange,
[{record_name, exchange},
@@ -361,12 +354,10 @@ definitions() ->
{rabbit_runtime_parameters,
[{record_name, runtime_parameters},
{attributes, record_info(fields, runtime_parameters)},
- {disc_copies, [node()]},
{match, #runtime_parameters{_='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, amqqueue:fields()},
- {disc_copies, [node()]},
{match, amqqueue:pattern_match_on_name(queue_name_match())}]},
{rabbit_queue,
[{record_name, amqqueue},
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 8cae363928..4d858bd7a5 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -159,7 +159,8 @@ upgrade_mode(AllNodes) ->
{disc, []} ->
primary;
{disc, _} ->
- Filename = rabbit_node_monitor:running_nodes_filename(),
+ %% TODO: rabbit_node_monitor
+ Filename = "<no filename>", % rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
"disc node to shut down.~n~nNote: if several disc "
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3fd09af499..20df5353c0 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -418,7 +418,7 @@ internal_system_x() ->
-spec cluster_name() -> 'ok'.
cluster_name() ->
- {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0),
+ {atomic, ok} = rabbit_misc:mnevis_transaction(fun cluster_name_tx/0),
ok.
cluster_name_tx() ->
@@ -646,17 +646,18 @@ exchange_options(Table) ->
transform(TableName, Fun, FieldList) ->
rabbit_table:wait([TableName]),
- {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
+ %% TODO: make sure there is no conflicts
+ {atomic, ok} = mnevis:transform_table(TableName, Fun, FieldList),
ok.
transform(TableName, Fun, FieldList, NewRecordName) ->
rabbit_table:wait([TableName]),
- {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
+ {atomic, ok} = mnevis:transform_table(TableName, Fun, FieldList,
NewRecordName),
ok.
create(Tab, TabDef) ->
- {atomic, ok} = mnesia:create_table(Tab, TabDef),
+ {atomic, ok} = mnevis:create_table(Tab, TabDef),
ok.
%% Dumb replacement for rabbit_exchange:declare that does not require
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8914197aaf..b03531d0aa 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -36,15 +36,19 @@
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
+ io:format("amqq on node down ~n"),
rabbit_amqqueue:on_node_down(node()),
+ io:format("amqq warn limit ~n"),
rabbit_amqqueue:warn_file_limit(),
%% Prepare rabbit_semi_durable_route table
+ io:format("binding recover ~n"),
rabbit_binding:recover(),
%% rabbit_vhost_sup_sup will start the actual recovery.
%% So recovery will be run every time a vhost supervisor is restarted.
+ io:format("vhost sup sup start ~n"),
ok = rabbit_vhost_sup_sup:start(),
[ok = rabbit_vhost_sup_sup:init_vhost(VHost) || VHost <- list_names()],
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl
index a02c4721bc..8f606716ee 100644
--- a/test/amqqueue_backward_compatibility_SUITE.erl
+++ b/test/amqqueue_backward_compatibility_SUITE.erl
@@ -297,6 +297,6 @@ upgrade_v1_to_v2(_) ->
?amqqueue_v1_type),
?assert(?is_amqqueue_v1(OldQueue)),
?assert(not ?is_amqqueue_v2(OldQueue)),
- NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue),
+ NewQueue = amqqueue:upgrade_to(OldQueue, amqqueue_v2),
?assert(not ?is_amqqueue_v1(NewQueue)),
?assert(?is_amqqueue_v2(NewQueue)).
diff --git a/test/mirrored_supervisor_SUITE.erl b/test/mirrored_supervisor_SUITE.erl
index d4bf6883da..f6893ea421 100644
--- a/test/mirrored_supervisor_SUITE.erl
+++ b/test/mirrored_supervisor_SUITE.erl
@@ -48,7 +48,7 @@ init_per_suite(Config) ->
lists:foreach(
fun ({Tab, TabDef}) ->
TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
+ case mnevis:create_table(Tab, TabDef1) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
@@ -277,7 +277,7 @@ pid_of(Id) ->
Pid.
tx_fun(Fun) ->
- case mnesia:sync_transaction(Fun) of
+ case rabbit_misc:mnevis_transaction(Fun) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
end.