diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-08 17:50:47 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-08 17:50:47 +0100 |
| commit | 4c8ef89985c59c679439477bb5feb799d0740816 (patch) | |
| tree | 1a01132b759dcdc2b53b9623c27d68b4be243393 /src | |
| parent | 0c9a7ab8868bfe47682c4ef05eb283b23ef852e5 (diff) | |
| download | rabbitmq-server-git-4c8ef89985c59c679439477bb5feb799d0740816.tar.gz | |
And now actually make sure that on start up we load in the first segment from gamma to q3 if possible. Hence a refactoring here.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 84 |
1 files changed, 45 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1491879bc9..184050f783 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -103,26 +103,28 @@ %% is non empty then q3 must be non empty. init(QueueName) -> - {LowSeqId, NextSeqId, Count, IndexState} = + {GammaSeqId, NextSeqId, GammaCount, IndexState} = rabbit_queue_index:init(QueueName), - Gamma = case Count of + Gamma = case GammaCount of 0 -> #gamma { seq_id = undefined, count = 0 }; - _ -> #gamma { seq_id = LowSeqId, count = Count } + _ -> #gamma { seq_id = GammaSeqId, count = GammaCount } end, - #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = Gamma, - q3 = queue:new(), q4 = queue:new(), - target_ram_msg_count = undefined, - ram_msg_count = 0, - queue = QueueName, - index_state = IndexState, - next_seq_id = NextSeqId, - out_counter = 0, - egress_rate = 0, - avg_egress_rate = 0, - egress_rate_timestamp = now(), - prefetcher = undefined - }. + State = + #vqstate { q1 = queue:new(), q2 = queue:new(), + gamma = Gamma, + q3 = queue:new(), q4 = queue:new(), + target_ram_msg_count = undefined, + ram_msg_count = 0, + queue = QueueName, + index_state = IndexState, + next_seq_id = NextSeqId, + out_counter = 0, + egress_rate = 0, + avg_egress_rate = 0, + egress_rate_timestamp = now(), + prefetcher = undefined + }, + maybe_load_next_segment(State). in(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId }) -> in(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, @@ -238,9 +240,8 @@ out(State = index_state = IndexState1 }} end. -out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState, - gamma = #gamma { seq_id = GammaSeqId, - count = GammaCount}, +out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, + gamma = #gamma { count = GammaCount }, q3 = Q3, q4 = Q4 }) -> case queue:out(Q3) of {empty, _Q3} -> @@ -271,25 +272,7 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState, State1 #vqstate { q1 = queue:new(), q4 = queue:join(Q4a, Q1) }; {true, false} -> - {List, IndexState1, Gamma1SeqId} = - read_index_segment(GammaSeqId, IndexState), - State3 = State1 #vqstate { index_state = IndexState1 }, - %% length(List) may be < segment_size because - %% of acks. But it can't be [] - Q3b = betas_from_segment_entries(List), - case GammaCount - length(List) of - 0 -> - %% gamma is now empty, but it wasn't - %% before, so can now join q2 onto q3 - State3 #vqstate { - gamma = #gamma { seq_id = undefined, - count = 0 }, - q2 = queue:new(), q3 = queue:join(Q3b, Q2) }; - N when N > 0 -> - State3 #vqstate { - gamma = #gamma { seq_id = Gamma1SeqId, - count = N }, q3 = Q3b } - end; + maybe_load_next_segment(State1); {false, _} -> %% q3 still isn't empty, we've not touched %% gamma, so the invariants between q1, q2, @@ -299,6 +282,29 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState, out(State2) end. +maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) -> + State; +maybe_load_next_segment(State = + #vqstate { index_state = IndexState, q2 = Q2, + gamma = #gamma { seq_id = GammaSeqId, + count = GammaCount }}) -> + {List, IndexState1, Gamma1SeqId} = + read_index_segment(GammaSeqId, IndexState), + State1 = State #vqstate { index_state = IndexState1 }, + %% length(List) may be < segment_size because of acks. But it + %% can't be [] + Q3a = betas_from_segment_entries(List), + case GammaCount - length(List) of + 0 -> + %% gamma is now empty, but it wasn't before, so can now + %% join q2 onto q3 + State1 #vqstate { gamma = #gamma { seq_id = undefined, count = 0 }, + q2 = queue:new(), q3 = queue:join(Q3a, Q2) }; + N when N > 0 -> + State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId, + count = N }, q3 = Q3a } + end. + betas_from_segment_entries(List) -> queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) -> #beta { msg_id = MsgId, seq_id = SeqId, |
