summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-07 16:27:23 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-07 16:27:23 +0100
commit686c68c85696a2916f65de4127956a806ce4318f (patch)
tree1e9c5c37bb8db1928c0689769cc75bafe2f28ce5 /src
parentb1c6afd12806ac531cec46c9007db49e29ee9016 (diff)
downloadrabbitmq-server-git-686c68c85696a2916f65de4127956a806ce4318f.tar.gz
More work on variable queue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_variable_queue.erl211
2 files changed, 174 insertions, 43 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e4111f82fe..c59b12dd29 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, write_published/4, write_delivered/2, write_acks/2,
- flush_journal/1, read_segment_entries/2]).
+ flush_journal/1, read_segment_entries/2, next_segment_boundary/1]).
%%----------------------------------------------------------------------------
%% The queue disk index
@@ -242,6 +242,10 @@ read_segment_entries(InitSeqId, State =
end, [], RelSeqs),
State}.
+next_segment_boundary(SeqId) ->
+ {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ reconstruct_seq_id(SegNum + 1, 0).
+
%%----------------------------------------------------------------------------
%% Minor Helpers
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 784cda3967..73c3c3395b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/1, in/3]).
+-export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1]).
-record(vqstate,
{ q1,
@@ -39,12 +39,16 @@
gamma,
q3,
q4,
- egress_rate,
target_ram_msg_count,
ram_msg_count,
queue,
index_state,
- next_seq_id
+ next_seq_id,
+ out_counter,
+ egress_rate,
+ old_egress_rate,
+ avg_egress_rate,
+ egress_rate_timestamp
}).
-include("rabbit.hrl").
@@ -54,35 +58,19 @@ init(QueueName) ->
#vqstate { q1 = queue:new(), q2 = queue:new(),
gamma = 0,
q3 = queue:new(), q4 = queue:new(),
- egress_rate = 0,
target_ram_msg_count = undefined,
ram_msg_count = 0,
queue = QueueName,
index_state = IndexState,
- next_seq_id = NextSeqId
+ next_seq_id = NextSeqId,
+ out_counter = 0,
+ egress_rate = 0,
+ old_egress_rate = 0,
+ avg_egress_rate = 0,
+ egress_rate_timestamp = now()
}.
-maybe_write_msg_to_disk(Bool, Msg = #basic_message {
- guid = MsgId, is_persistent = IsPersistent })
- when Bool orelse IsPersistent ->
- ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)),
- true;
-maybe_write_msg_to_disk(_Bool, _Msg) ->
- false.
-
-maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered,
- IndexState) when Bool orelse IsPersistent ->
- IndexState1 = rabbit_queue_index:write_published(
- MsgId, SeqId, IsPersistent, IndexState),
- {true, case IsDelivered of
- true -> rabbit_queue_index:write_delivered(SeqId, IndexState1);
- false -> IndexState1
- end};
-maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered,
- IndexState) ->
- {false, IndexState}.
-
-in(Msg = #basic_message {}, IsDelivered, State) ->
+in(Msg, IsDelivered, State) ->
in(test_keep_msg_in_ram(State), Msg, IsDelivered, State).
in(msg_and_index, Msg = #basic_message { guid = MsgId,
@@ -110,7 +98,7 @@ in(just_index, Msg = #basic_message { guid = MsgId,
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
- Entry = {index, MsgId, SeqId, IsDelivered, true, IndexOnDisk},
+ Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, true, IndexOnDisk},
State1 = State #vqstate { next_seq_id = SeqId + 1,
index_state = IndexState1 },
true = queue:is_empty(Q1), %% ASSERTION
@@ -130,6 +118,73 @@ in(neither, Msg = #basic_message { guid = MsgId,
index_state = IndexState1,
gamma = Gamma + 1 }.
+set_queue_ram_duration_target(
+ DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
+ target_ram_msg_count = TargetRamMsgCount
+ }) ->
+ TargetRamMsgCount1 = trunc(DurationTarget * EgressRate), %% msgs = sec * msgs/sec
+ State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 },
+ if TargetRamMsgCount == TargetRamMsgCount1 ->
+ State1;
+ TargetRamMsgCount < TargetRamMsgCount1 ->
+ maybe_start_prefetcher(State1);
+ true ->
+ reduce_memory_use(State1)
+ end.
+
+remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
+ egress_rate_timestamp = Timestamp,
+ out_counter = OutCount }) ->
+ Now = now(),
+ EgressRate = OutCount / timer:now_diff(Now, Timestamp),
+ AvgEgressRate = (EgressRate + OldEgressRate) / 2,
+ State #vqstate { old_egress_rate = OldEgressRate,
+ egress_rate = EgressRate,
+ avg_egress_rate = AvgEgressRate,
+ egress_rate_timestamp = Now,
+ out_counter = 0 }.
+
+maybe_start_prefetcher(State) ->
+ %% TODO
+ State.
+
+reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
+ when TargetRamMsgCount >= RamMsgCount ->
+ State;
+reduce_memory_use(State =
+ #vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
+ State1 = #vqstate { ram_msg_count = RamMsgCount } =
+ maybe_push_q1_to_betas(State),
+ State2 = case TargetRamMsgCount >= RamMsgCount of
+ true -> State1;
+ false -> maybe_push_q4_to_betas(State)
+ end,
+ case TargetRamMsgCount of
+ 0 -> push_betas_to_gammas(State);
+ _ -> State2
+ end.
+
+maybe_write_msg_to_disk(Bool, Msg = #basic_message {
+ guid = MsgId, is_persistent = IsPersistent })
+ when Bool orelse IsPersistent ->
+ ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)),
+ true;
+maybe_write_msg_to_disk(_Bool, _Msg) ->
+ false.
+
+maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered,
+ IndexState) when Bool orelse IsPersistent ->
+ IndexState1 = rabbit_queue_index:write_published(
+ MsgId, SeqId, IsPersistent, IndexState),
+ {true, case IsDelivered of
+ true -> rabbit_queue_index:write_delivered(SeqId, IndexState1);
+ false -> IndexState1
+ end};
+maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered,
+ IndexState) ->
+ {false, IndexState}.
+
test_keep_msg_in_ram(#vqstate { target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
q1 = Q1 }) ->
@@ -156,7 +211,7 @@ store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, gamma = Gamma,
true ->
State #vqstate { q4 = queue:in(Entry, Q4) };
false ->
- maybe_push_q1_out(State #vqstate { q1 = queue:in(Entry, Q1) })
+ maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) })
end.
store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) ->
@@ -165,18 +220,90 @@ store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) ->
false -> State #vqstate { q2 = queue:in(Entry, Q2) }
end.
-maybe_push_q1_out(State = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount
- }) when TargetRamMsgCount > RamMsgCount ->
+maybe_push_q1_to_betas(State =
+ #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount
+ }) when TargetRamMsgCount >= RamMsgCount ->
+ State;
+maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
+ q1 = Q1 }) ->
+ case queue:out(Q1) of
+ {empty, _Q1} -> State;
+ {{value, {msg_and_index, Msg = #basic_message {
+ guid = MsgId, is_persistent = IsPersistent },
+ SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q1a} ->
+ true = case MsgOnDisk of
+ true -> true;
+ false -> maybe_write_msg_to_disk(true, Msg)
+ end,
+ maybe_push_q1_to_betas(
+ store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered,
+ true, IndexOnDisk},
+ State #vqstate { ram_msg_count = RamMsgCount - 1,
+ q1 = Q1a }))
+ end.
+
+maybe_push_q4_to_betas(State =
+ #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount
+ }) when TargetRamMsgCount >= RamMsgCount ->
State;
-maybe_push_q1_out(State = #vqstate { ram_msg_count = RamMsgCount, q1 = Q1 }) ->
- {{value, {msg_and_index, Msg = #basic_message { guid = MsgId }, SeqId,
- IsDelivered, MsgOnDisk, IndexOnDisk}}, Q1a} = queue:out(Q1),
- true = case MsgOnDisk of
- true -> true;
- false -> maybe_write_msg_to_disk(true, Msg)
- end,
- maybe_push_q1_out(
- store_beta_entry({index, MsgId, SeqId, IsDelivered, true, IndexOnDisk},
- State #vqstate { ram_msg_count = RamMsgCount - 1,
- q1 = Q1a })).
+maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
+ q4 = Q4, q3 = Q3 }) ->
+ case queue:out_r(Q4) of
+ {empty, _Q4} -> State;
+ {{value, {msg_and_index, Msg = #basic_message {
+ guid = MsgId, is_persistent = IsPersistent },
+ SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} ->
+ true = case MsgOnDisk of
+ true -> true;
+ false -> maybe_write_msg_to_disk(true, Msg)
+ end,
+ Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered,
+ true, IndexOnDisk}, Q3),
+ maybe_push_q4_to_betas(
+ State #vqstate { ram_msg_count = RamMsgCount - 1,
+ q3 = Q3a, q4 = Q4a })
+ end.
+
+push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
+ index_state = IndexState }) ->
+ {Len1, Q2a, IndexState1} =
+ push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
+ State1 = State #vqstate { q2 = Q2a, gamma = Gamma + Len1,
+ index_state = IndexState1 },
+ case queue:out(Q3) of
+ {empty, _Q3} -> State1;
+ {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
+ true, _IndexOnDisk}}, _Q3a} ->
+ Limit = rabbit_queue_index:next_segment_boundary(SeqId) - 1,
+ {Len2, Q3b, IndexState2} =
+ push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1),
+ State1 #vqstate { q3 = Q3b, gamma = Gamma + Len1 + Len2,
+ index_state = IndexState2 }
+ end.
+
+push_betas_to_gammas(Generator, Limit, Q, IndexState) ->
+ push_betas_to_gammas(Generator, Limit, Q, 0, IndexState).
+
+push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
+ case Generator(Q) of
+ {empty, Qa} -> {Count, Qa, IndexState};
+ {{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered,
+ _MsgOnDisk, _IndexOnDisk}}, _Qa} ->
+ {Count, Q, IndexState};
+ {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered,
+ true, IndexOnDisk}}, Qa} ->
+ IndexState1 =
+ case IndexOnDisk of
+ true -> IndexState;
+ false ->
+ {true, IndexState2} =
+ maybe_write_index_to_disk(
+ true, IsPersistent, MsgId,
+ SeqId, IsDelivered, IndexState),
+ IndexState2
+ end,
+ push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1)
+ end.
+