diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-23 19:01:59 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-23 19:01:59 +0100 |
| commit | 29c67607331e9c0c781c81249c174e1c045b47f8 (patch) | |
| tree | d91545bc5e6cff1609f5243c3ec2533943da0e9a /src | |
| parent | a964c1d5cb2a40515bff14e4d4aa93b9ea55c420 (diff) | |
| download | rabbitmq-server-git-29c67607331e9c0c781c81249c174e1c045b47f8.tar.gz | |
Chunk up work to do when converting α to β. Unfortunately this violates one of the invariants (Q1 or Q2 or Q4) -> ¬(TargetRamMsgCount > 0) because it might be ==0 but the work is still in progress. I prefer relaxing the invariant over requiring a "big push" once we get to 0. Some of the tests now don't pass as they assume certain values for δ - this should be fixable.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 56 |
3 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 432d62900b..b76ae11ebc 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -113,14 +113,15 @@ behaviour_info(callbacks) -> %% queue. {ram_duration, 1}, - %% Should 'sync' be called as soon as the queue process can - %% manage (either on an empty mailbox, or when a timer fires)? - {needs_sync, 1}, - - %% Called (eventually) after needs_sync returns 'true'. Note this - %% may be called more than once for each 'true' returned from - %% needs_sync. - {sync, 1}, + %% Should 'idle_timeout' be called as soon as the queue process + %% can manage (either on an empty mailbox, or when a timer + %% fires)? + {needs_idle_timeout, 1}, + + %% Called (eventually) after needs_idle_timeout returns + %% 'true'. Note this may be called more than once for each 'true' + %% returned from needs_idle_timeout. + {idle_timeout, 1}, %% Called immediately before the queue hibernates. {handle_pre_hibernate, 1}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index a7ca20c80b..e6bd11e33d 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -34,8 +34,8 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, - handle_pre_hibernate/1, status/1]). + set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, + idle_timeout/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -197,9 +197,9 @@ set_ram_duration_target(_DurationTarget, State) -> State. ram_duration(State) -> {0, State}. -needs_sync(_State) -> false. +needs_idle_timeout(_State) -> false. -sync(State) -> State. +idle_timeout(State) -> State. handle_pre_hibernate(State) -> State. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 958a2903b7..d5d48e58b9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -36,7 +36,8 @@ tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, - needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1]). -export([start/1]). @@ -224,7 +225,7 @@ %% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't %% write more - we can always come back on the next publish to do %% more. --define(RAM_INDEX_BATCH_SIZE, 64). +-define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -626,10 +627,14 @@ ram_duration(State = #vqstate { egress_rate = Egress, out_counter = 0, ram_msg_count_prev = RamMsgCount })}. -needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; -needs_sync(_) -> true. +needs_idle_timeout(State = #vqstate { on_sync = {_, _, []}, + ram_index_count = RamIndexCount }) -> + Permitted = permitted_ram_index_count(State), + Permitted =:= infinity orelse RamIndexCount =< Permitted; +needs_idle_timeout(_) -> + true. -sync(State) -> a(tx_commit_index(State)). +idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -672,13 +677,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, ED = Delta#delta.count == 0, E3 = bpqueue:is_empty(Q3), E4 = queue:is_empty(Q4), - TZ = TargetRamMsgCount == 0, LZ = Len == 0, true = E1 or not E3, true = E2 or not ED, true = ED or not E3, - true = (E1 and E2 and E4) or not TZ, true = LZ == (E3 and E4), true = Len >= 0, @@ -1110,19 +1113,22 @@ reduce_memory_use(State = #vqstate { when TargetRamMsgCount >= RamMsgCount -> limit_ram_index(State); reduce_memory_use(State = #vqstate { + ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) -> - State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), + Reduction = lists:min([RamMsgCount - TargetRamMsgCount, ?IO_BATCH_SIZE]), + {Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State), + {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1), case TargetRamMsgCount of - 0 -> push_betas_to_deltas(State1); - _ -> limit_ram_index(State1) + 0 -> push_betas_to_deltas(State2); + _ -> limit_ram_index(State2) end. limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> Permitted = permitted_ram_index_count(State), if Permitted =/= infinity andalso RamIndexCount > Permitted -> Reduction = lists:min([RamIndexCount - Permitted, - ?RAM_INDEX_BATCH_SIZE]), - case Reduction < ?RAM_INDEX_BATCH_SIZE of + ?IO_BATCH_SIZE]), + case Reduction < ?IO_BATCH_SIZE of true -> State; false -> #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State, @@ -1217,7 +1223,9 @@ maybe_deltas_to_betas(State = #vqstate { end end. -maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> +maybe_push_q1_to_betas(0, State) -> + {0, State}; +maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( fun queue:out/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, @@ -1228,26 +1236,30 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } - end, Q1, State). + end, Quota, Q1, State). -maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> +maybe_push_q4_to_betas(0, State) -> + {0, State}; +maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( fun queue:out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), q4 = Q4a } - end, Q4, State). + end, Quota, Q4, State). -maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, +maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> - State; -maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> + when Quota =:= 0 orelse TargetRamMsgCount =:= infinity orelse + TargetRamMsgCount >= RamMsgCount -> + {Quota, State}; +maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of - {empty, _Q} -> State; + {empty, _Q} -> + {Quota, State}; {{value, MsgStatus}, Qa} -> {MsgStatus1 = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, @@ -1258,7 +1270,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, - maybe_push_alphas_to_betas(Generator, Consumer, Qa, + maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, Consumer(MsgStatus2, Qa, State2)) end. |
