diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-12-04 22:12:30 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-12-04 22:12:30 +0000 |
| commit | ff7f229037c78ae167403ddf3d42cf31acd26bfa (patch) | |
| tree | 747c41506b5432309f6055dc0b43ebe88e6c6748 | |
| parent | 3be853a684363e0deacbd41fead1dc34ddafc292 (diff) | |
| download | rabbitmq-server-git-ff7f229037c78ae167403ddf3d42cf31acd26bfa.tar.gz | |
cosmetic and some refactoring
| -rw-r--r-- | src/rabbit_queue_index.erl | 163 |
1 files changed, 77 insertions, 86 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 347742d645..eda0a43a78 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -179,31 +179,28 @@ init(Name) -> array:sparse_foldl( fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, {Segment3, DCountAcc2}) -> - InMsgStore = rabbit_msg_store:contains(MsgId), - case {InMsgStore, CleanShutdown} of - {true, true} -> - {Segment3, DCountAcc}; - {true, false} when Del == del -> - {Segment3, DCountAcc}; - {true, false} -> - {add_to_journal(RelSeq, del, Segment3), - DCountAcc2 + 1}; - {false, _} when Del == del -> - {add_to_journal(RelSeq, ack, Segment3), - DCountAcc2 + 1}; - {false, _} -> - {add_to_journal( - RelSeq, ack, - add_to_journal( - RelSeq, del, Segment3)), - DCountAcc2 + 2} - end + {Segment4, DCountDelta} = + maybe_add_to_journal( + rabbit_msg_store:contains(MsgId), + CleanShutdown, Del, RelSeq, Segment3), + {Segment4, DCountAcc2 + DCountDelta} end, {Segment1, DCountAcc}, SegEntries), {segment_store(Segment2, Segments2), CountAcc + PubCount - AckCount, DCountAcc1} end, {Segments, 0, DCount}, AllSegs), {Count, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}. +maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> + {Segment, 0}; +maybe_add_to_journal( true, false, del, _RelSeq, Segment) -> + {Segment, 0}; +maybe_add_to_journal( true, false, _Del, RelSeq, Segment) -> + {add_to_journal(RelSeq, del, Segment), 1}; +maybe_add_to_journal(false, _, del, RelSeq, Segment) -> + {add_to_journal(RelSeq, ack, Segment), 1}; +maybe_add_to_journal(false, _, _Del, RelSeq, Segment) -> + {add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)), 2}. + terminate(State) -> terminate(true, State). @@ -212,34 +209,30 @@ terminate_and_erase(State) -> ok = delete_queue_directory(State1 #qistate.dir), State1. -write_published(MsgId, SeqId, IsPersistent, State) - when is_binary(MsgId) -> +write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - [<<(case IsPersistent of - true -> ?PUB_PERSIST_JPREFIX; - false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - MsgId]), + ok = file_handle_cache:append( + JournalHdl, [<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId]), maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). write_delivered(SeqId, State) -> {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - <<?DEL_JPREFIX:?JPREFIX_BITS, - SeqId:?SEQ_BITS>>), + ok = file_handle_cache:append( + JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>), maybe_flush_journal(add_to_journal(SeqId, del, State1)). write_acks(SeqIds, State) -> {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - [<<?ACK_JPREFIX:?JPREFIX_BITS, - SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), - State2 = lists:foldl(fun (SeqId, StateN) -> - add_to_journal(SeqId, ack, StateN) - end, State1, SeqIds), - maybe_flush_journal(State2). + ok = file_handle_cache:append( + JournalHdl, [<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || + SeqId <- SeqIds]), + maybe_flush_journal(lists:foldl(fun (SeqId, StateN) -> + add_to_journal(SeqId, ack, StateN) + end, State1, SeqIds)). sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; @@ -255,23 +248,11 @@ flush_journal(State = #qistate { segments = Segments }) -> fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, acks = AckCount } = Segment, SegmentsN) -> case PubCount > 0 andalso PubCount == AckCount of - true -> - ok = delete_segment(Segment), - SegmentsN; - false -> - Segment1 = - case array:sparse_size(JEntries) of - 0 -> Segment; - _ -> {Hdl, Segment2} = - get_segment_handle(Segment), - array:sparse_foldl( - fun write_entry_to_segment/3, Hdl, - JEntries), - ok = file_handle_cache:sync(Hdl), - Segment2 #segment { journal_entries = - array_new() } - end, - segment_store(Segment1, SegmentsN) + true -> ok = delete_segment(Segment), + SegmentsN; + false -> segment_store( + append_journal_to_segment(Segment, JEntries), + SegmentsN) end end, segments_new(), Segments), {JournalHdl, State1} = @@ -279,6 +260,15 @@ flush_journal(State = #qistate { segments = Segments }) -> ok = file_handle_cache:clear(JournalHdl), State1 #qistate { dirty_count = 0 }. +append_journal_to_segment(Segment, JEntries) -> + case array:sparse_size(JEntries) of + 0 -> Segment; + _ -> {Hdl, Segment1} = get_segment_handle(Segment), + array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + ok = file_handle_cache:sync(Hdl), + Segment1 #segment { journal_entries = array_new() } + end. + read_segment_entries(InitSeqId, State = #qistate { segments = Segments, dir = Dir }) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), @@ -544,8 +534,8 @@ bool_to_int(false) -> 0. write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) -> Hdl; -write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> - ok = case Publish of +write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> + ok = case Pub of no_pub -> ok; {MsgId, IsPersistent} -> @@ -555,14 +545,16 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> RelSeq:?REL_SEQ_BITS>>, MsgId]) end, ok = case {Del, Ack} of - {no_del, no_ack} -> ok; - _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - Data = case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end, - file_handle_cache:append(Hdl, Data) + {no_del, no_ack} -> + ok; + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + file_handle_cache:append( + Hdl, case {Del, Ack} of + {del, ack} -> [Binary, Binary]; + _ -> Binary + end) end, Hdl. @@ -645,11 +637,11 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) -> case array:get(RelSeq, SegEntries) of - {PubRecord, no_del, no_ack} -> - {AckCount, array:set(RelSeq, {PubRecord, del, no_ack}, SegEntries)}; - {PubRecord, del, no_ack} when KeepAcks -> - {AckCount + 1, array:set(RelSeq, {PubRecord, del, ack}, SegEntries)}; - {_PubRecord, del, no_ack} -> + {Pub, no_del, no_ack} -> + {AckCount, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; + {Pub, del, no_ack} when KeepAcks -> + {AckCount + 1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; + {_Pub, del, no_ack} -> {AckCount + 1, array:reset(RelSeq, SegEntries)} end. @@ -700,11 +692,10 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Publish = {MsgId, - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + Publish = {MsgId, case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, load_journal_entries( add_to_journal(SeqId, Publish, State)); _ErrOrEoF -> %% err, we've lost at least a publish @@ -747,10 +738,10 @@ add_to_journal(RelSeq, Action, SegJArray) -> del -> {no_pub, del, no_ack}; ack -> {no_pub, no_del, ack} end, SegJArray); - ({PubRecord, no_del, no_ack}) when Action == del -> - array:set(RelSeq, {PubRecord, del, no_ack}, SegJArray); - ({PubRecord, Del, no_ack}) when Action == ack -> - array:set(RelSeq, {PubRecord, Del, ack}, SegJArray) + ({Pub, no_del, no_ack}) when Action == del -> + array:set(RelSeq, {Pub, del, no_ack}, SegJArray); + ({Pub, Del, no_ack}) when Action == ack -> + array:set(RelSeq, {Pub, Del, ack}, SegJArray) end. %% Combine what we have just read from a segment file with what we're @@ -784,9 +775,9 @@ journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, array:reset(RelSeq, Out); journal_plus_segment({no_pub, del, no_ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + {Pub = {_MsgId, _IsPersistent}, no_del, no_ack}, RelSeq, Out) -> - array:set(RelSeq, {PubRecord, del, no_ack}, Out); + array:set(RelSeq, {Pub, del, no_ack}, Out); journal_plus_segment({no_pub, del, ack}, {{_MsgId, _IsPersistent}, no_del, no_ack}, @@ -854,8 +845,8 @@ journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, not_found, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; -journal_minus_segment({PubRecord, del, no_ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, +journal_minus_segment({Pub, del, no_ack}, + {Pub = {_MsgId, _IsPersistent}, no_del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, del, no_ack}, Out), PubsRemoved + 1, AcksRemoved}; @@ -880,13 +871,13 @@ journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, not_found, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved, AcksRemoved}; -journal_minus_segment({PubRecord, del, ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, +journal_minus_segment({Pub, del, ack}, + {Pub = {_MsgId, _IsPersistent}, no_del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, del, ack}, Out), PubsRemoved + 1, AcksRemoved}; -journal_minus_segment({PubRecord, del, ack}, - {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, +journal_minus_segment({Pub, del, ack}, + {Pub = {_MsgId, _IsPersistent}, del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, no_del, ack}, Out), PubsRemoved + 1, AcksRemoved}. |
