diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 9 |
2 files changed, 24 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7158058adb..29103de527 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,7 +18,7 @@ -export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, - pre_publish/6, flush_pre_publish_cache/2, + pre_publish/7, flush_pre_publish_cache/2, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -295,11 +295,11 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. -pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, - State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM, - pre_publish_cache = PPC, - delivered_cache = DC}) -> +pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + pre_publish_cache = PPC, + delivered_cache = DC}) -> MsgId = case MsgOrId of #basic_message{id = Id} -> Id; Id when is_binary(Id) -> Id @@ -334,7 +334,19 @@ pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, end, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, - State1#qistate{pre_publish_cache = PPC1, delivered_cache = DC1}). + maybe_flush_pre_publish_cache( + JournalSizeHint, + State1#qistate{pre_publish_cache = PPC1, + delivered_cache = DC1})). + +%% pre_publish_cache is the entry with most elements when comapred to +%% delivered_cache so we only check the former in the guard. +maybe_flush_pre_publish_cache(JournalSizeHint, + #qistate{pre_publish_cache = PPC} = State) + when length(PPC) >= ?SEGMENT_ENTRY_COUNT -> + flush_pre_publish_cache(JournalSizeHint, State); +maybe_flush_pre_publish_cache(_JournalSizeHint, State) -> + State. flush_pre_publish_cache(JournalSizeHint, State) -> State1 = flush_pre_publish_cache(State), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5c3ce90326..6ea4d332e3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1372,7 +1372,7 @@ maybe_write_msg_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. %% Due to certain optimizations made inside -%% rabbit_queue_index:pre_publish/6 we need to have two separate +%% rabbit_queue_index:pre_publish/7 we need to have two separate %% functions for index persistence. This one is only used when paging %% during memory pressure. We didn't want to modify %% maybe_write_index_to_disk/3 because that function is used in other @@ -1390,8 +1390,9 @@ maybe_batch_write_index_to_disk(Force, is_delivered = IsDelivered, msg_props = MsgProps}, State = #vqstate { - disk_write_count = DiskWriteCount, - index_state = IndexState }) + target_ram_count = TargetRamCount, + disk_write_count = DiskWriteCount, + index_state = IndexState}) when Force orelse IsPersistent -> {MsgOrId, DiskWriteCount1} = case persist_to(MsgStatus) of @@ -1400,7 +1401,7 @@ maybe_batch_write_index_to_disk(Force, end, IndexState1 = rabbit_queue_index:pre_publish( MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, - IndexState), + TargetRamCount, IndexState), {MsgStatus#msg_status{index_on_disk = true}, State#vqstate{index_state = IndexState1, disk_write_count = DiskWriteCount1}}; |
