summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-13 16:56:18 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-13 16:56:18 +0000
commitb7c1a4b15f129bf920adfc59881ca9498034499f (patch)
treef7b1d40df41278680402be1da21d7620a85b93e1 /src
parent1e4ecb8f788103cb2d52cbf551e16a3fe0d829f2 (diff)
downloadrabbitmq-server-git-b7c1a4b15f129bf920adfc59881ca9498034499f.tar.gz
Refactoring of vq - pulled out the inlined block-prefix queue code and generally tidied profusely. Also efficiency fix in remove_queue_entries by avoiding an intermediate list (which could potentially be massive).
Diffstat (limited to 'src')
-rw-r--r--src/bpqueue.erl185
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_variable_queue.erl330
3 files changed, 324 insertions, 200 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
new file mode 100644
index 0000000000..5e7471f71d
--- /dev/null
+++ b/src/bpqueue.erl
@@ -0,0 +1,185 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(bpqueue).
+
+%% Block-prefixed queue. This implements a queue of queues, but
+%% supporting the normal queue interface. Each block has a prefix and
+%% it is guaranteed that no two consecutive blocks have the same
+%% prefix. len/1 returns the flattened length of the queue and is O(1)
+
+-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2,
+ fold/3, from_list/1, to_list/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(bpqueue() :: {non_neg_integer(), queue()}).
+-type(prefix() :: any()).
+-type(value() :: any()).
+-type(result() :: {'empty', bpqueue()} |
+ {{'value', prefix(), value()}, bpqueue()}).
+
+-spec(new/0 :: () -> bpqueue()).
+-spec(is_empty/1 :: (bpqueue()) -> boolean()).
+-spec(len/1 :: (bpqueue()) -> non_neg_integer()).
+-spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
+-spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()).
+-spec(out/1 :: (bpqueue()) -> result()).
+-spec(out_r/1 :: (bpqueue()) -> result()).
+-spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()).
+-spec(fold/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
+-spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
+-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() ->
+ {0, queue:new()}.
+
+is_empty({0, _Q}) ->
+ true;
+is_empty(_BPQ) ->
+ false.
+
+len({N, _Q}) ->
+ N.
+
+in(Prefix, Value, {0, Q}) ->
+ {1, queue:in({Prefix, queue:in(Value, Q)}, Q)};
+in(Prefix, Value, {N, Q}) ->
+ {N+1,
+ case queue:out_r(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in({Prefix, queue:in(Value, InnerQ)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in({Prefix, queue:in(Value, queue:new())}, Q)
+ end}.
+
+in_r(Prefix, Value, {0, Q}) ->
+ {1, queue:in({Prefix, queue:in(Value, Q)}, Q)};
+in_r(Prefix, Value, {N, Q}) ->
+ {N+1,
+ case queue:out(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in_r({Prefix, queue:in_r(Value, InnerQ)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in_r({Prefix, queue:in(Value, queue:new())}, Q)
+ end}.
+
+out({0, _Q} = BPQ) ->
+ {empty, BPQ};
+out({N, Q}) ->
+ {{value, {Prefix, InnerQ}}, Q1} = queue:out(Q),
+ {{value, Value}, InnerQ1} = queue:out(InnerQ),
+ Q2 = case queue:is_empty(InnerQ1) of
+ true -> Q1;
+ false -> queue:in_r({Prefix, InnerQ1}, Q1)
+ end,
+ {{value, Prefix, Value}, {N-1, Q2}}.
+
+out_r({0, _Q} = BPQ) ->
+ {empty, BPQ};
+out_r({N, Q}) ->
+ {{value, {Prefix, InnerQ}}, Q1} = queue:out_r(Q),
+ {{value, Value}, InnerQ1} = queue:out_r(InnerQ),
+ Q2 = case queue:is_empty(InnerQ1) of
+ true -> Q1;
+ false -> queue:in({Prefix, InnerQ1}, Q1)
+ end,
+ {{value, Prefix, Value}, {N-1, Q2}}.
+
+join({0, _Q}, BPQ) ->
+ BPQ;
+join(BPQ, {0, _Q}) ->
+ BPQ;
+join({NHead, QHead}, {NTail, QTail}) ->
+ {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead),
+ {NHead + NTail,
+ case queue:out(QTail) of
+ {{value, {Prefix, InnerQTail}}, QTail1} ->
+ queue:join(
+ queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1),
+ QTail1);
+ {{value, {_Prefix, _InnerQTail}}, _QTail1} ->
+ queue:join(QHead, QTail)
+ end}.
+
+fold(_Fun, Init, {0, _Q}) ->
+ Init;
+fold(Fun, Init, {_N, Q}) ->
+ fold1(Fun, Init, Q).
+
+fold1(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} ->
+ Init;
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ fold1(Fun, fold1(Fun, Prefix, Init, InnerQ), Q1)
+ end.
+
+fold1(Fun, Prefix, Init, InnerQ) ->
+ case queue:out(InnerQ) of
+ {empty, _Q} ->
+ Init;
+ {{value, Value}, InnerQ1} ->
+ fold1(Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1)
+ end.
+
+from_list(List) ->
+ {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} =
+ lists:foldl(
+ fun ({_Prefix, []}, Acc) ->
+ Acc;
+ ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
+ {Prefix, queue:join(InnerQ, queue:from_list(InnerList)),
+ ListOfPQs, LenAcc + length(InnerList)};
+ ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) ->
+ {Prefix1, queue:from_list(InnerList),
+ [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)}
+ end, {undefined, queue:new(), [], 0}, List),
+ ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1],
+ [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2),
+ {Len, queue:from_list(case queue:is_empty(InnerQ1) of
+ true -> Rest;
+ false -> All
+ end)}.
+
+to_list({0, _Q}) ->
+ [];
+to_list({_N, Q}) ->
+ lists:map(fun to_list1/1, queue:to_list(Q)).
+
+to_list1({Prefix, InnerQ}) ->
+ {Prefix, queue:to_list(InnerQ)}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 23666a5f3d..2b5fe4c746 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,7 +55,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, queue_fold/3]).
-import(mnesia).
-import(lists).
@@ -126,6 +126,7 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-endif.
@@ -489,3 +490,9 @@ ceil(N) ->
true -> T;
false -> 1 + T
end.
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6c7fad1212..8205f79f66 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -114,15 +114,16 @@
-ifdef(use_specs).
+-type(bpqueue() :: any()).
-type(msg_id() :: binary()).
-type(seq_id() :: non_neg_integer()).
-type(ack() :: {'ack_index_and_store', msg_id(), seq_id()}
| 'ack_not_on_disk').
-type(vqstate() :: #vqstate {
q1 :: queue(),
- q2 :: {non_neg_integer(), queue()},
+ q2 :: bpqueue(),
delta :: delta(),
- q3 :: {non_neg_integer(), queue()},
+ q3 :: bpqueue(),
q4 :: queue(),
duration_target :: non_neg_integer(),
target_ram_msg_count :: non_neg_integer(),
@@ -196,9 +197,9 @@ init(QueueName) ->
end,
Now = now(),
State =
- #vqstate { q1 = queue:new(), q2 = {0, queue:new()},
+ #vqstate { q1 = queue:new(), q2 = bpqueue:new(),
delta = Delta,
- q3 = {0, queue:new()}, q4 = queue:new(),
+ q3 = bpqueue:new(), q4 = queue:new(),
duration_target = undefined,
target_ram_msg_count = undefined,
ram_msg_count = 0,
@@ -371,7 +372,8 @@ is_empty(State) ->
0 == len(State).
purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
- {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState),
+ {Q4Count, IndexState1} =
+ remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState),
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
@@ -495,8 +497,7 @@ flush_journal(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =
rabbit_queue_index:flush_journal(IndexState) }.
-status(#vqstate { q1 = Q1, q2 = {Q2Len, _Q2},
- delta = Delta, q3 = {Q3Len, _Q3}, q4 = Q4,
+status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
@@ -505,9 +506,9 @@ status(#vqstate { q1 = Q1, q2 = {Q2Len, _Q2},
avg_ingress_rate = AvgIngressRate,
next_seq_id = NextSeqId }) ->
[ {q1, queue:len(Q1)},
- {q2, Q2Len},
+ {q2, bpqueue:len(Q2)},
{delta, Delta},
- {q3, Q3Len},
+ {q3, bpqueue:len(Q3)},
{q4, queue:len(Q4)},
{len, Len},
{outstanding_txns, length(From)},
@@ -532,44 +533,17 @@ persistent_msg_ids(Pubs) ->
Obj #basic_message.is_persistent].
betas_from_segment_entries(List, SeqIdLimit) ->
- List1 = [#msg_status { msg = undefined,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true
- }
- || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
- SeqId < SeqIdLimit ],
- {length(List1), queue:from_list([{true, queue:from_list(List1)}])}.
-
-join_betas({HeadLen, Head}, {TailLen, Tail}) ->
- {HeadLen + TailLen, join_betas1(Head, Tail)}.
-
-join_betas1(Head, Tail) ->
- case {queue:out_r(Head), queue:out(Tail)} of
- {{empty, _Head}, _} ->
- Tail;
- {_, {empty, _Tail}} ->
- Head;
- {{{value, {IndexOnDisk, InnerQHead}}, Head1},
- {{value, {IndexOnDisk, InnerQTail}}, Tail1}} ->
- queue:join(
- queue:in({IndexOnDisk,
- queue:join(InnerQHead, InnerQTail)}, Head1),
- Tail1);
- {_, _} -> queue:join(Head, Tail)
- end.
-
-grab_beta(Gen, Q) ->
- case Gen(Q) of
- {empty, _Q} ->
- empty;
- {{value, {_IndexOnDisk, InnerQ}}, _Q} ->
- {{value, MsgStatus}, _InnerQ} = Gen(InnerQ),
- MsgStatus
- end.
+ bpqueue:from_list([{true,
+ [#msg_status { msg = undefined,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ }
+ || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
+ SeqId < SeqIdLimit ]}]).
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -596,6 +570,11 @@ combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow},
true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION
#delta { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }.
+beta_fold_no_index_on_disk(Fun, Init, Q) ->
+ bpqueue:fold(fun (_Prefix, Value, Acc) ->
+ Fun(Value, Acc)
+ end, Init, Q).
+
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -609,50 +588,34 @@ delete1(NextSeqId, Count, DeltaSeqId, IndexState) ->
{[], IndexState1} ->
delete1(NextSeqId, Count, Delta1SeqId, IndexState1);
{List, IndexState1} ->
- {QCount, Q} = betas_from_segment_entries(List, Delta1SeqId),
- {QCount, IndexState2} = remove_queue_entries(Q, IndexState1),
+ Q = betas_from_segment_entries(List, Delta1SeqId),
+ {QCount, IndexState2} =
+ remove_queue_entries(fun beta_fold_no_index_on_disk/3,
+ Q, IndexState1),
delete1(NextSeqId, Count + QCount, Delta1SeqId, IndexState2)
end.
-purge1(Count, State = #vqstate { q3 = {Q3Len, Q3}, index_state = IndexState }) ->
- case 0 == Q3Len of
+purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
+ case bpqueue:is_empty(Q3) of
true ->
{Q1Count, IndexState1} =
- remove_queue_entries(State #vqstate.q1, IndexState),
+ remove_queue_entries(fun rabbit_misc:queue_fold/3,
+ State #vqstate.q1, IndexState),
{Count + Q1Count, State #vqstate { q1 = queue:new(),
index_state = IndexState1 }};
false ->
- {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
+ {Q3Count, IndexState1} =
+ remove_queue_entries(fun beta_fold_no_index_on_disk/3,
+ Q3, IndexState),
purge1(Count + Q3Count,
maybe_deltas_to_betas(
State #vqstate { index_state = IndexState1,
- q3 = {0, queue:new()} }))
+ q3 = bpqueue:new() }))
end.
-remove_queue_entries(Q, IndexState) ->
+remove_queue_entries(Fold, Q, IndexState) ->
{Count, MsgIds, SeqIds, IndexState1} =
- lists:foldl(
- fun (#msg_status { msg_id = MsgId, seq_id = SeqId,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
- MsgIdsAcc1 = case MsgOnDisk of
- true -> [MsgId | MsgIdsAcc];
- false -> MsgIdsAcc
- end,
- SeqIdsAcc1 = case IndexOnDisk of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
- true -> rabbit_queue_index:write_delivered(
- SeqId, IndexStateN);
- false -> IndexStateN
- end,
- {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}
- %% we need to write the delivered records in order otherwise
- %% we upset the qi. So don't reverse.
- end, {0, [], [], IndexState}, queue:to_list(Q)),
+ Fold(fun remove_queue_entries1/2, {0, [], [], IndexState}, Q),
ok = case MsgIds of
[] -> ok;
_ -> rabbit_msg_store:remove(MsgIds)
@@ -664,28 +627,40 @@ remove_queue_entries(Q, IndexState) ->
end,
{Count, IndexState2}.
+remove_queue_entries1(
+ #msg_status { msg_id = MsgId, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ MsgIdsAcc1 = case MsgOnDisk of
+ true -> [MsgId | MsgIdsAcc];
+ false -> MsgIdsAcc
+ end,
+ SeqIdsAcc1 = case IndexOnDisk of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
+ IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
+ true -> rabbit_queue_index:write_delivered(
+ SeqId, IndexStateN);
+ false -> IndexStateN
+ end,
+ {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}.
+
fetch_from_q3_or_delta(State = #vqstate {
- q1 = Q1, q2 = {Q2Len, _Q2}, delta = #delta { count = DeltaCount },
- q3 = {Q3Len, Q3}, q4 = Q4, ram_msg_count = RamMsgCount,
+ q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
+ q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
msg_store_read_state = MSCState }) ->
- case queue:out(Q3) of
+ case bpqueue:out(Q3) of
{empty, _Q3} ->
0 = DeltaCount, %% ASSERTION
- 0 = Q2Len, %% ASSERTION
- 0 = Q3Len, %% ASSERTION
+ true = bpqueue:is_empty(Q2), %% ASSERTION
true = queue:is_empty(Q1), %% ASSERTION
{empty, State};
- {{value, {IndexOnDisk, InnerQ}}, Q3a} ->
- {{value, MsgStatus = #msg_status {
- msg = undefined, msg_id = MsgId,
- is_persistent = IsPersistent
- }}, InnerQ1} = queue:out(InnerQ),
- Q3LenB = Q3Len - 1,
- Q3b = {Q3LenB, case queue:is_empty(InnerQ1) of
- true -> Q3a;
- false -> queue:in_r({IndexOnDisk, InnerQ1}, Q3a)
- end},
+ {{value, IndexOnDisk, MsgStatus = #msg_status {
+ msg = undefined, msg_id = MsgId,
+ is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }}, MSCState1} =
rabbit_msg_store:read(MsgId, MSCState),
@@ -695,17 +670,17 @@ fetch_from_q3_or_delta(State = #vqstate {
false -> RamIndexCount - 1
end,
true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3b, q4 = Q4a,
+ State1 = State #vqstate { q3 = Q3a, q4 = Q4a,
ram_msg_count = RamMsgCount + 1,
ram_index_count = RamIndexCount1,
msg_store_read_state = MSCState1 },
State2 =
- case {0 == Q3LenB, 0 == DeltaCount} of
+ case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and q1
%% can now be joined onto q4
- 0 = Q2Len, %% ASSERTION
+ true = bpqueue:is_empty(Q2), %% ASSERTION
State1 #vqstate { q1 = queue:new(),
q4 = queue:join(Q4a, Q1) };
{true, false} ->
@@ -737,26 +712,26 @@ reduce_memory_use(State =
test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
- q1 = Q1, q3 = {_Q3Len, Q3} }) ->
+ q1 = Q1, q3 = Q3 }) ->
case TargetRamMsgCount of
undefined ->
msg;
0 ->
- case queue:out(Q3) of
+ case bpqueue:out(Q3) of
{empty, _Q3} ->
%% if TargetRamMsgCount == 0, we know we have no
%% alphas. If q3 is empty then delta must be empty
%% too, so create a beta, which should end up in
%% q3
index;
- {{value, {_IndexOnDisk, InnerQ}}, _Q3a} ->
- {{value, #msg_status { seq_id = OldSeqId }}, _InnerQ} =
- queue:out(InnerQ),
+ {{value, _IndexOnDisk, #msg_status { seq_id = OldSeqId }},
+ _Q3a} ->
%% Don't look at the current delta as it may be
%% empty. If the SeqId is still within the current
%% segment, it'll be a beta, else it'll go into
%% delta
- case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of
+ case SeqId >= rabbit_queue_index:next_segment_boundary(
+ OldSeqId) of
true -> neither;
false -> index
end
@@ -817,13 +792,13 @@ publish(index, MsgStatus, State =
store_beta_entry(MsgStatus2, State1);
publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
- #vqstate { index_state = IndexState, q1 = Q1, q2 = {Q2Len, _Q2},
+ #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
delta = Delta }) ->
MsgStatus1 = #msg_status { msg_on_disk = true } =
maybe_write_msg_to_disk(true, MsgStatus),
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus1, IndexState),
- true = queue:is_empty(Q1) andalso 0 == Q2Len, %% ASSERTION
+ true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
%% delta may be empty, seq_id > next_segment_boundary from q3
%% head, so we need to find where the segment boundary is before
%% or equal to seq_id
@@ -835,39 +810,28 @@ publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
delta = combine_deltas(Delta, Delta1) }.
store_alpha_entry(MsgStatus, State =
- #vqstate { q1 = Q1, q2 = {Q2Len, _Q2},
+ #vqstate { q1 = Q1, q2 = Q2,
delta = #delta { count = DeltaCount },
- q3 = {Q3Len, _Q3}, q4 = Q4 }) ->
- case 0 == Q2Len andalso 0 == DeltaCount andalso 0 == Q3Len of
+ q3 = Q3, q4 = Q4 }) ->
+ case bpqueue:is_empty(Q2) andalso 0 == DeltaCount andalso
+ bpqueue:is_empty(Q3) of
true -> true = queue:is_empty(Q1), %% ASSERTION
State #vqstate { q4 = queue:in(MsgStatus, Q4) };
false -> maybe_push_q1_to_betas(
State #vqstate { q1 = queue:in(MsgStatus, Q1) })
end.
-store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true },
- State = #vqstate { q2 = {Q2Len, Q2},
+store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate { q2 = Q2,
delta = #delta { count = DeltaCount },
- q3 = {Q3Len, Q3} }) ->
+ q3 = Q3 }) ->
MsgStatus1 = MsgStatus #msg_status { msg = undefined },
case DeltaCount == 0 of
- true -> State #vqstate { q3 = {Q3Len + 1,
- store_beta_entry1(
- fun queue:out_r/1, fun queue:in/2,
- MsgStatus1, Q3)} };
- false -> State #vqstate { q2 = {Q2Len + 1,
- store_beta_entry1(
- fun queue:out_r/1, fun queue:in/2,
- MsgStatus1, Q2)} }
- end.
-
-store_beta_entry1(Gen, Cons, MsgStatus =
- #msg_status { index_on_disk = IndexOnDisk }, Q) ->
- case Gen(Q) of
- {{value, {IndexOnDisk, InnerQ}}, QTail} ->
- Cons({IndexOnDisk, Cons(MsgStatus, InnerQ)}, QTail);
- {_EmptyOrNotIndexOnDisk, _QTail} ->
- Cons({IndexOnDisk, Cons(MsgStatus, queue:new())}, Q)
+ true ->
+ State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) };
+ false ->
+ State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
end.
maybe_write_msg_to_disk(_Force, MsgStatus =
@@ -909,13 +873,12 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) ->
State;
maybe_deltas_to_betas(
- State = #vqstate { index_state = IndexState,
- q2 = Q2All, q3 = {Q3Len, _Q3} = Q3All,
+ State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
target_ram_msg_count = TargetRamMsgCount,
delta = #delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
end_seq_id = DeltaSeqIdEnd }}) ->
- case (0 < Q3Len) andalso (0 == TargetRamMsgCount) of
+ case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of
true ->
State;
false ->
@@ -927,19 +890,18 @@ maybe_deltas_to_betas(
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3bAll = {Q3bLen, _Q3b} =
- betas_from_segment_entries(List, DeltaSeqIdEnd),
- Q3a = join_betas(Q3All, Q3bAll),
- case DeltaCount - Q3bLen of
+ Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd),
+ Q3b = bpqueue:join(Q3, Q3a),
+ case DeltaCount - bpqueue:len(Q3a) of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
State1 #vqstate { delta = ?BLANK_DELTA,
- q2 = {0, queue:new()},
- q3 = join_betas(Q3a, Q2All) };
+ q2 = bpqueue:new(),
+ q3 = bpqueue:join(Q3b, Q2) };
N when N > 0 ->
State1 #vqstate {
- q3 = Q3a,
+ q3 = Q3b,
delta = #delta { start_seq_id = Delta1SeqId,
count = N,
end_seq_id = DeltaSeqIdEnd } }
@@ -957,13 +919,12 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
fun queue:out_r/1,
- fun (MsgStatus, Q4a, State1 = #vqstate { q3 = {Q3Len, Q3} }) ->
+ fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
+ Q4a, State1 = #vqstate { q3 = Q3 }) ->
MsgStatus1 = MsgStatus #msg_status { msg = undefined },
%% these must go to q3
- State1 #vqstate { q3 = {Q3Len + 1,
- store_beta_entry1(
- fun queue:out/1, fun queue:in_r/2,
- MsgStatus1, Q3)}, q4 = Q4a }
+ State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus1, Q3),
+ q4 = Q4a }
end, Q4, State).
maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
@@ -1000,43 +961,36 @@ maybe_push_alphas_to_betas(
Consumer(MsgStatus2, Qa, State1))
end.
-push_betas_to_deltas(State = #vqstate { q2 = {Q2Len, Q2}, delta = Delta,
- q3 = {Q3Len, Q3},
+push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3,
ram_index_count = RamIndexCount,
index_state = IndexState }) ->
%% HighSeqId is high in the sense that it must be higher than the
%% seq_id in Delta, but it's also the lowest of the betas that we
%% transfer from q2 to delta.
- {HighSeqId, Q2Len, Q2a, RamIndexCount1, IndexState1} =
+ {HighSeqId, Len1, Q2a, RamIndexCount1, IndexState1} =
push_betas_to_deltas(
- fun queue:out/1,
- fun (IndexOnDisk, InnerQ, Q) ->
- join_betas1(queue:from_list([{IndexOnDisk, InnerQ}]), Q)
- end, undefined, Q2, RamIndexCount, IndexState),
- true = queue:is_empty(Q2a), %% ASSERTION
- EndSeqId = case queue:out_r(Q2) of
- {empty, _Q2} ->
- undefined;
- {{value, {_IndexOnDisk, InnerQ}}, _Q2} ->
- {{value, #msg_status { seq_id = EndSeqId1 }}, _InnerQ} =
- queue:out_r(InnerQ),
- EndSeqId1 + 1
- end,
+ fun bpqueue:out/1, undefined, Q2, RamIndexCount, IndexState),
+ true = bpqueue:is_empty(Q2a), %% ASSERTION
+ EndSeqId =
+ case bpqueue:out_r(Q2) of
+ {empty, _Q2} ->
+ undefined;
+ {{value, _IndexOnDisk, #msg_status { seq_id = EndSeqId1 }}, _Q2} ->
+ EndSeqId1 + 1
+ end,
Delta1 = #delta { start_seq_id = Delta1SeqId } =
combine_deltas(Delta, #delta { start_seq_id = HighSeqId,
- count = Q2Len,
+ count = Len1,
end_seq_id = EndSeqId }),
- State1 = State #vqstate { q2 = {0, Q2a}, delta = Delta1,
+ State1 = State #vqstate { q2 = bpqueue:new(), delta = Delta1,
index_state = IndexState1,
ram_index_count = RamIndexCount1 },
- case queue:out(Q3) of
+ case bpqueue:out(Q3) of
{empty, _Q3} ->
State1;
- {{value, {_IndexOnDisk1, InnerQ1}}, _Q3} ->
- {{value, #msg_status { seq_id = SeqId }}, _InnerQ1} =
- queue:out(InnerQ1),
- #msg_status { seq_id = SeqIdMax } =
- grab_beta(fun queue:out_r/1, Q3),
+ {{value, _IndexOnDisk1, #msg_status { seq_id = SeqId }}, _Q3} ->
+ {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Q3a} =
+ bpqueue:out_r(Q3),
Limit = rabbit_queue_index:next_segment_boundary(SeqId),
%% ASSERTION
true = Delta1SeqId == undefined orelse Delta1SeqId > SeqIdMax,
@@ -1062,58 +1016,37 @@ push_betas_to_deltas(State = #vqstate { q2 = {Q2Len, Q2}, delta = Delta,
%% But because we use queue:out_r, SeqIdMax is
%% actually also the highest seq_id of the betas we
%% transfer from q3 to deltas.
- {SeqIdMax, Len2, Q3b, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(
- fun queue:out_r/1,
- fun (IndexOnDisk, InnerQ, Q) ->
- join_betas1(Q, queue:from_list(
- [{IndexOnDisk, InnerQ}]))
- end, Limit, Q3, RamIndexCount1, IndexState1),
+ {SeqIdMax, Len2, Q3a, RamIndexCount2, IndexState2} =
+ push_betas_to_deltas(fun bpqueue:out_r/1, Limit, Q3,
+ RamIndexCount1, IndexState1),
Delta2 = combine_deltas(#delta { start_seq_id = Limit,
count = Len2,
end_seq_id = SeqIdMax+1 },
Delta1),
- State1 #vqstate { q3 = {Q3Len - Len2, Q3b}, delta = Delta2,
+ State1 #vqstate { q3 = Q3a, delta = Delta2,
index_state = IndexState2,
ram_index_count = RamIndexCount2 }
end
end.
-push_betas_to_deltas(
- Generator, Consumer, Limit, Q, RamIndexCount, IndexState) ->
+push_betas_to_deltas(Generator, Limit, Q, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, Qa} -> {undefined, 0, Qa, RamIndexCount, IndexState};
- {{value, {IndexOnDisk, InnerQ}}, Qa} ->
- {{value, #msg_status { seq_id = SeqId }}, _Qb} = Generator(InnerQ),
+ {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} ->
{Count, Qb, RamIndexCount1, IndexState1} =
push_betas_to_deltas(
- Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, 0,
- RamIndexCount, IndexState),
+ Generator, Limit, Q, 0, RamIndexCount, IndexState),
{SeqId, Count, Qb, RamIndexCount1, IndexState1}
end.
-push_betas_to_deltas(
- Generator, Consumer, Limit, Q, Count, RamIndexCount, IndexState) ->
+push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, Qa} ->
{Count, Qa, RamIndexCount, IndexState};
- {{value, {IndexOnDisk, InnerQ}}, Qa} ->
- push_betas_to_deltas(
- Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, Count,
- RamIndexCount, IndexState)
- end.
-
-push_betas_to_deltas(Generator, Consumer, Limit, IndexOnDisk, InnerQ, Q,
- Count, RamIndexCount, IndexState) ->
- case Generator(InnerQ) of
- {empty, _InnerQ} ->
- push_betas_to_deltas(Generator, Consumer, Limit, Q, Count,
- RamIndexCount, IndexState);
- {{value, #msg_status { seq_id = SeqId }}, _InnerQ}
+ {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa}
when Limit /= undefined andalso SeqId < Limit ->
- {Count, Consumer(IndexOnDisk, InnerQ, Q), RamIndexCount,
- IndexState};
- {{value, MsgStatus}, InnerQa} ->
+ {Count, Q, RamIndexCount, IndexState};
+ {{value, IndexOnDisk, MsgStatus}, Qa} ->
{RamIndexCount1, IndexState1} =
case IndexOnDisk of
true -> {RamIndexCount, IndexState};
@@ -1124,6 +1057,5 @@ push_betas_to_deltas(Generator, Consumer, Limit, IndexOnDisk, InnerQ, Q,
{RamIndexCount - 1, IndexState2}
end,
push_betas_to_deltas(
- Generator, Consumer, Limit, IndexOnDisk, InnerQa, Q, Count + 1,
- RamIndexCount1, IndexState1)
+ Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.