diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 23 |
5 files changed, 94 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a1b5a895bd..0597215fb6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -100,19 +100,22 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - {ok, Mode} = rabbit_queue_mode_manager:register - (self(), rabbit_amqqueue, set_mode, [self()]), - {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode), - {ok, #q{q = Q, - owner = none, - exclusive_consumer = none, - has_had_consumers = false, - mixed_state = MS, - next_msg_id = 1, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - memory_report_timer = start_memory_timer() - }, {binary, ?HIBERNATE_AFTER_MIN}}. + ok = rabbit_queue_mode_manager:register + (self(), rabbit_amqqueue, set_mode, [self()]), + {ok, MS} = rabbit_mixed_queue:init(QName, Durable), + State = #q{q = Q, + owner = none, + exclusive_consumer = none, + has_had_consumers = false, + mixed_state = MS, + next_msg_id = 1, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + memory_report_timer = start_memory_timer() + }, + %% first thing we must do is report_memory which will clear out + %% the 'undefined' values in gain and loss in mixed_queue state + {ok, report_memory(false, State), {binary, ?HIBERNATE_AFTER_MIN}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -553,14 +556,10 @@ i(memory, _) -> i(Item, _) -> throw({bad_argument, Item}). -report_memory(Hibernating, State = #q { mixed_state = MS }) -> +report_memory(Hib, State = #q { mixed_state = MS }) -> {MSize, Gain, Loss} = rabbit_mixed_queue:estimate_queue_memory(MS), - NewMem = case MSize of - 0 -> 1; %% avoid / 0 - N -> N - end, - rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss, Hibernating), + rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib), State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS) }. %--------------------------------------------------------------------------- diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 8b1487770d..868eab4a5e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -42,7 +42,7 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, prefetch/2, length/1 + requeue_next_n/2, prefetch/2, length/1, foldl/3 ]). -export([filesync/0, cache_info/0]). @@ -266,6 +266,9 @@ -spec(delete_queue/1 :: (queue_name()) -> 'ok'). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(foldl/3 :: (fun (({message(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}, A) -> + A), A, queue_name()) -> A). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_disk_only_mode/0 :: () -> 'ok'). @@ -328,6 +331,9 @@ delete_non_durable_queues(DurableQueues) -> length(Q) -> gen_server2:call(?SERVER, {length, Q}, infinity). +foldl(Fun, Init, Acc) -> + gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity). + stop() -> gen_server2:call(?SERVER, stop, infinity). @@ -367,8 +373,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% brutal_kill. %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), - {ok, Mode} = rabbit_queue_mode_manager:register - (self(), rabbit_disk_queue, set_mode, []), + ok = rabbit_queue_mode_manager:register + (self(), rabbit_disk_queue, set_mode, []), Node = node(), ok = case mnesia:change_table_copy_type(rabbit_disk_queue, Node, @@ -440,10 +446,13 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ok = preallocate(FileHdl, FileSizeLimit, Offset) end, State2 = State1 #dqstate { current_file_handle = FileHdl }, - {ok, case Mode of - mixed -> State2; - disk -> to_disk_only_mode(State2) - end, {binary, ?HIBERNATE_AFTER_MIN}, 0}. + %% by reporting a memory use of 0, we guarantee the manager will + %% grant us to ram_disk mode. We have to start in ram_disk mode + %% because we can't find values for mnesia_bytes_per_record or + %% ets_bytes_per_record otherwise. + ok = rabbit_queue_mode_manager:report_memory(self(), 0, false), + ok = report_memory(false, State2), + {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}. handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, false, State), @@ -464,6 +473,9 @@ handle_call({purge, Q}, _From, State) -> handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), reply(WriteSeqId - ReadSeqId, State); +handle_call({foldl, Fun, Init, Q}, _From, State) -> + {ok, Result, State1} = internal_foldl(Q, Fun, Init, State), + reply(Result, State1); handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(stop_vaporise, _From, State) -> @@ -588,7 +600,7 @@ start_memory_timer() -> TRef. start_memory_timer(State = #dqstate { memory_report_timer = undefined }) -> - report_memory(false, State), + ok = report_memory(false, State), State #dqstate { memory_report_timer = start_memory_timer() }; start_memory_timer(State) -> State. @@ -899,6 +911,18 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) -> end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)), {ok, StateN}. +internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). + +internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> + {ok, Acc, State}; +internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> + {ok, MsgStuff, State1} + = internal_read_message(Q, ReadSeqId, true, true, false, State), + Acc1 = Fun(MsgStuff, Acc), + internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1). + internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -> [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] = diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index a9013f3d8b..d864d9b2f6 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). --export([init/3]). +-export([init/2]). -export([publish/2, publish_delivered/2, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, @@ -70,7 +70,7 @@ -type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })). -type(okmqs() :: {'ok', mqstate()}). --spec(init/3 :: (queue_name(), bool(), mode()) -> okmqs()). +-spec(init/2 :: (queue_name(), bool()) -> okmqs()). -spec(publish/2 :: (message(), mqstate()) -> okmqs()). -spec(publish_delivered/2 :: (message(), mqstate()) -> {'ok', acktag(), mqstate()}). @@ -99,16 +99,18 @@ -endif. -init(Queue, IsDurable, disk) -> +init(Queue, IsDurable) -> Len = rabbit_disk_queue:length(Queue), ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)), MsgBuf = inc_queue_length(Queue, queue:new(), Len), + Size = rabbit_disk_queue:foldl( + fun ({Msg, _Size, _IsDelivered, _AckTag}, Acc) -> + Acc + size_of_message(Msg) + end, 0, Queue), {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, is_durable = IsDurable, length = Len, - memory_size = 0, memory_gain = 0, memory_loss = 0 }}; -init(Queue, IsDurable, mixed) -> - {ok, State} = init(Queue, IsDurable, disk), - to_mixed_mode([], State). + memory_size = Size, memory_gain = undefined, + memory_loss = undefined }}. size_of_message( #basic_message { content = #content { payload_fragments_rev = Payload }}) -> @@ -214,7 +216,7 @@ to_mixed_mode(TxnMessages, State = %% load up a new queue with a token that says how many messages %% are on disk (this is already built for us by the disk mode) %% don't actually do anything to the disk - ok = maybe_prefetch(MsgBuf), + ok = maybe_prefetch(mixed, MsgBuf), %% remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why @@ -248,10 +250,10 @@ inc_queue_length(Queue, MsgBuf, Count) -> queue:in({Queue, Count}, MsgBuf) end. -dec_queue_length(MsgBuf) -> +dec_queue_length(Mode, MsgBuf) -> {{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf), MsgBuf2 = case Len of - 1 -> ok = maybe_prefetch(MsgBuf1), + 1 -> ok = maybe_prefetch(Mode, MsgBuf1), MsgBuf1; _ -> queue:in_r({Queue, Len-1}, MsgBuf1) end, @@ -327,7 +329,8 @@ publish_delivered(Msg, State = deliver(State = #mqstate { length = 0 }) -> {empty, State}; deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, - is_durable = IsDurable, length = Length }) -> + is_durable = IsDurable, length = Length, + mode = Mode }) -> {{value, Value}, MsgBuf1} = queue:out(MsgBuf), {Msg, IsDelivered, AckTag, MsgBuf2} = case Value of @@ -343,10 +346,10 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, false -> noack end, - ok = maybe_prefetch(MsgBuf1), + ok = maybe_prefetch(Mode, MsgBuf1), {Msg1, IsDelivered1, AckTag1, MsgBuf1}; _ -> - {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf), + {ReadQ, MsgBuf3} = dec_queue_length(Mode, MsgBuf), {Msg1 = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered1, AckTag1, _PersistRem} = rabbit_disk_queue:deliver(ReadQ), @@ -364,7 +367,9 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, {{Msg, IsDelivered, AckTag, Rem}, State #mqstate { msg_buf = MsgBuf2, length = Rem }}. -maybe_prefetch(MsgBuf) -> +maybe_prefetch(disk, MsgBuf) -> + ok; +maybe_prefetch(mixed, MsgBuf) -> case queue:peek(MsgBuf) of empty -> ok; diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index a5e9610a6b..d4bc21d42f 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -43,7 +43,6 @@ -define(TOTAL_TOKENS, 10000000). -define(ACTIVITY_THRESHOLD, 25). --define(INITIAL_TOKEN_ALLOCATION, 100). -define(SERVER, ?MODULE). @@ -53,7 +52,7 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}). +-spec(register/4 :: (pid(), atom(), atom(), list()) -> 'ok'). -spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok'). -spec(report_memory/5 :: (pid(), non_neg_integer(), non_neg_integer(), non_neg_integer(), bool()) -> @@ -141,7 +140,7 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). register(Pid, Module, Function, Args) -> - gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}). + gen_server2:cast(?SERVER, {register, Pid, Module, Function, Args}). pin_to_disk(Pid) -> gen_server2:call(?SERVER, {pin_to_disk, Pid}). @@ -173,27 +172,6 @@ init([]) -> disk_mode_pins = sets:new() }}. -handle_call({register, Pid, Module, Function, Args}, _From, - State = #state { callbacks = Callbacks }) -> - _MRef = erlang:monitor(process, Pid), - State1 = State #state { callbacks = dict:store - (Pid, {Module, Function, Args}, Callbacks) }, - State2 = #state { available_tokens = Avail, - mixed_queues = Mixed } = - free_upto(Pid, ?INITIAL_TOKEN_ALLOCATION, State1), - {Result, State3} = - case ?INITIAL_TOKEN_ALLOCATION > Avail of - true -> - {disk, State2}; - false -> - {mixed, State2 #state { - available_tokens = - Avail - ?INITIAL_TOKEN_ALLOCATION, - mixed_queues = dict:store - (Pid, {?INITIAL_TOKEN_ALLOCATION, active}, Mixed) }} - end, - {reply, {ok, Result}, State3}; - handle_call({pin_to_disk, Pid}, _From, State = #state { mixed_queues = Mixed, callbacks = Callbacks, @@ -317,7 +295,13 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, hibernate -> StateN #state { hibernate = queue:in(Pid, Sleepy) } end, - {noreply, StateN1}. + {noreply, StateN1}; + +handle_cast({register, Pid, Module, Function, Args}, + State = #state { callbacks = Callbacks }) -> + _MRef = erlang:monitor(process, Pid), + {noreply, State #state { callbacks = dict:store + (Pid, {Module, Function, Args}, Callbacks) }}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 221279f7ed..58a9d0cddd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -990,11 +990,20 @@ rdq_test_purge() -> rdq_stop(), passed. +rdq_new_mixed_queue(Q, Durable, Disk) -> + {ok, MS} = rabbit_mixed_queue:init(Q, Durable), + MS1 = rabbit_mixed_queue:reset_counters(MS), + case Disk of + true -> {ok, MS2} = rabbit_mixed_queue:to_disk_only_mode([], MS1), + MS2; + false -> MS1 + end. + rdq_test_mixed_queue_modes() -> rdq_virgin(), rdq_start(), Payload = <<0:(8*256)>>, - {ok, MS} = rabbit_mixed_queue:init(q, true, mixed), + MS = rdq_new_mixed_queue(q, true, false), MS2 = lists:foldl( fun (_N, MS1) -> Msg = rabbit_basic:message(x, <<>>, [], Payload), @@ -1041,7 +1050,7 @@ rdq_test_mixed_queue_modes() -> io:format("Converted to disk only mode~n"), rdq_stop(), rdq_start(), - {ok, MS12} = rabbit_mixed_queue:init(q, true, mixed), + MS12 = rdq_new_mixed_queue(q, true, false), 10 = rabbit_mixed_queue:length(MS12), io:format("Recovered queue~n"), {MS14, AckTags} = @@ -1061,7 +1070,7 @@ rdq_test_mixed_queue_modes() -> io:format("Converted to disk only mode~n"), rdq_stop(), rdq_start(), - {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed), + MS17 = rdq_new_mixed_queue(q, true, false), 0 = rabbit_mixed_queue:length(MS17), {0,0,0} = rabbit_mixed_queue:estimate_queue_memory(MS17), io:format("Recovered queue~n"), @@ -1081,23 +1090,23 @@ rdq_test_mode_conversion_mid_txn() -> rdq_virgin(), rdq_start(), - {ok, MS0} = rabbit_mixed_queue:init(q, true, mixed), + MS0 = rdq_new_mixed_queue(q, true, false), passed = rdq_tx_publish_mixed_alter_commit_get( MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit), rdq_stop_virgin_start(), - {ok, MS1} = rabbit_mixed_queue:init(q, true, mixed), + MS1 = rdq_new_mixed_queue(q, true, false), passed = rdq_tx_publish_mixed_alter_commit_get( MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel), rdq_stop_virgin_start(), - {ok, MS2} = rabbit_mixed_queue:init(q, true, disk), + MS2 = rdq_new_mixed_queue(q, true, true), passed = rdq_tx_publish_mixed_alter_commit_get( MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit), rdq_stop_virgin_start(), - {ok, MS3} = rabbit_mixed_queue:init(q, true, disk), + MS3 = rdq_new_mixed_queue(q, true, true), passed = rdq_tx_publish_mixed_alter_commit_get( MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel), |
