summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-19 12:40:00 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-19 12:40:00 +0100
commitb366bc7f483cc54e53a7bf19a580d0b9c0868db3 (patch)
tree2c0c6193479be4f3bfcea62894e7e59c41a4b10b
parent84c899894fd06f0929220593cfcbf8458c5bd365 (diff)
downloadrabbitmq-server-git-b366bc7f483cc54e53a7bf19a580d0b9c0868db3.tar.gz
Drop PubCount and AckCount in favour of UnackedCount
-rw-r--r--src/rabbit_queue_index.erl169
1 files changed, 78 insertions, 91 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 956b379704..67bf9f5236 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -159,7 +159,7 @@
-record(qistate, { dir, segments, journal_handle, dirty_count }).
--record(segment, { pubs, acks, handle, journal_entries, path, num }).
+-record(segment, { unacked, handle, journal_entries, path, num }).
-include("rabbit.hrl").
@@ -169,8 +169,7 @@
-type(hdl() :: ('undefined' | any())).
-type(segment() :: ('undefined' |
- #segment { pubs :: non_neg_integer(),
- acks :: non_neg_integer(),
+ #segment { unacked :: non_neg_integer(),
handle :: hdl(),
journal_entries :: array(),
path :: file_path(),
@@ -233,13 +232,12 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
%% number of unacked messages.
lists:foldl(
fun (Seg, {Segments2, CountAcc}) ->
- Segment = #segment { pubs = PubCount,
- acks = AckCount } =
+ Segment = #segment { unacked = UnackedCount } =
recover_segment(
ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
{segment_store(Segment, Segments2),
- CountAcc + PubCount - AckCount}
+ CountAcc + UnackedCount}
end, {Segments, 0}, all_segment_nums(State1));
true ->
%% At this stage, we will only know about files that
@@ -253,11 +251,11 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
fun (Seg, SegmentsN) ->
case {segment_find(Seg, SegmentsN),
dict:find(Seg, SegmentDictTerms)} of
- {error, {ok, {PubCount, AckCount}}} ->
+ {error, {ok, UnackedCount}} ->
Segment = segment_new(Seg, Dir),
segment_store(
- Segment #segment { pubs = PubCount,
- acks = AckCount },
+ Segment #segment {
+ unacked = UnackedCount },
SegmentsN);
_ ->
SegmentsN
@@ -350,9 +348,9 @@ read(Start, End, State = #qistate { segments = Segments,
false -> ?SEGMENT_ENTRY_COUNT
end,
Segment = segment_find_or_new(StartSeg, Dir, Segments),
- {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment),
+ {SegEntries, _UnackedCount, Segment1} = load_segment(false, Segment),
#segment { journal_entries = JEntries } = Segment1,
- {SegEntries1, _PubCountDelta, _AckCountDelta} =
+ {SegEntries1, _UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
{array:sparse_foldr(
fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
@@ -458,13 +456,12 @@ terminate(StoreShutdown, Terms, State =
end,
SegTerms = segment_fold(
fun (Seg, #segment { handle = Hdl,
- pubs = PubCount,
- acks = AckCount }, SegTermsAcc) ->
+ unacked = UnackedCount }, SegTermsAcc) ->
ok = case Hdl of
undefined -> ok;
_ -> file_handle_cache:close(Hdl)
end,
- [{Seg, {PubCount, AckCount}} | SegTermsAcc]
+ [{Seg, UnackedCount} | SegTermsAcc]
end, [], Segments),
case StoreShutdown of
true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
@@ -473,17 +470,16 @@ terminate(StoreShutdown, Terms, State =
State #qistate { journal_handle = undefined, segments = undefined }.
recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
- {SegEntries, PubCount, AckCount, Segment1} = load_segment(false, Segment),
+ {SegEntries, UnackedCount, Segment1} = load_segment(false, Segment),
#segment { journal_entries = JEntries } = Segment1,
- {SegEntries1, PubCountDelta, AckCountDelta} =
+ {SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment2) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment2)
end,
- Segment1 #segment { pubs = PubCount + PubCountDelta,
- acks = AckCount + AckCountDelta},
+ Segment1 #segment { unacked = UnackedCount + UnackedCountDelta },
SegEntries1).
recover_message( true, true, _Del, _RelSeq, Segment) ->
@@ -559,14 +555,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
add_to_journal(RelSeq, Action,
Segment = #segment { journal_entries = JEntries,
- pubs = PubCount,
- acks = AckCount }) ->
+ unacked = UnackedCount }) ->
Segment1 = Segment #segment {
journal_entries = add_to_journal(RelSeq, Action, JEntries) },
case Action of
del -> Segment1;
- ack -> Segment1 #segment { acks = AckCount + 1 };
- ?PUB -> Segment1 #segment { pubs = PubCount + 1 }
+ ack -> Segment1 #segment { unacked = UnackedCount - 1 };
+ ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 }
end;
add_to_journal(RelSeq, Action, JEntries) ->
@@ -596,14 +591,14 @@ flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (_Seg, #segment { journal_entries = JEntries,
- pubs = PubCount,
- acks = AckCount } = Segment, SegmentsN) ->
- case PubCount =:= AckCount of
- true -> ok = delete_segment(Segment),
- SegmentsN;
- false -> segment_store(
- append_journal_to_segment(Segment, JEntries),
- SegmentsN)
+ unacked = UnackedCount } = Segment,
+ SegmentsN) ->
+ case UnackedCount of
+ 0 -> ok = delete_segment(Segment),
+ SegmentsN;
+ _ -> segment_store(
+ append_journal_to_segment(Segment, JEntries),
+ SegmentsN)
end
end, segments_new(), Segments),
{JournalHdl, State1} =
@@ -641,23 +636,21 @@ load_journal(State) ->
Segments1 =
segment_map(
fun (_Seg, Segment = #segment { journal_entries = JEntries,
- pubs = PubCountInJournal,
- acks = AckCountInJournal }) ->
+ unacked = UnackedCountInJournal }) ->
%% We want to keep ack'd entries in so that we can
%% remove them if duplicates are in the journal. The
%% counts here are purely from the segment itself.
- {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} =
+ {SegEntries, UnackedCountInSeg, Segment1} =
load_segment(true, Segment),
%% Removed counts here are the number of pubs and
%% acks that are duplicates - i.e. found in both the
%% segment and journal.
- {JEntries1, PubsRemoved, AcksRemoved} =
+ {JEntries1, UnackedCountDuplicates} =
journal_minus_segment(JEntries, SegEntries),
- PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
- AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
Segment1 #segment { journal_entries = JEntries1,
- pubs = PubCount1,
- acks = AckCount1 }
+ unacked = UnackedCountInJournal +
+ UnackedCountInSeg -
+ UnackedCountDuplicates }
end, Segments),
State2 #qistate { segments = Segments1 }.
@@ -731,8 +724,7 @@ get_segment_handle(Segment = #segment { handle = Hdl }) ->
{Hdl, Segment}.
segment_new(Seg, Dir) ->
- #segment { pubs = 0,
- acks = 0,
+ #segment { unacked = 0,
handle = undefined,
journal_entries = array_new(),
path = seg_num_to_path(Dir, Seg),
@@ -824,15 +816,15 @@ load_segment(KeepAcked, Segment = #segment { path = Path, handle = SegHdl }) ->
_ -> true
end,
case SegmentExists of
- false -> {array_new(), 0, 0, Segment};
+ false -> {array_new(), 0, Segment};
true -> {Hdl, Segment1} = get_segment_handle(Segment),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {SegEntries, PubCount, AckCount} =
- load_segment_entries(KeepAcked, Hdl, array_new(), 0, 0),
- {SegEntries, PubCount, AckCount, Segment1}
+ {SegEntries, UnackedCount} =
+ load_segment_entries(KeepAcked, Hdl, array_new(), 0),
+ {SegEntries, UnackedCount, Segment1}
end.
-load_segment_entries(KeepAcked, Hdl, SegEntries, PubCount, AckCount) ->
+load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
@@ -842,22 +834,22 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, PubCount, AckCount) ->
Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
- PubCount + 1, AckCount);
+ UnackedCount + 1);
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS>>} ->
- {AckCountDelta, SegEntries1} =
+ {UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
- {0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
+ { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
{Pub, del, no_ack} when KeepAcked ->
- {1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
+ {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
{_Pub, del, no_ack} ->
- {1, array:reset(RelSeq, SegEntries)}
+ {-1, array:reset(RelSeq, SegEntries)}
end,
load_segment_entries(KeepAcked, Hdl, SegEntries1,
- PubCount, AckCount + AckCountDelta);
+ UnackedCount + UnackedCountDelta);
_ErrOrEoF ->
- {SegEntries, PubCount, AckCount}
+ {SegEntries, UnackedCount}
end.
array_new() ->
@@ -874,104 +866,99 @@ bool_to_int(false) -> 0.
%% holding for that segment in memory. There must be no duplicates.
segment_plus_journal(SegEntries, JEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, {SegEntriesOut, PubsAdded, AcksAdded}) ->
+ fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) ->
SegEntry = array:get(RelSeq, SegEntriesOut),
- {Obj, PubsAddedDelta, AcksAddedDelta} =
+ {Obj, AdditionalUnackedDelta} =
segment_plus_journal1(SegEntry, JObj),
{case Obj of
undefined -> array:reset(RelSeq, SegEntriesOut);
_ -> array:set(RelSeq, Obj, SegEntriesOut)
end,
- PubsAdded + PubsAddedDelta,
- AcksAdded + AcksAddedDelta}
- end, {SegEntries, 0, 0}, JEntries).
+ AdditionalUnacked + AdditionalUnackedDelta}
+ end, {SegEntries, 0}, JEntries).
-%% Here, the result is a triple with the first element containing the
+%% Here, the result is a tuple with the first element containing the
%% item which we may be adding to (for items only in the journal),
%% modifying in (bits in both), or, when returning 'undefined',
%% erasing from (ack in journal, not segment) the segment array. The
-%% other two elements of the triple are the deltas for PubsAdded and
-%% AcksAdded - these get increased when a publish or ack is found in
-%% the journal.
+%% other element of the tuple is the delta for AdditionalUnacked.
segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) ->
- {Obj, 1, 0};
+ {Obj, 1};
segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) ->
- {Obj, 1, 0};
+ {Obj, 1};
segment_plus_journal1(undefined, {?PUB, del, ack}) ->
- {undefined, 1, 1};
+ {undefined, 0};
segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) ->
- {{Pub, del, no_ack}, 0, 0};
+ {{Pub, del, no_ack}, 0};
segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) ->
- {undefined, 0, 1};
+ {undefined, -1};
segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
- {undefined, 0, 1}.
+ {undefined, -1}.
%% Remove from the journal entries for a segment, items that are
%% duplicates of entries found in the segment itself. Used on start up
%% to clean up the journal.
journal_minus_segment(JEntries, SegEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) ->
+ fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) ->
SegEntry = array:get(RelSeq, SegEntries),
- {Obj, PubsRemovedDelta, AcksRemovedDelta} =
+ {Obj, UnackedRemovedDelta} =
journal_minus_segment1(JObj, SegEntry),
{case Obj of
keep -> JEntriesOut;
undefined -> array:reset(RelSeq, JEntriesOut);
_ -> array:set(RelSeq, Obj, JEntriesOut)
end,
- PubsRemoved + PubsRemovedDelta,
- AcksRemoved + AcksRemovedDelta}
- end, {JEntries, 0, 0}, JEntries).
+ UnackedRemoved + UnackedRemovedDelta}
+ end, {JEntries, 0}, JEntries).
-%% Here, the result is a triple with the first element containing the
+%% Here, the result is a tuple with the first element containing the
%% item we are adding to or modifying in the (initially fresh) journal
%% array. If the item is 'undefined' we leave the journal array
-%% alone. The other two elements of the triple are the deltas for
-%% PubsRemoved and AcksRemoved - these only get increased when a
-%% publish or ack is in both the journal and the segment.
+%% alone. The other element of the tuple is the deltas for
+%% UnackedRemoved.
%% Both the same. Must be at least the publish
journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) ->
- {undefined, 1, 0};
+ {undefined, 1};
journal_minus_segment1({?PUB, _Del, ack} = Obj, Obj) ->
- {undefined, 1, 1};
+ {undefined, 0};
%% Just publish in journal
journal_minus_segment1({?PUB, no_del, no_ack}, undefined) ->
- {keep, 0, 0};
+ {keep, 0};
%% Publish and deliver in journal
journal_minus_segment1({?PUB, del, no_ack}, undefined) ->
- {keep, 0, 0};
+ {keep, 0};
journal_minus_segment1({?PUB = Pub, del, no_ack}, {Pub, no_del, no_ack}) ->
- {{no_pub, del, no_ack}, 1, 0};
+ {{no_pub, del, no_ack}, 1};
%% Publish, deliver and ack in journal
journal_minus_segment1({?PUB, del, ack}, undefined) ->
- {keep, 0, 0};
+ {keep, 0};
journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, no_del, no_ack}) ->
- {{no_pub, del, ack}, 1, 0};
+ {{no_pub, del, ack}, 1};
journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, del, no_ack}) ->
- {{no_pub, no_del, ack}, 1, 0};
+ {{no_pub, no_del, ack}, 1};
%% Just deliver in journal
journal_minus_segment1({no_pub, del, no_ack}, {?PUB, no_del, no_ack}) ->
- {keep, 0, 0};
+ {keep, 0};
journal_minus_segment1({no_pub, del, no_ack}, {?PUB, del, no_ack}) ->
- {undefined, 0, 0};
+ {undefined, 0};
%% Just ack in journal
journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) ->
- {keep, 0, 0};
+ {keep, 0};
journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) ->
- {undefined, 0, 1};
+ {undefined, -1};
%% Deliver and ack in journal
journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) ->
- {keep, 0, 0};
+ {keep, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
- {{no_pub, no_del, ack}, 0, 0};
+ {{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
- {undefined, 0, 1}.
+ {undefined, -1}.