summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-29 18:01:10 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-29 18:01:10 +0100
commit5c64ee0c50de0d9b3f6e382b9ec8d66ea5a4d51b (patch)
tree7487fcde855afe77ee5cf29b05c521b86705e677 /src
parent10aa354784bb711c8c0329a5623fc9a6a90c40d2 (diff)
downloadrabbitmq-server-git-5c64ee0c50de0d9b3f6e382b9ec8d66ea5a4d51b.tar.gz
mmmm. It maybe sort of works. Needs work though
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl76
-rw-r--r--src/rabbit_disk_queue.erl16
-rw-r--r--src/rabbit_mixed_queue.erl2
-rw-r--r--src/rabbit_queue_mode_manager.erl156
6 files changed, 181 insertions, 85 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7d5e2a796f..9587238835 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -149,9 +149,7 @@ start(normal, []) ->
end},
{"disk queue",
fun () ->
- ok = start_child(rabbit_disk_queue),
- %% TODO, CHANGE ME, waiting on bug 20980
- ok = rabbit_disk_queue:to_ram_disk_mode()
+ ok = start_child(rabbit_disk_queue)
end},
{"recovery",
fun () ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c045b3cae3..92272f0cdc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([set_mode/3]).
+-export([set_mode/3, set_mode/2, report_memory/1]).
-import(mnesia).
-import(gen_server2).
@@ -105,10 +105,12 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(set_mode/3 :: (vhost(), amqqueue(), ('disk' | 'mixed')) -> 'ok').
+-spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
+-spec(report_memory/1 :: (pid()) -> 'ok').
-endif.
@@ -229,7 +231,13 @@ set_mode(VHostPath, Queue, ModeBin)
when is_binary(VHostPath) andalso is_binary(Queue) ->
Mode = list_to_atom(binary_to_list(ModeBin)),
with(rabbit_misc:r(VHostPath, queue, Queue),
- fun(Q) -> gen_server2:pcast(Q #amqqueue.pid, 10, {set_mode, Mode}) end).
+ fun(Q) -> set_mode(Q #amqqueue.pid, Mode) end).
+
+set_mode(QPid, Mode) ->
+ gen_server2:pcast(QPid, 10, {set_mode, Mode}).
+
+report_memory(QPid) ->
+ gen_server2:cast(QPid, report_memory).
info(#amqqueue{ pid = QPid }) ->
gen_server2:pcall(QPid, 9, info, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b6353beff6..2bd170a265 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -37,8 +37,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
--define(MEMORY_REPORT_INTERVAL, 500).
--define(MEMORY_REPORT_TIME_INTERVAL, 1000000). %% 1 second in microseconds
+-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-export([start_link/1]).
@@ -58,8 +57,7 @@
next_msg_id,
active_consumers,
blocked_consumers,
- memory_report_counter,
- old_memory_report
+ memory_report_timer
}).
-record(consumer, {tag, ack_required}).
@@ -112,8 +110,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
- memory_report_counter = 0,
- old_memory_report = {1, now()}
+ memory_report_timer = start_memory_timer()
}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
@@ -124,6 +121,7 @@ terminate(_Reason, State) ->
rollback_transaction(Txn, State1)
end, State, all_tx()),
rabbit_mixed_queue:delete_queue(NewState #q.mixed_state),
+ stop_memory_timer(NewState),
ok = rabbit_amqqueue:internal_delete(QName).
code_change(_OldVsn, State, _Extra) ->
@@ -131,16 +129,30 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState = #q { memory_report_counter = 0 }) ->
- {reply, Reply, report_memory(NewState), ?HIBERNATE_AFTER};
-reply(Reply, NewState = #q { memory_report_counter = C }) ->
- {reply, Reply, NewState #q { memory_report_counter = C - 1 },
- ?HIBERNATE_AFTER}.
-
-noreply(NewState = #q { memory_report_counter = 0}) ->
- {noreply, report_memory(NewState), ?HIBERNATE_AFTER};
-noreply(NewState = #q { memory_report_counter = C}) ->
- {noreply, NewState #q { memory_report_counter = C - 1 }, ?HIBERNATE_AFTER}.
+reply(Reply, NewState = #q { memory_report_timer = undefined }) ->
+ {reply, Reply, start_memory_timer(NewState), ?HIBERNATE_AFTER};
+reply(Reply, NewState) ->
+ {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState = #q { memory_report_timer = undefined }) ->
+ {noreply, start_memory_timer(NewState), ?HIBERNATE_AFTER};
+noreply(NewState) ->
+ {noreply, NewState, ?HIBERNATE_AFTER}.
+
+start_memory_timer() ->
+ {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL,
+ rabbit_amqqueue, report_memory, [self()]),
+ TRef.
+start_memory_timer(State = #q { memory_report_timer = undefined }) ->
+ State #q { memory_report_timer = start_memory_timer() };
+start_memory_timer(State) ->
+ State.
+
+stop_memory_timer(State = #q { memory_report_timer = undefined }) ->
+ State;
+stop_memory_timer(State = #q { memory_report_timer = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #q { memory_report_timer = undefined }.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -543,24 +555,15 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
-report_memory(State = #q { old_memory_report = {OldMem, Then},
- mixed_state = MS }) ->
+report_memory(State = #q { mixed_state = MS }) ->
{MSize, Gain, Loss} =
rabbit_mixed_queue:estimate_queue_memory(MS),
NewMem = case MSize of
0 -> 1; %% avoid / 0
N -> N
end,
- State1 = State #q { memory_report_counter = ?MEMORY_REPORT_INTERVAL },
- Now = now(),
- case ((NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1) andalso
- (?MEMORY_REPORT_TIME_INTERVAL < timer:now_diff(Now, Then)) of
- true ->
- rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss),
- State1 #q { old_memory_report = {NewMem, Now},
- mixed_state = rabbit_mixed_queue:reset_counters(MS) };
- false -> State1
- end.
+ rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss),
+ State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS) }.
%---------------------------------------------------------------------------
@@ -834,8 +837,10 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
disk -> fun rabbit_mixed_queue:to_disk_only_mode/2;
mixed -> fun rabbit_mixed_queue:to_mixed_mode/2
end)(PendingMessages, MS),
- noreply(State #q { mixed_state = MS1 }).
-
+ noreply(State #q { mixed_state = MS1 });
+
+handle_cast(report_memory, State) ->
+ noreply(report_memory(State)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -853,16 +858,11 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State = #q { memory_report_counter = Count })
- when Count == ?MEMORY_REPORT_INTERVAL ->
- %% Have to do the +1 because the timeout below, with noreply, will -1
+handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
-
-handle_info(timeout, State) ->
- State1 = report_memory(State),
- noreply(State1 #q { memory_report_counter = 1 + ?MEMORY_REPORT_INTERVAL });
+ State1 = stop_memory_timer(report_memory(State)),
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4333f667cd..8db8f24932 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -46,7 +46,7 @@
-export([length/1, filesync/0, cache_info/0]).
--export([stop/0, stop_and_obliterate/0, change_memory_footprint/2,
+-export([stop/0, stop_and_obliterate/0, conserve_memory/2,
to_disk_only_mode/0, to_ram_disk_mode/0]).
-include("rabbit.hrl").
@@ -270,7 +270,7 @@
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
-spec(filesync/0 :: () -> 'ok').
-spec(cache_info/0 :: () -> [{atom(), term()}]).
--spec(change_memory_footprint/2 :: (pid(), bool()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
-endif.
@@ -345,8 +345,8 @@ filesync() ->
cache_info() ->
gen_server2:call(?SERVER, cache_info, infinity).
-change_memory_footprint(_Pid, Conserve) ->
- gen_server2:pcast(?SERVER, 9, {change_memory_footprint, Conserve}).
+conserve_memory(_Pid, Conserve) ->
+ gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}).
%% ---- GEN-SERVER INTERNAL API ----
@@ -360,11 +360,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}),
+ ok = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
Node = node(),
ok =
case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
- disc_only_copies) of
+ disc_copies) of
{atomic, ok} -> ok;
{aborted, {already_exists, rabbit_disk_queue, Node,
disc_only_copies}} -> ok;
@@ -391,7 +391,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
State =
#dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
- operation_mode = disk_only,
+ operation_mode = ram_disk,
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
[set, private]),
sequences = ets:new(?SEQUENCE_ETS_NAME,
@@ -502,7 +502,7 @@ handle_cast({delete_queue, Q}, State) ->
noreply(State1);
handle_cast(filesync, State) ->
noreply(sync_current_file_handle(State));
-handle_cast({change_memory_footprint, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
noreply((case Conserve of
true -> fun to_disk_only_mode/1;
false -> fun to_ram_disk_mode/1
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 12fede1728..d171cf186f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -502,7 +502,7 @@ is_empty(#mqstate { length = Length }) ->
estimate_queue_memory(#mqstate { memory_size = Size, memory_gain = Gain,
memory_loss = Loss }) ->
- {Size, Gain, Loss}.
+ {2*Size, Gain, Loss}.
reset_counters(State) ->
State #mqstate { memory_gain = 0, memory_loss = 0 }.
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 4ed56fd33c..3a55833b34 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -42,8 +42,7 @@
-define(TOTAL_TOKENS, 1000).
-define(LOW_WATER_MARK_FRACTION, 0.25).
--define(EXPIRY_INTERVAL_MICROSECONDS, 5000000).
--define(ACTIVITY_THRESHOLD, 10).
+-define(ACTIVITY_THRESHOLD, 25).
-define(INITIAL_TOKEN_ALLOCATION, 10).
-define(SERVER, ?MODULE).
@@ -60,10 +59,10 @@
-endif.
--record(state, { remaining_tokens,
+-record(state, { available_tokens,
+ available_etokens,
mixed_queues,
- disk_queues,
- bytes_per_token
+ tokens_per_byte
}).
start_link() ->
@@ -78,44 +77,131 @@ report_memory(Pid, Memory, Gain, Loss) ->
init([]) ->
process_flag(trap_exit, true),
%% todo, fix up this call as os_mon may not be running
- {MemTotal, _MemUsed, _BigProc} = memsup:get_memory_data(),
- {ok, #state { remaining_tokens = ?TOTAL_TOKENS,
+ {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(),
+ MemAvail = MemTotal - MemUsed,
+ Avail = ceil(?TOTAL_TOKENS * (1 - ?LOW_WATER_MARK_FRACTION)),
+ EAvail = ?TOTAL_TOKENS - Avail,
+ {ok, #state { available_tokens = Avail,
+ available_etokens = EAvail,
mixed_queues = dict:new(),
- disk_queues = sets:new(),
- bytes_per_token = MemTotal / ?TOTAL_TOKENS
+ tokens_per_byte = ?TOTAL_TOKENS / MemAvail
}}.
handle_call({register, Pid}, _From,
- State = #state { remaining_tokens = Remaining,
- mixed_queues = Mixed,
- disk_queues = Disk }) ->
+ State = #state { available_tokens = Avail,
+ mixed_queues = Mixed }) ->
_MRef = erlang:monitor(process, Pid),
{Result, State1} =
- case Remaining >= ?INITIAL_TOKEN_ALLOCATION of
+ case ?INITIAL_TOKEN_ALLOCATION > Avail of
true ->
- {mixed, State #state { remaining_tokens =
- Remaining - ?INITIAL_TOKEN_ALLOCATION,
- mixed_queues = dict:store
- (Pid, {?INITIAL_TOKEN_ALLOCATION, now()},
- Mixed) }};
-
+ {disk, State};
false ->
- {disk, State #state { disk_queues =
- sets:add_element(Pid, Disk) }}
+ {mixed, State #state { mixed_queues = dict:store
+ (Pid, {?INITIAL_TOKEN_ALLOCATION, 0}, Mixed) }}
end,
- {reply, {ok, Result}, State1 }.
-
-handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost}, State) ->
- {noreply, State}.
+ {reply, {ok, Result}, State1}.
+
+handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost},
+ State = #state { available_tokens = Avail,
+ available_etokens = EAvail,
+ tokens_per_byte = TPB,
+ mixed_queues = Mixed
+ }) ->
+ Req = ceil(Memory * TPB),
+ io:format("~w : ~w ~w ~n", [Pid, Memory, Req]),
+ LowRate = (BytesGained < ?ACTIVITY_THRESHOLD)
+ andalso (BytesLost < ?ACTIVITY_THRESHOLD),
+ io:format("~w ~w~n", [O, LowRate]),
+ State1 =
+ case find_queue(Pid, State) of
+ disk ->
+ case Req > Avail orelse (2*Req) > (Avail + EAvail) orelse
+ LowRate of
+ true -> State; %% remain as disk queue
+ false ->
+ %% go to mixed, allocate double Req, and use Extra
+ rabbit_amqqueue:set_mode(Pid, mixed),
+ Alloc = lists:min([2*Req, Avail]),
+ EAlloc = (2*Req) - Alloc,
+ State #state { available_tokens = Avail - Alloc,
+ available_etokens = EAvail - EAlloc,
+ mixed_queues = dict:store
+ (Pid, {Alloc, EAlloc}, Mixed)
+ }
+ end;
+ {mixed, {OAlloc, OEAlloc}} ->
+ io:format("~w ; ~w ~w ~n", [Pid, OAlloc, OEAlloc]),
+ Avail1 = Avail + OAlloc,
+ EAvail1 = EAvail + OEAlloc,
+ case Req > (OAlloc + OEAlloc) of
+ true -> %% getting bigger
+ case Req > Avail1 of
+ true -> %% go to disk
+ rabbit_amqqueue:set_mode(Pid, disk),
+ State #state { available_tokens = Avail1,
+ available_etokens = EAvail1,
+ mixed_queues =
+ dict:erase(Pid, Mixed) };
+ false -> %% request not too big, stay mixed
+ State #state { available_tokens = Avail1 - Req,
+ available_etokens = EAvail1,
+ mixed_queues = dict:store
+ (Pid, {Req, 0}, Mixed) }
+ end;
+ false -> %% getting smaller (or staying same)
+ case 0 =:= OEAlloc of
+ true ->
+ case Req > Avail1 orelse LowRate of
+ true -> %% go to disk
+ rabbit_amqqueue:set_mode(Pid, disk),
+ State #state { available_tokens = Avail1,
+ available_etokens = EAvail1,
+ mixed_queues =
+ dict:erase(Pid, Mixed) };
+ false -> %% request not too big, stay mixed
+ State #state { available_tokens = Avail1 - Req,
+ available_etokens = EAvail1,
+ mixed_queues = dict:store
+ (Pid, {Req, 0}, Mixed) }
+ end;
+ false ->
+ case Req > Avail1 of
+ true ->
+ EReq = Req - Avail1,
+ case EReq > EAvail1 of
+ true -> %% go to disk
+ rabbit_amqqueue:set_mode(Pid, disk),
+ State #state { available_tokens = Avail1,
+ available_etokens = EAvail1,
+ mixed_queues =
+ dict:erase(Pid, Mixed) };
+ false -> %% request not too big, stay mixed
+ State #state { available_tokens = 0,
+ available_etokens = EAvail1 - EReq,
+ mixed_queues = dict:store
+ (Pid, {Avail1, EReq}, Mixed) }
+ end;
+ false -> %% request not too big, stay mixed
+ State #state { available_tokens = Avail1 - Req,
+ available_etokens = EAvail1,
+ mixed_queues = dict:store
+ (Pid, {Req, 0}, Mixed) }
+ end
+ end
+ end
+ end,
+ {noreply, State1}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #state { remaining_tokens = Remaining,
+ State = #state { available_tokens = Avail,
+ available_etokens = EAvail,
mixed_queues = Mixed }) ->
State1 = case find_queue(Pid, State) of
disk ->
State;
- {mixed, {Tokens, _When}} ->
- State #state { remaining_tokens = Remaining + Tokens,
+ {mixed, {Alloc, EAlloc}} ->
+ State #state { available_tokens = Avail + Alloc,
+ available_etokens = EAvail + EAlloc,
mixed_queues = dict:erase(Pid, Mixed) }
end,
{noreply, State1};
@@ -130,9 +216,13 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-find_queue(Pid, #state { disk_queues = Disk, mixed_queues = Mixed }) ->
- case sets:is_element(Pid, Disk) of
- true -> disk;
- false -> {mixed, dict:fetch(Pid, Mixed)}
+find_queue(Pid, #state { mixed_queues = Mixed }) ->
+ case dict:find(Pid, Mixed) of
+ {ok, Value} -> {mixed, Value};
+ error -> disk
end.
-
+
+ceil(N) when N - trunc(N) > 0 ->
+ 1 + trunc(N);
+ceil(N) ->
+ N.