diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-09 12:13:41 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-09 12:13:41 +0100 |
| commit | fbf5a614b1e83c1977c183d8aa6f09ea2c64d617 (patch) | |
| tree | cdca5d67d576efc764c2e003f0f199024e51f082 /src | |
| parent | 3ce9813fd375baea72f9225d90989afd9812842a (diff) | |
| download | rabbitmq-server-git-fbf5a614b1e83c1977c183d8aa6f09ea2c64d617.tar.gz | |
adding len and is_empty. Renaming in -> publish and out -> fetch. A bit of moving around. Generally trying to work towards the mq API so that it can be dropped in
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 151 |
1 files changed, 86 insertions, 65 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff3b8b7c4f..cb2e9f242b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,8 +31,10 @@ -module(rabbit_variable_queue). --export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - out/1]). +-export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, + fetch/1, len/1, is_empty/1]). + +%%---------------------------------------------------------------------------- -record(vqstate, { q1, @@ -49,7 +51,8 @@ egress_rate, avg_egress_rate, egress_rate_timestamp, - prefetcher + prefetcher, + len }). -record(alpha, @@ -75,6 +78,8 @@ -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + %% Basic premise is that msgs move from q1 -> q2 -> gamma -> q3 -> q4 %% but they can only do so in the right form. q1 and q4 only hold %% alphas (msgs in ram), q2 and q3 only hold betas (msg on disk, index @@ -102,6 +107,10 @@ %% contain all msgs in the queue. Also, if q4 is non empty and gamma %% is non empty then q3 must be non empty. +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + init(QueueName) -> {GammaSeqId, NextSeqId, GammaCount, IndexState} = rabbit_queue_index:init(QueueName), @@ -122,59 +131,15 @@ init(QueueName) -> egress_rate = 0, avg_egress_rate = 0, egress_rate_timestamp = now(), - prefetcher = undefined + prefetcher = undefined, + len = GammaCount }, maybe_load_next_segment(State). -in(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId }) -> - in(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, - State #vqstate { next_seq_id = SeqId + 1 }). - -in(msg, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - SeqId, IsDelivered, State = #vqstate { index_state = IndexState, - ram_msg_count = RamMsgCount }) -> - MsgOnDisk = maybe_write_msg_to_disk(false, Msg), - {IndexOnDisk, IndexState1} = - maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, - IsDelivered, IndexState), - Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, - index_state = IndexState1 }, - store_alpha_entry(Entry, State1); - -in(index, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - SeqId, IsDelivered, State = #vqstate { index_state = IndexState, - q1 = Q1 }) -> - true = maybe_write_msg_to_disk(true, Msg), - {IndexOnDisk, IndexState1} = - maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, - IsDelivered, IndexState), - Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, - is_persistent = IsPersistent, index_on_disk = IndexOnDisk }, - State1 = State #vqstate { index_state = IndexState1 }, - true = queue:is_empty(Q1), %% ASSERTION - store_beta_entry(Entry, State1); - -in(neither, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - SeqId, IsDelivered, State = #vqstate { index_state = IndexState, - q1 = Q1, q2 = Q2, gamma = Gamma }) -> - true = maybe_write_msg_to_disk(true, Msg), - {true, IndexState1} = - maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, - IsDelivered, IndexState), - true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION - %% gamma may be empty, seq_id > next_segment_boundary from q3 - %% head, so we need to find where the segment boundary is before - %% or equal to seq_id - GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - - rabbit_queue_index:segment_size(), - Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 }, - State #vqstate { index_state = IndexState1, - gamma = combine_gammas(Gamma, Gamma1) }. +publish(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId, + len = Len }) -> + publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, + State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 }). set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, @@ -205,20 +170,20 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, egress_rate_timestamp = Now, out_counter = 0 }. -out(State = - #vqstate { q4 = Q4, - out_counter = OutCount, prefetcher = Prefetcher, - index_state = IndexState }) -> +fetch(State = + #vqstate { q4 = Q4, + out_counter = OutCount, prefetcher = Prefetcher, + index_state = IndexState, len = Len }) -> case queue:out(Q4) of {empty, _Q4} when Prefetcher == undefined -> - out_from_q3(State); + fetch_from_q3_or_gamma(State); {empty, _Q4} -> Q4a = case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of empty -> Q4; Q4b -> Q4b end, - out(State #vqstate { q4 = Q4a, prefetcher = undefined }); + fetch(State #vqstate { q4 = Q4a, prefetcher = undefined }); {{value, #alpha { msg = Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -256,14 +221,70 @@ out(State = true -> {ack_index_and_store, MsgId, SeqId}; false -> ack_not_on_disk end, - {{Msg, IsDelivered, AckTag}, + Len1 = Len - 1, + {{Msg, IsDelivered, AckTag, Len1}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, - index_state = IndexState1 }} + index_state = IndexState1, len = Len1 }} end. -out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, - gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4 }) -> +len(#vqstate { len = Len }) -> + Len. + +is_empty(State) -> + 0 == len(State). + +%%---------------------------------------------------------------------------- + +publish(msg, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + SeqId, IsDelivered, State = #vqstate { index_state = IndexState, + ram_msg_count = RamMsgCount }) -> + MsgOnDisk = maybe_write_msg_to_disk(false, Msg), + {IndexOnDisk, IndexState1} = + maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, + IsDelivered, IndexState), + Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, + index_state = IndexState1 }, + store_alpha_entry(Entry, State1); + +publish(index, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + SeqId, IsDelivered, State = #vqstate { index_state = IndexState, + q1 = Q1 }) -> + true = maybe_write_msg_to_disk(true, Msg), + {IndexOnDisk, IndexState1} = + maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, + IsDelivered, IndexState), + Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + is_persistent = IsPersistent, index_on_disk = IndexOnDisk }, + State1 = State #vqstate { index_state = IndexState1 }, + true = queue:is_empty(Q1), %% ASSERTION + store_beta_entry(Entry, State1); + +publish(neither, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + SeqId, IsDelivered, + State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, + gamma = Gamma }) -> + true = maybe_write_msg_to_disk(true, Msg), + {true, IndexState1} = + maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, + IsDelivered, IndexState), + true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION + %% gamma may be empty, seq_id > next_segment_boundary from q3 + %% head, so we need to find where the segment boundary is before + %% or equal to seq_id + GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - + rabbit_queue_index:segment_size(), + Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 }, + State #vqstate { index_state = IndexState1, + gamma = combine_gammas(Gamma, Gamma1) }. + +fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2, + gamma = #gamma { count = GammaCount }, + q3 = Q3, q4 = Q4 }) -> case queue:out(Q3) of {empty, _Q3} -> 0 = GammaCount, %% ASSERTION @@ -299,7 +320,7 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, %% gamma and q3 are maintained State1 end, - out(State2) + fetch(State2) end. maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) -> |
