summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_quorum_memory_manager.erl76
-rw-r--r--test/quorum_queue_SUITE.erl29
2 files changed, 103 insertions, 2 deletions
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl
new file mode 100644
index 0000000000..347f7f205e
--- /dev/null
+++ b/src/rabbit_quorum_memory_manager.erl
@@ -0,0 +1,76 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbit_quorum_memory_manager).
+
+-include("rabbit.hrl").
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([register/0, unregister/0]).
+
+-record(state, {last_roll_over,
+ interval}).
+
+-rabbit_boot_step({rabbit_quorum_memory_manager,
+ [{description, "quorum memory manager"},
+ {mfa, {?MODULE, register, []}},
+ {cleanup, {?MODULE, unregister, []}},
+ {requires, rabbit_event},
+ {enables, recovery}]}).
+
+register() ->
+ gen_event:add_handler(rabbit_alarm, ?MODULE, []).
+
+unregister() ->
+ gen_event:delete_handler(rabbit_alarm, ?MODULE, []).
+
+init([]) ->
+ {ok, #state{interval = interval()}}.
+
+handle_call( _, State) ->
+ {ok, ok, State}.
+
+handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
+ #state{last_roll_over = undefined} = State) when Node == node() ->
+ {ok, force_roll_over(State)};
+handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
+ #state{last_roll_over = Last, interval = Interval } = State)
+ when Node == node() ->
+ Now = erlang:system_time(millisecond),
+ case Now > (Last + Interval) of
+ true ->
+ {ok, force_roll_over(State)};
+ false ->
+ {ok, State}
+ end;
+handle_event(_, State) ->
+ {ok, State}.
+
+handle_info(_, State) ->
+ {ok, State}.
+
+terminate(_, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+force_roll_over(State) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ State#state{last_roll_over = erlang:system_time(millisecond)}.
+
+interval() ->
+ application:get_env(rabbit, min_wal_roll_over_interval, 20000).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 4190903409..5b87c5be20 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -119,7 +119,8 @@ all_tests() ->
delete_immediately,
delete_immediately_by_resource,
consume_redelivery_count,
- subscribe_redelivery_count
+ subscribe_redelivery_count,
+ memory_alarm_rolls_wal
].
%% -------------------------------------------------------------------
@@ -205,7 +206,10 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
merge_app_env(Config) ->
- rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
+ rabbit_ct_helpers:merge_app_env(
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [{core_metrics_gc_interval, 100}]}),
+ {ra, [{min_wal_roll_over_interval, 30000}]}).
end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
Testcase == reconnect_consumer_and_wait;
@@ -2044,6 +2048,27 @@ consume_redelivery_count(Config) ->
multiple = false,
requeue = true}).
+memory_alarm_rolls_wal(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
+ [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
+ ok = rpc:call(Server, rabbit_alarm, set_alarm,
+ [{{resource_limit, memory, Server}, []}]),
+ timer:sleep(1000),
+ [Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"),
+ ?assert(Wal0 =/= Wal1),
+ %% roll over shouldn't happen if we trigger a new alarm in less than
+ %% min_wal_roll_over_interval
+ ok = rpc:call(Server, rabbit_alarm, set_alarm,
+ [{{resource_limit, memory, Server}, []}]),
+ timer:sleep(1000),
+ [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
+ ?assert(Wal1 == Wal2).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->