diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-07 14:26:20 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-07 14:26:20 +0100 |
| commit | 37b3778664a83f5d632a0a73110aa60d55e03e06 (patch) | |
| tree | 7f7bc177a76d9980f2d322d54529f4208bf66dd4 /src | |
| parent | 6978ff64d9d952603f5ac3859d65a332eaa4a876 (diff) | |
| parent | 5f810eca3ff2ec81a4760e0ad1e3c5ed71711918 (diff) | |
| download | rabbitmq-server-git-37b3778664a83f5d632a0a73110aa60d55e03e06.tar.gz | |
merge bug22896 into bug21673
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 182 |
5 files changed, 136 insertions, 92 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2fb60e9675..a2cbcf5517 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -179,7 +179,7 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), - case BQ:needs_sync(BQS)of + case BQ:needs_idle_timeout(BQS)of true -> {ensure_sync_timer(State1), 0}; false -> {stop_sync_timer(State1), hibernate} end. @@ -188,7 +188,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> BQ:sync(BQS) end]), + [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -822,7 +822,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:sync(BQS) end, State)); + fun (BQS) -> BQ:idle_timeout(BQS) end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 432d62900b..b76ae11ebc 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -113,14 +113,15 @@ behaviour_info(callbacks) -> %% queue. {ram_duration, 1}, - %% Should 'sync' be called as soon as the queue process can - %% manage (either on an empty mailbox, or when a timer fires)? - {needs_sync, 1}, - - %% Called (eventually) after needs_sync returns 'true'. Note this - %% may be called more than once for each 'true' returned from - %% needs_sync. - {sync, 1}, + %% Should 'idle_timeout' be called as soon as the queue process + %% can manage (either on an empty mailbox, or when a timer + %% fires)? + {needs_idle_timeout, 1}, + + %% Called (eventually) after needs_idle_timeout returns + %% 'true'. Note this may be called more than once for each 'true' + %% returned from needs_idle_timeout. + {idle_timeout, 1}, %% Called immediately before the queue hibernates. {handle_pre_hibernate, 1}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index df8adb2e2f..0ae6ddb9d2 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -34,8 +34,8 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, - handle_pre_hibernate/1, status/1]). + set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, + idle_timeout/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -197,9 +197,9 @@ set_ram_duration_target(_DurationTarget, State) -> State. ram_duration(State) -> {0, State}. -needs_sync(_State) -> false. +needs_idle_timeout(_State) -> false. -sync(State) -> State. +idle_timeout(State) -> State. handle_pre_hibernate(State) -> State. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b28dd839e5..dd6a90892a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1845,7 +1845,8 @@ test_variable_queue_partial_segments_delta_thing() -> VQ0 = fresh_variable_queue(), VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), - VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), + VQ3 = variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:set_ram_duration_target(0, VQ2)), %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), io:format("~p~n", [S3]), @@ -1854,7 +1855,8 @@ test_variable_queue_partial_segments_delta_thing() -> assert_prop(S3, q3, SegmentSize), assert_prop(S3, len, SegmentSize + HalfSegment), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), - VQ5 = variable_queue_publish(true, 1, VQ4), + VQ5 = variable_queue_wait_for_shuffling_end( + variable_queue_publish(true, 1, VQ4)), %% should have 1 alpha, but it's in the same segment as the deltas S5 = rabbit_variable_queue:status(VQ5), io:format("~p~n", [S5]), @@ -1881,6 +1883,13 @@ test_variable_queue_partial_segments_delta_thing() -> passed. +variable_queue_wait_for_shuffling_end(VQ) -> + case rabbit_variable_queue:needs_idle_timeout(VQ) of + true -> variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:idle_timeout(VQ)); + false -> VQ + end. + test_queue_recover() -> Count = 2*rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), @@ -1939,7 +1948,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere() -> VQa2 = variable_queue_publish(false, 4, VQa1), {VQa3, AckTags} = variable_queue_fetch(2, false, false, 4, VQa2), VQa4 = rabbit_variable_queue:requeue(AckTags, VQa3), - VQa5 = rabbit_variable_queue:sync(VQa4), + VQa5 = rabbit_variable_queue:idle_timeout(VQa4), _VQa6 = rabbit_variable_queue:terminate(VQa5), VQa7 = rabbit_variable_queue:init(test_queue(), true, true), {empty, VQa8} = rabbit_variable_queue:fetch(false, VQa7), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e566544809..52845f63dd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -36,7 +36,8 @@ tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, - needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1]). -export([start/1]). @@ -110,18 +111,22 @@ %% should be very few betas remaining, thus the transition is fast (no %% work needs to be done for the gamma -> delta transition). %% -%% The conversion of betas to gammas is done on all actions that can -%% increase the message count, such as publish and requeue, and when -%% the queue is asked to reduce its memory usage. The conversion is -%% done in batches of exactly ?RAM_INDEX_BATCH_SIZE. This value should -%% not be too small, otherwise the frequent operations on the queues -%% of q2 and q3 will not be effectively amortised (switching the -%% direction of queue access defeats amortisation), nor should it be -%% too big, otherwise converting a batch stalls the queue for too -%% long. Therefore, it must be just right. This approach is preferable -%% to doing work on a new queue-duration because converting all the -%% indicated betas to gammas at that point can be far too expensive, -%% thus requiring batching and segmented work anyway. +%% The conversion of betas to gammas is done in batches of exactly +%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the +%% frequent operations on the queues of q2 and q3 will not be +%% effectively amortised (switching the direction of queue access +%% defeats amortisation), nor should it be too big, otherwise +%% converting a batch stalls the queue for too long. Therefore, it +%% must be just right. +%% +%% The conversion from alphas to betas is also chunked, but only to +%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at +%% any one time. This further smooths the effects of changes to the +%% target_ram_msg_count and ensures the queue remains responsive +%% even when there is a large amount of IO work to do. The +%% idle_timeout callback is utilised to ensure that conversions are +%% done as promptly as possible whilst ensuring the queue remains +%% responsive. %% %% In the queue we only keep track of messages that are pending %% delivery. This is fine for queue purging, but can be expensive for @@ -221,7 +226,7 @@ %% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't %% write more - we can always come back on the next publish to do %% more. --define(RAM_INDEX_BATCH_SIZE, 64). +-define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -632,10 +637,16 @@ ram_duration(State = #vqstate { egress_rate = Egress, out_counter = 0, ram_msg_count_prev = RamMsgCount })}. -needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; -needs_sync(_) -> true. +needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) -> + true; +needs_idle_timeout(State) -> + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + State), + Res. -sync(State) -> a(tx_commit_index(State)). +idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -670,7 +681,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, persistent_count = PersistentCount, - target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }) -> E1 = queue:is_empty(Q1), @@ -678,13 +688,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, ED = Delta#delta.count == 0, E3 = bpqueue:is_empty(Q3), E4 = queue:is_empty(Q4), - TZ = TargetRamMsgCount == 0, LZ = Len == 0, true = E1 or not E3, true = E2 or not ED, true = ED or not E3, - true = (E1 and E2 and E4) or not TZ, true = LZ == (E3 and E4), true = Len >= 0, @@ -1107,54 +1115,67 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Phase changes %%---------------------------------------------------------------------------- -reduce_memory_use(State = #vqstate { - target_ram_msg_count = infinity }) -> - State; -reduce_memory_use(State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount >= RamMsgCount -> - limit_ram_index(State); -reduce_memory_use(State = #vqstate { - target_ram_msg_count = TargetRamMsgCount }) -> - State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), - case TargetRamMsgCount of - 0 -> push_betas_to_deltas(State1); - _ -> limit_ram_index(State1) +%% Determine whether a reduction in memory use is necessary, and call +%% functions to perform the required phase changes. The function can +%% also be used to just do the former, by passing in dummy phase +%% change functions. +%% +%% The function does not report on any needed beta->delta conversions, +%% though the conversion function for that is called as necessary. The +%% reason is twofold. Firstly, this is safe because the conversion is +%% only ever necessary just after a transition to a +%% target_ram_msg_count of zero or after an incremental alpha->beta +%% conversion. In the former case the conversion is performed straight +%% away (i.e. any betas present at the time are converted to deltas), +%% and in the latter case the need for a conversion is flagged up +%% anyway. Secondly, this is necessary because we do not have a +%% precise and cheap predicate for determining whether a beta->delta +%% conversion is necessary - due to the complexities of retaining up +%% one segment's worth of messages in q3 - and thus would risk +%% perpetually reporting the need for a conversion when no such +%% conversion is needed. That in turn could cause an infinite loop. +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> + {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + State #vqstate.target_ram_msg_count) of + 0 -> {false, State}; + S1 -> {true, AlphaBetaFun(S1, State)} + end, + case State1 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State1}; + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end end. -limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> - Permitted = permitted_ram_index_count(State), - if Permitted =/= infinity andalso RamIndexCount > Permitted -> - Reduction = lists:min([RamIndexCount - Permitted, - ?RAM_INDEX_BATCH_SIZE]), - case Reduction < ?RAM_INDEX_BATCH_SIZE of - true -> State; - false -> #vqstate { q2 = Q2, q3 = Q3, - index_state = IndexState } = State, - {Q2a, {Reduction1, IndexState1}} = - limit_ram_index(fun bpqueue:map_fold_filter_r/4, - Q2, {Reduction, IndexState}), - %% TODO: we shouldn't be writing index - %% entries for messages that can never end up - %% in delta due them residing in the only - %% segment held by q3. - {Q3a, {Reduction2, IndexState2}} = - limit_ram_index(fun bpqueue:map_fold_filter_r/4, - Q3, {Reduction1, IndexState1}), - RamIndexCount1 = RamIndexCount - - (Reduction - Reduction2), - State #vqstate { q2 = Q2a, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount1 } - end; - true -> - State - end. +reduce_memory_use(State) -> + {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, + fun limit_ram_index/2, + fun push_betas_to_deltas/1, + State), + State1. + +limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, + index_state = IndexState, + ram_index_count = RamIndexCount }) -> + {Q2a, {Quota1, IndexState1}} = limit_ram_index( + fun bpqueue:map_fold_filter_r/4, + Q2, {Quota, IndexState}), + %% TODO: we shouldn't be writing index entries for messages that + %% can never end up in delta due them residing in the only segment + %% held by q3. + {Q3a, {Quota2, IndexState2}} = limit_ram_index( + fun bpqueue:map_fold_filter_r/4, + Q3, {Quota1, IndexState1}), + State #vqstate { q2 = Q2a, q3 = Q3a, + index_state = IndexState2, + ram_index_count = RamIndexCount - (Quota - Quota2) }. limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> {Q, {0, IndexState}}; -limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> +limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> MapFoldFilterFun( fun erlang:'not'/1, fun (MsgStatus, {0, _IndexStateN}) -> @@ -1165,7 +1186,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> {MsgStatus1, IndexStateN1} = maybe_write_index_to_disk(true, MsgStatus, IndexStateN), {true, MsgStatus1, {N-1, IndexStateN1}} - end, {Reduction, IndexState}, Q). + end, {Quota, IndexState}, Q). permitted_ram_index_count(#vqstate { len = 0 }) -> infinity; @@ -1176,6 +1197,12 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). +chunk_size(Current, Permitted) + when Permitted =:= infinity orelse Permitted >= Current -> + 0; +chunk_size(Current, Permitted) -> + lists:min([Current - Permitted, ?IO_BATCH_SIZE]). + maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; maybe_deltas_to_betas(State = #vqstate { @@ -1226,7 +1253,12 @@ maybe_deltas_to_betas(State = #vqstate { end end. -maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> +push_alphas_to_betas(Quota, State) -> + { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + State2. + +maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( fun queue:out/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, @@ -1237,26 +1269,28 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } - end, Q1, State). + end, Quota, Q1, State). -maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> +maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( fun queue:out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), q4 = Q4a } - end, Q4, State). + end, Quota, Q4, State). -maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, +maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> - State; -maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> + when Quota =:= 0 orelse + TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + {Quota, State}; +maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of - {empty, _Q} -> State; + {empty, _Q} -> + {Quota, State}; {{value, MsgStatus}, Qa} -> {MsgStatus1 = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, @@ -1267,7 +1301,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, - maybe_push_alphas_to_betas(Generator, Consumer, Qa, + maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, Consumer(MsgStatus2, Qa, State2)) end. |
