summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-03 17:43:52 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-03 17:43:52 +0100
commite5962c4232be43111fb7ca4a60fa0360f49ee9f5 (patch)
tree7d2cc83a03ead10d150f231992fe48a33f91c692 /src
parent9fc504937632389a8570606f3b9518f13e6a8e24 (diff)
downloadrabbitmq-server-git-e5962c4232be43111fb7ca4a60fa0360f49ee9f5.tar.gz
Reworked. Because the disk->mixed transition doesn't eat up any ram, there is no need for the emergency tokens, nor any need for the weird doubling. So it's basically got much simpler.
We hold two queues, one of hibernating queues (ordered by when they hibernated) and another priority_queue of lowrate queues (ordered by the amount of memory allocated to them). We evict to disk from the hibernated and then the lowrate queues in their relevant orders. Seems to work. Oh and disk_queue is now managed by the tokens too.
Diffstat (limited to 'src')
-rw-r--r--src/priority_queue.erl58
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_disk_queue.erl104
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_mixed_queue.erl2
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_queue_mode_manager.erl406
-rw-r--r--src/rabbit_tests.erl58
8 files changed, 435 insertions, 212 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 732757c41c..9683809933 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -55,7 +55,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, pout/1, join/2]).
%%----------------------------------------------------------------------------
@@ -73,6 +74,8 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(pout/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -147,6 +150,59 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+pout({queue, [], []}) ->
+ {empty, {queue, [], []}};
+pout({queue, _, _} = Q) ->
+ {{value, V}, Q1} = out(Q),
+ {{value, V, 0}, Q1};
+pout({pqueue, [{P, Q} | Queues]}) ->
+ {{value, V}, Q1} = out(Q),
+ NewQ = case is_empty(Q1) of
+ true -> case Queues of
+ [] -> {queue, [], []};
+ [{0, OnlyQ}] -> OnlyQ;
+ [_|_] -> {pqueue, Queues}
+ end;
+ false -> {pqueue, [{P, Q1} | Queues]}
+ end,
+ {{value, V, -P}, NewQ}.
+
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 986546dce7..6b19695157 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -102,7 +102,8 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, Mode} = rabbit_queue_mode_manager:register(self()),
+ {ok, Mode} = rabbit_queue_mode_manager:register
+ (self(), rabbit_amqqueue, set_mode, [self()]),
{ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode),
{ok, #q{q = Q,
owner = none,
@@ -141,7 +142,7 @@ reply(Reply, NewState) ->
reply1(Reply, NewState = #q { hibernated_at = undefined }) ->
{reply, Reply, NewState, NewState #q.hibernate_after};
reply1(Reply, NewState) ->
- NewState1 = report_memory(false, adjust_hibernate_after(NewState)),
+ NewState1 = adjust_hibernate_after(NewState),
{reply, Reply, NewState1, NewState1 #q.hibernate_after}.
noreply(NewState = #q { memory_report_timer = undefined }) ->
@@ -152,7 +153,7 @@ noreply(NewState) ->
noreply1(NewState = #q { hibernated_at = undefined }) ->
{noreply, NewState, NewState #q.hibernate_after};
noreply1(NewState) ->
- NewState1 = report_memory(false, adjust_hibernate_after(NewState)),
+ NewState1 = adjust_hibernate_after(NewState),
{noreply, NewState1, NewState1 #q.hibernate_after}.
adjust_hibernate_after(State = #q { hibernated_at = undefined }) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index dc328792a0..6674ce0e4d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -46,23 +46,24 @@
-export([length/1, filesync/0, cache_info/0]).
--export([stop/0, stop_and_obliterate/0, conserve_memory/2,
- to_disk_only_mode/0, to_ram_disk_mode/0]).
+-export([stop/0, stop_and_obliterate/0, report_memory/0,
+ set_mode/1, to_disk_only_mode/0, to_ram_disk_mode/0]).
-include("rabbit.hrl").
--define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK, 255).
--define(INTEGER_SIZE_BYTES, 8).
--define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
--define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
--define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
--define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
--define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
--define(FILE_EXTENSION, ".rdq").
--define(FILE_EXTENSION_TMP, ".rdt").
--define(FILE_EXTENSION_DETS, ".dets").
--define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+-define(WRITE_OK_SIZE_BITS, 8).
+-define(WRITE_OK, 255).
+-define(INTEGER_SIZE_BYTES, 8).
+-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
+-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-define(SERVER, ?MODULE).
@@ -89,7 +90,9 @@
on_sync_froms, %% list of commiters to run on sync (reversed)
timer_ref, %% TRef for our interval timer
last_sync_offset, %% current_offset at the last time we sync'd
- message_cache %% ets message cache
+ message_cache, %% ets message cache
+ memory_report_timer, %% TRef for the memory report timer
+ wordsize %% bytes in a word on this platform
}).
%% The components:
@@ -267,7 +270,8 @@
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
-spec(filesync/0 :: () -> 'ok').
-spec(cache_info/0 :: () -> [{atom(), term()}]).
--spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
+-spec(report_memory/0 :: () -> 'ok').
+-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok').
-endif.
@@ -339,8 +343,11 @@ filesync() ->
cache_info() ->
gen_server2:call(?SERVER, cache_info, infinity).
-conserve_memory(_Pid, Conserve) ->
- gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}).
+report_memory() ->
+ gen_server2:cast(?SERVER, report_memory).
+
+set_mode(Mode) ->
+ gen_server2:cast(?SERVER, {set_mode, Mode}).
%% ---- GEN-SERVER INTERNAL API ----
@@ -354,7 +361,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- ok = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ {ok, Mode} = rabbit_queue_mode_manager:register
+ (self(), rabbit_disk_queue, set_mode, []),
Node = node(),
ok =
case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
@@ -381,6 +389,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+ {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL,
+ rabbit_disk_queue, report_memory, []),
+
+
InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
@@ -402,7 +414,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
timer_ref = undefined,
last_sync_offset = 0,
message_cache = ets:new(?CACHE_ETS_NAME,
- [set, private])
+ [set, private]),
+ memory_report_timer = TRef,
+ wordsize = erlang:system_info(wordsize)
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -419,7 +433,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
false -> %% new file, so preallocate
ok = preallocate(FileHdl, FileSizeLimit, Offset)
end,
- {ok, State1 #dqstate { current_file_handle = FileHdl }}.
+ State2 = State1 #dqstate { current_file_handle = FileHdl },
+ {ok, case Mode of
+ mixed -> State2;
+ disk -> to_disk_only_mode(State2)
+ end}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
@@ -493,11 +511,15 @@ handle_cast({delete_queue, Q}, State) ->
noreply(State1);
handle_cast(filesync, State) ->
noreply(sync_current_file_handle(State));
-handle_cast({conserve_memory, Conserve}, State) ->
- noreply((case Conserve of
- true -> fun to_disk_only_mode/1;
- false -> fun to_ram_disk_mode/1
- end)(State)).
+handle_cast({set_mode, Mode}, State) ->
+ noreply((case Mode of
+ disk -> fun to_disk_only_mode/1;
+ mixed -> fun to_ram_disk_mode/1
+ end)(State));
+handle_cast(report_memory, State) ->
+ Bytes = memory_use(State),
+ rabbit_queue_mode_manager:report_memory(self(), Bytes),
+ noreply(State).
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -513,10 +535,12 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge}
+ read_file_handles = {ReadHdls, _ReadHdlsAge},
+ memory_report_timer = TRef
}) ->
- State1 = stop_commit_timer(State),
%% deliberately ignoring return codes here
+ timer:cancel(TRef),
+ State1 = stop_commit_timer(State),
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
@@ -531,7 +555,8 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
end, ok, ReadHdls),
State1 #dqstate { current_file_handle = undefined,
current_dirty = false,
- read_file_handles = {dict:new(), gb_trees:empty()}
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ memory_report_timer = undefined
}.
code_change(_OldVsn, State, _Extra) ->
@@ -539,6 +564,27 @@ code_change(_OldVsn, State, _Extra) ->
%% ---- UTILITY FUNCTIONS ----
+memory_use(#dqstate { operation_mode = ram_disk,
+ file_summary = FileSummary,
+ sequences = Sequences,
+ msg_location_ets = MsgLocationEts,
+ wordsize = WordSize
+ }) ->
+ WordSize * (mnesia:table_info(rabbit_disk_queue, memory) +
+ ets:info(MsgLocationEts, memory) +
+ ets:info(FileSummary, memory) +
+ ets:info(Sequences, memory));
+memory_use(#dqstate { operation_mode = disk_only,
+ file_summary = FileSummary,
+ sequences = Sequences,
+ msg_location_dets = MsgLocationDets,
+ wordsize = WordSize
+ }) ->
+ (WordSize * (ets:info(FileSummary, memory) +
+ ets:info(Sequences, memory))) +
+ mnesia:table_info(rabbit_disk_queue, memory) +
+ dets:info(MsgLocationDets, memory).
+
to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
State;
to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2971e33265..e66eb6b088 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -52,7 +52,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2]).
+-export([unfold/2, ceil/1]).
-import(mnesia).
-import(lists).
@@ -115,7 +115,8 @@
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-
+-spec(ceil/1 :: (number()) -> number()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -442,3 +443,8 @@ unfold(Fun, Acc, Init) ->
{true, E, I} -> unfold(Fun, [E|Acc], I);
false -> {Acc, Init}
end.
+
+ceil(N) when N - trunc(N) > 0 ->
+ 1 + trunc(N);
+ceil(N) ->
+ N.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 26fa029db7..d9c4689851 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -541,7 +541,7 @@ is_empty(#mqstate { length = Length }) ->
estimate_queue_memory(#mqstate { memory_size = Size, memory_gain = Gain,
memory_loss = Loss }) ->
- {2*Size, Gain, Loss}.
+ {Size, Gain, Loss}.
reset_counters(State) ->
State #mqstate { memory_gain = 0, memory_loss = 0 }.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 149501f8b3..b40294f686 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -150,7 +150,7 @@ table_definitions() ->
{type, set},
{local_content, true},
{attributes, record_info(fields, dq_msg_loc)},
- {disc_only_copies, [node()]}]}
+ {disc_copies, [node()]}]}
].
replicated_table_definitions() ->
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 395249782b..30695404c3 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -38,10 +38,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/1, report_memory/5]).
+-export([register/4, report_memory/2, report_memory/5, info/0]).
-define(TOTAL_TOKENS, 1000).
--define(LOW_WATER_MARK_FRACTION, 0.25).
-define(ACTIVITY_THRESHOLD, 25).
-define(INITIAL_TOKEN_ALLOCATION, 10).
@@ -53,7 +52,7 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(register/1 :: (pid()) -> {'ok', queue_mode()}).
+-spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}).
-spec(report_memory/5 :: (pid(), non_neg_integer(),
non_neg_integer(), non_neg_integer(), bool()) ->
'ok').
@@ -61,153 +60,153 @@
-endif.
-record(state, { available_tokens,
- available_etokens,
mixed_queues,
+ callbacks,
tokens_per_byte,
- low_rate,
- hibernated
+ lowrate,
+ hibernate
}).
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-register(Pid) ->
- gen_server2:call(?SERVER, {register, Pid}).
+register(Pid, Module, Function, Args) ->
+ gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}).
+
+report_memory(Pid, Memory) ->
+ report_memory(Pid, Memory, undefined, undefined, false).
report_memory(Pid, Memory, Gain, Loss, Hibernating) ->
gen_server2:cast(?SERVER,
{report_memory, Pid, Memory, Gain, Loss, Hibernating}).
+info() ->
+ gen_server2:call(?SERVER, info).
+
init([]) ->
process_flag(trap_exit, true),
%% todo, fix up this call as os_mon may not be running
{MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(),
- MemAvail = MemTotal - MemUsed,
- Avail = ceil(?TOTAL_TOKENS * (1 - ?LOW_WATER_MARK_FRACTION)),
- EAvail = ?TOTAL_TOKENS - Avail,
- {ok, #state { available_tokens = Avail,
- available_etokens = EAvail,
+ MemAvail = (MemTotal - MemUsed) / 3, %% magic
+ {ok, #state { available_tokens = ?TOTAL_TOKENS,
mixed_queues = dict:new(),
+ callbacks = dict:new(),
tokens_per_byte = ?TOTAL_TOKENS / MemAvail,
- low_rate = sets:new(),
- hibernated = sets:new()
+ lowrate = priority_queue:new(),
+ hibernate = queue:new()
}}.
-handle_call({register, Pid}, _From,
- State = #state { available_tokens = Avail,
- mixed_queues = Mixed }) ->
+handle_call({register, Pid, Module, Function, Args}, _From,
+ State = #state { callbacks = Callbacks }) ->
_MRef = erlang:monitor(process, Pid),
- {Result, State1} =
+ State1 = State #state { callbacks = dict:store
+ (Pid, {Module, Function, Args}, Callbacks) },
+ State2 = #state { available_tokens = Avail,
+ mixed_queues = Mixed } =
+ free_upto(Pid, ?INITIAL_TOKEN_ALLOCATION, State1),
+ {Result, State3} =
case ?INITIAL_TOKEN_ALLOCATION > Avail of
true ->
- {disk, State};
+ {disk, State2};
false ->
- {mixed, State #state { mixed_queues = dict:store
- (Pid, {?INITIAL_TOKEN_ALLOCATION, 0}, Mixed) }}
+ {mixed, State2 #state {
+ available_tokens =
+ Avail - ?INITIAL_TOKEN_ALLOCATION,
+ mixed_queues = dict:store
+ (Pid, {?INITIAL_TOKEN_ALLOCATION, active}, Mixed) }}
end,
- {reply, {ok, Result}, State1}.
+ {reply, {ok, Result}, State3};
+
+handle_call(info, _From, State) ->
+ State1 = #state { available_tokens = Avail,
+ mixed_queues = Mixed,
+ lowrate = Lazy,
+ hibernate = Sleepy } =
+ free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
+ {reply, [{ available_tokens, Avail },
+ { mixed_queues, dict:to_list(Mixed) },
+ { lowrate_queues, priority_queue:to_list(Lazy) },
+ { hibernated_queues, queue:to_list(Sleepy) }], State1}.
+
-handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
- State = #state { available_tokens = Avail,
- available_etokens = EAvail,
- tokens_per_byte = TPB,
- mixed_queues = Mixed
- }) ->
- Req = ceil(Memory * TPB),
- io:format("~w : ~w ~w ~n", [Pid, Memory, Req]),
- LowRate = (BytesGained < ?ACTIVITY_THRESHOLD)
- andalso (BytesLost < ?ACTIVITY_THRESHOLD),
- io:format("~w ~w~n", [O, LowRate]),
- State1 =
- case find_queue(Pid, State) of
- disk ->
- case Req > Avail orelse (2*Req) > (Avail + EAvail) orelse
- LowRate of
- true -> State; %% remain as disk queue
- false ->
- %% go to mixed, allocate double Req, and use Extra
- rabbit_amqqueue:set_mode(Pid, mixed),
- Alloc = lists:min([2*Req, Avail]),
- EAlloc = (2*Req) - Alloc,
- State #state { available_tokens = Avail - Alloc,
- available_etokens = EAvail - EAlloc,
- mixed_queues = dict:store
- (Pid, {Alloc, EAlloc}, Mixed)
- }
- end;
- {mixed, {OAlloc, OEAlloc}} ->
- io:format("~w ; ~w ~w ~n", [Pid, OAlloc, OEAlloc]),
+handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
+ State = #state { mixed_queues = Mixed,
+ available_tokens = Avail,
+ callbacks = Callbacks,
+ tokens_per_byte = TPB }) ->
+ Req = rabbit_misc:ceil(TPB * Memory),
+ LowRate = case {BytesGained, BytesLost} of
+ {undefined, _} -> false;
+ {_, undefined} -> false;
+ {G, L} -> G < ?ACTIVITY_THRESHOLD andalso
+ L < ?ACTIVITY_THRESHOLD
+ end,
+ {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} =
+ case find_queue(Pid, Mixed) of
+ {mixed, {OAlloc, _OActivity}} ->
Avail1 = Avail + OAlloc,
- EAvail1 = EAvail + OEAlloc,
- case Req > (OAlloc + OEAlloc) of
- true -> %% getting bigger
- case Req > Avail1 of
- true -> %% go to disk
- attempt_free_from_idle(Req, Pid,
- State #state { available_tokens = Avail1,
- available_etokens = EAvail1,
- mixed_queues =
- dict:erase(Pid, Mixed) });
- false -> %% request not too big, stay mixed
- State #state { available_tokens = Avail1 - Req,
- available_etokens = EAvail1,
- mixed_queues = dict:store
- (Pid, {Req, 0}, Mixed) }
- end;
- false -> %% getting smaller (or staying same)
- case 0 =:= OEAlloc of
- true ->
- case Req > Avail1 orelse LowRate of
- true -> %% go to disk
- attempt_free_from_idle(Req, Pid,
- State #state { available_tokens = Avail1,
- available_etokens = EAvail1,
- mixed_queues =
- dict:erase(Pid, Mixed) });
- false -> %% request not too big, stay mixed
- State #state { available_tokens = Avail1 - Req,
- available_etokens = EAvail1,
- mixed_queues = dict:store
- (Pid, {Req, 0}, Mixed) }
- end;
- false ->
- case Req > Avail1 of
- true ->
- EReq = Req - Avail1,
- case EReq > EAvail1 of
- true -> %% go to disk
- attempt_free_from_idle(Req, Pid,
- State #state { available_tokens = Avail1,
- available_etokens = EAvail1,
- mixed_queues =
- dict:erase(Pid, Mixed) });
- false -> %% request not too big, stay mixed
- State #state { available_tokens = 0,
- available_etokens = EAvail1 - EReq,
- mixed_queues = dict:store
- (Pid, {Avail1, EReq}, Mixed) }
- end;
- false -> %% request not too big, stay mixed
- State #state { available_tokens = Avail1 - Req,
- available_etokens = EAvail1,
- mixed_queues = dict:store
- (Pid, {Req, 0}, Mixed) }
- end
- end
+ State1 = #state { available_tokens = Avail2,
+ mixed_queues = Mixed1 } =
+ free_upto(Pid, Req,
+ State #state { available_tokens = Avail1 }),
+ case Req > Avail2 of
+ true -> %% nowt we can do, send to disk
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function, Args ++ [disk]),
+ {State1 #state { mixed_queues =
+ dict:erase(Pid, Mixed1) },
+ disk};
+ false -> %% keep mixed
+ Activity = if Hibernating -> hibernate;
+ LowRate -> lowrate;
+ true -> active
+ end,
+ {State1 #state
+ { mixed_queues =
+ dict:store(Pid, {Req, Activity}, Mixed1),
+ available_tokens = Avail2 - Req },
+ Activity}
+ end;
+ disk ->
+ State1 = #state { available_tokens = Avail1,
+ mixed_queues = Mixed1 } =
+ free_upto(Pid, Req, State),
+ case Req > Avail1 of
+ true -> %% not enough space, stay as disk
+ {State1, disk};
+ false -> %% can go to mixed mode
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function, Args ++ [mixed]),
+ Activity = if Hibernating -> hibernate;
+ LowRate -> lowrate;
+ true -> active
+ end,
+ {State1 #state {
+ mixed_queues =
+ dict:store(Pid, {Req, Activity}, Mixed1),
+ available_tokens = Avail1 - Req },
+ disk}
end
end,
- {noreply, State1}.
+ StateN1 =
+ case ActivityNew of
+ active -> StateN;
+ disk -> StateN;
+ lowrate -> StateN #state { lowrate =
+ priority_queue:in(Pid, Req, Lazy) };
+ hibernate -> StateN #state { hibernate =
+ queue:in(Pid, Sleepy) }
+ end,
+ {noreply, StateN1}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
- available_etokens = EAvail,
mixed_queues = Mixed }) ->
- State1 = case find_queue(Pid, State) of
+ State1 = case find_queue(Pid, Mixed) of
disk ->
State;
- {mixed, {Alloc, EAlloc}} ->
+ {mixed, {Alloc, _Activity}} ->
State #state { available_tokens = Avail + Alloc,
- available_etokens = EAvail + EAlloc,
mixed_queues = dict:erase(Pid, Mixed) }
end,
{noreply, State1};
@@ -222,69 +221,140 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-find_queue(Pid, #state { mixed_queues = Mixed }) ->
+find_queue(Pid, Mixed) ->
case dict:find(Pid, Mixed) of
{ok, Value} -> {mixed, Value};
error -> disk
end.
-ceil(N) when N - trunc(N) > 0 ->
- 1 + trunc(N);
-ceil(N) ->
- N.
+tidy_and_sum_lazy(IgnorePid, Lazy, Mixed) ->
+ tidy_and_sum_lazy(sets:add_element(IgnorePid, sets:new()),
+ Lazy, Mixed, 0, priority_queue:new()).
-attempt_free_from_idle(Req, Pid, State = #state { available_tokens = Avail,
- available_etokens = EAvail,
- low_rate = Lazy,
- hibernated = Sleepy,
- mixed_queues = Mixed }) ->
- case Req > Avail of
- true ->
- {Sleepy1, Freed, EFreed, State1} = free_upto(Req, sets:to_list(Sleepy), State),
- case Req > Avail + Freed of
+tidy_and_sum_lazy(DupCheckSet, Lazy, Mixed, FreeAcc, LazyAcc) ->
+ case priority_queue:pout(Lazy) of
+ {empty, Lazy} -> {FreeAcc, LazyAcc};
+ {{value, Pid, Alloc}, Lazy1} ->
+ case sets:is_element(Pid, DupCheckSet) of
true ->
- {Lazy1, Freed1, EFreed1, State2} = free_upto(Req, sets:to_list(Lazy), State1),
- case Req > Avail + Freed + Freed1 of
- true ->
- rabbit_amqqueue:set_mode(Pid, disk),
- State2 #state { available_tokens = Avail + Freed + Freed1,
- available_etokens = EAvail + EFreed + EFreed1,
- low_rate = Lazy1,
- hibernated = Sleepy1,
- mixed_queues = dict:erase(Pid, Mixed)
- };
- false ->
- State2 #state { available_tokens = Avail + Freed + Freed1 - Req,
- available_etokens = EAvail + EFreed + EFreed1,
- low_rate = Lazy1,
- hibernated = Sleepy1,
- mixed_queues = dict:store(Pid, {Req, 0}, Mixed)
- }
- end;
+ tidy_and_sum_lazy(DupCheckSet, Lazy1, Mixed, FreeAcc,
+ LazyAcc);
false ->
- State1 #state { available_tokens = Avail + Freed - Req,
- available_etokens = EAvail + EFreed,
- hibernated = Sleepy1,
- mixed_queues = dict:store(Pid, {Req, 0}, Mixed)
- }
- end;
- false ->
- State #state { mixed_queues = dict:store(Pid, {Req, 0}, Mixed) }
+ DupCheckSet1 = sets:add_element(Pid, DupCheckSet),
+ case find_queue(Pid, Mixed) of
+ {mixed, {Alloc, lowrate}} ->
+ tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed,
+ FreeAcc + Alloc, priority_queue:in
+ (Pid, Alloc, LazyAcc));
+ _ ->
+ tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed,
+ FreeAcc, LazyAcc)
+ end
+ end
+ end.
+
+tidy_and_sum_sleepy(IgnorePid, Sleepy, Mixed) ->
+ tidy_and_sum_sleepy(sets:add_element(IgnorePid, sets:new()),
+ Sleepy, Mixed, 0, queue:new()).
+
+tidy_and_sum_sleepy(DupCheckSet, Sleepy, Mixed, FreeAcc, SleepyAcc) ->
+ case queue:out(Sleepy) of
+ {empty, Sleepy} -> {FreeAcc, SleepyAcc};
+ {{value, Pid}, Sleepy1} ->
+ case sets:is_element(Pid, DupCheckSet) of
+ true ->
+ tidy_and_sum_sleepy(DupCheckSet, Sleepy1, Mixed, FreeAcc,
+ SleepyAcc);
+ false ->
+ DupCheckSet1 = sets:add_element(Pid, DupCheckSet),
+ case find_queue(Pid, Mixed) of
+ {mixed, {Alloc, hibernate}} ->
+ tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed,
+ FreeAcc + Alloc, queue:in
+ (Pid, SleepyAcc));
+ _ -> tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed,
+ FreeAcc, SleepyAcc)
+ end
+ end
+ end.
+
+free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req) ->
+ free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req,
+ priority_queue:new()).
+
+free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req, LazyAcc) ->
+ case priority_queue:pout(Lazy) of
+ {empty, Lazy} -> {priority_queue:join(Lazy, LazyAcc), Mixed, Req};
+ {{value, IgnorePid, Alloc}, Lazy1} ->
+ free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed, Req,
+ priority_queue:in(IgnorePid, Alloc, LazyAcc));
+ {{value, Pid, Alloc}, Lazy1} ->
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function, Args ++ [disk]),
+ Mixed1 = dict:erase(Pid, Mixed),
+ case Req > Alloc of
+ true -> free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed1,
+ Req - Alloc, LazyAcc);
+ false -> {priority_queue:join(Lazy1, LazyAcc), Mixed1,
+ Req - Alloc}
+ end
+ end.
+
+free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req) ->
+ free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, queue:new()).
+
+free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, SleepyAcc) ->
+ case queue:out(Sleepy) of
+ {empty, Sleepy} -> {queue:join(Sleepy, SleepyAcc), Mixed, Req};
+ {{value, IgnorePid}, Sleepy1} ->
+ free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed, Req,
+ queue:in(IgnorePid, SleepyAcc));
+ {{value, Pid}, Sleepy1} ->
+ {Alloc, hibernate} = dict:fetch(Pid, Mixed),
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function, Args ++ [disk]),
+ Mixed1 = dict:erase(Pid, Mixed),
+ case Req > Alloc of
+ true -> free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed1,
+ Req - Alloc, SleepyAcc);
+ false -> {queue:join(Sleepy1, SleepyAcc), Mixed1, Req - Alloc}
+ end
end.
-free_upto(Req, List, State) ->
- free_upto(Req, List, 0, 0, State).
-
-free_upto(_Req, [], Freed, EFreed, State) ->
- {[], Freed, EFreed, State};
-free_upto(Req, [Pid|Pids], Freed, EFreed, State = #state { available_tokens = Avail,
- mixed_queues = Mixed }) ->
- {mixed, {Alloc, EAlloc}} = find_queue(Pid, State),
- rabbit_amqqueue:set_mode(Pid, disk),
- State1 = State #state { mixed_queues = dict:erase(Pid, Mixed) },
- case Req > Avail + Freed + Alloc of
+free_upto(Pid, Req, State = #state { available_tokens = Avail,
+ mixed_queues = Mixed,
+ callbacks = Callbacks,
+ lowrate = Lazy,
+ hibernate = Sleepy }) ->
+ case Req > Avail of
true ->
- free_upto(Req, Pids, Freed + Alloc, EFreed + EAlloc, State1);
- false ->
- {Pids, Freed + Alloc, EFreed + EAlloc, State1}
+ {SleepySum, Sleepy1} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed),
+ case Req > Avail + SleepySum of
+ true -> %% not enough in sleepy, have a look in lazy too
+ {LazySum, Lazy1} = tidy_and_sum_lazy(Pid, Lazy, Mixed),
+ case Req > Avail + SleepySum + LazySum of
+ true -> %% can't free enough, just return tidied state
+ State #state { lowrate = Lazy1,
+ hibernate = Sleepy1 };
+ false -> %% need to free all of sleepy, and some of lazy
+ {Sleepy2, Mixed1, ReqRem} =
+ free_upto_sleepy
+ (Pid, Callbacks, Sleepy1, Mixed, Req),
+ {Lazy2, Mixed2, ReqRem1} =
+ free_upto_lazy(Pid, Callbacks, Lazy1, Mixed1,
+ ReqRem),
+ State #state { available_tokens =
+ Avail + (Req - ReqRem1),
+ mixed_queues = Mixed2,
+ lowrate = Lazy2,
+ hibernate = Sleepy2 }
+ end;
+ false -> %% enough available in sleepy, don't touch lazy
+ {Sleepy2, Mixed1, ReqRem} =
+ free_upto_sleepy(Pid, Callbacks, Sleepy1, Mixed, Req),
+ State #state { available_tokens = Avail + (Req - ReqRem),
+ mixed_queues = Mixed1,
+ hibernate = Sleepy2 }
+ end;
+ false -> State
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b56d71c8c2..f108285056 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -65,7 +65,7 @@ test_priority_queue() ->
%% empty Q
Q = priority_queue:new(),
- {true, true, 0, [], []} = test_priority_queue(Q),
+ {true, true, 0, [], [], []} = test_priority_queue(Q),
%% 1-4 element no-priority Q
true = lists:all(fun (X) -> X =:= passed end,
@@ -74,21 +74,57 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo], [{foo, 1}]} = test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
- {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} =
test_priority_queue(Q2),
%% 2-element different-priority Q
Q3 = priority_queue:in(bar, 2, Q1),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} =
test_priority_queue(Q3),
%% 1-element negative priority Q
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
- {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+ {true, false, 1, [{-1, foo}], [foo], [{foo, -1}]} = test_priority_queue(Q4),
+
+ %% merge 2 * 1-element no-priority Qs
+ Q5 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [{foo, 0}, {bar, 0}]} =
+ test_priority_queue(Q5),
+
+ %% merge 1-element no-priority Q with 1-element priority Q
+ Q6 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [{bar, 1}, {foo, 0}]} =
+ test_priority_queue(Q6),
+
+ %% merge 1-element priority Q with 1-element no-priority Q
+ Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [{foo, 1}, {bar, 0}]} =
+ test_priority_queue(Q7),
+
+ %% merge 2 * 1-element same-priority Qs
+ Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} =
+ test_priority_queue(Q8),
+
+ %% merge 2 * 1-element different-priority Qs
+ Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 2, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} =
+ test_priority_queue(Q9),
+
+ %% merge 2 * 1-element different-priority Qs (other way around)
+ Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
+ priority_queue:in(foo, 1, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} =
+ test_priority_queue(Q10),
passed.
@@ -101,18 +137,26 @@ priority_queue_out_all(Q) ->
{{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
end.
+priority_queue_pout_all(Q) ->
+ case priority_queue:pout(Q) of
+ {empty, _} -> [];
+ {{value, V, P}, Q1} -> [{V, P} | priority_queue_pout_all(Q1)]
+ end.
+
test_priority_queue(Q) ->
{priority_queue:is_queue(Q),
priority_queue:is_empty(Q),
priority_queue:len(Q),
priority_queue:to_list(Q),
- priority_queue_out_all(Q)}.
+ priority_queue_out_all(Q),
+ priority_queue_pout_all(Q)}.
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
ToListRes = [{0, X} || X <- Items],
- {true, false, N, ToListRes, Items} = test_priority_queue(Q),
+ POutAllRes = [{X, 0} || X <- Items],
+ {true, false, N, ToListRes, Items, POutAllRes} = test_priority_queue(Q),
passed.
test_parsing() ->