summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-18 18:16:26 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-18 18:16:26 +0100
commit58ea63ab02b7bebd9c442e018bc32ed1955095ba (patch)
treecd167b5cb700ade8b7b175e3cc7d7199996501fa
parent31789bf730f2e6d70719b7096da5479bf4480190 (diff)
downloadrabbitmq-server-git-58ea63ab02b7bebd9c442e018bc32ed1955095ba.tar.gz
wiring things up
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_queue_mode_manager.erl25
4 files changed, 33 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9d3cead654..57269c5341 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([constrain_memory/2]).
+-export([constrain_memory/2, report_desired_memory/1]).
-import(mnesia).
-import(gen_server2).
@@ -109,6 +109,7 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
+-spec(report_desired_memory/1 :: (pid()) -> non_neg_integer()).
-endif.
@@ -356,6 +357,9 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
+report_desired_memory(QPid) ->
+ gen_server2:pcall(QPid, 9, report_desired_memory, infinity).
+
safe_pmap_ok(H, F, L) ->
case [R || R <- rabbit_misc:upmap(
fun (V) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6869846ddc..084529a4af 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -720,7 +720,12 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(ok, State);
_ ->
reply(locked, State)
- end.
+ end;
+
+handle_call(report_desired_memory, _From, State = #q { mixed_state = MS }) ->
+ MSize = rabbit_mixed_queue:estimate_extra_memory(MS),
+ {memory, PSize} = process_info(self(), memory),
+ reply(PSize + MSize, State).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 586c06c0c6..9c1553b874 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -284,7 +284,7 @@ action(reduce_memory_footprint, Node, _Args, Inform) ->
call(Node, {rabbit_queue_mode_manager, reduce_memory_footprint, []});
action(increase_memory_footprint, Node, _Args, Inform) ->
- Inform("Reducing memory footprint", []),
+ Inform("Increasing memory footprint", []),
call(Node, {rabbit_queue_mode_manager, increase_memory_footprint, []});
action(Command, Node, Args, Inform) ->
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index b36bb8bef2..e317fedaa3 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -39,13 +39,14 @@
terminate/2, code_change/3]).
-export([register/1, change_memory_footprint/2,
- reduce_memory_footprint/0, increase_memory_footprint/0]).
+ reduce_memory_footprint/0, increase_memory_footprint/0,
+ gather_memory_estimates/0
+ ]).
-define(SERVER, ?MODULE).
-ifdef(use_specs).
--type(mode() :: ( 'unlimited' | 'ram_disk' | 'disk_only' )).
-type(queue_mode() :: ( 'mixed' | 'disk' )).
-spec(start_link/0 :: () ->
@@ -75,10 +76,14 @@ reduce_memory_footprint() ->
increase_memory_footprint() ->
gen_server2:cast(?SERVER, {change_memory_footprint, false}).
-
+
+gather_memory_estimates() ->
+ gen_server2:cast(?SERVER, gather_memory_estimates).
+
init([]) ->
process_flag(trap_exit, true),
ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}),
+ {ok, _TRef} = timer:apply_interval(5000, ?MODULE, gather_memory_estimates, []),
{ok, #state { mode = unlimited,
queues = []
}}.
@@ -113,7 +118,11 @@ handle_cast({change_memory_footprint, false},
handle_cast({change_memory_footprint, false},
State = #state { mode = disk_only }) ->
constrain_queues(false, State #state.queues),
- {noreply, State #state { mode = ram_disk }}.
+ {noreply, State #state { mode = ram_disk }};
+
+handle_cast(gather_memory_estimates, State) ->
+ State1 = internal_gather(State),
+ {noreply, State1}.
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -131,3 +140,11 @@ constrain_queues(Constrain, Qs) ->
fun (QPid) ->
ok = rabbit_amqqueue:constrain_memory(QPid, Constrain)
end, Qs).
+
+internal_gather(State = #state { queues = Qs }) ->
+ lists:foreach(fun(Q) ->
+ io:format("Queue memory request: ~w is ~w bytes~n",
+ [Q, rabbit_amqqueue:report_desired_memory(Q)
+ ])
+ end, Qs),
+ State.