summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-09-09 11:57:42 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-09-09 11:57:42 +0100
commitf7620ca0993a5fa471b1884fd797ab191c667eb6 (patch)
tree01d560fba45d06771cd2e330533edf5af5ab34df /src
parent23e82e5eb9781414b6f64885ec1e60904d386b06 (diff)
downloadrabbitmq-server-git-f7620ca0993a5fa471b1884fd797ab191c667eb6.tar.gz
Turn q4 back into a queue for performance reasons
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl87
1 files changed, 39 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 00ffef2044..56280e1523 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -308,7 +308,7 @@
q2 :: bpqueue:bpqueue(),
delta :: delta(),
q3 :: gb_tree(),
- q4 :: gb_tree(),
+ q4 :: queue(),
next_seq_id :: seq_id(),
pending_ack :: dict(),
ram_ack_index :: gb_tree(),
@@ -469,13 +469,13 @@ purge(State = #vqstate { q4 = Q4,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
- fun q4_fold/3, Q4,
+ fun rabbit_misc:queue_fold/3, Q4,
orddict:new(), IndexState, MSCState),
{LensByStore1, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = gb_trees:empty(),
+ State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
fun rabbit_misc:queue_fold/3, Q1,
@@ -685,7 +685,7 @@ status(#vqstate {
{q2 , bpqueue:len(Q2)},
{delta , Delta},
{q3 , q3tree:len(Q3)},
- {q4 , gb_trees:size(Q4)},
+ {q4 , queue:len(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
{target_ram_count , TargetRamCount},
@@ -718,7 +718,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
E2 = bpqueue:is_empty(Q2),
ED = Delta#delta.count == 0,
E3 = q3tree:is_empty(Q3),
- E4 = gb_trees:is_empty(Q4),
+ E4 = queue:is_empty(Q4),
LZ = Len == 0,
true = E1 or not E3,
@@ -753,26 +753,6 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-q4_from_q1(Q1) ->
- rabbit_misc:queue_fold(fun (#msg_status { seq_id = SeqId } = M, Acc) ->
- gb_trees:insert(SeqId, M, Acc)
- end, gb_trees:empty(), Q1).
-
-q4_out_r(Q4) ->
- case gb_trees:is_empty(Q4) of
- true -> {empty, Q4};
- false -> {_SeqId, MsgStatus, Q4a} = gb_trees:take_largest(Q4),
- {{value, MsgStatus}, Q4a}
- end.
-
-q4_fold(Fun, Init, Q4) ->
- q4_fold1(Fun, Init, gb_trees:iterator(Q4)).
-q4_fold1(Fun, Init, Iter) ->
- case gb_trees:next(Iter) of
- none -> Init;
- {_Key, M, Iter1} -> q4_fold1(Fun, Fun(M, Init), Iter1)
- end.
-
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
MsgProps) ->
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
@@ -913,7 +893,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q2 = bpqueue:new(),
delta = Delta,
q3 = q3tree:new(),
- q4 = gb_trees:empty(),
+ q4 = queue:new(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
ram_ack_index = gb_trees:empty(),
@@ -953,27 +933,26 @@ blank_rate(Timestamp, IngressLength) ->
in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
- case gb_trees:is_empty(Q4) of
+ case queue:is_empty(Q4) of
true -> State #vqstate {
q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
- false -> {MsgStatus1 = #msg_status{ seq_id = SeqId},
- State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State),
- State1 #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus1, Q4a)}
+ false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, State),
+ State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) }
end;
-in_r(MsgStatus = #msg_status { seq_id = SeqId },
- State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4) }.
+in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
+ State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
queue_out(State = #vqstate { q4 = Q4 }) ->
- case gb_trees:is_empty(Q4) of
- true ->
+ case queue:out(Q4) of
+ {empty, _Q4} ->
case fetch_from_q3(State) of
{empty, _State1} = Result -> Result;
{loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
end;
- false -> {_SeqId, MsgStatus, Q4a} = gb_trees:take_smallest(Q4),
- {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ {{value, MsgStatus}, Q4a} ->
+ {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
read_msg(MsgStatus = #msg_status { msg = undefined,
@@ -1106,8 +1085,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case q3tree:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = gb_trees:insert(SeqId,
- m(MsgStatus1), Q4) }
+ true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
@@ -1363,10 +1341,9 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId,
undefined ->
{MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- State1 #vqstate { q4 = gb_trees:insert(
- SeqId, MsgStatus1, Q4a)};
+ State1 #vqstate { q4 = q4_merge(MsgStatus1, Q4a)};
#basic_message{} ->
- State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4),
+ State #vqstate { q4 = q4_merge(MsgStatus, Q4),
ram_msg_count = RamMsgCount + 1 }
end;
q3 -> %% make sure index is on disk
@@ -1410,6 +1387,20 @@ pick_store(SeqId, #vqstate { q3 = Q3,
end
end.
+q4_merge(MsgStatus, Q) ->
+ q4_merge(MsgStatus, Q, queue:new()).
+
+q4_merge(#msg_status {seq_id = SeqId } = MsgStatus, Q, Front) ->
+ case queue:out(Q) of
+ {{value, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
+ case SeqId1 > SeqId of
+ true -> queue:join(queue:in(MsgStatus, Front), Q);
+ false -> q4_merge(MsgStatus, Q1, queue:in(MsgStatusHead, Front))
+ end;
+ {empty, _Q1} ->
+ queue:in(MsgStatus, Front)
+ end.
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1574,12 +1565,12 @@ fetch_from_q3(State = #vqstate {
%% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and we
%% know q4 is empty otherwise we wouldn't be
- %% loading from q3. As such, we can just transform
- %% q1 to q4.
- true = bpqueue:is_empty(Q2), %% ASSERTION
- true = gb_trees:is_empty(Q4), %% ASSERTION
+ %% loading from q3. As such, we can just set
+ %% q4 to Q1.
+ true = bpqueue:is_empty(Q2), %% ASSERTION
+ true = queue:is_empty(Q4), %% ASSERTION
State1 #vqstate { q1 = queue:new(),
- q4 = q4_from_q1(Q1) };
+ q4 = Q1 };
{true, false} ->
maybe_deltas_to_betas(State1);
{false, _} ->
@@ -1655,7 +1646,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
- fun q4_out_r/1,
+ fun queue:out_r/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q4a, State1 = #vqstate { q3 = Q3 }) ->
State1 #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),