summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-16 11:26:22 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-16 11:26:22 +0100
commitd97a9e548b646a24ea134b9c08f0c0880d84c260 (patch)
tree2efdba46741f8a0575120ab5707d41219a104735 /src
parent389cfc6e763f3d109914e1d429c9f140588cbcb6 (diff)
downloadrabbitmq-server-git-d97a9e548b646a24ea134b9c08f0c0880d84c260.tar.gz
refactoring: extract segment recovery
also rename maybe_add_to_journal to recover_message, which better describes its purpose.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl57
1 files changed, 29 insertions, 28 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6a27992591..e478333859 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -239,23 +239,13 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
false ->
lists:foldl(
fun (Seg, {Segments2, CountAcc}) ->
- Segment = segment_find_or_new(Seg, Dir, Segments2),
- {SegEntries, PubCount, AckCount, Segment1} =
- load_segment(false, Segment),
- Segment2 =
- #segment { pubs = PubCount1, acks = AckCount1 } =
- array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del,
- no_ack},
- Segment3) ->
- maybe_add_to_journal(
- ContainsCheckFun(Guid),
- CleanShutdown, Del, RelSeq, Segment3)
- end, Segment1 #segment { pubs = PubCount,
- acks = AckCount },
- SegEntries),
- {segment_store(Segment2, Segments2),
- CountAcc + PubCount1 - AckCount1}
+ Segment = #segment { pubs = PubCount,
+ acks = AckCount } =
+ recover_segment(
+ ContainsCheckFun, CleanShutdown,
+ segment_find_or_new(Seg, Dir, Segments2)),
+ {segment_store(Segment, Segments2),
+ CountAcc + PubCount - AckCount}
end, {Segments, 0}, all_segment_nums(State2));
true ->
%% At this stage, we will only know about files that
@@ -484,6 +474,28 @@ terminate(StoreShutdown, Terms, State =
end,
State #qistate { journal_handle = undefined, segments = undefined }.
+recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
+ {SegEntries, PubCount, AckCount, Segment1} =
+ load_segment(false, Segment),
+ array:sparse_foldl(
+ fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) ->
+ recover_message(ContainsCheckFun(Guid), CleanShutdown,
+ Del, RelSeq, Segment3)
+ end,
+ Segment1 #segment { pubs = PubCount, acks = AckCount },
+ SegEntries).
+
+recover_message( true, true, _Del, _RelSeq, Segment) ->
+ Segment;
+recover_message( true, false, del, _RelSeq, Segment) ->
+ Segment;
+recover_message( true, false, no_del, RelSeq, Segment) ->
+ add_to_journal(RelSeq, del, Segment);
+recover_message(false, _, del, RelSeq, Segment) ->
+ add_to_journal(RelSeq, ack, Segment);
+recover_message(false, _, no_del, RelSeq, Segment) ->
+ add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
+
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
Bin = term_to_binary(Name),
Size = 8*size(Bin),
@@ -534,17 +546,6 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% journal manipulation
%%----------------------------------------------------------------------------
-maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
- Segment;
-maybe_add_to_journal( true, false, del, _RelSeq, Segment) ->
- Segment;
-maybe_add_to_journal( true, false, _Del, RelSeq, Segment) ->
- add_to_journal(RelSeq, del, Segment);
-maybe_add_to_journal(false, _, del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, Segment);
-maybe_add_to_journal(false, _, _Del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
-
add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
segments = Segments,
dir = Dir }) ->