diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 25 |
4 files changed, 33 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9d3cead654..57269c5341 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([constrain_memory/2]). +-export([constrain_memory/2, report_desired_memory/1]). -import(mnesia). -import(gen_server2). @@ -109,6 +109,7 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). +-spec(report_desired_memory/1 :: (pid()) -> non_neg_integer()). -endif. @@ -356,6 +357,9 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. +report_desired_memory(QPid) -> + gen_server2:pcall(QPid, 9, report_desired_memory, infinity). + safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6869846ddc..084529a4af 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,7 +720,12 @@ handle_call({claim_queue, ReaderPid}, _From, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call(report_desired_memory, _From, State = #q { mixed_state = MS }) -> + MSize = rabbit_mixed_queue:estimate_extra_memory(MS), + {memory, PSize} = process_info(self(), memory), + reply(PSize + MSize, State). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 586c06c0c6..9c1553b874 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -284,7 +284,7 @@ action(reduce_memory_footprint, Node, _Args, Inform) -> call(Node, {rabbit_queue_mode_manager, reduce_memory_footprint, []}); action(increase_memory_footprint, Node, _Args, Inform) -> - Inform("Reducing memory footprint", []), + Inform("Increasing memory footprint", []), call(Node, {rabbit_queue_mode_manager, increase_memory_footprint, []}); action(Command, Node, Args, Inform) -> diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index b36bb8bef2..e317fedaa3 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -39,13 +39,14 @@ terminate/2, code_change/3]). -export([register/1, change_memory_footprint/2, - reduce_memory_footprint/0, increase_memory_footprint/0]). + reduce_memory_footprint/0, increase_memory_footprint/0, + gather_memory_estimates/0 + ]). -define(SERVER, ?MODULE). -ifdef(use_specs). --type(mode() :: ( 'unlimited' | 'ram_disk' | 'disk_only' )). -type(queue_mode() :: ( 'mixed' | 'disk' )). -spec(start_link/0 :: () -> @@ -75,10 +76,14 @@ reduce_memory_footprint() -> increase_memory_footprint() -> gen_server2:cast(?SERVER, {change_memory_footprint, false}). - + +gather_memory_estimates() -> + gen_server2:cast(?SERVER, gather_memory_estimates). + init([]) -> process_flag(trap_exit, true), ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}), + {ok, _TRef} = timer:apply_interval(5000, ?MODULE, gather_memory_estimates, []), {ok, #state { mode = unlimited, queues = [] }}. @@ -113,7 +118,11 @@ handle_cast({change_memory_footprint, false}, handle_cast({change_memory_footprint, false}, State = #state { mode = disk_only }) -> constrain_queues(false, State #state.queues), - {noreply, State #state { mode = ram_disk }}. + {noreply, State #state { mode = ram_disk }}; + +handle_cast(gather_memory_estimates, State) -> + State1 = internal_gather(State), + {noreply, State1}. handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -131,3 +140,11 @@ constrain_queues(Constrain, Qs) -> fun (QPid) -> ok = rabbit_amqqueue:constrain_memory(QPid, Constrain) end, Qs). + +internal_gather(State = #state { queues = Qs }) -> + lists:foreach(fun(Q) -> + io:format("Queue memory request: ~w is ~w bytes~n", + [Q, rabbit_amqqueue:report_desired_memory(Q) + ]) + end, Qs), + State. |
