summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-23 19:01:59 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-23 19:01:59 +0100
commit29c67607331e9c0c781c81249c174e1c045b47f8 (patch)
treed91545bc5e6cff1609f5243c3ec2533943da0e9a /src
parenta964c1d5cb2a40515bff14e4d4aa93b9ea55c420 (diff)
downloadrabbitmq-server-git-29c67607331e9c0c781c81249c174e1c045b47f8.tar.gz
Chunk up work to do when converting α to β. Unfortunately this violates one of the invariants (Q1 or Q2 or Q4) -> ¬(TargetRamMsgCount > 0) because it might be ==0 but the work is still in progress. I prefer relaxing the invariant over requiring a "big push" once we get to 0. Some of the tests now don't pass as they assume certain values for δ - this should be fixable.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl17
-rw-r--r--src/rabbit_invariable_queue.erl8
-rw-r--r--src/rabbit_variable_queue.erl56
3 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 432d62900b..b76ae11ebc 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -113,14 +113,15 @@ behaviour_info(callbacks) ->
%% queue.
{ram_duration, 1},
- %% Should 'sync' be called as soon as the queue process can
- %% manage (either on an empty mailbox, or when a timer fires)?
- {needs_sync, 1},
-
- %% Called (eventually) after needs_sync returns 'true'. Note this
- %% may be called more than once for each 'true' returned from
- %% needs_sync.
- {sync, 1},
+ %% Should 'idle_timeout' be called as soon as the queue process
+ %% can manage (either on an empty mailbox, or when a timer
+ %% fires)?
+ {needs_idle_timeout, 1},
+
+ %% Called (eventually) after needs_idle_timeout returns
+ %% 'true'. Note this may be called more than once for each 'true'
+ %% returned from needs_idle_timeout.
+ {idle_timeout, 1},
%% Called immediately before the queue hibernates.
{handle_pre_hibernate, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index a7ca20c80b..e6bd11e33d 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -34,8 +34,8 @@
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1,
- handle_pre_hibernate/1, status/1]).
+ set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
+ idle_timeout/1, handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -197,9 +197,9 @@ set_ram_duration_target(_DurationTarget, State) -> State.
ram_duration(State) -> {0, State}.
-needs_sync(_State) -> false.
+needs_idle_timeout(_State) -> false.
-sync(State) -> State.
+idle_timeout(State) -> State.
handle_pre_hibernate(State) -> State.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 958a2903b7..d5d48e58b9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -36,7 +36,8 @@
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
- needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]).
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1]).
-export([start/1]).
@@ -224,7 +225,7 @@
%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
%% write more - we can always come back on the next publish to do
%% more.
--define(RAM_INDEX_BATCH_SIZE, 64).
+-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -626,10 +627,14 @@ ram_duration(State = #vqstate { egress_rate = Egress,
out_counter = 0,
ram_msg_count_prev = RamMsgCount })}.
-needs_sync(#vqstate { on_sync = {_, _, []} }) -> false;
-needs_sync(_) -> true.
+needs_idle_timeout(State = #vqstate { on_sync = {_, _, []},
+ ram_index_count = RamIndexCount }) ->
+ Permitted = permitted_ram_index_count(State),
+ Permitted =:= infinity orelse RamIndexCount =< Permitted;
+needs_idle_timeout(_) ->
+ true.
-sync(State) -> a(tx_commit_index(State)).
+idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -672,13 +677,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
ED = Delta#delta.count == 0,
E3 = bpqueue:is_empty(Q3),
E4 = queue:is_empty(Q4),
- TZ = TargetRamMsgCount == 0,
LZ = Len == 0,
true = E1 or not E3,
true = E2 or not ED,
true = ED or not E3,
- true = (E1 and E2 and E4) or not TZ,
true = LZ == (E3 and E4),
true = Len >= 0,
@@ -1110,19 +1113,22 @@ reduce_memory_use(State = #vqstate {
when TargetRamMsgCount >= RamMsgCount ->
limit_ram_index(State);
reduce_memory_use(State = #vqstate {
+ ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
+ Reduction = lists:min([RamMsgCount - TargetRamMsgCount, ?IO_BATCH_SIZE]),
+ {Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State),
+ {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1),
case TargetRamMsgCount of
- 0 -> push_betas_to_deltas(State1);
- _ -> limit_ram_index(State1)
+ 0 -> push_betas_to_deltas(State2);
+ _ -> limit_ram_index(State2)
end.
limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
Permitted = permitted_ram_index_count(State),
if Permitted =/= infinity andalso RamIndexCount > Permitted ->
Reduction = lists:min([RamIndexCount - Permitted,
- ?RAM_INDEX_BATCH_SIZE]),
- case Reduction < ?RAM_INDEX_BATCH_SIZE of
+ ?IO_BATCH_SIZE]),
+ case Reduction < ?IO_BATCH_SIZE of
true -> State;
false -> #vqstate { q2 = Q2, q3 = Q3,
index_state = IndexState } = State,
@@ -1217,7 +1223,9 @@ maybe_deltas_to_betas(State = #vqstate {
end
end.
-maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
+maybe_push_q1_to_betas(0, State) ->
+ {0, State};
+maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
fun queue:out/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
@@ -1228,26 +1236,30 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) }
- end, Q1, State).
+ end, Quota, Q1, State).
-maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
+maybe_push_q4_to_betas(0, State) ->
+ {0, State};
+maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
fun queue:out_r/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q4a, State1 = #vqstate { q3 = Q3 }) ->
State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
q4 = Q4a }
- end, Q4, State).
+ end, Quota, Q4, State).
-maybe_push_alphas_to_betas(_Generator, _Consumer, _Q,
+maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
- State;
-maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
+ when Quota =:= 0 orelse TargetRamMsgCount =:= infinity orelse
+ TargetRamMsgCount >= RamMsgCount ->
+ {Quota, State};
+maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
- {empty, _Q} -> State;
+ {empty, _Q} ->
+ {Quota, State};
{{value, MsgStatus}, Qa} ->
{MsgStatus1 = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
@@ -1258,7 +1270,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },
- maybe_push_alphas_to_betas(Generator, Consumer, Qa,
+ maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
end.