diff options
| -rw-r--r-- | src/rabbit.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 105 |
6 files changed, 131 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index ce73f6ce6d..44e4dc7f25 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -138,6 +138,8 @@ start(normal, []) -> {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), + + ok = start_child(rabbit_queue_mode_manager), ok = rabbit_binary_generator: check_empty_content_body_frame_size(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 08c67946dc..97ffcda8e6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,6 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([constrain_memory/2]). -import(mnesia). -import(gen_server2). @@ -103,6 +104,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(constrain_memory/2 :: (pid(), bool()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -312,6 +314,9 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:cast(QPid, {unblock, ChPid}). +constrain_memory(QPid, Constrain) -> + gen_server2:cast(QPid, {constrain, Constrain}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f45f931e81..6ad4e4e6b7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -93,7 +93,8 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, mixed), %% TODO, CHANGE ME + {ok, Mode} = rabbit_queue_mode_manager:register(self()), + {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, Mode), %% TODO, CHANGE ME {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -779,7 +780,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) -> + {ok, MS2} = if Constrain -> rabbit_mixed_queue:to_disk_only_mode(MS); + true -> rabbit_mixed_queue:to_mixed_mode(MS) + end, + noreply(State #q { mixed_state = MS2 }). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 5c1f969e1e..1b30051f30 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -458,6 +458,7 @@ handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk, msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts }) -> + rabbit_log:info("Converting disk queue to disk only mode~n", []), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), ok = dets:from_ets(MsgLocationDets, MsgLocationEts), @@ -470,6 +471,7 @@ handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only, msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts }) -> + rabbit_log:info("Converting disk queue to ram disk mode~n", []), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), true = ets:from_dets(MsgLocationEts, MsgLocationDets), @@ -514,6 +516,8 @@ handle_cast({delete_queue, Q}, State) -> {ok, State1} = internal_delete_queue(Q, State), {noreply, State1}. +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index dae4dad150..6a463242be 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -57,8 +57,11 @@ start_link(Queue, IsDurable, mixed) -> {ok, State} = start_link(Queue, IsDurable, disk), to_mixed_mode(State). +to_disk_only_mode(State = #mqstate { mode = disk }) -> + {ok, State}; to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextSeq }) -> + rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), %% We enqueue _everything_ here. This means that should a message %% already be in the disk queue we must remove it and add it back %% in. Fortunately, by using requeue, we avoid rewriting the @@ -93,7 +96,10 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, {ok, State #mqstate { mode = disk, msg_buf = queue:new(), next_write_seq = NextSeq1 }}. +to_mixed_mode(State = #mqstate { mode = mixed }) -> + {ok, State}; to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) -> + rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), %% load up a new queue with everything that's on disk. %% don't remove non-persistent messages that happen to be on disk QList = rabbit_disk_queue:dump_queue(Q), diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl new file mode 100644 index 0000000000..aee57ac3b7 --- /dev/null +++ b/src/rabbit_queue_mode_manager.erl @@ -0,0 +1,105 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_mode_manager). + +-behaviour(gen_server2). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/1, change_memory_usage/2]). + +-define(SERVER, ?MODULE). + +-record(state, { mode, + queues + }). + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +register(Pid) -> + gen_server2:call(?SERVER, {register, Pid}). + +change_memory_usage(_Pid, Conserve) -> + gen_server2:cast(?SERVER, {change_memory_usage, Conserve}). + +init([]) -> + process_flag(trap_exit, true), + ok = rabbit_alarm:register(self(), {?MODULE, change_memory_usage, []}), + {ok, #state { mode = unlimited, + queues = [] + }}. + +handle_call({register, Pid}, _From, State = #state { queues = Qs, mode = Mode }) -> + Result = case Mode of + unlimited -> mixed; + _ -> disk + end, + {reply, {ok, Result}, State #state { queues = [Pid | Qs] }}. + +handle_cast({change_memory_usage, true}, State = #state { mode = disk_only }) -> + {noreply, State}; +handle_cast({change_memory_usage, true}, State = #state { mode = ram_disk }) -> + ok = rabbit_disk_queue:to_disk_only_mode(), + {noreply, State #state { mode = disk_only }}; +handle_cast({change_memory_usage, true}, State = #state { mode = unlimited }) -> + constrain_queues(true, State #state.queues), + {noreply, State #state { mode = ram_disk }}; + +handle_cast({change_memory_usage, false}, State = #state { mode = unlimited }) -> + {noreply, State}; +handle_cast({change_memory_usage, false}, State = #state { mode = ram_disk }) -> + constrain_queues(false, State #state.queues), + {noreply, State #state { mode = unlimited }}; +handle_cast({change_memory_usage, false}, State = #state { mode = disk_only }) -> + ok = rabbit_disk_queue:to_ram_disk_mode(), + {noreply, State #state { mode = ram_disk }}. + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +constrain_queues(Constrain, Qs) -> + lists:foreach( + fun (QPid) -> + ok = rabbit_amqqueue:constrain_memory(QPid, Constrain) + end, Qs). |
