summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_disk_queue.erl44
-rw-r--r--src/rabbit_mixed_queue.erl106
3 files changed, 100 insertions, 54 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1184122022..aab336ca7c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -841,7 +841,9 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
noreply(State #q { mixed_state = MS1 });
handle_cast(report_memory, State) ->
- {noreply, (report_memory(false, State)) #q { memory_report_timer = undefined }, binary}.
+ %% deliberately don't call noreply/2 as we don't want to restart the timer
+ {noreply, (report_memory(false, State))
+ #q { memory_report_timer = undefined }, binary}.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 84fbd76018..86a47c38bd 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -92,7 +92,9 @@
last_sync_offset, %% current_offset at the last time we sync'd
message_cache, %% ets message cache
memory_report_timer, %% TRef for the memory report timer
- wordsize %% bytes in a word on this platform
+ wordsize, %% bytes in a word on this platform
+ mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode
+ ets_bytes_per_record %% bytes per record in msg_location_ets
}).
%% The components:
@@ -416,7 +418,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
message_cache = ets:new(?CACHE_ETS_NAME,
[set, private]),
memory_report_timer = TRef,
- wordsize = erlang:system_info(wordsize)
+ wordsize = erlang:system_info(wordsize),
+ mnesia_bytes_per_record = undefined,
+ ets_bytes_per_record = undefined
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -518,7 +522,7 @@ handle_cast({set_mode, Mode}, State) ->
end)(State));
handle_cast(report_memory, State) ->
Bytes = memory_use(State),
- rabbit_queue_mode_manager:report_memory(self(), 2 * Bytes),
+ rabbit_queue_mode_manager:report_memory(self(), 2.5 * Bytes),
noreply(State).
handle_info({'EXIT', _Pid, Reason}, State) ->
@@ -568,34 +572,51 @@ memory_use(#dqstate { operation_mode = ram_disk,
file_summary = FileSummary,
sequences = Sequences,
msg_location_ets = MsgLocationEts,
+ message_cache = Cache,
wordsize = WordSize
}) ->
WordSize * (mnesia:table_info(rabbit_disk_queue, memory) +
ets:info(MsgLocationEts, memory) +
ets:info(FileSummary, memory) +
+ ets:info(Cache, memory) +
ets:info(Sequences, memory));
memory_use(#dqstate { operation_mode = disk_only,
file_summary = FileSummary,
sequences = Sequences,
msg_location_dets = MsgLocationDets,
- wordsize = WordSize
- }) ->
+ message_cache = Cache,
+ wordsize = WordSize,
+ mnesia_bytes_per_record = MnesiaBytesPerRecord,
+ ets_bytes_per_record = EtsBytesPerRecord }) ->
+ MnesiaSizeEstimate =
+ mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord,
+ MsgLocationSizeEstimate =
+ dets:info(MsgLocationDets, size) * EtsBytesPerRecord,
(WordSize * (ets:info(FileSummary, memory) +
+ ets:info(Cache, memory) +
ets:info(Sequences, memory))) +
- mnesia:table_info(rabbit_disk_queue, memory) +
- dets:info(MsgLocationDets, memory).
+ round(MnesiaSizeEstimate) +
+ round(MsgLocationSizeEstimate).
to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
State;
to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
+ msg_location_ets = MsgLocationEts,
+ wordsize = WordSize }) ->
rabbit_log:info("Converting disk queue to disk only mode~n", []),
+ MnesiaMemoryBytes = WordSize * mnesia:table_info(rabbit_disk_queue, memory),
+ MnesiaSize = lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]),
+ EtsMemoryBytes = WordSize * ets:info(MsgLocationEts, memory),
+ EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_only_copies),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
true = ets:delete_all_objects(MsgLocationEts),
- State #dqstate { operation_mode = disk_only }.
+ garbage_collect(),
+ State #dqstate { operation_mode = disk_only,
+ mnesia_bytes_per_record = MnesiaMemoryBytes / MnesiaSize,
+ ets_bytes_per_record = EtsMemoryBytes / EtsSize }.
to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) ->
State;
@@ -607,7 +628,10 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
- State #dqstate { operation_mode = ram_disk }.
+ garbage_collect(),
+ State #dqstate { operation_mode = ram_disk,
+ mnesia_bytes_per_record = undefined,
+ ets_bytes_per_record = undefined }.
noreply(NewState = #dqstate { on_sync_froms = [], timer_ref = undefined }) ->
{noreply, NewState, infinity};
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index df241a6db8..f415472719 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -53,6 +53,8 @@
}
).
+-define(TO_DISK_MAX_FLUSH_SIZE, 100000).
+
-ifdef(use_specs).
-type(mode() :: ( 'disk' | 'mixed' )).
@@ -124,48 +126,7 @@ to_disk_only_mode(TxnMessages, State =
%% message on disk.
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
- Msgs = queue:to_list(MsgBuf),
- {Requeue, TxPublish} =
- lists:foldl(
- fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
- {RQueueAcc, TxPublishAcc}) ->
- case OnDisk of
- true ->
- ok = rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []),
- {MsgId, IsDelivered, AckTag, _PersistRemaining} =
- rabbit_disk_queue:phantom_deliver(Q),
- {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []};
- false ->
- ok = if [] == RQueueAcc -> ok;
- true ->
- rabbit_disk_queue:requeue_with_seqs(
- Q, lists:reverse(RQueueAcc))
- end,
- ok = rabbit_disk_queue:tx_publish(Msg),
- {[], [ MsgId | TxPublishAcc ]}
- end;
- ({disk, Count}, {RQueueAcc, TxPublishAcc}) ->
- ok = if [] == TxPublishAcc -> ok;
- true ->
- rabbit_disk_queue:tx_commit(Q, TxPublishAcc, [])
- end,
- {RQueueAcc1, 0} =
- rabbit_misc:unfold(
- fun (0) -> false;
- (N) ->
- {_MsgId, IsDelivered, AckTag, _PersistRemaining}
- = rabbit_disk_queue:phantom_deliver(Q),
- {true, {AckTag, {next, IsDelivered}}, N - 1}
- end, Count),
- {RQueueAcc1 ++ RQueueAcc, []}
- end, {[], []}, Msgs),
- ok = if [] == TxPublish -> ok;
- true -> rabbit_disk_queue:tx_commit(Q, TxPublish, [])
- end,
- ok = if [] == Requeue -> ok;
- true ->
- rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
- end,
+ ok = send_messages_to_disk(Q, MsgBuf, [], 0, []),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -177,8 +138,64 @@ to_disk_only_mode(TxnMessages, State =
_ -> rabbit_disk_queue:tx_publish(Msg)
end
end, TxnMessages),
+ garbage_collect(),
{ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
+send_messages_to_disk(Q, Queue, Requeue, PublishCount, Commit) ->
+ case queue:out(Queue) of
+ {empty, Queue} ->
+ ok = flush_messages_to_disk_queue(Q, Commit),
+ [] = flush_requeue_to_disk_queue(Q, Requeue, []),
+ ok;
+ {{value, {Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}},
+ Queue1} ->
+ case OnDisk of
+ true ->
+ ok = flush_messages_to_disk_queue (Q, Commit),
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ send_messages_to_disk(
+ Q, Queue1, [{AckTag, {next, IsDelivered}} | Requeue],
+ 0, []);
+ false ->
+ Commit1 =
+ flush_requeue_to_disk_queue(Q, Requeue, Commit),
+ ok = rabbit_disk_queue:tx_publish(Msg),
+ case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
+ true ->
+ ok = flush_messages_to_disk_queue(Q, Commit1),
+ send_messages_to_disk(Q, Queue1, [], 1, [MsgId]);
+ false ->
+ send_messages_to_disk
+ (Q, Queue1, [], PublishCount + 1,
+ [MsgId | Commit1])
+ end
+ end;
+ {{value, {disk, Count}}, Queue2} ->
+ ok = flush_messages_to_disk_queue(Q, Commit),
+ {Requeue1, 0} =
+ rabbit_misc:unfold(
+ fun (0) -> false;
+ (N) ->
+ {_MsgId, IsDelivered, AckTag, _PersistRemaining}
+ = rabbit_disk_queue:phantom_deliver(Q),
+ {true, {AckTag, {next, IsDelivered}}, N - 1}
+ end, Count),
+ send_messages_to_disk(Q, Queue2, Requeue1 ++ Requeue, 0, [])
+ end.
+
+flush_messages_to_disk_queue(Q, Commit) ->
+ ok = if [] == Commit -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), [])
+ end.
+
+flush_requeue_to_disk_queue(Q, Requeue, Commit) ->
+ if [] == Requeue -> Commit;
+ true -> ok = rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []),
+ rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)),
+ []
+ end.
+
to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) ->
{ok, State};
to_mixed_mode(TxnMessages, State =
@@ -204,7 +221,10 @@ to_mixed_mode(TxnMessages, State =
_ -> [Msg #basic_message.guid | Acc]
end
end, [], TxnMessages),
- ok = rabbit_disk_queue:tx_cancel(Cancel),
+ ok = if Cancel == [] -> ok;
+ true -> rabbit_disk_queue:tx_cancel(Cancel)
+ end,
+ garbage_collect(),
{ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,