diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-27 11:41:25 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-27 11:41:25 +0000 |
| commit | 779c886a4fefd526dd2834df20ad25812f87902a (patch) | |
| tree | 9d7ec7282323c87a85a882b4fc471582e8f83dee | |
| parent | 3a0e16bbc9ba25c7edb43d11de5d721ca590db4b (diff) | |
| download | rabbitmq-server-git-779c886a4fefd526dd2834df20ad25812f87902a.tar.gz | |
Limit journal size ro target_ram_count or queue_index_max_journal_entries, whichever is lower.
| -rw-r--r-- | src/rabbit_queue_index.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 58 |
2 files changed, 41 insertions, 39 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c58f00ce0f..24dcd23cc8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,7 +18,7 @@ -export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, + publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). @@ -226,9 +226,9 @@ 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), - rabbit_types:message_properties(), boolean(), qistate()) - -> qistate()). +-spec(publish/6 :: (rabbit_types:msg_id(), seq_id(), + rabbit_types:message_properties(), boolean(), + non_neg_integer(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/1 :: (qistate()) -> qistate()). @@ -288,7 +288,7 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. -publish(MsgOrId, SeqId, MsgProps, IsPersistent, +publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State = #qistate{unconfirmed = UC, unconfirmed_msg = UCM}) -> MsgId = case MsgOrId of @@ -315,6 +315,7 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, SeqId:?SEQ_BITS, Bin/binary, (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]), maybe_flush_journal( + JournalSizeHint, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). deliver(SeqIds, State) -> @@ -674,11 +675,14 @@ add_to_journal(RelSeq, Action, JEntries) -> array:reset(RelSeq, JEntries) end. -maybe_flush_journal(State = #qistate { dirty_count = DCount, - max_journal_entries = MaxJournal }) - when DCount > MaxJournal -> - flush_journal(State); maybe_flush_journal(State) -> + maybe_flush_journal(infinity, State). + +maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount, + max_journal_entries = MaxJournal }) + when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) -> + flush_journal(State); +maybe_flush_journal(_Hint, State) -> State. flush_journal(State = #qistate { segments = Segments }) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index acdd4c7521..cf18e8ea6a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1351,33 +1351,35 @@ maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { - index_on_disk = true }, IndexState) -> - {MsgStatus, IndexState}; + index_on_disk = true }, State) -> + {MsgStatus, State}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_props = MsgProps}, IndexState) + msg_props = MsgProps}, + State = #vqstate{target_ram_count = TargetRamCount, + index_state = IndexState}) when Force orelse IsPersistent -> IndexState1 = rabbit_queue_index:publish( case persist_to(MsgStatus) of msg_store -> MsgId; queue_index -> prepare_to_store(Msg) - end, SeqId, MsgProps, IsPersistent, IndexState), + end, SeqId, MsgProps, IsPersistent, TargetRamCount, + IndexState), + IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), {MsgStatus#msg_status{index_on_disk = true}, - maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; -maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> - {MsgStatus, IndexState}. + State#vqstate{index_state = IndexState2}}; + +maybe_write_index_to_disk(_Force, MsgStatus, State) -> + {MsgStatus, State}. maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> + State = #vqstate { msg_store_clients = MSCState }) -> MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = - maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1 }}. + maybe_write_index_to_disk(ForceIndex, MsgStatus1, State). determine_persist_to(#basic_message{ content = #content{properties = Props, @@ -1930,11 +1932,10 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> end end. -push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState }) -> - PushState = {Quota, Delta, IndexState, State}, +push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3}) -> + PushState = {Quota, Delta, State}, {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, @@ -1943,11 +1944,10 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, Q2, PushState1), - {_, Delta1, IndexState1, State1} = PushState2, - State1 #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1 }. + {_, Delta1, State1} = PushState2, + State1 #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a }. push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of @@ -1963,11 +1963,9 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end end. -push_betas_to_deltas1(_Generator, _Limit, Q, - {0, _Delta, _IndexState, _State} = PushState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) -> {Q, PushState}; -push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, IndexState, State} = PushState) -> +push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1975,12 +1973,12 @@ push_betas_to_deltas1(Generator, Limit, Q, when SeqId < Limit -> {Q, PushState}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(true, MsgStatus, IndexState), - State1 = stats(ready0, {MsgStatus, none}, State), + {#msg_status { index_on_disk = true }, State1} = + maybe_write_index_to_disk(true, MsgStatus, State), + State2 = stats(ready0, {MsgStatus, none}, State1), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, IndexState1, State1}) + {Quota - 1, Delta1, State2}) end. %%---------------------------------------------------------------------------- |
