diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 106 |
3 files changed, 100 insertions, 54 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1184122022..aab336ca7c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -841,7 +841,9 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> noreply(State #q { mixed_state = MS1 }); handle_cast(report_memory, State) -> - {noreply, (report_memory(false, State)) #q { memory_report_timer = undefined }, binary}. + %% deliberately don't call noreply/2 as we don't want to restart the timer + {noreply, (report_memory(false, State)) + #q { memory_report_timer = undefined }, binary}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 84fbd76018..86a47c38bd 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -92,7 +92,9 @@ last_sync_offset, %% current_offset at the last time we sync'd message_cache, %% ets message cache memory_report_timer, %% TRef for the memory report timer - wordsize %% bytes in a word on this platform + wordsize, %% bytes in a word on this platform + mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode + ets_bytes_per_record %% bytes per record in msg_location_ets }). %% The components: @@ -416,7 +418,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> message_cache = ets:new(?CACHE_ETS_NAME, [set, private]), memory_report_timer = TRef, - wordsize = erlang:system_info(wordsize) + wordsize = erlang:system_info(wordsize), + mnesia_bytes_per_record = undefined, + ets_bytes_per_record = undefined }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -518,7 +522,7 @@ handle_cast({set_mode, Mode}, State) -> end)(State)); handle_cast(report_memory, State) -> Bytes = memory_use(State), - rabbit_queue_mode_manager:report_memory(self(), 2 * Bytes), + rabbit_queue_mode_manager:report_memory(self(), 2.5 * Bytes), noreply(State). handle_info({'EXIT', _Pid, Reason}, State) -> @@ -568,34 +572,51 @@ memory_use(#dqstate { operation_mode = ram_disk, file_summary = FileSummary, sequences = Sequences, msg_location_ets = MsgLocationEts, + message_cache = Cache, wordsize = WordSize }) -> WordSize * (mnesia:table_info(rabbit_disk_queue, memory) + ets:info(MsgLocationEts, memory) + ets:info(FileSummary, memory) + + ets:info(Cache, memory) + ets:info(Sequences, memory)); memory_use(#dqstate { operation_mode = disk_only, file_summary = FileSummary, sequences = Sequences, msg_location_dets = MsgLocationDets, - wordsize = WordSize - }) -> + message_cache = Cache, + wordsize = WordSize, + mnesia_bytes_per_record = MnesiaBytesPerRecord, + ets_bytes_per_record = EtsBytesPerRecord }) -> + MnesiaSizeEstimate = + mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord, + MsgLocationSizeEstimate = + dets:info(MsgLocationDets, size) * EtsBytesPerRecord, (WordSize * (ets:info(FileSummary, memory) + + ets:info(Cache, memory) + ets:info(Sequences, memory))) + - mnesia:table_info(rabbit_disk_queue, memory) + - dets:info(MsgLocationDets, memory). + round(MnesiaSizeEstimate) + + round(MsgLocationSizeEstimate). to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) -> State; to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> + msg_location_ets = MsgLocationEts, + wordsize = WordSize }) -> rabbit_log:info("Converting disk queue to disk only mode~n", []), + MnesiaMemoryBytes = WordSize * mnesia:table_info(rabbit_disk_queue, memory), + MnesiaSize = lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]), + EtsMemoryBytes = WordSize * ets:info(MsgLocationEts, memory), + EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]), {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), ok = dets:from_ets(MsgLocationDets, MsgLocationEts), true = ets:delete_all_objects(MsgLocationEts), - State #dqstate { operation_mode = disk_only }. + garbage_collect(), + State #dqstate { operation_mode = disk_only, + mnesia_bytes_per_record = MnesiaMemoryBytes / MnesiaSize, + ets_bytes_per_record = EtsMemoryBytes / EtsSize }. to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) -> State; @@ -607,7 +628,10 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, disc_copies), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), - State #dqstate { operation_mode = ram_disk }. + garbage_collect(), + State #dqstate { operation_mode = ram_disk, + mnesia_bytes_per_record = undefined, + ets_bytes_per_record = undefined }. noreply(NewState = #dqstate { on_sync_froms = [], timer_ref = undefined }) -> {noreply, NewState, infinity}; diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index df241a6db8..f415472719 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -53,6 +53,8 @@ } ). +-define(TO_DISK_MAX_FLUSH_SIZE, 100000). + -ifdef(use_specs). -type(mode() :: ( 'disk' | 'mixed' )). @@ -124,48 +126,7 @@ to_disk_only_mode(TxnMessages, State = %% message on disk. %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. - Msgs = queue:to_list(MsgBuf), - {Requeue, TxPublish} = - lists:foldl( - fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}, - {RQueueAcc, TxPublishAcc}) -> - case OnDisk of - true -> - ok = rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []), - {MsgId, IsDelivered, AckTag, _PersistRemaining} = - rabbit_disk_queue:phantom_deliver(Q), - {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []}; - false -> - ok = if [] == RQueueAcc -> ok; - true -> - rabbit_disk_queue:requeue_with_seqs( - Q, lists:reverse(RQueueAcc)) - end, - ok = rabbit_disk_queue:tx_publish(Msg), - {[], [ MsgId | TxPublishAcc ]} - end; - ({disk, Count}, {RQueueAcc, TxPublishAcc}) -> - ok = if [] == TxPublishAcc -> ok; - true -> - rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []) - end, - {RQueueAcc1, 0} = - rabbit_misc:unfold( - fun (0) -> false; - (N) -> - {_MsgId, IsDelivered, AckTag, _PersistRemaining} - = rabbit_disk_queue:phantom_deliver(Q), - {true, {AckTag, {next, IsDelivered}}, N - 1} - end, Count), - {RQueueAcc1 ++ RQueueAcc, []} - end, {[], []}, Msgs), - ok = if [] == TxPublish -> ok; - true -> rabbit_disk_queue:tx_commit(Q, TxPublish, []) - end, - ok = if [] == Requeue -> ok; - true -> - rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) - end, + ok = send_messages_to_disk(Q, MsgBuf, [], 0, []), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -177,8 +138,64 @@ to_disk_only_mode(TxnMessages, State = _ -> rabbit_disk_queue:tx_publish(Msg) end end, TxnMessages), + garbage_collect(), {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}. +send_messages_to_disk(Q, Queue, Requeue, PublishCount, Commit) -> + case queue:out(Queue) of + {empty, Queue} -> + ok = flush_messages_to_disk_queue(Q, Commit), + [] = flush_requeue_to_disk_queue(Q, Requeue, []), + ok; + {{value, {Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}}, + Queue1} -> + case OnDisk of + true -> + ok = flush_messages_to_disk_queue (Q, Commit), + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + send_messages_to_disk( + Q, Queue1, [{AckTag, {next, IsDelivered}} | Requeue], + 0, []); + false -> + Commit1 = + flush_requeue_to_disk_queue(Q, Requeue, Commit), + ok = rabbit_disk_queue:tx_publish(Msg), + case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of + true -> + ok = flush_messages_to_disk_queue(Q, Commit1), + send_messages_to_disk(Q, Queue1, [], 1, [MsgId]); + false -> + send_messages_to_disk + (Q, Queue1, [], PublishCount + 1, + [MsgId | Commit1]) + end + end; + {{value, {disk, Count}}, Queue2} -> + ok = flush_messages_to_disk_queue(Q, Commit), + {Requeue1, 0} = + rabbit_misc:unfold( + fun (0) -> false; + (N) -> + {_MsgId, IsDelivered, AckTag, _PersistRemaining} + = rabbit_disk_queue:phantom_deliver(Q), + {true, {AckTag, {next, IsDelivered}}, N - 1} + end, Count), + send_messages_to_disk(Q, Queue2, Requeue1 ++ Requeue, 0, []) + end. + +flush_messages_to_disk_queue(Q, Commit) -> + ok = if [] == Commit -> ok; + true -> rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []) + end. + +flush_requeue_to_disk_queue(Q, Requeue, Commit) -> + if [] == Requeue -> Commit; + true -> ok = rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []), + rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)), + [] + end. + to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) -> {ok, State}; to_mixed_mode(TxnMessages, State = @@ -204,7 +221,10 @@ to_mixed_mode(TxnMessages, State = _ -> [Msg #basic_message.guid | Acc] end end, [], TxnMessages), - ok = rabbit_disk_queue:tx_cancel(Cancel), + ok = if Cancel == [] -> ok; + true -> rabbit_disk_queue:tx_cancel(Cancel) + end, + garbage_collect(), {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, |
