diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-07-27 16:38:49 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-07-27 16:38:49 +0100 |
| commit | e6e9cee87b8ffe73dfdfb3b472d48ff04dd5bfb5 (patch) | |
| tree | 215a93cff5002b51b9f6845888dc7022457cc02b | |
| parent | dd5755647a68aafd26a1e73cdc47a94875e909fa (diff) | |
| download | rabbitmq-server-git-e6e9cee87b8ffe73dfdfb3b472d48ff04dd5bfb5.tar.gz | |
Turn q4 into random access data structure
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
1 files changed, 45 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ea72de66e3..0944788869 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -318,7 +318,7 @@ q2 :: bpqueue:bpqueue(), delta :: delta(), q3 :: bpqueue:bpqueue(), - q4 :: queue(), + q4 :: gb_tree(), next_seq_id :: seq_id(), pending_ack :: dict(), ram_ack_index :: gb_tree(), @@ -479,13 +479,13 @@ purge(State = #vqstate { q4 = Q4, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q4, + fun q4_fold/3, Q4, orddict:new(), IndexState, MSCState), {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2, msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = queue:new(), + State #vqstate { q4 = gb_trees:empty(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q1, @@ -712,7 +712,7 @@ status(#vqstate { {q2 , bpqueue:len(Q2)}, {delta , Delta}, {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, + {q4 , gb_trees:size(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, {target_ram_count , TargetRamCount}, @@ -745,7 +745,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, E2 = bpqueue:is_empty(Q2), ED = Delta#delta.count == 0, E3 = bpqueue:is_empty(Q3), - E4 = queue:is_empty(Q4), + E4 = gb_trees:is_empty(Q4), LZ = Len == 0, true = E1 or not E3, @@ -780,6 +780,26 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; %% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). +q4_from_q1(Q1) -> + rabbit_misc:queue_fold(fun (#msg_status { seq_id = SeqId } = M, Acc) -> + gb_trees:insert(SeqId, M, Acc) + end, gb_trees:empty(), Q1). + +q4_out_r(Q4) -> + case gb_trees:is_empty(Q4) of + true -> {empty, Q4}; + false -> {_SeqId, MsgStatus, Q4a} = gb_trees:take_largest(Q4), + {{value, MsgStatus}, Q4a} + end. + +q4_fold(Fun, Init, Q4) -> + q4_fold1(Fun, Init, gb_trees:iterator(Q4)). +q4_fold1(Fun, Init, Iter) -> + case gb_trees:next(Iter) of + none -> Init; + {_Key, M, Iter1} -> q4_fold1(Fun, Fun(M, Init), Iter1) + end. + msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, MsgProps) -> #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, @@ -920,7 +940,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q2 = bpqueue:new(), delta = Delta, q3 = bpqueue:new(), - q4 = queue:new(), + q4 = gb_trees:empty(), next_seq_id = NextSeqId, pending_ack = dict:new(), ram_ack_index = gb_trees:empty(), @@ -960,26 +980,27 @@ blank_rate(Timestamp, IngressLength) -> in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - case queue:is_empty(Q4) of + case gb_trees:is_empty(Q4) of true -> State #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; - false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + false -> {MsgStatus1 = #msg_status{ seq_id = SeqId}, + State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), + State1 #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus1, Q4a)} end; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. +in_r(MsgStatus = #msg_status { seq_id = SeqId }, + State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4) }. queue_out(State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of - {empty, _Q4} -> + case gb_trees:is_empty(Q4) of + true -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; - {{value, MsgStatus}, Q4a} -> - {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + false -> {_SeqId, MsgStatus, Q4a} = gb_trees:take_smallest(Q4), + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. read_msg(MsgStatus = #msg_status { msg = undefined, @@ -1112,7 +1133,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + true -> State1 #vqstate { q4 = gb_trees:insert(SeqId, + m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -1486,12 +1508,12 @@ fetch_from_q3(State = #vqstate { %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and we %% know q4 is empty otherwise we wouldn't be - %% loading from q3. As such, we can just set - %% q4 to Q1. - true = bpqueue:is_empty(Q2), %% ASSERTION - true = queue:is_empty(Q4), %% ASSERTION + %% loading from q3. As such, we can just transform + %% q1 to q4. + true = bpqueue:is_empty(Q2), %% ASSERTION + true = gb_trees:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = queue:new(), - q4 = Q1 }; + q4 = q4_from_q1(Q1) }; {true, false} -> maybe_deltas_to_betas(State1); {false, _} -> @@ -1567,7 +1589,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( - fun queue:out_r/1, + fun q4_out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), |
