diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-08-20 11:28:57 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-08-20 11:28:57 +0100 |
| commit | ec42c2a48cdce9b49920bfc751feda835cdf8bee (patch) | |
| tree | ae05eed85f4e34ec61add5eeca4ac3cc57356afc | |
| parent | f2a73308e711246d6ea2c50caad7d846be0e022c (diff) | |
| download | rabbitmq-server-git-ec42c2a48cdce9b49920bfc751feda835cdf8bee.tar.gz | |
in flight, want to test against default
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
4 files changed, 73 insertions, 28 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index be0d4b8233..82c60e936d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -76,6 +76,8 @@ -record(msg_properties, {expiry}). +-record(qientry, {guid, seq_id, expiry}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b6f898cece..f1bfba84c0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,6 +720,7 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> + io:format("Basic Get~n"), AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d6b8bb2889..3c3aef3edf 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/3, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{IndexEntry, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -141,14 +141,19 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). +-define(EXPIRY_BYTES, 8). +-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). +-define(NO_EXPIRY, 0). + -define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). +%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -157,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _}). %% {Guid, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, Expiry, IsPersistent} -define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). @@ -245,15 +250,18 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}, + IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, Expiry)]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, Expiry, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -282,9 +290,10 @@ flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> {[], State}; -read(Start, End, State = #qistate { segments = Segments, +read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. + io:format("Reading~n"), LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), {Messages, Segments1} = @@ -320,6 +329,7 @@ bounds(State = #qistate { segments = Segments }) -> {LowSeqId, NextSeqId, State}. recover(DurableQueues) -> + io:format("Recovering~n"), DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), @@ -386,6 +396,7 @@ store_clean_shutdown(Terms, Dir) -> rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). init_clean(RecoveredCounts, State) -> + io:format("Clean Init"), %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), @@ -402,9 +413,11 @@ init_clean(RecoveredCounts, State) -> end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return + io:format("Clean:~p~n", [Segments1]), {undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> + io:format("Dirty Init"), %% Recover the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment @@ -427,6 +440,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Unconditionally flush since the dirty_count doesn't get updated %% by the above foldl. State2 = flush_journal(State1 #qistate { segments = Segments1 }), + io:format("Dirty:~p~n", [State2]), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -449,7 +463,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _Expiry, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -512,6 +526,26 @@ queue_index_walker_reader(QueueName, Gatherer) -> ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- +%% expiry/binary manipulation +%%---------------------------------------------------------------------------- + +create_pub_record_body(Guid, Expiry) -> + io:format("Writing GUID: ~p~n", [Guid]), + [Guid, expiry_to_binary(Expiry)]. + +expiry_to_binary(undefined) -> + <<?NO_EXPIRY:?EXPIRY_BITS>>; +expiry_to_binary(Expiry) -> + <<Expiry:?EXPIRY_BITS>>. + +read_pub_record_body(Hdl) -> + {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), + <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, + <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, + io:format("Read GUID: ~p~n", [Guid]), + {Guid, Expiry}. + +%%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -588,6 +622,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), + io:format("Journal Path:~p~n", [Path]), {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -631,14 +666,10 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<GuidNum:?GUID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<Guid:?GUID_BYTES/binary>> = - <<GuidNum:?GUID_BITS>>, - Publish = {Guid, case Prefix of + io:format("found a pub record"), + case read_pub_record_body(Hdl) of + {Guid, Expiry} -> + Publish = {Guid, Expiry, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -739,11 +770,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, IsPersistent} -> + {Guid, Expiry, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Guid]) + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, Expiry)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -763,10 +795,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, Expiry, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, Expiry, reconstruct_seq_id(StartSeg, RelSeq), IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -789,6 +821,7 @@ load_segment(KeepAcked, #segment { path = Path }) -> {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), ok = file_handle_cache:close(Hdl), + io:format("Load segment: ~p~n", [Res]), Res end. @@ -798,8 +831,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), - Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, Expiry} = read_pub_record_body(Hdl), + Obj = {{Guid, Expiry, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4e978fd5ad..ff19b87f3b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1057,12 +1057,14 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, seq_id = SeqId, + seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, IndexState) + is_delivered = IsDelivered }, + IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, + IndexEntry = queue_index_entry(MsgStatus), + IndexState1 = rabbit_queue_index:publish(IndexEntry, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; @@ -1079,6 +1081,12 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, {MsgStatus2, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }}. +queue_index_entry(#msg_status {guid = Guid, + seq_id = SeqId, + msg_properties = + #msg_properties{expiry = Expiry}}) -> + #qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}. + %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- @@ -1300,6 +1308,7 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), + io:format("Reading~n"), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries( |
