diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-09-05 04:23:36 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-09-05 04:23:36 +0300 |
| commit | 7ba2571e9550f9030d69bc29119bc814f935db59 (patch) | |
| tree | 85dda0694a463e86944b12071943a483d14d15b5 | |
| parent | 6a084b62c62809c5bdedabcb4b0a998cb42c6eb7 (diff) | |
| parent | 4527bea53d0e79331ecb298b056e6c2410c5e2fa (diff) | |
| download | rabbitmq-server-git-7ba2571e9550f9030d69bc29119bc814f935db59.tar.gz | |
Merge branch 'master' into rabbitmq-stomp-24
| -rw-r--r-- | src/rabbit_queue_index.erl | 84 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 75 |
2 files changed, 143 insertions, 16 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6f36d4f0dc..7c8fdf38c0 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,6 +18,7 @@ -export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, + pre_publish/7, flush_pre_publish_cache/2, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -177,7 +178,8 @@ -record(qistate, {dir, segments, journal_handle, dirty_count, max_journal_entries, on_sync, on_sync_msg, - unconfirmed, unconfirmed_msg}). + unconfirmed, unconfirmed_msg, + pre_publish_cache, delivered_cache}). -record(segment, {num, path, journal_entries, entries_to_segment, unacked}). @@ -212,7 +214,9 @@ on_sync :: on_sync_fun(), on_sync_msg :: on_sync_fun(), unconfirmed :: gb_sets:set(), - unconfirmed_msg :: gb_sets:set() + unconfirmed_msg :: gb_sets:set(), + pre_publish_cache :: list(), + delivered_cache :: list() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -291,6 +295,78 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. +pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + pre_publish_cache = PPC, + delivered_cache = DC}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, + ?MSG_ID_BYTES = size(MsgId), + + State1 = + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State + end, + + {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), + + PPC1 = + [[<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin], PPC], + + DC1 = + case IsDelivered of + true -> + [SeqId | DC]; + false -> + DC + end, + + add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, + maybe_flush_pre_publish_cache( + JournalSizeHint, + State1#qistate{pre_publish_cache = PPC1, + delivered_cache = DC1})). + +%% pre_publish_cache is the entry with most elements when comapred to +%% delivered_cache so we only check the former in the guard. +maybe_flush_pre_publish_cache(JournalSizeHint, + #qistate{pre_publish_cache = PPC} = State) + when length(PPC) >= ?SEGMENT_ENTRY_COUNT -> + flush_pre_publish_cache(JournalSizeHint, State); +maybe_flush_pre_publish_cache(_JournalSizeHint, State) -> + State. + +flush_pre_publish_cache(JournalSizeHint, State) -> + State1 = flush_pre_publish_cache(State), + State2 = flush_delivered_cache(State1), + maybe_flush_journal(JournalSizeHint, State2). + +flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) -> + State; +flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) -> + {JournalHdl, State1} = get_journal_handle(State), + file_handle_cache_stats:update(queue_index_journal_write), + ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)), + State1#qistate{pre_publish_cache = []}. + +flush_delivered_cache(#qistate{delivered_cache = []} = State) -> + State; +flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> + State1 = deliver(lists:reverse(DC), State), + State1#qistate{delivered_cache = []}. + publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State = #qistate{unconfirmed = UC, unconfirmed_msg = UCM}) -> @@ -446,7 +522,9 @@ blank_state_dir(Dir) -> on_sync = fun (_) -> ok end, on_sync_msg = fun (_) -> ok end, unconfirmed = gb_sets:new(), - unconfirmed_msg = gb_sets:new() }. + unconfirmed_msg = gb_sets:new(), + pre_publish_cache = [], + delivered_cache = [] }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 859dc6051c..41a13ac9b6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1440,6 +1440,43 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { maybe_write_msg_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. +%% Due to certain optimizations made inside +%% rabbit_queue_index:pre_publish/7 we need to have two separate +%% functions for index persistence. This one is only used when paging +%% during memory pressure. We didn't want to modify +%% maybe_write_index_to_disk/3 because that function is used in other +%% places. +maybe_batch_write_index_to_disk(_Force, + MsgStatus = #msg_status { + index_on_disk = true }, State) -> + {MsgStatus, State}; +maybe_batch_write_index_to_disk(Force, + MsgStatus = #msg_status { + msg = Msg, + msg_id = MsgId, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_props = MsgProps}, + State = #vqstate { + target_ram_count = TargetRamCount, + disk_write_count = DiskWriteCount, + index_state = IndexState}) + when Force orelse IsPersistent -> + {MsgOrId, DiskWriteCount1} = + case persist_to(MsgStatus) of + msg_store -> {MsgId, DiskWriteCount}; + queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} + end, + IndexState1 = rabbit_queue_index:pre_publish( + MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, + TargetRamCount, IndexState), + {MsgStatus#msg_status{index_on_disk = true}, + State#vqstate{index_state = IndexState1, + disk_write_count = DiskWriteCount1}}; +maybe_batch_write_index_to_disk(_Force, MsgStatus, State) -> + {MsgStatus, State}. + maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, State) -> {MsgStatus, State}; @@ -1474,6 +1511,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). +maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> + {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), + maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). + determine_persist_to(#basic_message{ content = #content{properties = Props, properties_bin = PropsBin}}, @@ -1861,16 +1902,16 @@ reduce_memory_use(State = #vqstate { end. limit_ram_acks(0, State) -> - {0, State}; + {0, ui(State)}; limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> - {Quota, State}; + {Quota, ui(State)}; false -> {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = - maybe_write_to_disk(true, false, MsgStatus, State), + maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, @@ -2008,16 +2049,17 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, when Quota =:= 0 orelse TargetRamCount =:= infinity orelse TargetRamCount >= RamMsgCount -> - {Quota, State}; + {Quota, ui(State)}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case credit_flow:blocked() of - true -> {Quota, State}; + true -> {Quota, ui(State)}; false -> case Generator(Q) of {empty, _Q} -> - {Quota, State}; + {Quota, ui(State)}; {{value, MsgStatus}, Qa} -> {MsgStatus1, State1} = - maybe_write_to_disk(true, false, MsgStatus, State), + maybe_prepare_write_to_disk(true, false, MsgStatus, + State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), State2 = stats( ready0, {MsgStatus, MsgStatus2}, State1), @@ -2058,24 +2100,31 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end end. -push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) -> - {Q, PushState}; -push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) -> + {Q, {0, Delta, ui(State)}}; +push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> case Generator(Q) of {empty, _Q} -> - {Q, PushState}; + {Q, {Quota, Delta, ui(State)}}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {Q, PushState}; + {Q, {Quota, Delta, ui(State)}}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, State1} = - maybe_write_index_to_disk(true, MsgStatus, State), + maybe_batch_write_index_to_disk(true, MsgStatus, State), State2 = stats(ready0, {MsgStatus, none}, State1), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, {Quota - 1, Delta1, State2}) end. +%% Flushes queue index batch caches and updates queue index state. +ui(#vqstate{index_state = IndexState, + target_ram_count = TargetRamCount} = State) -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + State#vqstate{index_state = IndexState1}. + %%---------------------------------------------------------------------------- %% Upgrading %%---------------------------------------------------------------------------- |
