summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-07 18:46:12 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-07 18:46:12 +0100
commit2ecdaa38b99b2878325cbf5597537162f9f8684e (patch)
treec3339735f4e84797d061ae533dcd63583ede968b /src
parent686c68c85696a2916f65de4127956a806ce4318f (diff)
downloadrabbitmq-server-git-2ecdaa38b99b2878325cbf5597537162f9f8684e.tar.gz
implemented out. This is getting pretty disgusting, needs some refactoring, marginally more useful variable names, and more API, in particular proper support for the prefetcher. Also, totally untested.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl17
-rw-r--r--src/rabbit_variable_queue.erl176
2 files changed, 164 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c59b12dd29..27952af161 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,8 @@
-module(rabbit_queue_index).
-export([init/1, write_published/4, write_delivered/2, write_acks/2,
- flush_journal/1, read_segment_entries/2, next_segment_boundary/1]).
+ flush_journal/1, read_segment_entries/2, next_segment_boundary/1,
+ segment_size/0]).
%%----------------------------------------------------------------------------
%% The queue disk index
@@ -139,8 +140,10 @@
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
- {[{'index_entry', seq_id(), msg_id(), boolean(), boolean(),
- 'on_disk'}], qistate()}).
+ {( [{'index', msg_id(), seq_id(), boolean(), boolean()}]
+ | 'not_found'), qistate()}).
+-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
+-spec(segment_size/0 :: () -> non_neg_integer()).
-endif.
@@ -237,8 +240,9 @@ read_segment_entries(InitSeqId, State =
{lists:foldl(fun (RelSeq, Acc) ->
{MsgId, IsDelivered, IsPersistent} =
dict:fetch(RelSeq, SDict),
- [{index_entry, reconstruct_seq_id(SegNum, RelSeq),
- MsgId, IsDelivered, IsPersistent, on_disk} | Acc]
+ [ {index, MsgId,
+ reconstruct_seq_id(SegNum, RelSeq),
+ IsPersistent, IsDelivered, true} | Acc]
end, [], RelSeqs),
State}.
@@ -246,6 +250,9 @@ next_segment_boundary(SeqId) ->
{SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
reconstruct_seq_id(SegNum + 1, 0).
+segment_size() ->
+ ?SEGMENT_ENTRIES_COUNT.
+
%%----------------------------------------------------------------------------
%% Minor Helpers
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73c3c3395b..f041f4783e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,8 @@
-module(rabbit_variable_queue).
--export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1]).
+-export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
+ out/1]).
-record(vqstate,
{ q1,
@@ -48,7 +49,8 @@
egress_rate,
old_egress_rate,
avg_egress_rate,
- egress_rate_timestamp
+ egress_rate_timestamp,
+ prefetcher
}).
-include("rabbit.hrl").
@@ -56,7 +58,7 @@
init(QueueName) ->
{NextSeqId, IndexState} = rabbit_queue_index:init(QueueName),
#vqstate { q1 = queue:new(), q2 = queue:new(),
- gamma = 0,
+ gamma = {undefined, 0},
q3 = queue:new(), q4 = queue:new(),
target_ram_msg_count = undefined,
ram_msg_count = 0,
@@ -67,7 +69,8 @@ init(QueueName) ->
egress_rate = 0,
old_egress_rate = 0,
avg_egress_rate = 0,
- egress_rate_timestamp = now()
+ egress_rate_timestamp = now(),
+ prefetcher = undefined
}.
in(Msg, IsDelivered, State) ->
@@ -98,7 +101,7 @@ in(just_index, Msg = #basic_message { guid = MsgId,
{IndexOnDisk, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
IsDelivered, IndexState),
- Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, true, IndexOnDisk},
+ Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk},
State1 = State #vqstate { next_seq_id = SeqId + 1,
index_state = IndexState1 },
true = queue:is_empty(Q1), %% ASSERTION
@@ -108,7 +111,8 @@ in(neither, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
IsDelivered, State = #vqstate { index_state = IndexState,
next_seq_id = SeqId,
- q1 = Q1, q2 = Q2, gamma = Gamma }) ->
+ q1 = Q1, q2 = Q2,
+ gamma = {GammaSeqId, GammaCount} }) ->
true = maybe_write_msg_to_disk(true, Msg),
{true, IndexState1} =
maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
@@ -116,7 +120,7 @@ in(neither, Msg = #basic_message { guid = MsgId,
true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION
State #vqstate { next_seq_id = SeqId + 1,
index_state = IndexState1,
- gamma = Gamma + 1 }.
+ gamma = {GammaSeqId, GammaCount + 1} }.
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
@@ -144,6 +148,91 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
egress_rate_timestamp = Now,
out_counter = 0 }.
+out(State =
+ #vqstate { q4 = Q4,
+ out_counter = OutCount, prefetcher = Prefetcher,
+ index_state = IndexState }) ->
+ case queue:out(Q4) of
+ {empty, _Q4} when Prefetcher == undefined ->
+ out_from_q3(State);
+ {empty, _Q4} ->
+ Q4a =
+ case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
+ empty -> Q4;
+ Q4b -> Q4b
+ end,
+ out(State #vqstate { q4 = Q4a, prefetcher = undefined });
+ {{value,
+ {msg_and_index, Msg = #basic_message { guid = MsgId },
+ SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} ->
+ IndexState1 =
+ case IndexOnDisk andalso not IsDelivered of
+ true ->
+ rabbit_queue_index:write_delivered(SeqId, IndexState);
+ false ->
+ IndexState
+ end,
+ AckTag = case {IndexOnDisk, MsgOnDisk} of
+ {true, true } -> {ack_index_and_store, MsgId, SeqId};
+ {false, true } -> {ack_store, MsgId};
+ {false, false} -> not_on_disk
+ end,
+ {{Msg, IsDelivered, AckTag},
+ State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
+ index_state = IndexState1 }}
+ end.
+
+out_from_q3(State = #vqstate { q2 = Q2, index_state = IndexState,
+ gamma = {GammaSeqId, GammaCount}, q3 = Q3,
+ q4 = Q4 }) ->
+ case queue:out(Q3) of
+ {empty, _Q3} ->
+ case GammaCount of
+ 0 ->
+ undefined = GammaSeqId, %% ASSERTION
+ true = queue:is_empty(Q2), %% ASSERTION
+ {empty, State};
+ _ ->
+ {List = [_|_], IndexState1} =
+ rabbit_queue_index:read_segment_entries(GammaSeqId,
+ IndexState),
+ State1 = State #vqstate { index_state = IndexState1 },
+ Q3a = queue:from_list(List),
+ State2 =
+ case GammaCount - length(List) of
+ 0 ->
+ State1 #vqstate { gamma = {undefined, 0},
+ q2 = queue:new(),
+ q3 = queue:join(Q3a, Q2) };
+ N when N > 0 ->
+ State1 #vqstate { gamma =
+ {rabbit_queue_index:segment_size() +
+ GammaSeqId, N},
+ q3 = Q3a }
+ end,
+ out_from_q3(State2)
+ end;
+ {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}},
+ Q3a} ->
+ {ok, Msg = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId }} =
+ rabbit_msg_store:read(MsgId),
+ State1 = #vqstate { q1 = Q1, q4 = Q4a } =
+ State #vqstate { q3 = Q3a,
+ q4 = queue:in({msg_and_index, Msg, SeqId,
+ IsDelivered, true, IndexOnDisk},
+ Q4) },
+ State2 = case queue:is_empty(Q3a) andalso 0 == GammaCount of
+ true ->
+ true = queue:is_empty(Q2), %% ASSERTION
+ State1 #vqstate { q1 = queue:new(),
+ q4 = queue:join(Q4a, Q1) };
+ false ->
+ State1
+ end,
+ out(State2)
+ end.
+
maybe_start_prefetcher(State) ->
%% TODO
State.
@@ -204,18 +293,21 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
content = rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content)) }.
-store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, gamma = Gamma,
+store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2,
+ gamma = {_GammaSeqId, GammaCount},
q3 = Q3, q4 = Q4 }) ->
case queue:is_empty(Q1) andalso queue:is_empty(Q2) andalso
- Gamma == 0 andalso queue:is_empty(Q3) of
+ GammaCount == 0 andalso queue:is_empty(Q3) of
true ->
State #vqstate { q4 = queue:in(Entry, Q4) };
false ->
maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) })
end.
-store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) ->
- case queue:is_empty(Q2) andalso Gamma == 0 of
+store_beta_entry(Entry, State =
+ #vqstate { q2 = Q2, gamma = {_GammaSeqId, GammaCount},
+ q3 = Q3 }) ->
+ case queue:is_empty(Q2) andalso GammaCount == 0 of
true -> State #vqstate { q3 = queue:in(Entry, Q3) };
false -> State #vqstate { q2 = queue:in(Entry, Q2) }
end.
@@ -238,7 +330,7 @@ maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
end,
maybe_push_q1_to_betas(
store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered,
- true, IndexOnDisk},
+ IndexOnDisk},
State #vqstate { ram_msg_count = RamMsgCount - 1,
q1 = Q1a }))
end.
@@ -260,7 +352,7 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
false -> maybe_write_msg_to_disk(true, Msg)
end,
Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered,
- true, IndexOnDisk}, Q3),
+ IndexOnDisk}, Q3),
maybe_push_q4_to_betas(
State #vqstate { ram_msg_count = RamMsgCount - 1,
q3 = Q3a, q4 = Q4a })
@@ -268,32 +360,62 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount,
push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
index_state = IndexState }) ->
- {Len1, Q2a, IndexState1} =
+ %% HighSeqId is high in the sense that it must be higher than the
+ %% seqid in Gamma, but it's also the lowest of the betas that we
+ %% transfer from q2 to gamma.
+ {HighSeqId, Len1, Q2a, IndexState1} =
push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
- State1 = State #vqstate { q2 = Q2a, gamma = Gamma + Len1,
+ Gamma1 = {Gamma1SeqId, _} = combine_gammas(Gamma, {HighSeqId, Len1}),
+ State1 = State #vqstate { q2 = Q2a,
+ gamma = Gamma1,
index_state = IndexState1 },
case queue:out(Q3) of
{empty, _Q3} -> State1;
{{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
- true, _IndexOnDisk}}, _Q3a} ->
- Limit = rabbit_queue_index:next_segment_boundary(SeqId) - 1,
- {Len2, Q3b, IndexState2} =
- push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1),
- State1 #vqstate { q3 = Q3b, gamma = Gamma + Len1 + Len2,
- index_state = IndexState2 }
+ _IndexOnDisk}}, _Q3a} ->
+ Limit = rabbit_queue_index:next_segment_boundary(SeqId),
+ case Limit == Gamma1SeqId of
+ true -> %% already only holding the minimum, nothing to do
+ State1;
+ false ->
+ %% ASSERTION
+ true = Gamma1SeqId == undefined orelse
+ Gamma1SeqId == Limit + rabbit_queue_index:segment_size(),
+ %% LowSeqId is low in the sense that it must be
+ %% lower than the seqid in Gamma1, in fact either
+ %% gamma1 has undefined as its seqid or its seqid
+ %% is LowSeqId + 1. But because we use
+ %% queue:out_r, LowSeqId is actually also the
+ %% highest seqid of the betas we transfer from q3
+ %% to gammas.
+ {LowSeqId, Len2, Q3b, IndexState2} =
+ push_betas_to_gammas(fun queue:out_r/1, Limit - 1, Q3,
+ IndexState1),
+ Gamma1SeqId = LowSeqId + 1, %% ASSERTION
+ Gamma2 = combine_gammas({Limit, Len2}, Gamma1),
+ State1 #vqstate { q3 = Q3b, gamma = Gamma2,
+ index_state = IndexState2 }
+ end
end.
push_betas_to_gammas(Generator, Limit, Q, IndexState) ->
- push_betas_to_gammas(Generator, Limit, Q, 0, IndexState).
+ case Generator(Q) of
+ {empty, Qa} -> {undefined, 0, Qa, IndexState};
+ {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered,
+ _IndexOnDisk}}, _Qa} ->
+ {Count, Qb, IndexState1} =
+ push_betas_to_gammas(Generator, Limit, Q, 0, IndexState),
+ {SeqId, Count, Qb, IndexState1}
+ end.
push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
case Generator(Q) of
{empty, Qa} -> {Count, Qa, IndexState};
{{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered,
- _MsgOnDisk, _IndexOnDisk}}, _Qa} ->
+ _IndexOnDisk}}, _Qa} ->
{Count, Q, IndexState};
{{value, {index, MsgId, SeqId, IsPersistent, IsDelivered,
- true, IndexOnDisk}}, Qa} ->
+ IndexOnDisk}}, Qa} ->
IndexState1 =
case IndexOnDisk of
true -> IndexState;
@@ -307,3 +429,9 @@ push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1)
end.
+combine_gammas({_, 0}, {_, 0}) -> {undefined, 0};
+combine_gammas({_, 0}, B ) -> B;
+combine_gammas(A , {_, 0}) -> A;
+combine_gammas({SeqIdLow, CountLow}, {SeqIdHigh, CountHigh}) ->
+ SeqIdHigh = SeqIdLow + CountLow, %% ASSERTION
+ {SeqIdLow, CountLow + CountHigh}.