summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-04 12:10:10 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-04 12:10:10 +0000
commit41805b590d507f3c9a443d48b3661364e052e9ae (patch)
tree61e3a0db9ecf066758a5d6c35ed9022cb573a868
parentb51ba757d4e71fb86259c69f3693747d1d122e38 (diff)
downloadrabbitmq-server-git-41805b590d507f3c9a443d48b3661364e052e9ae.tar.gz
refactoring
-rw-r--r--src/rabbit_queue_index.erl73
1 files changed, 39 insertions, 34 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6345428e8d..ed469849ff 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -156,7 +156,8 @@ init(Name) ->
State1 = load_journal(State),
%% 2. Flush the journal. This makes life easier for everyone, as
%% it means there won't be any publishes in the journal alone.
- State2 = #qistate { dir = Dir } = flush_journal(State1),
+ State2 = #qistate { dir = Dir, segments = Segments,
+ dirty_count = DCount } = flush_journal(State1),
%% 3. Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
@@ -167,39 +168,41 @@ init(Name) ->
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
%% that have been acked.
- {State3 = #qistate { segments = Segments }, Count} =
+ {Segments1, Count, DCount1} =
lists:foldl(
- fun (Seg, {StateN = #qistate { segments = SegmentsN }, CountAcc}) ->
- Segment = segment_find_or_new(Seg, Dir, SegmentsN),
+ fun (Seg, {Segments2, CountAcc, DCountAcc}) ->
+ Segment = segment_find_or_new(Seg, Dir, Segments2),
{SegDict, _PubCount, _AckCount, Segment1} =
load_segment(false, Segment),
- SegmentsN1 = segment_store(Segment1, SegmentsN),
- StateN1 = #qistate { segments = SegmentsN2 } =
+ {Segment2 = #segment { pubs = PubCount, acks = AckCount },
+ DCountAcc1} =
dict:fold(
fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
- StateM) ->
- SeqId = reconstruct_seq_id(Seg, RelSeq),
+ {Segment3, DCountAcc2}) ->
InMsgStore = rabbit_msg_store:contains(MsgId),
case {InMsgStore, CleanShutdown} of
{true, true} ->
- StateM;
+ {Segment3, DCountAcc};
{true, false} when Del == del ->
- StateM;
+ {Segment3, DCountAcc};
{true, false} ->
- add_to_journal(SeqId, del, StateM);
+ {add_to_journal(RelSeq, del, Segment3),
+ DCountAcc2 + 1};
{false, _} when Del == del ->
- add_to_journal(SeqId, ack, StateM);
+ {add_to_journal(RelSeq, ack, Segment3),
+ DCountAcc2 + 1};
{false, _} ->
- add_to_journal(
- SeqId, ack,
- add_to_journal(SeqId, del, StateM))
+ {add_to_journal(
+ RelSeq, ack,
+ add_to_journal(
+ RelSeq, del, Segment3)),
+ DCountAcc2 + 2}
end
- end, StateN #qistate { segments=SegmentsN1 }, SegDict),
- {ok, #segment { pubs = PubCount, acks = AckCount }} =
- segment_find(Seg, SegmentsN2),
- {StateN1, CountAcc + PubCount - AckCount}
- end, {State2, 0}, AllSegs),
- {Count, State3}.
+ end, {Segment1, DCountAcc}, SegDict),
+ {segment_store(Segment2, Segments2),
+ CountAcc + PubCount - AckCount, DCountAcc1}
+ end, {Segments, 0, DCount}, AllSegs),
+ {Count, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}.
terminate(State) ->
terminate(true, State).
@@ -491,7 +494,7 @@ segment_new(Seg, Dir) ->
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
- error -> segment_new(Seg, Dir);
+ error -> segment_new(Seg, Dir);
{ok, Segment} -> Segment
end.
@@ -717,19 +720,21 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
segments = Segments,
dir = Dir }) ->
{Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- Segment = #segment { journal_entries = SegJDict,
- pubs = PubCount, acks = AckCount } =
- segment_find_or_new(Seg, Dir, Segments),
- SegJDict1 = add_to_journal(RelSeq, Action, SegJDict),
- Segment1 = Segment #segment { journal_entries = SegJDict1 },
- Segment2 =
- case Action of
- del -> Segment1;
- ack -> Segment1 #segment { acks = AckCount + 1 };
- {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
- end,
+ Segment = segment_find_or_new(Seg, Dir, Segments),
+ Segment1 = add_to_journal(RelSeq, Action, Segment),
State #qistate { dirty_count = DCount + 1,
- segments = segment_store(Segment2, Segments) };
+ segments = segment_store(Segment1, Segments) };
+
+add_to_journal(RelSeq, Action, Segment =
+ #segment { journal_entries = SegJournal,
+ pubs = PubCount, acks = AckCount }) ->
+ SegJournal1 = add_to_journal(RelSeq, Action, SegJournal),
+ Segment1 = Segment #segment { journal_entries = SegJournal1 },
+ case Action of
+ del -> Segment1;
+ ack -> Segment1 #segment { acks = AckCount + 1 };
+ {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
+ end;
%% This is a more relaxed version of deliver_or_ack_msg because we can
%% have dels or acks in the journal without the corresponding