summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-06-08 13:37:54 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-07-14 03:50:30 +0300
commit10dc8b29687e132cee6e8bc910f1b043969d5c1e (patch)
tree66517a7ce2a84952fdbbb96a54c32b2110af02ec
parentb6d818136e7cc6a03529ec233270fcc559532580 (diff)
downloadrabbitmq-server-git-10dc8b29687e132cee6e8bc910f1b043969d5c1e.tar.gz
Initial API bits for marking a node as being drained (or not)
Part of #2321.
-rw-r--r--src/rabbit_maintenance.erl69
-rw-r--r--src/rabbit_mnesia.erl7
-rw-r--r--src/rabbit_table.erl8
-rw-r--r--src/rabbit_upgrade_functions.erl1
4 files changed, 68 insertions, 17 deletions
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index d3f355dc79..108381039d 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -19,34 +19,71 @@
-include("rabbit.hrl").
-export([
- mark_as_drained/0,
- unmark_as_drained/0,
- is_drained_using_dirty_read/1,
- is_drained_using_consistent_read/1,
+ mark_as_being_drained/0,
+ unmark_as_being_drained/0,
+ is_being_drained_local_read/1,
+ is_being_drained_consistent_read/1,
suspend_all_client_listeners/0,
resume_all_client_listeners/0,
close_all_client_connections/0]).
+ -define(TABLE, rabbit_node_maintenance_states).
+ -define(DEFAULT_STATUS, regular).
+ -define(DRAINING_STATUS, draining).
+
%%
%% API
%%
-mark_as_drained() ->
- ok.
-
-unmark_as_drained() ->
- ok.
+-spec mark_as_being_drained() -> boolean().
+mark_as_being_drained() ->
+ set_maintenance_state_status(?DRAINING_STATUS).
+
+-spec unmark_as_being_drained() -> boolean().
+unmark_as_being_drained() ->
+ set_maintenance_state_status(?DEFAULT_STATUS).
--spec is_drained_using_dirty_read(node()) -> boolean().
-is_drained_using_dirty_read(_Node) ->
- false.
+set_maintenance_state_status(Status) ->
+ Res = mnesia:transaction(fun () ->
+ case mnesia:wread({?TABLE, node()}) of
+ [] ->
+ Row = #node_maintenance_state{
+ node = node(),
+ status = Status
+ },
+ mnesia:write(?TABLE, Row, write);
+ [Row0] ->
+ Row = Row0#node_maintenance_state{
+ node = node(),
+ status = Status
+ },
+ mnesia:write(?TABLE, Row, write)
+ end
+ end),
+ case Res of
+ {atomic, ok} -> true;
+ _ -> false
+ end.
+
+
+-spec is_being_drained_local_read(node()) -> boolean().
+is_being_drained_local_read(Node) ->
+ case mnesia:dirty_read(?TABLE, Node) of
+ [] -> false;
+ [#node_maintenance_state{node = Node, status = Status}] ->
+ Status =:= ?DRAINING_STATUS
+ end.
--spec is_drained_using_consistent_read(node()) -> boolean().
-is_drained_using_consistent_read(_Node) ->
- false.
+-spec is_being_drained_consistent_read(node()) -> boolean().
+is_being_drained_consistent_read(Node) ->
+ case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of
+ {atomic, []} -> false;
+ {atomic, [#node_maintenance_state{node = Node, status = Status}]} ->
+ Status =:= ?DRAINING_STATUS;
+ {aborted, _Reason} -> false
+ end.
-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
-
%% Pauses all listeners on the current node except for
%% Erlang distribution (clustering and CLI tools).
%% A respausedumed listener will not accept any new client connections
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index ba87617a74..f22e975b82 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -824,10 +824,17 @@ schema_ok_or_move() ->
%% up only
create_schema() ->
stop_mnesia(),
+ rabbit_log:debug("Will bootstrap a schema database..."),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
+ rabbit_log:debug("Bootstraped a schema database successfully"),
start_mnesia(),
+
+ rabbit_log:debug("Will create schema database tables"),
ok = rabbit_table:create(),
+ rabbit_log:debug("Created schema database tables successfully"),
+ rabbit_log:debug("Will check schema database integrity..."),
ensure_schema_integrity(),
+ rabbit_log:debug("Schema database schema integrity check passed"),
ok = rabbit_version:record_desired().
move_db() ->
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 3a7d2dd1bf..a922190647 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -30,6 +30,7 @@
create() ->
lists:foreach(fun ({Tab, TabDef}) ->
TabDef1 = proplists:delete(match, TabDef),
+ rabbit_log:debug("Will create a schema database table named '~s'", [Tab]),
case mnesia:create_table(Tab, TabDef1) of
{atomic, ok} -> ok;
{aborted, Reason} ->
@@ -363,7 +364,12 @@ definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, amqqueue:fields()},
- {match, amqqueue:pattern_match_on_name(queue_name_match())}]}]
+ {match, amqqueue:pattern_match_on_name(queue_name_match())}]},
+ {rabbit_node_maintenance_states,
+ [{record_name, node_maintenance_state},
+ {attributes, record_info(fields, node_maintenance_state)},
+ {match, #node_maintenance_state{_='_'}}]}
+ ]
++ gm:table_definitions()
++ mirrored_supervisor:table_definitions().
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 676ebda2ec..ae80075e8f 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -656,6 +656,7 @@ transform(TableName, Fun, FieldList, NewRecordName) ->
ok.
create(Tab, TabDef) ->
+ rabbit_log:debug("Will create a schema table named '~s'", [Tab]),
{atomic, ok} = mnesia:create_table(Tab, TabDef),
ok.