summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-27 11:41:25 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-27 11:41:25 +0000
commit779c886a4fefd526dd2834df20ad25812f87902a (patch)
tree9d7ec7282323c87a85a882b4fc471582e8f83dee
parent3a0e16bbc9ba25c7edb43d11de5d721ca590db4b (diff)
downloadrabbitmq-server-git-779c886a4fefd526dd2834df20ad25812f87902a.tar.gz
Limit journal size ro target_ram_count or queue_index_max_journal_entries, whichever is lower.
-rw-r--r--src/rabbit_queue_index.erl22
-rw-r--r--src/rabbit_variable_queue.erl58
2 files changed, 41 insertions, 39 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c58f00ce0f..24dcd23cc8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,7 +18,7 @@
-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
+ publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
@@ -226,9 +226,9 @@
'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
- rabbit_types:message_properties(), boolean(), qistate())
- -> qistate()).
+-spec(publish/6 :: (rabbit_types:msg_id(), seq_id(),
+ rabbit_types:message_properties(), boolean(),
+ non_neg_integer(), qistate()) -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
@@ -288,7 +288,7 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgOrId, SeqId, MsgProps, IsPersistent,
+publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
State = #qistate{unconfirmed = UC,
unconfirmed_msg = UCM}) ->
MsgId = case MsgOrId of
@@ -315,6 +315,7 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent,
SeqId:?SEQ_BITS, Bin/binary,
(size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
maybe_flush_journal(
+ JournalSizeHint,
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
deliver(SeqIds, State) ->
@@ -674,11 +675,14 @@ add_to_journal(RelSeq, Action, JEntries) ->
array:reset(RelSeq, JEntries)
end.
-maybe_flush_journal(State = #qistate { dirty_count = DCount,
- max_journal_entries = MaxJournal })
- when DCount > MaxJournal ->
- flush_journal(State);
maybe_flush_journal(State) ->
+ maybe_flush_journal(infinity, State).
+
+maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount,
+ max_journal_entries = MaxJournal })
+ when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) ->
+ flush_journal(State);
+maybe_flush_journal(_Hint, State) ->
State.
flush_journal(State = #qistate { segments = Segments }) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index acdd4c7521..cf18e8ea6a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1351,33 +1351,35 @@ maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
- index_on_disk = true }, IndexState) ->
- {MsgStatus, IndexState};
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_props = MsgProps}, IndexState)
+ msg_props = MsgProps},
+ State = #vqstate{target_ram_count = TargetRamCount,
+ index_state = IndexState})
when Force orelse IsPersistent ->
IndexState1 = rabbit_queue_index:publish(
case persist_to(MsgStatus) of
msg_store -> MsgId;
queue_index -> prepare_to_store(Msg)
- end, SeqId, MsgProps, IsPersistent, IndexState),
+ end, SeqId, MsgProps, IsPersistent, TargetRamCount,
+ IndexState),
+ IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
{MsgStatus#msg_status{index_on_disk = true},
- maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
-maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
- {MsgStatus, IndexState}.
+ State#vqstate{index_state = IndexState2}};
+
+maybe_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
- State = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState }) ->
+ State = #vqstate { msg_store_clients = MSCState }) ->
MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
- {MsgStatus2, IndexState1} =
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
- {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, State).
determine_persist_to(#basic_message{
content = #content{properties = Props,
@@ -1930,11 +1932,10 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
end
end.
-push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState }) ->
- PushState = {Quota, Delta, IndexState, State},
+push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
{Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
@@ -1943,11 +1944,10 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
Q2, PushState1),
- {_, Delta1, IndexState1, State1} = PushState2,
- State1 #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1 }.
+ {_, Delta1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a }.
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
@@ -1963,11 +1963,9 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end
end.
-push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, _Delta, _IndexState, _State} = PushState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) ->
{Q, PushState};
-push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, IndexState, State} = PushState) ->
+push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1975,12 +1973,12 @@ push_betas_to_deltas1(Generator, Limit, Q,
when SeqId < Limit ->
{Q, PushState};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
- {#msg_status { index_on_disk = true }, IndexState1} =
- maybe_write_index_to_disk(true, MsgStatus, IndexState),
- State1 = stats(ready0, {MsgStatus, none}, State),
+ {#msg_status { index_on_disk = true }, State1} =
+ maybe_write_index_to_disk(true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, State1),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, IndexState1, State1})
+ {Quota - 1, Delta1, State2})
end.
%%----------------------------------------------------------------------------