summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-08 14:46:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-08 14:46:45 +0100
commit144137b12fb4af9a30a56406e17fef2b7b1c17be (patch)
treeaecc0368ed271372436aec8937a588403305ee51 /src
parent845c25497ce2c31de762bb815b7218138c38f089 (diff)
downloadrabbitmq-server-git-144137b12fb4af9a30a56406e17fef2b7b1c17be.tar.gz
Tidying and refactoring of the variable queue, some documentation, and the removal of a lot of algorithmic bugs. No real new features, but code in much better state.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_variable_queue.erl435
2 files changed, 273 insertions, 174 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 27952af161..b21651a234 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -140,7 +140,7 @@
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
- {( [{'index', msg_id(), seq_id(), boolean(), boolean()}]
+ {( [{msg_id(), seq_id(), boolean(), boolean()}]
| 'not_found'), qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(segment_size/0 :: () -> non_neg_integer()).
@@ -152,7 +152,7 @@
%%----------------------------------------------------------------------------
init(Name) ->
- Dir = filename:join(rabbit_mnesia:dir(), Name),
+ Dir = filename:join(queues_dir(), Name),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
AckCounts = scatter_journal(Dir, find_ack_counts(Dir)),
{ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME),
@@ -240,9 +240,8 @@ read_segment_entries(InitSeqId, State =
{lists:foldl(fun (RelSeq, Acc) ->
{MsgId, IsDelivered, IsPersistent} =
dict:fetch(RelSeq, SDict),
- [ {index, MsgId,
- reconstruct_seq_id(SegNum, RelSeq),
- IsPersistent, IsDelivered, true} | Acc]
+ [ {MsgId, reconstruct_seq_id(SegNum, RelSeq),
+ IsPersistent, IsDelivered} | Acc]
end, [], RelSeqs),
State}.
@@ -257,6 +256,9 @@ segment_size() ->
%% Minor Helpers
%%----------------------------------------------------------------------------
+queues_dir() ->
+ filename:join(rabbit_mnesia:dir(), "queues").
+
rev_sort(List) ->
lists:sort(fun (A, B) -> B < A end, List).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f041f4783e..79a7f38bee 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -47,18 +47,64 @@
next_seq_id,
out_counter,
egress_rate,
- old_egress_rate,
avg_egress_rate,
egress_rate_timestamp,
prefetcher
}).
+-record(alpha,
+ { msg,
+ seq_id,
+ is_delivered,
+ msg_on_disk,
+ index_on_disk
+ }).
+
+-record(beta,
+ { msg_id,
+ seq_id,
+ is_persistent,
+ is_delivered,
+ index_on_disk
+ }).
+
+-record(gamma,
+ { seq_id,
+ count
+ }).
+
-include("rabbit.hrl").
+%% Basic premise is that msgs move from q1 -> q2 -> gamma -> q3 -> q4
+%% but they can only do so in the right form. q1 and q4 only hold
+%% alphas (msgs in ram), q2 and q3 only hold betas (msg on disk, index
+%% in ram), and gamma is just a count of the number of index entries
+%% on disk at that stage (msg on disk, index on disk).
+%%
+%% When a msg arrives, we decide which form it should be in. It is
+%% then added to the rightmost appropriate queue, maintaining
+%% order. Thus if the msg is to be an alpha, it will be added to q1,
+%% unless all of q1, q2, gamma and q3 are empty, in which case it will
+%% go to q4. If it is to be a beta, it will be added to q2 unless all
+%% of q2 and gamma are empty, in which case it will go to q3.
+%%
+%% The major invariant is that if the msg is to be a beta, q1 will be
+%% empty, and if it is to be a gamma then both q1 and q2 will be empty.
+%%
+%% When taking msgs out of the queue, if q4 is empty then we drain the
+%% prefetcher. If that doesn't help then we read directly from q3, or
+%% gamma, if q3 is empty. If q3 and gamma are empty then we have an
+%% invariant that q2 must be empty because q2 can only grow if gamma
+%% is non empty.
+%%
+%% A further invariant is that if the queue is non empty, either q4 or
+%% q3 contains at least one entry. I.e. we never allow gamma to
+%% contain all msgs in the queue.
+
init(QueueName) ->
{NextSeqId, IndexState} = rabbit_queue_index:init(QueueName),
#vqstate { q1 = queue:new(), q2 = queue:new(),
- gamma = {undefined, 0},
+ gamma = #gamma { seq_id = undefined, count = 0 },
q3 = queue:new(), q4 = queue:new(),
target_ram_msg_count = undefined,
ram_msg_count = 0,
@@ -67,60 +113,60 @@ init(QueueName) ->
next_seq_id = NextSeqId,
out_counter = 0,
egress_rate = 0,
- old_egress_rate = 0,
avg_egress_rate = 0,
egress_rate_timestamp = now(),
prefetcher = undefined
}.
-in(Msg, IsDelivered, State) ->
- in(test_keep_msg_in_ram(State), Msg, IsDelivered, State).
+in(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId }) ->
+ in(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
+ State #vqstate { next_seq_id = SeqId + 1 }).
-in(msg_and_index, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered, State = #vqstate { index_state = IndexState,
- next_seq_id = SeqId,
- ram_msg_count = RamMsgCount
- }) ->
+in(msg, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
+ ram_msg_count = RamMsgCount }) ->
MsgOnDisk = maybe_write_msg_to_disk(false, Msg),
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
- Entry =
- {msg_and_index, Msg, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk},
- State1 = State #vqstate { next_seq_id = SeqId + 1,
- ram_msg_count = RamMsgCount + 1,
+ Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+ State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
index_state = IndexState1 },
store_alpha_entry(Entry, State1);
-in(just_index, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered, State = #vqstate { index_state = IndexState,
- next_seq_id = SeqId, q1 = Q1 }) ->
+in(index, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
+ q1 = Q1 }) ->
true = maybe_write_msg_to_disk(true, Msg),
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
- Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk},
- State1 = State #vqstate { next_seq_id = SeqId + 1,
- index_state = IndexState1 },
+ Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ is_persistent = IsPersistent, index_on_disk = IndexOnDisk },
+ State1 = State #vqstate { index_state = IndexState1 },
true = queue:is_empty(Q1), %% ASSERTION
store_beta_entry(Entry, State1);
in(neither, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- IsDelivered, State = #vqstate { index_state = IndexState,
- next_seq_id = SeqId,
- q1 = Q1, q2 = Q2,
- gamma = {GammaSeqId, GammaCount} }) ->
+ SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
+ q1 = Q1, q2 = Q2, gamma = Gamma }) ->
true = maybe_write_msg_to_disk(true, Msg),
{true, IndexState1} =
maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION
- State #vqstate { next_seq_id = SeqId + 1,
- index_state = IndexState1,
- gamma = {GammaSeqId, GammaCount + 1} }.
+ %% gamma may be empty, seq_id > next_segment_boundary from q3
+ %% head, so we need to find where the segment boundary is before
+ %% or equal to seq_id
+ GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
+ rabbit_queue_index:segment_size(),
+ Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 },
+ State #vqstate { index_state = IndexState1,
+ gamma = combine_gammas(Gamma, Gamma1) }.
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
@@ -139,11 +185,14 @@ set_queue_ram_duration_target(
remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
egress_rate_timestamp = Timestamp,
out_counter = OutCount }) ->
+ %% We do an average over the last two values, but also hold the
+ %% current value separately so that the average always only
+ %% incorporates the last two values, and not the current value and
+ %% the last average. Averaging helps smooth out spikes.
Now = now(),
EgressRate = OutCount / timer:now_diff(Now, Timestamp),
AvgEgressRate = (EgressRate + OldEgressRate) / 2,
- State #vqstate { old_egress_rate = OldEgressRate,
- egress_rate = EgressRate,
+ State #vqstate { egress_rate = EgressRate,
avg_egress_rate = AvgEgressRate,
egress_rate_timestamp = Now,
out_counter = 0 }.
@@ -163,8 +212,9 @@ out(State =
end,
out(State #vqstate { q4 = Q4a, prefetcher = undefined });
{{value,
- {msg_and_index, Msg = #basic_message { guid = MsgId },
- SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} ->
+ #alpha { msg = Msg = #basic_message { guid = MsgId }, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk }}, Q4a} ->
IndexState1 =
case IndexOnDisk andalso not IsDelivered of
true ->
@@ -175,64 +225,85 @@ out(State =
AckTag = case {IndexOnDisk, MsgOnDisk} of
{true, true } -> {ack_index_and_store, MsgId, SeqId};
{false, true } -> {ack_store, MsgId};
- {false, false} -> not_on_disk
+ {false, false} -> ack_not_on_disk
end,
{{Msg, IsDelivered, AckTag},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
index_state = IndexState1 }}
end.
-out_from_q3(State = #vqstate { q2 = Q2, index_state = IndexState,
- gamma = {GammaSeqId, GammaCount}, q3 = Q3,
- q4 = Q4 }) ->
+out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState,
+ gamma = #gamma { seq_id = GammaSeqId,
+ count = GammaCount},
+ q3 = Q3, q4 = Q4 }) ->
case queue:out(Q3) of
{empty, _Q3} ->
- case GammaCount of
- 0 ->
- undefined = GammaSeqId, %% ASSERTION
- true = queue:is_empty(Q2), %% ASSERTION
- {empty, State};
- _ ->
- {List = [_|_], IndexState1} =
- rabbit_queue_index:read_segment_entries(GammaSeqId,
- IndexState),
- State1 = State #vqstate { index_state = IndexState1 },
- Q3a = queue:from_list(List),
- State2 =
- case GammaCount - length(List) of
- 0 ->
- State1 #vqstate { gamma = {undefined, 0},
- q2 = queue:new(),
- q3 = queue:join(Q3a, Q2) };
- N when N > 0 ->
- State1 #vqstate { gamma =
- {rabbit_queue_index:segment_size() +
- GammaSeqId, N},
- q3 = Q3a }
- end,
- out_from_q3(State2)
- end;
- {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}},
+ 0 = GammaCount, %% ASSERTION
+ true = queue:is_empty(Q2), %% ASSERTION
+ true = queue:is_empty(Q1), %% ASSERTION
+ {empty, State};
+ {{value,
+ #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ is_persistent = IsPersistent, index_on_disk = IndexOnDisk }},
Q3a} ->
{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }} =
rabbit_msg_store:read(MsgId),
- State1 = #vqstate { q1 = Q1, q4 = Q4a } =
- State #vqstate { q3 = Q3a,
- q4 = queue:in({msg_and_index, Msg, SeqId,
- IsDelivered, true, IndexOnDisk},
- Q4) },
- State2 = case queue:is_empty(Q3a) andalso 0 == GammaCount of
- true ->
- true = queue:is_empty(Q2), %% ASSERTION
- State1 #vqstate { q1 = queue:new(),
- q4 = queue:join(Q4a, Q1) };
- false ->
- State1
- end,
+ Q4a = queue:in(
+ #alpha { msg = Msg, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_on_disk = true,
+ index_on_disk = IndexOnDisk }, Q4),
+ %% TODO - if it's not persistent, remove it from disk now
+ State1 = State #vqstate { q3 = Q3a, q4 = Q4a },
+ State2 =
+ case {queue:is_empty(Q3a), 0 == GammaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before; gamma is
+ %% still empty. So q2 must be empty, and q1
+ %% can now be joined onto q4
+ true = queue:is_empty(Q2), %% ASSERTION
+ State1 #vqstate { q1 = queue:new(),
+ q4 = queue:join(Q4a, Q1) };
+ {true, false} ->
+ {List, IndexState1} =
+ rabbit_queue_index:read_segment_entries(GammaSeqId,
+ IndexState),
+ State3 = State1 #vqstate { index_state = IndexState1 },
+ %% length(List) may be < segment_size because
+ %% of acks. In fact, List may be []
+ Q3b = betas_from_segment_entries(List),
+ case GammaCount - length(List) of
+ 0 ->
+ %% gamma is now empty, but it wasn't
+ %% before, so can now join q2 onto q3
+ State3 #vqstate {
+ gamma = #gamma { seq_id = undefined,
+ count = 0 },
+ q2 = queue:new(), q3 = queue:join(Q3b, Q2) };
+ N when N > 0 ->
+ State3 #vqstate {
+ gamma = #gamma {
+ seq_id = GammaSeqId +
+ rabbit_queue_index:segment_size(),
+ count = N }, q3 = Q3b }
+ end;
+ {false, _} ->
+ %% q3 still isn't empty, we've not touched
+ %% gamma, so the invariants between q1, q2,
+ %% gamma and q3 are maintained
+ State1
+ end,
out(State2)
end.
+betas_from_segment_entries(List) ->
+ queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) ->
+ #beta { msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = true }
+ end, List)).
+
maybe_start_prefetcher(State) ->
%% TODO
State.
@@ -243,15 +314,10 @@ reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
State;
reduce_memory_use(State =
#vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = #vqstate { ram_msg_count = RamMsgCount } =
- maybe_push_q1_to_betas(State),
- State2 = case TargetRamMsgCount >= RamMsgCount of
- true -> State1;
- false -> maybe_push_q4_to_betas(State)
- end,
+ State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
case TargetRamMsgCount of
- 0 -> push_betas_to_gammas(State);
- _ -> State2
+ 0 -> push_betas_to_gammas(State1);
+ _ -> State1
end.
maybe_write_msg_to_disk(Bool, Msg = #basic_message {
@@ -274,17 +340,32 @@ maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered,
IndexState) ->
{false, IndexState}.
-test_keep_msg_in_ram(#vqstate { target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- q1 = Q1 }) ->
+test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
+ ram_msg_count = RamMsgCount,
+ q1 = Q1, q3 = Q3 }) ->
case TargetRamMsgCount of
- undefined -> msg_and_index;
- 0 -> neither;
+ undefined ->
+ msg;
+ 0 ->
+ case queue:out(Q3) of
+ {empty, _Q3} ->
+ %% if TargetRamMsgCount == 0, we know we have no
+ %% alphas. If q3 is empty then gamma must be empty
+ %% too, so create a beta, which should end up in
+ %% q3
+ index;
+ {{value, #beta { seq_id = OldSeqId }}, _Q3a} ->
+ %% don't look at the current gamma as it may be empty
+ case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of
+ true -> neither;
+ false -> index
+ end
+ end;
_ when TargetRamMsgCount > RamMsgCount ->
- msg_and_index;
+ msg;
_ -> case queue:is_empty(Q1) of
- true -> just_index;
- false -> msg_and_index %% can push out elders to disk
+ true -> index;
+ false -> msg %% can push out elders to disk
end
end.
@@ -293,9 +374,10 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
content = rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content)) }.
-store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2,
- gamma = {_GammaSeqId, GammaCount},
- q3 = Q3, q4 = Q4 }) ->
+store_alpha_entry(Entry = #alpha {}, State =
+ #vqstate { q1 = Q1, q2 = Q2,
+ gamma = #gamma { count = GammaCount },
+ q3 = Q3, q4 = Q4 }) ->
case queue:is_empty(Q1) andalso queue:is_empty(Q2) andalso
GammaCount == 0 andalso queue:is_empty(Q3) of
true ->
@@ -304,95 +386,104 @@ store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2,
maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) })
end.
-store_beta_entry(Entry, State =
- #vqstate { q2 = Q2, gamma = {_GammaSeqId, GammaCount},
+store_beta_entry(Entry = #beta {}, State =
+ #vqstate { q2 = Q2, gamma = #gamma { count = GammaCount },
q3 = Q3 }) ->
case queue:is_empty(Q2) andalso GammaCount == 0 of
true -> State #vqstate { q3 = queue:in(Entry, Q3) };
false -> State #vqstate { q2 = queue:in(Entry, Q2) }
end.
-maybe_push_q1_to_betas(State =
- #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount
- }) when TargetRamMsgCount >= RamMsgCount ->
- State;
-maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
- q1 = Q1 }) ->
- case queue:out(Q1) of
- {empty, _Q1} -> State;
- {{value, {msg_and_index, Msg = #basic_message {
- guid = MsgId, is_persistent = IsPersistent },
- SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q1a} ->
- true = case MsgOnDisk of
- true -> true;
- false -> maybe_write_msg_to_disk(true, Msg)
- end,
- maybe_push_q1_to_betas(
- store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered,
- IndexOnDisk},
- State #vqstate { ram_msg_count = RamMsgCount - 1,
- q1 = Q1a }))
- end.
+maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
+ maybe_push_alphas_to_betas(
+ fun queue:out/1,
+ fun (Beta, Q1a, State1) ->
+ %% these could legally go to q3 if gamma and q2 are empty
+ store_beta_entry(Beta, State1 #vqstate { q1 = Q1a })
+ end, Q1, State).
-maybe_push_q4_to_betas(State =
- #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount
- }) when TargetRamMsgCount >= RamMsgCount ->
+maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
+ maybe_push_alphas_to_betas(
+ fun queue:out_r/1,
+ fun (Beta, Q4a, State1 = #vqstate { q3 = Q3 }) ->
+ %% these must go to q3
+ State1 #vqstate { q3 = queue:in_r(Beta, Q3), q4 = Q4a }
+ end, Q4, State).
+
+maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
+ #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
+ when TargetRamMsgCount >= RamMsgCount ->
State;
-maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
- q4 = Q4, q3 = Q3 }) ->
- case queue:out_r(Q4) of
- {empty, _Q4} -> State;
- {{value, {msg_and_index, Msg = #basic_message {
- guid = MsgId, is_persistent = IsPersistent },
- SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} ->
+maybe_push_alphas_to_betas(Generator, Consumer, Q, State =
+ #vqstate { ram_msg_count = RamMsgCount }) ->
+ case Generator(Q) of
+ {empty, _Q} -> State;
+ {{value,
+ #alpha { msg = Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ seq_id = SeqId, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ Qa} ->
true = case MsgOnDisk of
true -> true;
false -> maybe_write_msg_to_disk(true, Msg)
end,
- Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered,
- IndexOnDisk}, Q3),
- maybe_push_q4_to_betas(
- State #vqstate { ram_msg_count = RamMsgCount - 1,
- q3 = Q3a, q4 = Q4a })
+ Beta = #beta { msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk },
+ State1 = State #vqstate { ram_msg_count = RamMsgCount - 1 },
+ maybe_push_alphas_to_betas(Generator, Consumer, Qa,
+ Consumer(Beta, Qa, State1))
end.
push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
index_state = IndexState }) ->
%% HighSeqId is high in the sense that it must be higher than the
- %% seqid in Gamma, but it's also the lowest of the betas that we
+ %% seq_id in Gamma, but it's also the lowest of the betas that we
%% transfer from q2 to gamma.
{HighSeqId, Len1, Q2a, IndexState1} =
push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
- Gamma1 = {Gamma1SeqId, _} = combine_gammas(Gamma, {HighSeqId, Len1}),
- State1 = State #vqstate { q2 = Q2a,
- gamma = Gamma1,
+ Gamma1 = #gamma { seq_id = Gamma1SeqId } =
+ combine_gammas(Gamma, #gamma { seq_id = HighSeqId, count = Len1 }),
+ State1 = State #vqstate { q2 = Q2a, gamma = Gamma1,
index_state = IndexState1 },
case queue:out(Q3) of
{empty, _Q3} -> State1;
- {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
- _IndexOnDisk}}, _Q3a} ->
+ {{value, #beta { seq_id = SeqId }}, _Q3a} ->
Limit = rabbit_queue_index:next_segment_boundary(SeqId),
- case Limit == Gamma1SeqId of
- true -> %% already only holding the minimum, nothing to do
+ case Gamma1SeqId of
+ Limit -> %% already only holding the minimum, nothing to do
State1;
- false ->
- %% ASSERTION
- true = Gamma1SeqId == undefined orelse
- Gamma1SeqId == Limit + rabbit_queue_index:segment_size(),
+ _ when Gamma1SeqId == undefined orelse Gamma1SeqId > Limit ->
+ %% ASSERTION (sadly large!)
+ %% This says that if Gamma1SeqId != undefined then
+ %% the gap from Limit to Gamma1SeqId is an integer
+ %% multiple of segment_size
+ SegmentCount =
+ case Gamma1SeqId of
+ undefined -> undefined;
+ _ -> (Gamma1SeqId - Limit) /
+ rabbit_queue_index:segment_size()
+ end,
+ true = (is_integer(SegmentCount) andalso SegmentCount > 0)
+ orelse Gamma1SeqId == undefined,
%% LowSeqId is low in the sense that it must be
- %% lower than the seqid in Gamma1, in fact either
- %% gamma1 has undefined as its seqid or its seqid
- %% is LowSeqId + 1. But because we use
- %% queue:out_r, LowSeqId is actually also the
- %% highest seqid of the betas we transfer from q3
- %% to gammas.
+ %% lower than the seq_id in gamma1, in fact either
+ %% gamma1 has undefined as its seq_id or there
+ %% does not exist a seq_id X s.t. X > LowSeqId and
+ %% X < gamma1's seq_id (would be +1 if it wasn't
+ %% for the possibility of gaps in the seq_ids).
+ %% But because we use queue:out_r, LowSeqId is
+ %% actually also the highest seqid of the betas we
+ %% transfer from q3 to gammas.
{LowSeqId, Len2, Q3b, IndexState2} =
- push_betas_to_gammas(fun queue:out_r/1, Limit - 1, Q3,
+ push_betas_to_gammas(fun queue:out_r/1, Limit, Q3,
IndexState1),
- Gamma1SeqId = LowSeqId + 1, %% ASSERTION
- Gamma2 = combine_gammas({Limit, Len2}, Gamma1),
+ true = Gamma1SeqId > LowSeqId, %% ASSERTION
+ Gamma2 = combine_gammas(
+ #gamma { seq_id = Limit, count = Len2}, Gamma1),
State1 #vqstate { q3 = Q3b, gamma = Gamma2,
index_state = IndexState2 }
end
@@ -401,8 +492,7 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
push_betas_to_gammas(Generator, Limit, Q, IndexState) ->
case Generator(Q) of
{empty, Qa} -> {undefined, 0, Qa, IndexState};
- {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
- _IndexOnDisk}}, _Qa} ->
+ {{value, #beta { seq_id = SeqId }}, _Qa} ->
{Count, Qb, IndexState1} =
push_betas_to_gammas(Generator, Limit, Q, 0, IndexState),
{SeqId, Count, Qb, IndexState1}
@@ -411,11 +501,13 @@ push_betas_to_gammas(Generator, Limit, Q, IndexState) ->
push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
case Generator(Q) of
{empty, Qa} -> {Count, Qa, IndexState};
- {{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered,
- _IndexOnDisk}}, _Qa} ->
+ {{value, #beta { seq_id = SeqId }}, _Qa}
+ when Limit /= undefined andalso SeqId < Limit ->
{Count, Q, IndexState};
- {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered,
- IndexOnDisk}}, Qa} ->
+ {{value, #beta { msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk}}, Qa} ->
IndexState1 =
case IndexOnDisk of
true -> IndexState;
@@ -428,10 +520,15 @@ push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
end,
push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1)
end.
-
-combine_gammas({_, 0}, {_, 0}) -> {undefined, 0};
-combine_gammas({_, 0}, B ) -> B;
-combine_gammas(A , {_, 0}) -> A;
-combine_gammas({SeqIdLow, CountLow}, {SeqIdHigh, CountHigh}) ->
- SeqIdHigh = SeqIdLow + CountLow, %% ASSERTION
- {SeqIdLow, CountLow + CountHigh}.
+
+%% the first arg is the older gamma
+combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) -> {undefined, 0};
+combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B;
+combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A;
+combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow },
+ #gamma { seq_id = SeqIdHigh, count = CountHigh}) ->
+ true = SeqIdLow + CountLow =< SeqIdHigh, %% ASSERTION
+ %% note the above assertion does not say ==. This is because acks
+ %% may mean that the counts are not straight multiples of
+ %% segment_size.
+ #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}.