summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-10 19:04:58 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-10 19:04:58 +0100
commitbff27f50060f08aed88e16c7473750b371eeee3b (patch)
treee8fffe0007428d561c2ea2b7e91f8bf32849c672 /src
parente8f690e23c4f350f6eb8dc9ccd8350a9439afc35 (diff)
downloadrabbitmq-server-git-bff27f50060f08aed88e16c7473750b371eeee3b.tar.gz
Added means to alter all queues and switch to disk_only mode in the disk queue.
rabbit_queue_mode_manager:change_memory_usage(undef, true). this will first ask all queues to switch from mixed to disk mode, and will on a 2nd call, ask the disk queue to switch to disk only mode. rabbit_queue_mode_manager:change_memory_usage(undef, false). moves the other way. This all works, eg set MulticastMain pushing in messages and switch modes, and it's fine. One immediate problem is that as soon as everything becomes disk only, the performance suffers, so as a result messages build up. This is as expected. Then, going back to the middle mode (i.e. disk queue in ram_disk mode and queues in disk mode), the switch in the disk queue eats up a lot of memory. I suspect this is the effect of converting the mnesia table from disc_only_copies to disc_copies when there are 40k+ messages in there (one row per message). As a result, this conversion on its own is very dangerous to make. It might be more sensible to use the "weird" mode, where the queues are in mixed mode and the disk queue is in disk_only mode so as to try and get the queues to drain as fast as possible, reducing the size of the mnesia table so that when it is finally converted back, it's small. More experimentation is needed. I'll hook the above commands into rabbitmqctl soon.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_disk_queue.erl4
-rw-r--r--src/rabbit_mixed_queue.erl6
-rw-r--r--src/rabbit_queue_mode_manager.erl105
6 files changed, 131 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ce73f6ce6d..44e4dc7f25 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -138,6 +138,8 @@ start(normal, []) ->
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
+
+ ok = start_child(rabbit_queue_mode_manager),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 08c67946dc..97ffcda8e6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,6 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([constrain_memory/2]).
-import(mnesia).
-import(gen_server2).
@@ -103,6 +104,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(constrain_memory/2 :: (pid(), bool()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -312,6 +314,9 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:cast(QPid, {unblock, ChPid}).
+constrain_memory(QPid, Constrain) ->
+ gen_server2:cast(QPid, {constrain, Constrain}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f45f931e81..6ad4e4e6b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -93,7 +93,8 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, mixed), %% TODO, CHANGE ME
+ {ok, Mode} = rabbit_queue_mode_manager:register(self()),
+ {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode), %% TODO, CHANGE ME
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -779,7 +780,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) ->
+ {ok, MS2} = if Constrain -> rabbit_mixed_queue:to_disk_only_mode(MS);
+ true -> rabbit_mixed_queue:to_mixed_mode(MS)
+ end,
+ noreply(State #q { mixed_state = MS2 }).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 5c1f969e1e..1b30051f30 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -458,6 +458,7 @@ handle_call(to_disk_only_mode, _From,
State = #dqstate { operation_mode = ram_disk,
msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to disk only mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_only_copies),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
@@ -470,6 +471,7 @@ handle_call(to_ram_disk_mode, _From,
State = #dqstate { operation_mode = disk_only,
msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to ram disk mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
@@ -514,6 +516,8 @@ handle_cast({delete_queue, Q}, State) ->
{ok, State1} = internal_delete_queue(Q, State),
{noreply, State1}.
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index dae4dad150..6a463242be 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -57,8 +57,11 @@ start_link(Queue, IsDurable, mixed) ->
{ok, State} = start_link(Queue, IsDurable, disk),
to_mixed_mode(State).
+to_disk_only_mode(State = #mqstate { mode = disk }) ->
+ {ok, State};
to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
next_write_seq = NextSeq }) ->
+ rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
%% We enqueue _everything_ here. This means that should a message
%% already be in the disk queue we must remove it and add it back
%% in. Fortunately, by using requeue, we avoid rewriting the
@@ -93,7 +96,10 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
{ok, State #mqstate { mode = disk, msg_buf = queue:new(),
next_write_seq = NextSeq1 }}.
+to_mixed_mode(State = #mqstate { mode = mixed }) ->
+ {ok, State};
to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) ->
+ rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
%% load up a new queue with everything that's on disk.
%% don't remove non-persistent messages that happen to be on disk
QList = rabbit_disk_queue:dump_queue(Q),
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
new file mode 100644
index 0000000000..aee57ac3b7
--- /dev/null
+++ b/src/rabbit_queue_mode_manager.erl
@@ -0,0 +1,105 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_mode_manager).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/1, change_memory_usage/2]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, { mode,
+ queues
+ }).
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register(Pid) ->
+ gen_server2:call(?SERVER, {register, Pid}).
+
+change_memory_usage(_Pid, Conserve) ->
+ gen_server2:cast(?SERVER, {change_memory_usage, Conserve}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ok = rabbit_alarm:register(self(), {?MODULE, change_memory_usage, []}),
+ {ok, #state { mode = unlimited,
+ queues = []
+ }}.
+
+handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) ->
+ Result = case Mode of
+ unlimited -> mixed;
+ _ -> disk
+ end,
+ {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}.
+
+handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) ->
+ {noreply, State};
+handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) ->
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ {noreply, State #state { mode = disk_only }};
+handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) ->
+ constrain_queues(true, State #state.queues),
+ {noreply, State #state { mode = ram_disk }};
+
+handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) ->
+ {noreply, State};
+handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) ->
+ constrain_queues(false, State #state.queues),
+ {noreply, State #state { mode = unlimited }};
+handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) ->
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ {noreply, State #state { mode = ram_disk }}.
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+constrain_queues(Constrain, Qs) ->
+ lists:foreach(
+ fun (QPid) ->
+ ok = rabbit_amqqueue:constrain_memory(QPid, Constrain)
+ end, Qs).