summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-07 14:26:20 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-07 14:26:20 +0100
commit37b3778664a83f5d632a0a73110aa60d55e03e06 (patch)
tree7f7bc177a76d9980f2d322d54529f4208bf66dd4 /src
parent6978ff64d9d952603f5ac3859d65a332eaa4a876 (diff)
parent5f810eca3ff2ec81a4760e0ad1e3c5ed71711918 (diff)
downloadrabbitmq-server-git-37b3778664a83f5d632a0a73110aa60d55e03e06.tar.gz
merge bug22896 into bug21673
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_backing_queue.erl17
-rw-r--r--src/rabbit_invariable_queue.erl8
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl182
5 files changed, 136 insertions, 92 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2fb60e9675..a2cbcf5517 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -179,7 +179,7 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
- case BQ:needs_sync(BQS)of
+ case BQ:needs_idle_timeout(BQS)of
true -> {ensure_sync_timer(State1), 0};
false -> {stop_sync_timer(State1), hibernate}
end.
@@ -188,7 +188,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:sync(BQS) end]),
+ [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -822,7 +822,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:sync(BQS) end, State));
+ fun (BQS) -> BQ:idle_timeout(BQS) end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 432d62900b..b76ae11ebc 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -113,14 +113,15 @@ behaviour_info(callbacks) ->
%% queue.
{ram_duration, 1},
- %% Should 'sync' be called as soon as the queue process can
- %% manage (either on an empty mailbox, or when a timer fires)?
- {needs_sync, 1},
-
- %% Called (eventually) after needs_sync returns 'true'. Note this
- %% may be called more than once for each 'true' returned from
- %% needs_sync.
- {sync, 1},
+ %% Should 'idle_timeout' be called as soon as the queue process
+ %% can manage (either on an empty mailbox, or when a timer
+ %% fires)?
+ {needs_idle_timeout, 1},
+
+ %% Called (eventually) after needs_idle_timeout returns
+ %% 'true'. Note this may be called more than once for each 'true'
+ %% returned from needs_idle_timeout.
+ {idle_timeout, 1},
%% Called immediately before the queue hibernates.
{handle_pre_hibernate, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index df8adb2e2f..0ae6ddb9d2 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -34,8 +34,8 @@
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1,
- handle_pre_hibernate/1, status/1]).
+ set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
+ idle_timeout/1, handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -197,9 +197,9 @@ set_ram_duration_target(_DurationTarget, State) -> State.
ram_duration(State) -> {0, State}.
-needs_sync(_State) -> false.
+needs_idle_timeout(_State) -> false.
-sync(State) -> State.
+idle_timeout(State) -> State.
handle_pre_hibernate(State) -> State.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b28dd839e5..dd6a90892a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1845,7 +1845,8 @@ test_variable_queue_partial_segments_delta_thing() ->
VQ0 = fresh_variable_queue(),
VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
{_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
- VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ VQ3 = variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:set_ram_duration_target(0, VQ2)),
%% one segment in q3 as betas, and half a segment in delta
S3 = rabbit_variable_queue:status(VQ3),
io:format("~p~n", [S3]),
@@ -1854,7 +1855,8 @@ test_variable_queue_partial_segments_delta_thing() ->
assert_prop(S3, q3, SegmentSize),
assert_prop(S3, len, SegmentSize + HalfSegment),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
- VQ5 = variable_queue_publish(true, 1, VQ4),
+ VQ5 = variable_queue_wait_for_shuffling_end(
+ variable_queue_publish(true, 1, VQ4)),
%% should have 1 alpha, but it's in the same segment as the deltas
S5 = rabbit_variable_queue:status(VQ5),
io:format("~p~n", [S5]),
@@ -1881,6 +1883,13 @@ test_variable_queue_partial_segments_delta_thing() ->
passed.
+variable_queue_wait_for_shuffling_end(VQ) ->
+ case rabbit_variable_queue:needs_idle_timeout(VQ) of
+ true -> variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:idle_timeout(VQ));
+ false -> VQ
+ end.
+
test_queue_recover() ->
Count = 2*rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
@@ -1939,7 +1948,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere() ->
VQa2 = variable_queue_publish(false, 4, VQa1),
{VQa3, AckTags} = variable_queue_fetch(2, false, false, 4, VQa2),
VQa4 = rabbit_variable_queue:requeue(AckTags, VQa3),
- VQa5 = rabbit_variable_queue:sync(VQa4),
+ VQa5 = rabbit_variable_queue:idle_timeout(VQa4),
_VQa6 = rabbit_variable_queue:terminate(VQa5),
VQa7 = rabbit_variable_queue:init(test_queue(), true, true),
{empty, VQa8} = rabbit_variable_queue:fetch(false, VQa7),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e566544809..52845f63dd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -36,7 +36,8 @@
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
- needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]).
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1]).
-export([start/1]).
@@ -110,18 +111,22 @@
%% should be very few betas remaining, thus the transition is fast (no
%% work needs to be done for the gamma -> delta transition).
%%
-%% The conversion of betas to gammas is done on all actions that can
-%% increase the message count, such as publish and requeue, and when
-%% the queue is asked to reduce its memory usage. The conversion is
-%% done in batches of exactly ?RAM_INDEX_BATCH_SIZE. This value should
-%% not be too small, otherwise the frequent operations on the queues
-%% of q2 and q3 will not be effectively amortised (switching the
-%% direction of queue access defeats amortisation), nor should it be
-%% too big, otherwise converting a batch stalls the queue for too
-%% long. Therefore, it must be just right. This approach is preferable
-%% to doing work on a new queue-duration because converting all the
-%% indicated betas to gammas at that point can be far too expensive,
-%% thus requiring batching and segmented work anyway.
+%% The conversion of betas to gammas is done in batches of exactly
+%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
+%% frequent operations on the queues of q2 and q3 will not be
+%% effectively amortised (switching the direction of queue access
+%% defeats amortisation), nor should it be too big, otherwise
+%% converting a batch stalls the queue for too long. Therefore, it
+%% must be just right.
+%%
+%% The conversion from alphas to betas is also chunked, but only to
+%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
+%% any one time. This further smooths the effects of changes to the
+%% target_ram_msg_count and ensures the queue remains responsive
+%% even when there is a large amount of IO work to do. The
+%% idle_timeout callback is utilised to ensure that conversions are
+%% done as promptly as possible whilst ensuring the queue remains
+%% responsive.
%%
%% In the queue we only keep track of messages that are pending
%% delivery. This is fine for queue purging, but can be expensive for
@@ -221,7 +226,7 @@
%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
%% write more - we can always come back on the next publish to do
%% more.
--define(RAM_INDEX_BATCH_SIZE, 64).
+-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -632,10 +637,16 @@ ram_duration(State = #vqstate { egress_rate = Egress,
out_counter = 0,
ram_msg_count_prev = RamMsgCount })}.
-needs_sync(#vqstate { on_sync = {_, _, []} }) -> false;
-needs_sync(_) -> true.
+needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) ->
+ true;
+needs_idle_timeout(State) ->
+ {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ State),
+ Res.
-sync(State) -> a(tx_commit_index(State)).
+idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -670,7 +681,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
persistent_count = PersistentCount,
- target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }) ->
E1 = queue:is_empty(Q1),
@@ -678,13 +688,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
ED = Delta#delta.count == 0,
E3 = bpqueue:is_empty(Q3),
E4 = queue:is_empty(Q4),
- TZ = TargetRamMsgCount == 0,
LZ = Len == 0,
true = E1 or not E3,
true = E2 or not ED,
true = ED or not E3,
- true = (E1 and E2 and E4) or not TZ,
true = LZ == (E3 and E4),
true = Len >= 0,
@@ -1107,54 +1115,67 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Phase changes
%%----------------------------------------------------------------------------
-reduce_memory_use(State = #vqstate {
- target_ram_msg_count = infinity }) ->
- State;
-reduce_memory_use(State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount >= RamMsgCount ->
- limit_ram_index(State);
-reduce_memory_use(State = #vqstate {
- target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
- case TargetRamMsgCount of
- 0 -> push_betas_to_deltas(State1);
- _ -> limit_ram_index(State1)
+%% Determine whether a reduction in memory use is necessary, and call
+%% functions to perform the required phase changes. The function can
+%% also be used to just do the former, by passing in dummy phase
+%% change functions.
+%%
+%% The function does not report on any needed beta->delta conversions,
+%% though the conversion function for that is called as necessary. The
+%% reason is twofold. Firstly, this is safe because the conversion is
+%% only ever necessary just after a transition to a
+%% target_ram_msg_count of zero or after an incremental alpha->beta
+%% conversion. In the former case the conversion is performed straight
+%% away (i.e. any betas present at the time are converted to deltas),
+%% and in the latter case the need for a conversion is flagged up
+%% anyway. Secondly, this is necessary because we do not have a
+%% precise and cheap predicate for determining whether a beta->delta
+%% conversion is necessary - due to the complexities of retaining up
+%% one segment's worth of messages in q3 - and thus would risk
+%% perpetually reporting the need for a conversion when no such
+%% conversion is needed. That in turn could cause an infinite loop.
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
+ {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
+ State #vqstate.target_ram_msg_count) of
+ 0 -> {false, State};
+ S1 -> {true, AlphaBetaFun(S1, State)}
+ end,
+ case State1 #vqstate.target_ram_msg_count of
+ infinity -> {Reduce, State1};
+ 0 -> {Reduce, BetaDeltaFun(State1)};
+ _ -> case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_ram_index_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end
end.
-limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
- Permitted = permitted_ram_index_count(State),
- if Permitted =/= infinity andalso RamIndexCount > Permitted ->
- Reduction = lists:min([RamIndexCount - Permitted,
- ?RAM_INDEX_BATCH_SIZE]),
- case Reduction < ?RAM_INDEX_BATCH_SIZE of
- true -> State;
- false -> #vqstate { q2 = Q2, q3 = Q3,
- index_state = IndexState } = State,
- {Q2a, {Reduction1, IndexState1}} =
- limit_ram_index(fun bpqueue:map_fold_filter_r/4,
- Q2, {Reduction, IndexState}),
- %% TODO: we shouldn't be writing index
- %% entries for messages that can never end up
- %% in delta due them residing in the only
- %% segment held by q3.
- {Q3a, {Reduction2, IndexState2}} =
- limit_ram_index(fun bpqueue:map_fold_filter_r/4,
- Q3, {Reduction1, IndexState1}),
- RamIndexCount1 = RamIndexCount -
- (Reduction - Reduction2),
- State #vqstate { q2 = Q2a, q3 = Q3a,
- index_state = IndexState2,
- ram_index_count = RamIndexCount1 }
- end;
- true ->
- State
- end.
+reduce_memory_use(State) ->
+ {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
+ fun limit_ram_index/2,
+ fun push_betas_to_deltas/1,
+ State),
+ State1.
+
+limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
+ index_state = IndexState,
+ ram_index_count = RamIndexCount }) ->
+ {Q2a, {Quota1, IndexState1}} = limit_ram_index(
+ fun bpqueue:map_fold_filter_r/4,
+ Q2, {Quota, IndexState}),
+ %% TODO: we shouldn't be writing index entries for messages that
+ %% can never end up in delta due them residing in the only segment
+ %% held by q3.
+ {Q3a, {Quota2, IndexState2}} = limit_ram_index(
+ fun bpqueue:map_fold_filter_r/4,
+ Q3, {Quota1, IndexState1}),
+ State #vqstate { q2 = Q2a, q3 = Q3a,
+ index_state = IndexState2,
+ ram_index_count = RamIndexCount - (Quota - Quota2) }.
limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) ->
{Q, {0, IndexState}};
-limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
+limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) ->
MapFoldFilterFun(
fun erlang:'not'/1,
fun (MsgStatus, {0, _IndexStateN}) ->
@@ -1165,7 +1186,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
{MsgStatus1, IndexStateN1} =
maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
{true, MsgStatus1, {N-1, IndexStateN1}}
- end, {Reduction, IndexState}, Q).
+ end, {Quota, IndexState}, Q).
permitted_ram_index_count(#vqstate { len = 0 }) ->
infinity;
@@ -1176,6 +1197,12 @@ permitted_ram_index_count(#vqstate { len = Len,
BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
+chunk_size(Current, Permitted)
+ when Permitted =:= infinity orelse Permitted >= Current ->
+ 0;
+chunk_size(Current, Permitted) ->
+ lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
+
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(State = #vqstate {
@@ -1226,7 +1253,12 @@ maybe_deltas_to_betas(State = #vqstate {
end
end.
-maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
+push_alphas_to_betas(Quota, State) ->
+ { Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
+ {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ State2.
+
+maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
fun queue:out/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
@@ -1237,26 +1269,28 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) }
- end, Q1, State).
+ end, Quota, Q1, State).
-maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
+maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
fun queue:out_r/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q4a, State1 = #vqstate { q3 = Q3 }) ->
State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
q4 = Q4a }
- end, Q4, State).
+ end, Quota, Q4, State).
-maybe_push_alphas_to_betas(_Generator, _Consumer, _Q,
+maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
- State;
-maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
+ when Quota =:= 0 orelse
+ TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
+ {Quota, State};
+maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
- {empty, _Q} -> State;
+ {empty, _Q} ->
+ {Quota, State};
{{value, MsgStatus}, Qa} ->
{MsgStatus1 = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
@@ -1267,7 +1301,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },
- maybe_push_alphas_to_betas(Generator, Consumer, Qa,
+ maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
end.