summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_disk_queue.erl40
-rw-r--r--src/rabbit_mixed_queue.erl33
-rw-r--r--src/rabbit_queue_mode_manager.erl34
-rw-r--r--src/rabbit_tests.erl23
5 files changed, 94 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a1b5a895bd..0597215fb6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -100,19 +100,22 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, Mode} = rabbit_queue_mode_manager:register
- (self(), rabbit_amqqueue, set_mode, [self()]),
- {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode),
- {ok, #q{q = Q,
- owner = none,
- exclusive_consumer = none,
- has_had_consumers = false,
- mixed_state = MS,
- next_msg_id = 1,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- memory_report_timer = start_memory_timer()
- }, {binary, ?HIBERNATE_AFTER_MIN}}.
+ ok = rabbit_queue_mode_manager:register
+ (self(), rabbit_amqqueue, set_mode, [self()]),
+ {ok, MS} = rabbit_mixed_queue:init(QName, Durable),
+ State = #q{q = Q,
+ owner = none,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ mixed_state = MS,
+ next_msg_id = 1,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ memory_report_timer = start_memory_timer()
+ },
+ %% first thing we must do is report_memory which will clear out
+ %% the 'undefined' values in gain and loss in mixed_queue state
+ {ok, report_memory(false, State), {binary, ?HIBERNATE_AFTER_MIN}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -553,14 +556,10 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
-report_memory(Hibernating, State = #q { mixed_state = MS }) ->
+report_memory(Hib, State = #q { mixed_state = MS }) ->
{MSize, Gain, Loss} =
rabbit_mixed_queue:estimate_queue_memory(MS),
- NewMem = case MSize of
- 0 -> 1; %% avoid / 0
- N -> N
- end,
- rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss, Hibernating),
+ rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib),
State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS) }.
%---------------------------------------------------------------------------
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8b1487770d..868eab4a5e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, prefetch/2, length/1
+ requeue_next_n/2, prefetch/2, length/1, foldl/3
]).
-export([filesync/0, cache_info/0]).
@@ -266,6 +266,9 @@
-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(foldl/3 :: (fun (({message(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}, A) ->
+ A), A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
@@ -328,6 +331,9 @@ delete_non_durable_queues(DurableQueues) ->
length(Q) ->
gen_server2:call(?SERVER, {length, Q}, infinity).
+foldl(Fun, Init, Acc) ->
+ gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -367,8 +373,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- {ok, Mode} = rabbit_queue_mode_manager:register
- (self(), rabbit_disk_queue, set_mode, []),
+ ok = rabbit_queue_mode_manager:register
+ (self(), rabbit_disk_queue, set_mode, []),
Node = node(),
ok =
case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
@@ -440,10 +446,13 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = preallocate(FileHdl, FileSizeLimit, Offset)
end,
State2 = State1 #dqstate { current_file_handle = FileHdl },
- {ok, case Mode of
- mixed -> State2;
- disk -> to_disk_only_mode(State2)
- end, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
+ %% by reporting a memory use of 0, we guarantee the manager will
+ %% grant us to ram_disk mode. We have to start in ram_disk mode
+ %% because we can't find values for mnesia_bytes_per_record or
+ %% ets_bytes_per_record otherwise.
+ ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
+ ok = report_memory(false, State2),
+ {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
@@ -464,6 +473,9 @@ handle_call({purge, Q}, _From, State) ->
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
reply(WriteSeqId - ReadSeqId, State);
+handle_call({foldl, Fun, Init, Q}, _From, State) ->
+ {ok, Result, State1} = internal_foldl(Q, Fun, Init, State),
+ reply(Result, State1);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -588,7 +600,7 @@ start_memory_timer() ->
TRef.
start_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
- report_memory(false, State),
+ ok = report_memory(false, State),
State #dqstate { memory_report_timer = start_memory_timer() };
start_memory_timer(State) ->
State.
@@ -899,6 +911,18 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)),
{ok, StateN}.
+internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
+
+internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
+ {ok, Acc, State};
+internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
+ {ok, MsgStuff, State1}
+ = internal_read_message(Q, ReadSeqId, true, true, false, State),
+ Acc1 = Fun(MsgStuff, Acc),
+ internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
+
internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) ->
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index a9013f3d8b..d864d9b2f6 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
--export([init/3]).
+-export([init/2]).
-export([publish/2, publish_delivered/2, deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
@@ -70,7 +70,7 @@
-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
--spec(init/3 :: (queue_name(), bool(), mode()) -> okmqs()).
+-spec(init/2 :: (queue_name(), bool()) -> okmqs()).
-spec(publish/2 :: (message(), mqstate()) -> okmqs()).
-spec(publish_delivered/2 :: (message(), mqstate()) ->
{'ok', acktag(), mqstate()}).
@@ -99,16 +99,18 @@
-endif.
-init(Queue, IsDurable, disk) ->
+init(Queue, IsDurable) ->
Len = rabbit_disk_queue:length(Queue),
ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)),
MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ Size = rabbit_disk_queue:foldl(
+ fun ({Msg, _Size, _IsDelivered, _AckTag}, Acc) ->
+ Acc + size_of_message(Msg)
+ end, 0, Queue),
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
is_durable = IsDurable, length = Len,
- memory_size = 0, memory_gain = 0, memory_loss = 0 }};
-init(Queue, IsDurable, mixed) ->
- {ok, State} = init(Queue, IsDurable, disk),
- to_mixed_mode([], State).
+ memory_size = Size, memory_gain = undefined,
+ memory_loss = undefined }}.
size_of_message(
#basic_message { content = #content { payload_fragments_rev = Payload }}) ->
@@ -214,7 +216,7 @@ to_mixed_mode(TxnMessages, State =
%% load up a new queue with a token that says how many messages
%% are on disk (this is already built for us by the disk mode)
%% don't actually do anything to the disk
- ok = maybe_prefetch(MsgBuf),
+ ok = maybe_prefetch(mixed, MsgBuf),
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -248,10 +250,10 @@ inc_queue_length(Queue, MsgBuf, Count) ->
queue:in({Queue, Count}, MsgBuf)
end.
-dec_queue_length(MsgBuf) ->
+dec_queue_length(Mode, MsgBuf) ->
{{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf),
MsgBuf2 = case Len of
- 1 -> ok = maybe_prefetch(MsgBuf1),
+ 1 -> ok = maybe_prefetch(Mode, MsgBuf1),
MsgBuf1;
_ -> queue:in_r({Queue, Len-1}, MsgBuf1)
end,
@@ -327,7 +329,8 @@ publish_delivered(Msg, State =
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
- is_durable = IsDurable, length = Length }) ->
+ is_durable = IsDurable, length = Length,
+ mode = Mode }) ->
{{value, Value}, MsgBuf1} = queue:out(MsgBuf),
{Msg, IsDelivered, AckTag, MsgBuf2} =
case Value of
@@ -343,10 +346,10 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
false ->
noack
end,
- ok = maybe_prefetch(MsgBuf1),
+ ok = maybe_prefetch(Mode, MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
_ ->
- {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf),
+ {ReadQ, MsgBuf3} = dec_queue_length(Mode, MsgBuf),
{Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
= rabbit_disk_queue:deliver(ReadQ),
@@ -364,7 +367,9 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(MsgBuf) ->
+maybe_prefetch(disk, MsgBuf) ->
+ ok;
+maybe_prefetch(mixed, MsgBuf) ->
case queue:peek(MsgBuf) of
empty ->
ok;
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index a5e9610a6b..d4bc21d42f 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -43,7 +43,6 @@
-define(TOTAL_TOKENS, 10000000).
-define(ACTIVITY_THRESHOLD, 25).
--define(INITIAL_TOKEN_ALLOCATION, 100).
-define(SERVER, ?MODULE).
@@ -53,7 +52,7 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}).
+-spec(register/4 :: (pid(), atom(), atom(), list()) -> 'ok').
-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
-spec(report_memory/5 :: (pid(), non_neg_integer(),
non_neg_integer(), non_neg_integer(), bool()) ->
@@ -141,7 +140,7 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
register(Pid, Module, Function, Args) ->
- gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}).
+ gen_server2:cast(?SERVER, {register, Pid, Module, Function, Args}).
pin_to_disk(Pid) ->
gen_server2:call(?SERVER, {pin_to_disk, Pid}).
@@ -173,27 +172,6 @@ init([]) ->
disk_mode_pins = sets:new()
}}.
-handle_call({register, Pid, Module, Function, Args}, _From,
- State = #state { callbacks = Callbacks }) ->
- _MRef = erlang:monitor(process, Pid),
- State1 = State #state { callbacks = dict:store
- (Pid, {Module, Function, Args}, Callbacks) },
- State2 = #state { available_tokens = Avail,
- mixed_queues = Mixed } =
- free_upto(Pid, ?INITIAL_TOKEN_ALLOCATION, State1),
- {Result, State3} =
- case ?INITIAL_TOKEN_ALLOCATION > Avail of
- true ->
- {disk, State2};
- false ->
- {mixed, State2 #state {
- available_tokens =
- Avail - ?INITIAL_TOKEN_ALLOCATION,
- mixed_queues = dict:store
- (Pid, {?INITIAL_TOKEN_ALLOCATION, active}, Mixed) }}
- end,
- {reply, {ok, Result}, State3};
-
handle_call({pin_to_disk, Pid}, _From,
State = #state { mixed_queues = Mixed,
callbacks = Callbacks,
@@ -317,7 +295,13 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
hibernate -> StateN #state { hibernate =
queue:in(Pid, Sleepy) }
end,
- {noreply, StateN1}.
+ {noreply, StateN1};
+
+handle_cast({register, Pid, Module, Function, Args},
+ State = #state { callbacks = Callbacks }) ->
+ _MRef = erlang:monitor(process, Pid),
+ {noreply, State #state { callbacks = dict:store
+ (Pid, {Module, Function, Args}, Callbacks) }}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 221279f7ed..58a9d0cddd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -990,11 +990,20 @@ rdq_test_purge() ->
rdq_stop(),
passed.
+rdq_new_mixed_queue(Q, Durable, Disk) ->
+ {ok, MS} = rabbit_mixed_queue:init(Q, Durable),
+ MS1 = rabbit_mixed_queue:reset_counters(MS),
+ case Disk of
+ true -> {ok, MS2} = rabbit_mixed_queue:to_disk_only_mode([], MS1),
+ MS2;
+ false -> MS1
+ end.
+
rdq_test_mixed_queue_modes() ->
rdq_virgin(),
rdq_start(),
Payload = <<0:(8*256)>>,
- {ok, MS} = rabbit_mixed_queue:init(q, true, mixed),
+ MS = rdq_new_mixed_queue(q, true, false),
MS2 = lists:foldl(
fun (_N, MS1) ->
Msg = rabbit_basic:message(x, <<>>, [], Payload),
@@ -1041,7 +1050,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS12} = rabbit_mixed_queue:init(q, true, mixed),
+ MS12 = rdq_new_mixed_queue(q, true, false),
10 = rabbit_mixed_queue:length(MS12),
io:format("Recovered queue~n"),
{MS14, AckTags} =
@@ -1061,7 +1070,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed),
+ MS17 = rdq_new_mixed_queue(q, true, false),
0 = rabbit_mixed_queue:length(MS17),
{0,0,0} = rabbit_mixed_queue:estimate_queue_memory(MS17),
io:format("Recovered queue~n"),
@@ -1081,23 +1090,23 @@ rdq_test_mode_conversion_mid_txn() ->
rdq_virgin(),
rdq_start(),
- {ok, MS0} = rabbit_mixed_queue:init(q, true, mixed),
+ MS0 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit),
rdq_stop_virgin_start(),
- {ok, MS1} = rabbit_mixed_queue:init(q, true, mixed),
+ MS1 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel),
rdq_stop_virgin_start(),
- {ok, MS2} = rabbit_mixed_queue:init(q, true, disk),
+ MS2 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit),
rdq_stop_virgin_start(),
- {ok, MS3} = rabbit_mixed_queue:init(q, true, disk),
+ MS3 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel),