summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_variable_queue.erl9
2 files changed, 24 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 7158058adb..29103de527 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,7 +18,7 @@
-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
- pre_publish/6, flush_pre_publish_cache/2,
+ pre_publish/7, flush_pre_publish_cache/2,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
@@ -295,11 +295,11 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
- State = #qistate{unconfirmed = UC,
- unconfirmed_msg = UCM,
- pre_publish_cache = PPC,
- delivered_cache = DC}) ->
+pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ pre_publish_cache = PPC,
+ delivered_cache = DC}) ->
MsgId = case MsgOrId of
#basic_message{id = Id} -> Id;
Id when is_binary(Id) -> Id
@@ -334,7 +334,19 @@ pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
end,
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin},
- State1#qistate{pre_publish_cache = PPC1, delivered_cache = DC1}).
+ maybe_flush_pre_publish_cache(
+ JournalSizeHint,
+ State1#qistate{pre_publish_cache = PPC1,
+ delivered_cache = DC1})).
+
+%% pre_publish_cache is the entry with most elements when comapred to
+%% delivered_cache so we only check the former in the guard.
+maybe_flush_pre_publish_cache(JournalSizeHint,
+ #qistate{pre_publish_cache = PPC} = State)
+ when length(PPC) >= ?SEGMENT_ENTRY_COUNT ->
+ flush_pre_publish_cache(JournalSizeHint, State);
+maybe_flush_pre_publish_cache(_JournalSizeHint, State) ->
+ State.
flush_pre_publish_cache(JournalSizeHint, State) ->
State1 = flush_pre_publish_cache(State),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5c3ce90326..6ea4d332e3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1372,7 +1372,7 @@ maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
{MsgStatus, State}.
%% Due to certain optimizations made inside
-%% rabbit_queue_index:pre_publish/6 we need to have two separate
+%% rabbit_queue_index:pre_publish/7 we need to have two separate
%% functions for index persistence. This one is only used when paging
%% during memory pressure. We didn't want to modify
%% maybe_write_index_to_disk/3 because that function is used in other
@@ -1390,8 +1390,9 @@ maybe_batch_write_index_to_disk(Force,
is_delivered = IsDelivered,
msg_props = MsgProps},
State = #vqstate {
- disk_write_count = DiskWriteCount,
- index_state = IndexState })
+ target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
when Force orelse IsPersistent ->
{MsgOrId, DiskWriteCount1} =
case persist_to(MsgStatus) of
@@ -1400,7 +1401,7 @@ maybe_batch_write_index_to_disk(Force,
end,
IndexState1 = rabbit_queue_index:pre_publish(
MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
- IndexState),
+ TargetRamCount, IndexState),
{MsgStatus#msg_status{index_on_disk = true},
State#vqstate{index_state = IndexState1,
disk_write_count = DiskWriteCount1}};