summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_disk_queue.erl4
-rw-r--r--src/rabbit_mixed_queue.erl6
-rw-r--r--src/rabbit_queue_mode_manager.erl105
6 files changed, 131 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ce73f6ce6d..44e4dc7f25 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -138,6 +138,8 @@ start(normal, []) ->
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
+
+ ok = start_child(rabbit_queue_mode_manager),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 08c67946dc..97ffcda8e6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,6 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([constrain_memory/2]).
-import(mnesia).
-import(gen_server2).
@@ -103,6 +104,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(constrain_memory/2 :: (pid(), bool()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -312,6 +314,9 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:cast(QPid, {unblock, ChPid}).
+constrain_memory(QPid, Constrain) ->
+ gen_server2:cast(QPid, {constrain, Constrain}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f45f931e81..6ad4e4e6b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -93,7 +93,8 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, mixed), %% TODO, CHANGE ME
+ {ok, Mode} = rabbit_queue_mode_manager:register(self()),
+ {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode), %% TODO, CHANGE ME
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -779,7 +780,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) ->
+ {ok, MS2} = if Constrain -> rabbit_mixed_queue:to_disk_only_mode(MS);
+ true -> rabbit_mixed_queue:to_mixed_mode(MS)
+ end,
+ noreply(State #q { mixed_state = MS2 }).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 5c1f969e1e..1b30051f30 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -458,6 +458,7 @@ handle_call(to_disk_only_mode, _From,
State = #dqstate { operation_mode = ram_disk,
msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to disk only mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_only_copies),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
@@ -470,6 +471,7 @@ handle_call(to_ram_disk_mode, _From,
State = #dqstate { operation_mode = disk_only,
msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to ram disk mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
@@ -514,6 +516,8 @@ handle_cast({delete_queue, Q}, State) ->
{ok, State1} = internal_delete_queue(Q, State),
{noreply, State1}.
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index dae4dad150..6a463242be 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -57,8 +57,11 @@ start_link(Queue, IsDurable, mixed) ->
{ok, State} = start_link(Queue, IsDurable, disk),
to_mixed_mode(State).
+to_disk_only_mode(State = #mqstate { mode = disk }) ->
+ {ok, State};
to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
next_write_seq = NextSeq }) ->
+ rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
%% We enqueue _everything_ here. This means that should a message
%% already be in the disk queue we must remove it and add it back
%% in. Fortunately, by using requeue, we avoid rewriting the
@@ -93,7 +96,10 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
{ok, State #mqstate { mode = disk, msg_buf = queue:new(),
next_write_seq = NextSeq1 }}.
+to_mixed_mode(State = #mqstate { mode = mixed }) ->
+ {ok, State};
to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) ->
+ rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
%% load up a new queue with everything that's on disk.
%% don't remove non-persistent messages that happen to be on disk
QList = rabbit_disk_queue:dump_queue(Q),
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
new file mode 100644
index 0000000000..aee57ac3b7
--- /dev/null
+++ b/src/rabbit_queue_mode_manager.erl
@@ -0,0 +1,105 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_mode_manager).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/1, change_memory_usage/2]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, { mode,
+ queues
+ }).
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register(Pid) ->
+ gen_server2:call(?SERVER, {register, Pid}).
+
+change_memory_usage(_Pid, Conserve) ->
+ gen_server2:cast(?SERVER, {change_memory_usage, Conserve}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ok = rabbit_alarm:register(self(), {?MODULE, change_memory_usage, []}),
+ {ok, #state { mode = unlimited,
+ queues = []
+ }}.
+
+handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) ->
+ Result = case Mode of
+ unlimited -> mixed;
+ _ -> disk
+ end,
+ {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}.
+
+handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) ->
+ {noreply, State};
+handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) ->
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ {noreply, State #state { mode = disk_only }};
+handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) ->
+ constrain_queues(true, State #state.queues),
+ {noreply, State #state { mode = ram_disk }};
+
+handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) ->
+ {noreply, State};
+handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) ->
+ constrain_queues(false, State #state.queues),
+ {noreply, State #state { mode = unlimited }};
+handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) ->
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ {noreply, State #state { mode = ram_disk }}.
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+constrain_queues(Constrain, Qs) ->
+ lists:foreach(
+ fun (QPid) ->
+ ok = rabbit_amqqueue:constrain_memory(QPid, Constrain)
+ end, Qs).