diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 84 |
2 files changed, 143 insertions, 13 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 9bd917ee96..bfa449b2a0 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,6 +18,7 @@ -export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, + pre_publish/6, flush_pre_publish_cache/2, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -177,7 +178,8 @@ -record(qistate, {dir, segments, journal_handle, dirty_count, max_journal_entries, on_sync, on_sync_msg, - unconfirmed, unconfirmed_msg}). + unconfirmed, unconfirmed_msg, + pre_publish_cache, delivered_cache}). -record(segment, {num, path, journal_entries, unacked}). @@ -210,7 +212,9 @@ on_sync :: on_sync_fun(), on_sync_msg :: on_sync_fun(), unconfirmed :: gb_sets:set(), - unconfirmed_msg :: gb_sets:set() + unconfirmed_msg :: gb_sets:set(), + pre_publish_cache :: list(), + delivered_cache :: list() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -289,6 +293,66 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. +pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + pre_publish_cache = PPC, + delivered_cache = DC}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, + ?MSG_ID_BYTES = size(MsgId), + + State1 = + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State + end, + + {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), + + PPC1 = + [[<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin], PPC], + + DC1 = + case IsDelivered of + true -> + [SeqId | DC]; + false -> + DC + end, + + add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, + State1#qistate{pre_publish_cache = PPC1, delivered_cache = DC1}). + +flush_pre_publish_cache(JournalSizeHint, State) -> + State1 = flush_pre_publish_cache(State), + State2 = flush_delivered_cache(State1), + maybe_flush_journal(JournalSizeHint, State2). + +flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) -> + State; +flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) -> + {JournalHdl, State1} = get_journal_handle(State), + file_handle_cache_stats:update(queue_index_journal_write), + ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)), + State1#qistate{pre_publish_cache = []}. + +flush_delivered_cache(#qistate{delivered_cache = []} = State) -> + State; +flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> + State1 = deliver(lists:reverse(DC), State), + State1#qistate{delivered_cache = []}. + publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State = #qistate{unconfirmed = UC, unconfirmed_msg = UCM}) -> @@ -444,7 +508,9 @@ blank_state_dir(Dir) -> on_sync = fun (_) -> ok end, on_sync_msg = fun (_) -> ok end, unconfirmed = gb_sets:new(), - unconfirmed_msg = gb_sets:new() }. + unconfirmed_msg = gb_sets:new(), + pre_publish_cache = [], + delivered_cache = [] }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 691e4ce2e2..657215327e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1367,6 +1367,34 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { maybe_write_msg_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. +%% Due to certain optimizations made inside +%% rabbit_queue_index:pre_publish/6 we need to have two separate +%% functions for index persistence. This one is only used when paging +%% during memory pressure. +write_index_to_disk_paging(MsgStatus = #msg_status { + index_on_disk = true }, State) -> + {MsgStatus, State}; +write_index_to_disk_paging(MsgStatus = #msg_status { + msg = Msg, + msg_id = MsgId, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_props = MsgProps}, + State = #vqstate { disk_write_count = DiskWriteCount, + index_state = IndexState }) -> + {MsgOrId, DiskWriteCount1} = + case persist_to(MsgStatus) of + msg_store -> {MsgId, DiskWriteCount}; + queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} + end, + IndexState1 = rabbit_queue_index:pre_publish( + MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, + IndexState), + {MsgStatus#msg_status{index_on_disk = true}, + State#vqstate{index_state = IndexState1, + disk_write_count = DiskWriteCount1}}. + maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, State) -> {MsgStatus, State}; @@ -1971,7 +1999,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, delta = Delta1, q3 = Q3a }. -push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> +push_betas_to_deltas(Generator, LimitFun, Q, + {_Quota, _Delta, + #vqstate{ + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}} = PushState) -> case ?QUEUE:is_empty(Q) of true -> {Q, PushState}; @@ -1981,28 +2013,60 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) + false -> push_betas_to_deltas1(Generator, Limit, Q, + CurrRamReadyCount, CurrRamBytes, + PushState) end end. -push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) -> - {Q, PushState}; -push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, + CurrRamReadyCount, CurrRamBytes, + {0, Delta, State = + #vqstate{index_state = IndexState, + target_ram_count = TargetRamCount}}) -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Q, {0, Delta, State#vqstate{index_state = IndexState1, + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}}}; +push_betas_to_deltas1(Generator, Limit, Q, + CurrRamReadyCount, CurrRamBytes, + {Quota, Delta, State = + #vqstate{index_state = IndexState, + target_ram_count = TargetRamCount}}) -> case Generator(Q) of {empty, _Q} -> - {Q, PushState}; + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Q, {Quota, Delta, State#vqstate{index_state = IndexState1, + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}}}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {Q, PushState}; + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Q, {Quota, Delta, State#vqstate{index_state = IndexState1, + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}}}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, State1} = - maybe_write_index_to_disk(true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), + write_index_to_disk_paging(MsgStatus, State), + {Size, DeltaRam} = size_and_delta_ram(MsgStatus), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, State2}) + CurrRamReadyCount + DeltaRam, + CurrRamBytes + DeltaRam * Size, + {Quota - 1, Delta1, State1}) end. +%% Ooptimised version for paging only, based on stats/3 being called +%% like this: stats(ready0, {MsgStatus, none}, State1). +size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size}, + msg = undefined}) -> + {Size, 0}; +size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size}}) -> + {Size, -1}. + %%---------------------------------------------------------------------------- %% Upgrading %%---------------------------------------------------------------------------- |
