diff options
| -rw-r--r-- | src/rabbit_quorum_memory_manager.erl | 76 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 29 |
2 files changed, 103 insertions, 2 deletions
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl new file mode 100644 index 0000000000..347f7f205e --- /dev/null +++ b/src/rabbit_quorum_memory_manager.erl @@ -0,0 +1,76 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbit_quorum_memory_manager). + +-include("rabbit.hrl"). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). +-export([register/0, unregister/0]). + +-record(state, {last_roll_over, + interval}). + +-rabbit_boot_step({rabbit_quorum_memory_manager, + [{description, "quorum memory manager"}, + {mfa, {?MODULE, register, []}}, + {cleanup, {?MODULE, unregister, []}}, + {requires, rabbit_event}, + {enables, recovery}]}). + +register() -> + gen_event:add_handler(rabbit_alarm, ?MODULE, []). + +unregister() -> + gen_event:delete_handler(rabbit_alarm, ?MODULE, []). + +init([]) -> + {ok, #state{interval = interval()}}. + +handle_call( _, State) -> + {ok, ok, State}. + +handle_event({set_alarm, {{resource_limit, memory, Node}, []}}, + #state{last_roll_over = undefined} = State) when Node == node() -> + {ok, force_roll_over(State)}; +handle_event({set_alarm, {{resource_limit, memory, Node}, []}}, + #state{last_roll_over = Last, interval = Interval } = State) + when Node == node() -> + Now = erlang:system_time(millisecond), + case Now > (Last + Interval) of + true -> + {ok, force_roll_over(State)}; + false -> + {ok, State} + end; +handle_event(_, State) -> + {ok, State}. + +handle_info(_, State) -> + {ok, State}. + +terminate(_, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +force_roll_over(State) -> + ra_log_wal:force_roll_over(ra_log_wal), + State#state{last_roll_over = erlang:system_time(millisecond)}. + +interval() -> + application:get_env(rabbit, min_wal_roll_over_interval, 20000). diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 4190903409..5b87c5be20 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -119,7 +119,8 @@ all_tests() -> delete_immediately, delete_immediately_by_resource, consume_redelivery_count, - subscribe_redelivery_count + subscribe_redelivery_count, + memory_alarm_rolls_wal ]. %% ------------------------------------------------------------------- @@ -205,7 +206,10 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). merge_app_env(Config) -> - rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). + rabbit_ct_helpers:merge_app_env( + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [{core_metrics_gc_interval, 100}]}), + {ra, [{min_wal_roll_over_interval, 30000}]}). end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; Testcase == reconnect_consumer_and_wait; @@ -2044,6 +2048,27 @@ consume_redelivery_count(Config) -> multiple = false, requeue = true}). +memory_alarm_rolls_wal(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []), + [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"), + ok = rpc:call(Server, rabbit_alarm, set_alarm, + [{{resource_limit, memory, Server}, []}]), + timer:sleep(1000), + [Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"), + ?assert(Wal0 =/= Wal1), + %% roll over shouldn't happen if we trigger a new alarm in less than + %% min_wal_roll_over_interval + ok = rpc:call(Server, rabbit_alarm, set_alarm, + [{{resource_limit, memory, Server}, []}]), + timer:sleep(1000), + [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"), + ?assert(Wal1 == Wal2). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> |
