summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-08 17:50:47 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-08 17:50:47 +0100
commit4c8ef89985c59c679439477bb5feb799d0740816 (patch)
tree1a01132b759dcdc2b53b9623c27d68b4be243393 /src
parent0c9a7ab8868bfe47682c4ef05eb283b23ef852e5 (diff)
downloadrabbitmq-server-git-4c8ef89985c59c679439477bb5feb799d0740816.tar.gz
And now actually make sure that on start up we load in the first segment from gamma to q3 if possible. Hence a refactoring here.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl84
1 files changed, 45 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1491879bc9..184050f783 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -103,26 +103,28 @@
%% is non empty then q3 must be non empty.
init(QueueName) ->
- {LowSeqId, NextSeqId, Count, IndexState} =
+ {GammaSeqId, NextSeqId, GammaCount, IndexState} =
rabbit_queue_index:init(QueueName),
- Gamma = case Count of
+ Gamma = case GammaCount of
0 -> #gamma { seq_id = undefined, count = 0 };
- _ -> #gamma { seq_id = LowSeqId, count = Count }
+ _ -> #gamma { seq_id = GammaSeqId, count = GammaCount }
end,
- #vqstate { q1 = queue:new(), q2 = queue:new(),
- gamma = Gamma,
- q3 = queue:new(), q4 = queue:new(),
- target_ram_msg_count = undefined,
- ram_msg_count = 0,
- queue = QueueName,
- index_state = IndexState,
- next_seq_id = NextSeqId,
- out_counter = 0,
- egress_rate = 0,
- avg_egress_rate = 0,
- egress_rate_timestamp = now(),
- prefetcher = undefined
- }.
+ State =
+ #vqstate { q1 = queue:new(), q2 = queue:new(),
+ gamma = Gamma,
+ q3 = queue:new(), q4 = queue:new(),
+ target_ram_msg_count = undefined,
+ ram_msg_count = 0,
+ queue = QueueName,
+ index_state = IndexState,
+ next_seq_id = NextSeqId,
+ out_counter = 0,
+ egress_rate = 0,
+ avg_egress_rate = 0,
+ egress_rate_timestamp = now(),
+ prefetcher = undefined
+ },
+ maybe_load_next_segment(State).
in(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId }) ->
in(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
@@ -238,9 +240,8 @@ out(State =
index_state = IndexState1 }}
end.
-out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState,
- gamma = #gamma { seq_id = GammaSeqId,
- count = GammaCount},
+out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2,
+ gamma = #gamma { count = GammaCount },
q3 = Q3, q4 = Q4 }) ->
case queue:out(Q3) of
{empty, _Q3} ->
@@ -271,25 +272,7 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState,
State1 #vqstate { q1 = queue:new(),
q4 = queue:join(Q4a, Q1) };
{true, false} ->
- {List, IndexState1, Gamma1SeqId} =
- read_index_segment(GammaSeqId, IndexState),
- State3 = State1 #vqstate { index_state = IndexState1 },
- %% length(List) may be < segment_size because
- %% of acks. But it can't be []
- Q3b = betas_from_segment_entries(List),
- case GammaCount - length(List) of
- 0 ->
- %% gamma is now empty, but it wasn't
- %% before, so can now join q2 onto q3
- State3 #vqstate {
- gamma = #gamma { seq_id = undefined,
- count = 0 },
- q2 = queue:new(), q3 = queue:join(Q3b, Q2) };
- N when N > 0 ->
- State3 #vqstate {
- gamma = #gamma { seq_id = Gamma1SeqId,
- count = N }, q3 = Q3b }
- end;
+ maybe_load_next_segment(State1);
{false, _} ->
%% q3 still isn't empty, we've not touched
%% gamma, so the invariants between q1, q2,
@@ -299,6 +282,29 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState,
out(State2)
end.
+maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) ->
+ State;
+maybe_load_next_segment(State =
+ #vqstate { index_state = IndexState, q2 = Q2,
+ gamma = #gamma { seq_id = GammaSeqId,
+ count = GammaCount }}) ->
+ {List, IndexState1, Gamma1SeqId} =
+ read_index_segment(GammaSeqId, IndexState),
+ State1 = State #vqstate { index_state = IndexState1 },
+ %% length(List) may be < segment_size because of acks. But it
+ %% can't be []
+ Q3a = betas_from_segment_entries(List),
+ case GammaCount - length(List) of
+ 0 ->
+ %% gamma is now empty, but it wasn't before, so can now
+ %% join q2 onto q3
+ State1 #vqstate { gamma = #gamma { seq_id = undefined, count = 0 },
+ q2 = queue:new(), q3 = queue:join(Q3a, Q2) };
+ N when N > 0 ->
+ State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId,
+ count = N }, q3 = Q3a }
+ end.
+
betas_from_segment_entries(List) ->
queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) ->
#beta { msg_id = MsgId, seq_id = SeqId,