summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-07 18:46:12 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-07 18:46:12 +0100
commit2ecdaa38b99b2878325cbf5597537162f9f8684e (patch)
treec3339735f4e84797d061ae533dcd63583ede968b
parent686c68c85696a2916f65de4127956a806ce4318f (diff)
downloadrabbitmq-server-git-2ecdaa38b99b2878325cbf5597537162f9f8684e.tar.gz
implemented out. This is getting pretty disgusting, needs some refactoring, marginally more useful variable names, and more API, in particular proper support for the prefetcher. Also, totally untested.
-rw-r--r--src/rabbit_queue_index.erl17
-rw-r--r--src/rabbit_variable_queue.erl176
2 files changed, 164 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c59b12dd29..27952af161 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,8 @@
-module(rabbit_queue_index).
-export([init/1, write_published/4, write_delivered/2, write_acks/2,
- flush_journal/1, read_segment_entries/2, next_segment_boundary/1]).
+ flush_journal/1, read_segment_entries/2, next_segment_boundary/1,
+ segment_size/0]).
%%----------------------------------------------------------------------------
%% The queue disk index
@@ -139,8 +140,10 @@
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
- {[{'index_entry', seq_id(), msg_id(), boolean(), boolean(),
- 'on_disk'}], qistate()}).
+ {( [{'index', msg_id(), seq_id(), boolean(), boolean()}]
+ | 'not_found'), qistate()}).
+-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
+-spec(segment_size/0 :: () -> non_neg_integer()).
-endif.
@@ -237,8 +240,9 @@ read_segment_entries(InitSeqId, State =
{lists:foldl(fun (RelSeq, Acc) ->
{MsgId, IsDelivered, IsPersistent} =
dict:fetch(RelSeq, SDict),
- [{index_entry, reconstruct_seq_id(SegNum, RelSeq),
- MsgId, IsDelivered, IsPersistent, on_disk} | Acc]
+ [ {index, MsgId,
+ reconstruct_seq_id(SegNum, RelSeq),
+ IsPersistent, IsDelivered, true} | Acc]
end, [], RelSeqs),
State}.
@@ -246,6 +250,9 @@ next_segment_boundary(SeqId) ->
{SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
reconstruct_seq_id(SegNum + 1, 0).
+segment_size() ->
+ ?SEGMENT_ENTRIES_COUNT.
+
%%----------------------------------------------------------------------------
%% Minor Helpers
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73c3c3395b..f041f4783e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,8 @@
-module(rabbit_variable_queue).
--export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1]).
+-export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
+ out/1]).
-record(vqstate,
{ q1,
@@ -48,7 +49,8 @@
egress_rate,
old_egress_rate,
avg_egress_rate,
- egress_rate_timestamp
+ egress_rate_timestamp,
+ prefetcher
}).
-include("rabbit.hrl").
@@ -56,7 +58,7 @@
init(QueueName) ->
{NextSeqId, IndexState} = rabbit_queue_index:init(QueueName),
#vqstate { q1 = queue:new(), q2 = queue:new(),
- gamma = 0,
+ gamma = {undefined, 0},
q3 = queue:new(), q4 = queue:new(),
target_ram_msg_count = undefined,
ram_msg_count = 0,
@@ -67,7 +69,8 @@ init(QueueName) ->
egress_rate = 0,
old_egress_rate = 0,
avg_egress_rate = 0,
- egress_rate_timestamp = now()
+ egress_rate_timestamp = now(),
+ prefetcher = undefined
}.
in(Msg, IsDelivered, State) ->
@@ -98,7 +101,7 @@ in(just_index, Msg = #basic_message { guid = MsgId,
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
- Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, true, IndexOnDisk},
+ Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk},
State1 = State #vqstate { next_seq_id = SeqId + 1,
index_state = IndexState1 },
true = queue:is_empty(Q1), %% ASSERTION
@@ -108,7 +111,8 @@ in(neither, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
IsDelivered, State = #vqstate { index_state = IndexState,
next_seq_id = SeqId,
- q1 = Q1, q2 = Q2, gamma = Gamma }) ->
+ q1 = Q1, q2 = Q2,
+ gamma = {GammaSeqId, GammaCount} }) ->
true = maybe_write_msg_to_disk(true, Msg),
{true, IndexState1} =
maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
@@ -116,7 +120,7 @@ in(neither, Msg = #basic_message { guid = MsgId,
true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION
State #vqstate { next_seq_id = SeqId + 1,
index_state = IndexState1,
- gamma = Gamma + 1 }.
+ gamma = {GammaSeqId, GammaCount + 1} }.
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
@@ -144,6 +148,91 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
egress_rate_timestamp = Now,
out_counter = 0 }.
+out(State =
+ #vqstate { q4 = Q4,
+ out_counter = OutCount, prefetcher = Prefetcher,
+ index_state = IndexState }) ->
+ case queue:out(Q4) of
+ {empty, _Q4} when Prefetcher == undefined ->
+ out_from_q3(State);
+ {empty, _Q4} ->
+ Q4a =
+ case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
+ empty -> Q4;
+ Q4b -> Q4b
+ end,
+ out(State #vqstate { q4 = Q4a, prefetcher = undefined });
+ {{value,
+ {msg_and_index, Msg = #basic_message { guid = MsgId },
+ SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} ->
+ IndexState1 =
+ case IndexOnDisk andalso not IsDelivered of
+ true ->
+ rabbit_queue_index:write_delivered(SeqId, IndexState);
+ false ->
+ IndexState
+ end,
+ AckTag = case {IndexOnDisk, MsgOnDisk} of
+ {true, true } -> {ack_index_and_store, MsgId, SeqId};
+ {false, true } -> {ack_store, MsgId};
+ {false, false} -> not_on_disk
+ end,
+ {{Msg, IsDelivered, AckTag},
+ State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
+ index_state = IndexState1 }}
+ end.
+
+out_from_q3(State = #vqstate { q2 = Q2, index_state = IndexState,
+ gamma = {GammaSeqId, GammaCount}, q3 = Q3,
+ q4 = Q4 }) ->
+ case queue:out(Q3) of
+ {empty, _Q3} ->
+ case GammaCount of
+ 0 ->
+ undefined = GammaSeqId, %% ASSERTION
+ true = queue:is_empty(Q2), %% ASSERTION
+ {empty, State};
+ _ ->
+ {List = [_|_], IndexState1} =
+ rabbit_queue_index:read_segment_entries(GammaSeqId,
+ IndexState),
+ State1 = State #vqstate { index_state = IndexState1 },
+ Q3a = queue:from_list(List),
+ State2 =
+ case GammaCount - length(List) of
+ 0 ->
+ State1 #vqstate { gamma = {undefined, 0},
+ q2 = queue:new(),
+ q3 = queue:join(Q3a, Q2) };
+ N when N > 0 ->
+ State1 #vqstate { gamma =
+ {rabbit_queue_index:segment_size() +
+ GammaSeqId, N},
+ q3 = Q3a }
+ end,
+ out_from_q3(State2)
+ end;
+ {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}},
+ Q3a} ->
+ {ok, Msg = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId }} =
+ rabbit_msg_store:read(MsgId),
+ State1 = #vqstate { q1 = Q1, q4 = Q4a } =
+ State #vqstate { q3 = Q3a,
+ q4 = queue:in({msg_and_index, Msg, SeqId,
+ IsDelivered, true, IndexOnDisk},
+ Q4) },
+ State2 = case queue:is_empty(Q3a) andalso 0 == GammaCount of
+ true ->
+ true = queue:is_empty(Q2), %% ASSERTION
+ State1 #vqstate { q1 = queue:new(),
+ q4 = queue:join(Q4a, Q1) };
+ false ->
+ State1
+ end,
+ out(State2)
+ end.
+
maybe_start_prefetcher(State) ->
%% TODO
State.
@@ -204,18 +293,21 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
content = rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content)) }.
-store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, gamma = Gamma,
+store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2,
+ gamma = {_GammaSeqId, GammaCount},
q3 = Q3, q4 = Q4 }) ->
case queue:is_empty(Q1) andalso queue:is_empty(Q2) andalso
- Gamma == 0 andalso queue:is_empty(Q3) of
+ GammaCount == 0 andalso queue:is_empty(Q3) of
true ->
State #vqstate { q4 = queue:in(Entry, Q4) };
false ->
maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) })
end.
-store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) ->
- case queue:is_empty(Q2) andalso Gamma == 0 of
+store_beta_entry(Entry, State =
+ #vqstate { q2 = Q2, gamma = {_GammaSeqId, GammaCount},
+ q3 = Q3 }) ->
+ case queue:is_empty(Q2) andalso GammaCount == 0 of
true -> State #vqstate { q3 = queue:in(Entry, Q3) };
false -> State #vqstate { q2 = queue:in(Entry, Q2) }
end.
@@ -238,7 +330,7 @@ maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
end,
maybe_push_q1_to_betas(
store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered,
- true, IndexOnDisk},
+ IndexOnDisk},
State #vqstate { ram_msg_count = RamMsgCount - 1,
q1 = Q1a }))
end.
@@ -260,7 +352,7 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
false -> maybe_write_msg_to_disk(true, Msg)
end,
Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered,
- true, IndexOnDisk}, Q3),
+ IndexOnDisk}, Q3),
maybe_push_q4_to_betas(
State #vqstate { ram_msg_count = RamMsgCount - 1,
q3 = Q3a, q4 = Q4a })
@@ -268,32 +360,62 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
index_state = IndexState }) ->
- {Len1, Q2a, IndexState1} =
+ %% HighSeqId is high in the sense that it must be higher than the
+ %% seqid in Gamma, but it's also the lowest of the betas that we
+ %% transfer from q2 to gamma.
+ {HighSeqId, Len1, Q2a, IndexState1} =
push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
- State1 = State #vqstate { q2 = Q2a, gamma = Gamma + Len1,
+ Gamma1 = {Gamma1SeqId, _} = combine_gammas(Gamma, {HighSeqId, Len1}),
+ State1 = State #vqstate { q2 = Q2a,
+ gamma = Gamma1,
index_state = IndexState1 },
case queue:out(Q3) of
{empty, _Q3} -> State1;
{{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
- true, _IndexOnDisk}}, _Q3a} ->
- Limit = rabbit_queue_index:next_segment_boundary(SeqId) - 1,
- {Len2, Q3b, IndexState2} =
- push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1),
- State1 #vqstate { q3 = Q3b, gamma = Gamma + Len1 + Len2,
- index_state = IndexState2 }
+ _IndexOnDisk}}, _Q3a} ->
+ Limit = rabbit_queue_index:next_segment_boundary(SeqId),
+ case Limit == Gamma1SeqId of
+ true -> %% already only holding the minimum, nothing to do
+ State1;
+ false ->
+ %% ASSERTION
+ true = Gamma1SeqId == undefined orelse
+ Gamma1SeqId == Limit + rabbit_queue_index:segment_size(),
+ %% LowSeqId is low in the sense that it must be
+ %% lower than the seqid in Gamma1, in fact either
+ %% gamma1 has undefined as its seqid or its seqid
+ %% is LowSeqId + 1. But because we use
+ %% queue:out_r, LowSeqId is actually also the
+ %% highest seqid of the betas we transfer from q3
+ %% to gammas.
+ {LowSeqId, Len2, Q3b, IndexState2} =
+ push_betas_to_gammas(fun queue:out_r/1, Limit - 1, Q3,
+ IndexState1),
+ Gamma1SeqId = LowSeqId + 1, %% ASSERTION
+ Gamma2 = combine_gammas({Limit, Len2}, Gamma1),
+ State1 #vqstate { q3 = Q3b, gamma = Gamma2,
+ index_state = IndexState2 }
+ end
end.
push_betas_to_gammas(Generator, Limit, Q, IndexState) ->
- push_betas_to_gammas(Generator, Limit, Q, 0, IndexState).
+ case Generator(Q) of
+ {empty, Qa} -> {undefined, 0, Qa, IndexState};
+ {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
+ _IndexOnDisk}}, _Qa} ->
+ {Count, Qb, IndexState1} =
+ push_betas_to_gammas(Generator, Limit, Q, 0, IndexState),
+ {SeqId, Count, Qb, IndexState1}
+ end.
push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
case Generator(Q) of
{empty, Qa} -> {Count, Qa, IndexState};
{{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered,
- _MsgOnDisk, _IndexOnDisk}}, _Qa} ->
+ _IndexOnDisk}}, _Qa} ->
{Count, Q, IndexState};
{{value, {index, MsgId, SeqId, IsPersistent, IsDelivered,
- true, IndexOnDisk}}, Qa} ->
+ IndexOnDisk}}, Qa} ->
IndexState1 =
case IndexOnDisk of
true -> IndexState;
@@ -307,3 +429,9 @@ push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1)
end.
+combine_gammas({_, 0}, {_, 0}) -> {undefined, 0};
+combine_gammas({_, 0}, B ) -> B;
+combine_gammas(A , {_, 0}) -> A;
+combine_gammas({SeqIdLow, CountLow}, {SeqIdHigh, CountHigh}) ->
+ SeqIdHigh = SeqIdLow + CountLow, %% ASSERTION
+ {SeqIdLow, CountLow + CountHigh}.