summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-16 18:08:41 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-16 18:08:41 +0100
commitc196178600ced8eec833902be30309d076f98e4a (patch)
tree0c275b0bd774ae21dbfcd1888ec94201c7b5b1ce
parent6467e9a1c93c5b88cee3b214f8e9fc89b83743cb (diff)
downloadrabbitmq-server-git-c196178600ced8eec833902be30309d076f98e4a.tar.gz
Well it's better. The memory size is now recovered at start up by doing a foldl on the entire queue. This seems excessive, but it does work. It only takes 75 seconds on my machine to get through 1e6 1024-byte messages, and 160 seconds to get through 2e6 1024-byte messages. So that doesn't worry me any more. Also, it's done in constant memory... ish[0].
Also fixed the queue_mode_manager. Registration does not now produce a mode. Instead, it assumes you're starting up in disk only mode and then the first memory_report will result in the correct mode being set. This is safe and prevents a potentially deadly prefetch being sent when a queue starts up in mixed mode only to be sent to disk_only mode. However, the disk_queue has to start up in mixed mode because if it doesn't it has no way to estimate its memory use for disk mode. As such, it registers and then sends a report of 0 memory use. This guarantees that it can be put in mixed mode, thus it can then respond as necessary to the queue_mode_manager. I've not done anything further at this stage with the use of the erlang queue in the mixed_queue module when in disk mode (the potential per-message cost). Really you don't want to send individual entries here to the disk_queue, you want to batch them up... makes this rather more complex. [0] Sort of wrong. It can use the cache, and if you think about not too big queues sharing messages, this is clearly a good thing. But if there are lots of shared messages then it all goes wrong because the cache will get over populated and exhaust memory. Furthermore, the foldl is entirely in the disk_queue process. This means that during the foldl it won't be reporting memory and it won't be able to respond to request to change its mode. All of which points pretty strongly to the requirement that the prefetch needs to be somewhat more sophisticated.
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_disk_queue.erl40
-rw-r--r--src/rabbit_mixed_queue.erl33
-rw-r--r--src/rabbit_queue_mode_manager.erl34
-rw-r--r--src/rabbit_tests.erl23
5 files changed, 94 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a1b5a895bd..0597215fb6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -100,19 +100,22 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, Mode} = rabbit_queue_mode_manager:register
- (self(), rabbit_amqqueue, set_mode, [self()]),
- {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode),
- {ok, #q{q = Q,
- owner = none,
- exclusive_consumer = none,
- has_had_consumers = false,
- mixed_state = MS,
- next_msg_id = 1,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- memory_report_timer = start_memory_timer()
- }, {binary, ?HIBERNATE_AFTER_MIN}}.
+ ok = rabbit_queue_mode_manager:register
+ (self(), rabbit_amqqueue, set_mode, [self()]),
+ {ok, MS} = rabbit_mixed_queue:init(QName, Durable),
+ State = #q{q = Q,
+ owner = none,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ mixed_state = MS,
+ next_msg_id = 1,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ memory_report_timer = start_memory_timer()
+ },
+ %% first thing we must do is report_memory which will clear out
+ %% the 'undefined' values in gain and loss in mixed_queue state
+ {ok, report_memory(false, State), {binary, ?HIBERNATE_AFTER_MIN}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -553,14 +556,10 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
-report_memory(Hibernating, State = #q { mixed_state = MS }) ->
+report_memory(Hib, State = #q { mixed_state = MS }) ->
{MSize, Gain, Loss} =
rabbit_mixed_queue:estimate_queue_memory(MS),
- NewMem = case MSize of
- 0 -> 1; %% avoid / 0
- N -> N
- end,
- rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss, Hibernating),
+ rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib),
State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS) }.
%---------------------------------------------------------------------------
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8b1487770d..868eab4a5e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, prefetch/2, length/1
+ requeue_next_n/2, prefetch/2, length/1, foldl/3
]).
-export([filesync/0, cache_info/0]).
@@ -266,6 +266,9 @@
-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(foldl/3 :: (fun (({message(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}, A) ->
+ A), A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
@@ -328,6 +331,9 @@ delete_non_durable_queues(DurableQueues) ->
length(Q) ->
gen_server2:call(?SERVER, {length, Q}, infinity).
+foldl(Fun, Init, Acc) ->
+ gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -367,8 +373,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- {ok, Mode} = rabbit_queue_mode_manager:register
- (self(), rabbit_disk_queue, set_mode, []),
+ ok = rabbit_queue_mode_manager:register
+ (self(), rabbit_disk_queue, set_mode, []),
Node = node(),
ok =
case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
@@ -440,10 +446,13 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = preallocate(FileHdl, FileSizeLimit, Offset)
end,
State2 = State1 #dqstate { current_file_handle = FileHdl },
- {ok, case Mode of
- mixed -> State2;
- disk -> to_disk_only_mode(State2)
- end, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
+ %% by reporting a memory use of 0, we guarantee the manager will
+ %% grant us to ram_disk mode. We have to start in ram_disk mode
+ %% because we can't find values for mnesia_bytes_per_record or
+ %% ets_bytes_per_record otherwise.
+ ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
+ ok = report_memory(false, State2),
+ {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
@@ -464,6 +473,9 @@ handle_call({purge, Q}, _From, State) ->
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
reply(WriteSeqId - ReadSeqId, State);
+handle_call({foldl, Fun, Init, Q}, _From, State) ->
+ {ok, Result, State1} = internal_foldl(Q, Fun, Init, State),
+ reply(Result, State1);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -588,7 +600,7 @@ start_memory_timer() ->
TRef.
start_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
- report_memory(false, State),
+ ok = report_memory(false, State),
State #dqstate { memory_report_timer = start_memory_timer() };
start_memory_timer(State) ->
State.
@@ -899,6 +911,18 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)),
{ok, StateN}.
+internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
+
+internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
+ {ok, Acc, State};
+internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
+ {ok, MsgStuff, State1}
+ = internal_read_message(Q, ReadSeqId, true, true, false, State),
+ Acc1 = Fun(MsgStuff, Acc),
+ internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
+
internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) ->
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index a9013f3d8b..d864d9b2f6 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
--export([init/3]).
+-export([init/2]).
-export([publish/2, publish_delivered/2, deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
@@ -70,7 +70,7 @@
-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
--spec(init/3 :: (queue_name(), bool(), mode()) -> okmqs()).
+-spec(init/2 :: (queue_name(), bool()) -> okmqs()).
-spec(publish/2 :: (message(), mqstate()) -> okmqs()).
-spec(publish_delivered/2 :: (message(), mqstate()) ->
{'ok', acktag(), mqstate()}).
@@ -99,16 +99,18 @@
-endif.
-init(Queue, IsDurable, disk) ->
+init(Queue, IsDurable) ->
Len = rabbit_disk_queue:length(Queue),
ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)),
MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ Size = rabbit_disk_queue:foldl(
+ fun ({Msg, _Size, _IsDelivered, _AckTag}, Acc) ->
+ Acc + size_of_message(Msg)
+ end, 0, Queue),
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
is_durable = IsDurable, length = Len,
- memory_size = 0, memory_gain = 0, memory_loss = 0 }};
-init(Queue, IsDurable, mixed) ->
- {ok, State} = init(Queue, IsDurable, disk),
- to_mixed_mode([], State).
+ memory_size = Size, memory_gain = undefined,
+ memory_loss = undefined }}.
size_of_message(
#basic_message { content = #content { payload_fragments_rev = Payload }}) ->
@@ -214,7 +216,7 @@ to_mixed_mode(TxnMessages, State =
%% load up a new queue with a token that says how many messages
%% are on disk (this is already built for us by the disk mode)
%% don't actually do anything to the disk
- ok = maybe_prefetch(MsgBuf),
+ ok = maybe_prefetch(mixed, MsgBuf),
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -248,10 +250,10 @@ inc_queue_length(Queue, MsgBuf, Count) ->
queue:in({Queue, Count}, MsgBuf)
end.
-dec_queue_length(MsgBuf) ->
+dec_queue_length(Mode, MsgBuf) ->
{{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf),
MsgBuf2 = case Len of
- 1 -> ok = maybe_prefetch(MsgBuf1),
+ 1 -> ok = maybe_prefetch(Mode, MsgBuf1),
MsgBuf1;
_ -> queue:in_r({Queue, Len-1}, MsgBuf1)
end,
@@ -327,7 +329,8 @@ publish_delivered(Msg, State =
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
- is_durable = IsDurable, length = Length }) ->
+ is_durable = IsDurable, length = Length,
+ mode = Mode }) ->
{{value, Value}, MsgBuf1} = queue:out(MsgBuf),
{Msg, IsDelivered, AckTag, MsgBuf2} =
case Value of
@@ -343,10 +346,10 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
false ->
noack
end,
- ok = maybe_prefetch(MsgBuf1),
+ ok = maybe_prefetch(Mode, MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
_ ->
- {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf),
+ {ReadQ, MsgBuf3} = dec_queue_length(Mode, MsgBuf),
{Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
= rabbit_disk_queue:deliver(ReadQ),
@@ -364,7 +367,9 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(MsgBuf) ->
+maybe_prefetch(disk, MsgBuf) ->
+ ok;
+maybe_prefetch(mixed, MsgBuf) ->
case queue:peek(MsgBuf) of
empty ->
ok;
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index a5e9610a6b..d4bc21d42f 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -43,7 +43,6 @@
-define(TOTAL_TOKENS, 10000000).
-define(ACTIVITY_THRESHOLD, 25).
--define(INITIAL_TOKEN_ALLOCATION, 100).
-define(SERVER, ?MODULE).
@@ -53,7 +52,7 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}).
+-spec(register/4 :: (pid(), atom(), atom(), list()) -> 'ok').
-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
-spec(report_memory/5 :: (pid(), non_neg_integer(),
non_neg_integer(), non_neg_integer(), bool()) ->
@@ -141,7 +140,7 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
register(Pid, Module, Function, Args) ->
- gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}).
+ gen_server2:cast(?SERVER, {register, Pid, Module, Function, Args}).
pin_to_disk(Pid) ->
gen_server2:call(?SERVER, {pin_to_disk, Pid}).
@@ -173,27 +172,6 @@ init([]) ->
disk_mode_pins = sets:new()
}}.
-handle_call({register, Pid, Module, Function, Args}, _From,
- State = #state { callbacks = Callbacks }) ->
- _MRef = erlang:monitor(process, Pid),
- State1 = State #state { callbacks = dict:store
- (Pid, {Module, Function, Args}, Callbacks) },
- State2 = #state { available_tokens = Avail,
- mixed_queues = Mixed } =
- free_upto(Pid, ?INITIAL_TOKEN_ALLOCATION, State1),
- {Result, State3} =
- case ?INITIAL_TOKEN_ALLOCATION > Avail of
- true ->
- {disk, State2};
- false ->
- {mixed, State2 #state {
- available_tokens =
- Avail - ?INITIAL_TOKEN_ALLOCATION,
- mixed_queues = dict:store
- (Pid, {?INITIAL_TOKEN_ALLOCATION, active}, Mixed) }}
- end,
- {reply, {ok, Result}, State3};
-
handle_call({pin_to_disk, Pid}, _From,
State = #state { mixed_queues = Mixed,
callbacks = Callbacks,
@@ -317,7 +295,13 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
hibernate -> StateN #state { hibernate =
queue:in(Pid, Sleepy) }
end,
- {noreply, StateN1}.
+ {noreply, StateN1};
+
+handle_cast({register, Pid, Module, Function, Args},
+ State = #state { callbacks = Callbacks }) ->
+ _MRef = erlang:monitor(process, Pid),
+ {noreply, State #state { callbacks = dict:store
+ (Pid, {Module, Function, Args}, Callbacks) }}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 221279f7ed..58a9d0cddd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -990,11 +990,20 @@ rdq_test_purge() ->
rdq_stop(),
passed.
+rdq_new_mixed_queue(Q, Durable, Disk) ->
+ {ok, MS} = rabbit_mixed_queue:init(Q, Durable),
+ MS1 = rabbit_mixed_queue:reset_counters(MS),
+ case Disk of
+ true -> {ok, MS2} = rabbit_mixed_queue:to_disk_only_mode([], MS1),
+ MS2;
+ false -> MS1
+ end.
+
rdq_test_mixed_queue_modes() ->
rdq_virgin(),
rdq_start(),
Payload = <<0:(8*256)>>,
- {ok, MS} = rabbit_mixed_queue:init(q, true, mixed),
+ MS = rdq_new_mixed_queue(q, true, false),
MS2 = lists:foldl(
fun (_N, MS1) ->
Msg = rabbit_basic:message(x, <<>>, [], Payload),
@@ -1041,7 +1050,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS12} = rabbit_mixed_queue:init(q, true, mixed),
+ MS12 = rdq_new_mixed_queue(q, true, false),
10 = rabbit_mixed_queue:length(MS12),
io:format("Recovered queue~n"),
{MS14, AckTags} =
@@ -1061,7 +1070,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed),
+ MS17 = rdq_new_mixed_queue(q, true, false),
0 = rabbit_mixed_queue:length(MS17),
{0,0,0} = rabbit_mixed_queue:estimate_queue_memory(MS17),
io:format("Recovered queue~n"),
@@ -1081,23 +1090,23 @@ rdq_test_mode_conversion_mid_txn() ->
rdq_virgin(),
rdq_start(),
- {ok, MS0} = rabbit_mixed_queue:init(q, true, mixed),
+ MS0 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit),
rdq_stop_virgin_start(),
- {ok, MS1} = rabbit_mixed_queue:init(q, true, mixed),
+ MS1 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel),
rdq_stop_virgin_start(),
- {ok, MS2} = rabbit_mixed_queue:init(q, true, disk),
+ MS2 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit),
rdq_stop_virgin_start(),
- {ok, MS3} = rabbit_mixed_queue:init(q, true, disk),
+ MS3 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel),