diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-09-09 11:57:42 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-09-09 11:57:42 +0100 |
| commit | f7620ca0993a5fa471b1884fd797ab191c667eb6 (patch) | |
| tree | 01d560fba45d06771cd2e330533edf5af5ab34df | |
| parent | 23e82e5eb9781414b6f64885ec1e60904d386b06 (diff) | |
| download | rabbitmq-server-git-f7620ca0993a5fa471b1884fd797ab191c667eb6.tar.gz | |
Turn q4 back into a queue for performance reasons
| -rw-r--r-- | src/rabbit_variable_queue.erl | 87 |
1 files changed, 39 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 00ffef2044..56280e1523 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -308,7 +308,7 @@ q2 :: bpqueue:bpqueue(), delta :: delta(), q3 :: gb_tree(), - q4 :: gb_tree(), + q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict(), ram_ack_index :: gb_tree(), @@ -469,13 +469,13 @@ purge(State = #vqstate { q4 = Q4, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( - fun q4_fold/3, Q4, + fun rabbit_misc:queue_fold/3, Q4, orddict:new(), IndexState, MSCState), {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2, msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = gb_trees:empty(), + State #vqstate { q4 = queue:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q1, @@ -685,7 +685,7 @@ status(#vqstate { {q2 , bpqueue:len(Q2)}, {delta , Delta}, {q3 , q3tree:len(Q3)}, - {q4 , gb_trees:size(Q4)}, + {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, {target_ram_count , TargetRamCount}, @@ -718,7 +718,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, E2 = bpqueue:is_empty(Q2), ED = Delta#delta.count == 0, E3 = q3tree:is_empty(Q3), - E4 = gb_trees:is_empty(Q4), + E4 = queue:is_empty(Q4), LZ = Len == 0, true = E1 or not E3, @@ -753,26 +753,6 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; %% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). -q4_from_q1(Q1) -> - rabbit_misc:queue_fold(fun (#msg_status { seq_id = SeqId } = M, Acc) -> - gb_trees:insert(SeqId, M, Acc) - end, gb_trees:empty(), Q1). - -q4_out_r(Q4) -> - case gb_trees:is_empty(Q4) of - true -> {empty, Q4}; - false -> {_SeqId, MsgStatus, Q4a} = gb_trees:take_largest(Q4), - {{value, MsgStatus}, Q4a} - end. - -q4_fold(Fun, Init, Q4) -> - q4_fold1(Fun, Init, gb_trees:iterator(Q4)). -q4_fold1(Fun, Init, Iter) -> - case gb_trees:next(Iter) of - none -> Init; - {_Key, M, Iter1} -> q4_fold1(Fun, Fun(M, Init), Iter1) - end. - msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, MsgProps) -> #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, @@ -913,7 +893,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q2 = bpqueue:new(), delta = Delta, q3 = q3tree:new(), - q4 = gb_trees:empty(), + q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), ram_ack_index = gb_trees:empty(), @@ -953,27 +933,26 @@ blank_rate(Timestamp, IngressLength) -> in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - case gb_trees:is_empty(Q4) of + case queue:is_empty(Q4) of true -> State #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; - false -> {MsgStatus1 = #msg_status{ seq_id = SeqId}, - State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - State1 #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus1, Q4a)} + false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, State), + State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } end; -in_r(MsgStatus = #msg_status { seq_id = SeqId }, - State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4) }. +in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. queue_out(State = #vqstate { q4 = Q4 }) -> - case gb_trees:is_empty(Q4) of - true -> + case queue:out(Q4) of + {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; - false -> {_SeqId, MsgStatus, Q4a} = gb_trees:take_smallest(Q4), - {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + {{value, MsgStatus}, Q4a} -> + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. read_msg(MsgStatus = #msg_status { msg = undefined, @@ -1106,8 +1085,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case q3tree:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = gb_trees:insert(SeqId, - m(MsgStatus1), Q4) } + true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -1363,10 +1341,9 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId, undefined -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - State1 #vqstate { q4 = gb_trees:insert( - SeqId, MsgStatus1, Q4a)}; + State1 #vqstate { q4 = q4_merge(MsgStatus1, Q4a)}; #basic_message{} -> - State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4), + State #vqstate { q4 = q4_merge(MsgStatus, Q4), ram_msg_count = RamMsgCount + 1 } end; q3 -> %% make sure index is on disk @@ -1410,6 +1387,20 @@ pick_store(SeqId, #vqstate { q3 = Q3, end end. +q4_merge(MsgStatus, Q) -> + q4_merge(MsgStatus, Q, queue:new()). + +q4_merge(#msg_status {seq_id = SeqId } = MsgStatus, Q, Front) -> + case queue:out(Q) of + {{value, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} -> + case SeqId1 > SeqId of + true -> queue:join(queue:in(MsgStatus, Front), Q); + false -> q4_merge(MsgStatus, Q1, queue:in(MsgStatusHead, Front)) + end; + {empty, _Q1} -> + queue:in(MsgStatus, Front) + end. + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- @@ -1574,12 +1565,12 @@ fetch_from_q3(State = #vqstate { %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and we %% know q4 is empty otherwise we wouldn't be - %% loading from q3. As such, we can just transform - %% q1 to q4. - true = bpqueue:is_empty(Q2), %% ASSERTION - true = gb_trees:is_empty(Q4), %% ASSERTION + %% loading from q3. As such, we can just set + %% q4 to Q1. + true = bpqueue:is_empty(Q2), %% ASSERTION + true = queue:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = queue:new(), - q4 = q4_from_q1(Q1) }; + q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); {false, _} -> @@ -1655,7 +1646,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( - fun q4_out_r/1, + fun queue:out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> State1 #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), |
