diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-18 19:27:52 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-18 19:27:52 +0100 |
| commit | 26c086256d881ac89c19e8d014b570264cf29c3e (patch) | |
| tree | 63712f7472443e5ad2d891ba6db48408c1972703 /src | |
| parent | bbe80f0ae6b869f542caa6def142d707a0ded5bc (diff) | |
| download | rabbitmq-server-git-26c086256d881ac89c19e8d014b570264cf29c3e.tar.gz | |
KeepAck -> KeepAcked
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f245172583..d061947ac4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -647,9 +647,9 @@ load_journal(State) -> fun (_Seg, Segment = #segment { journal_entries = JEntries, pubs = PubCountInJournal, acks = AckCountInJournal }) -> - %% We want to keep acks in so that we can remove - %% them if duplicates are in the journal. The counts - %% here are purely from the segment itself. + %% We want to keep ack'd entries in so that we can + %% remove them if duplicates are in the journal. The + %% counts here are purely from the segment itself. {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = load_segment(true, Segment), %% Removed counts here are the number of pubs and @@ -821,11 +821,11 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> %% %% Does not do any combining with the journal at all. The PubCount %% that comes back is the number of publishes in the segment. The -%% number of unacked msgs is PubCount - AckCount. If KeepAcks is +%% number of unacked msgs is PubCount - AckCount. If KeepAcked is %% false, then array:sparse_size(SegEntries) == PubCount - -%% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries) +%% AckCount. If KeepAcked is true, then array:sparse_size(SegEntries) %% == PubCount. -load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> +load_segment(KeepAcked, Segment = #segment { path = Path, handle = SegHdl }) -> SegmentExists = case SegHdl of undefined -> filelib:is_file(Path); _ -> true @@ -835,18 +835,12 @@ load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> true -> {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), {SegEntries, PubCount, AckCount} = - load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0), + load_segment_entries(KeepAcked, Hdl, array_new(), 0, 0), {SegEntries, PubCount, AckCount, Segment1} end. -load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> +load_segment_entries(KeepAcked, Hdl, SegEntries, PubCount, AckCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> - {AckCount1, SegEntries1} = - deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries), - load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount, - AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete @@ -856,17 +850,23 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> array:set(RelSeq, {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries), - load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1, + load_segment_entries(KeepAcked, Hdl, SegEntries1, PubCount + 1, AckCount); + {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>} -> + {AckCount1, SegEntries1} = + deliver_or_ack_msg(KeepAcked, RelSeq, AckCount, SegEntries), + load_segment_entries(KeepAcked, Hdl, SegEntries1, PubCount, + AckCount1); _ErrOrEoF -> {SegEntries, PubCount, AckCount} end. -deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) -> +deliver_or_ack_msg(KeepAcked, RelSeq, AckCount, SegEntries) -> case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> {AckCount, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcks -> + {Pub, del, no_ack} when KeepAcked -> {AckCount + 1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; {_Pub, del, no_ack} -> {AckCount + 1, array:reset(RelSeq, SegEntries)} |
