diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-08-20 13:25:31 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-08-20 13:25:31 +0100 |
| commit | 0ac9b46b9787ee59c95bb2930102bf7f86565a6d (patch) | |
| tree | e073acd789ed51edefc91f90a859d8123d88a750 /src | |
| parent | ec42c2a48cdce9b49920bfc751feda835cdf8bee (diff) | |
| download | rabbitmq-server-git-0ac9b46b9787ee59c95bb2930102bf7f86565a6d.tar.gz | |
reworked contract between qi and vq. tidied up in general
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 22 |
3 files changed, 32 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f1bfba84c0..b6f898cece 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,7 +720,6 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> - io:format("Basic Get~n"), AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3c3aef3edf..4f2168d1ad 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/3, +-export([init/4, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{IndexEntry, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -162,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, Expiry, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} -define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). @@ -203,15 +203,15 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> - qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(), + boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], - qistate()}). + {[{rabbit_guid:guid(), seq_id(), msg_properties(), + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -250,8 +250,8 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}, - IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProperties, IsPersistent, State) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( @@ -260,8 +260,8 @@ publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}, false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, Expiry)]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, Expiry, IsPersistent}, State1)). + create_pub_record_body(Guid, MsgProperties)]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -293,7 +293,6 @@ read(StartEnd, StartEnd, State) -> read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. - io:format("Reading~n"), LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), {Messages, Segments1} = @@ -329,7 +328,6 @@ bounds(State = #qistate { segments = Segments }) -> {LowSeqId, NextSeqId, State}. recover(DurableQueues) -> - io:format("Recovering~n"), DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), @@ -396,7 +394,6 @@ store_clean_shutdown(Terms, Dir) -> rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). init_clean(RecoveredCounts, State) -> - io:format("Clean Init"), %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), @@ -413,11 +410,9 @@ init_clean(RecoveredCounts, State) -> end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return - io:format("Clean:~p~n", [Segments1]), {undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> - io:format("Dirty Init"), %% Recover the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment @@ -440,7 +435,6 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Unconditionally flush since the dirty_count doesn't get updated %% by the above foldl. State2 = flush_journal(State1 #qistate { segments = Segments1 }), - io:format("Dirty:~p~n", [State2]), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -463,7 +457,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _Expiry, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -529,8 +523,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, Expiry) -> - io:format("Writing GUID: ~p~n", [Guid]), +create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) -> [Guid, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> @@ -542,8 +535,7 @@ read_pub_record_body(Hdl) -> {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, - io:format("Read GUID: ~p~n", [Guid]), - {Guid, Expiry}. + {Guid, #msg_properties{expiry = Expiry}}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -622,7 +614,6 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - io:format("Journal Path:~p~n", [Path]), {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -666,10 +657,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - io:format("found a pub record"), case read_pub_record_body(Hdl) of - {Guid, Expiry} -> - Publish = {Guid, Expiry, case Prefix of + {Guid, MsgProperties} -> + Publish = {Guid, MsgProperties, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -770,12 +760,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, Expiry, IsPersistent} -> + {Guid, MsgProperties, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(Guid, Expiry)]) + create_pub_record_body(Guid, MsgProperties)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -795,10 +785,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, Expiry, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, Expiry, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, MsgProperties, reconstruct_seq_id(StartSeg, RelSeq), IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -821,7 +811,6 @@ load_segment(KeepAcked, #segment { path = Path }) -> {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), ok = file_handle_cache:close(Hdl), - io:format("Load segment: ~p~n", [Res]), Res end. @@ -831,8 +820,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {Guid, Expiry} = read_pub_record_body(Hdl), - Obj = {{Guid, Expiry, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProperties} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff19b87f3b..e6ff18c922 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -837,7 +837,7 @@ persistent_guids(Pubs) -> betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, MsgProperties, SeqId, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -849,7 +849,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = true + index_on_disk = true, + msg_properties = MsgProperties }) | Filtered1], Delivers1, Acks1} @@ -1057,14 +1058,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, + is_delivered = IsDelivered, + msg_properties = MsgProperties}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexEntry = queue_index_entry(MsgStatus), - IndexState1 = rabbit_queue_index:publish(IndexEntry, IsPersistent, + IndexState1 = rabbit_queue_index:publish(Guid, + SeqId, + MsgProperties, + IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; @@ -1081,12 +1086,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, {MsgStatus2, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }}. -queue_index_entry(#msg_status {guid = Guid, - seq_id = SeqId, - msg_properties = - #msg_properties{expiry = Expiry}}) -> - #qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}. - %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- @@ -1308,7 +1307,6 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), - io:format("Reading~n"), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries( |
