summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 12:13:41 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 12:13:41 +0100
commitfbf5a614b1e83c1977c183d8aa6f09ea2c64d617 (patch)
treecdca5d67d576efc764c2e003f0f199024e51f082 /src
parent3ce9813fd375baea72f9225d90989afd9812842a (diff)
downloadrabbitmq-server-git-fbf5a614b1e83c1977c183d8aa6f09ea2c64d617.tar.gz
adding len and is_empty. Renaming in -> publish and out -> fetch. A bit of moving around. Generally trying to work towards the mq API so that it can be dropped in
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl151
1 files changed, 86 insertions, 65 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ff3b8b7c4f..cb2e9f242b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,8 +31,10 @@
-module(rabbit_variable_queue).
--export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- out/1]).
+-export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
+ fetch/1, len/1, is_empty/1]).
+
+%%----------------------------------------------------------------------------
-record(vqstate,
{ q1,
@@ -49,7 +51,8 @@
egress_rate,
avg_egress_rate,
egress_rate_timestamp,
- prefetcher
+ prefetcher,
+ len
}).
-record(alpha,
@@ -75,6 +78,8 @@
-include("rabbit.hrl").
+%%----------------------------------------------------------------------------
+
%% Basic premise is that msgs move from q1 -> q2 -> gamma -> q3 -> q4
%% but they can only do so in the right form. q1 and q4 only hold
%% alphas (msgs in ram), q2 and q3 only hold betas (msg on disk, index
@@ -102,6 +107,10 @@
%% contain all msgs in the queue. Also, if q4 is non empty and gamma
%% is non empty then q3 must be non empty.
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
init(QueueName) ->
{GammaSeqId, NextSeqId, GammaCount, IndexState} =
rabbit_queue_index:init(QueueName),
@@ -122,59 +131,15 @@ init(QueueName) ->
egress_rate = 0,
avg_egress_rate = 0,
egress_rate_timestamp = now(),
- prefetcher = undefined
+ prefetcher = undefined,
+ len = GammaCount
},
maybe_load_next_segment(State).
-in(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId }) ->
- in(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
- State #vqstate { next_seq_id = SeqId + 1 }).
-
-in(msg, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
- ram_msg_count = RamMsgCount }) ->
- MsgOnDisk = maybe_write_msg_to_disk(false, Msg),
- {IndexOnDisk, IndexState1} =
- maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
- IsDelivered, IndexState),
- Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
- State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
- index_state = IndexState1 },
- store_alpha_entry(Entry, State1);
-
-in(index, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
- q1 = Q1 }) ->
- true = maybe_write_msg_to_disk(true, Msg),
- {IndexOnDisk, IndexState1} =
- maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
- IsDelivered, IndexState),
- Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
- is_persistent = IsPersistent, index_on_disk = IndexOnDisk },
- State1 = State #vqstate { index_state = IndexState1 },
- true = queue:is_empty(Q1), %% ASSERTION
- store_beta_entry(Entry, State1);
-
-in(neither, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
- q1 = Q1, q2 = Q2, gamma = Gamma }) ->
- true = maybe_write_msg_to_disk(true, Msg),
- {true, IndexState1} =
- maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
- IsDelivered, IndexState),
- true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION
- %% gamma may be empty, seq_id > next_segment_boundary from q3
- %% head, so we need to find where the segment boundary is before
- %% or equal to seq_id
- GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
- rabbit_queue_index:segment_size(),
- Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 },
- State #vqstate { index_state = IndexState1,
- gamma = combine_gammas(Gamma, Gamma1) }.
+publish(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId,
+ len = Len }) ->
+ publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
+ State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 }).
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
@@ -205,20 +170,20 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
egress_rate_timestamp = Now,
out_counter = 0 }.
-out(State =
- #vqstate { q4 = Q4,
- out_counter = OutCount, prefetcher = Prefetcher,
- index_state = IndexState }) ->
+fetch(State =
+ #vqstate { q4 = Q4,
+ out_counter = OutCount, prefetcher = Prefetcher,
+ index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
{empty, _Q4} when Prefetcher == undefined ->
- out_from_q3(State);
+ fetch_from_q3_or_gamma(State);
{empty, _Q4} ->
Q4a =
case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
empty -> Q4;
Q4b -> Q4b
end,
- out(State #vqstate { q4 = Q4a, prefetcher = undefined });
+ fetch(State #vqstate { q4 = Q4a, prefetcher = undefined });
{{value,
#alpha { msg = Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -256,14 +221,70 @@ out(State =
true -> {ack_index_and_store, MsgId, SeqId};
false -> ack_not_on_disk
end,
- {{Msg, IsDelivered, AckTag},
+ Len1 = Len - 1,
+ {{Msg, IsDelivered, AckTag, Len1},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
- index_state = IndexState1 }}
+ index_state = IndexState1, len = Len1 }}
end.
-out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2,
- gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4 }) ->
+len(#vqstate { len = Len }) ->
+ Len.
+
+is_empty(State) ->
+ 0 == len(State).
+
+%%----------------------------------------------------------------------------
+
+publish(msg, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
+ ram_msg_count = RamMsgCount }) ->
+ MsgOnDisk = maybe_write_msg_to_disk(false, Msg),
+ {IndexOnDisk, IndexState1} =
+ maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
+ IsDelivered, IndexState),
+ Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+ State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
+ index_state = IndexState1 },
+ store_alpha_entry(Entry, State1);
+
+publish(index, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ SeqId, IsDelivered, State = #vqstate { index_state = IndexState,
+ q1 = Q1 }) ->
+ true = maybe_write_msg_to_disk(true, Msg),
+ {IndexOnDisk, IndexState1} =
+ maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
+ IsDelivered, IndexState),
+ Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ is_persistent = IsPersistent, index_on_disk = IndexOnDisk },
+ State1 = State #vqstate { index_state = IndexState1 },
+ true = queue:is_empty(Q1), %% ASSERTION
+ store_beta_entry(Entry, State1);
+
+publish(neither, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ SeqId, IsDelivered,
+ State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
+ gamma = Gamma }) ->
+ true = maybe_write_msg_to_disk(true, Msg),
+ {true, IndexState1} =
+ maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
+ IsDelivered, IndexState),
+ true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION
+ %% gamma may be empty, seq_id > next_segment_boundary from q3
+ %% head, so we need to find where the segment boundary is before
+ %% or equal to seq_id
+ GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
+ rabbit_queue_index:segment_size(),
+ Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 },
+ State #vqstate { index_state = IndexState1,
+ gamma = combine_gammas(Gamma, Gamma1) }.
+
+fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2,
+ gamma = #gamma { count = GammaCount },
+ q3 = Q3, q4 = Q4 }) ->
case queue:out(Q3) of
{empty, _Q3} ->
0 = GammaCount, %% ASSERTION
@@ -299,7 +320,7 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2,
%% gamma and q3 are maintained
State1
end,
- out(State2)
+ fetch(State2)
end.
maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) ->