summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-15 17:25:14 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-15 17:25:14 +0100
commitb5cb4324a2b25a5ba79f4b2865e63ad7cebe4e6b (patch)
tree8e3c464a5e84e1b83512259b5c6bc1c0633b91ae /src
parent596b6087ca0176a9fd30f0f6588830f59ba11895 (diff)
downloadrabbitmq-server-git-b5cb4324a2b25a5ba79f4b2865e63ad7cebe4e6b.tar.gz
cosmetic: group functions by purpose
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl415
1 files changed, 209 insertions, 206 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a4e3689168..5e5ac4ce3f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -215,7 +215,7 @@
%%----------------------------------------------------------------------------
-%% Public API
+%% public API
%%----------------------------------------------------------------------------
init(Name, MsgStoreRecovered, ContainsCheckFun) ->
@@ -294,17 +294,6 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
dirty_count = 1 }),
{Count, Terms, State3}.
-maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
- Segment;
-maybe_add_to_journal( true, false, del, _RelSeq, Segment) ->
- Segment;
-maybe_add_to_journal( true, false, _Del, RelSeq, Segment) ->
- add_to_journal(RelSeq, del, Segment);
-maybe_add_to_journal(false, _, del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, Segment);
-maybe_add_to_journal(false, _, _Del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
-
terminate(Terms, State) ->
terminate(true, Terms, State).
@@ -453,7 +442,65 @@ recover(DurableQueues) ->
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
%%----------------------------------------------------------------------------
-%% Msg Store Startup Delta Function
+%% startup and shutdown
+%%----------------------------------------------------------------------------
+
+blank_state(QueueName) ->
+ StrName = queue_name_to_dir_name(QueueName),
+ Dir = filename:join(queues_dir(), StrName),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ #qistate { dir = Dir,
+ segments = segments_new(),
+ journal_handle = undefined,
+ dirty_count = 0 }.
+
+detect_clean_shutdown(Dir) ->
+ case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ ok -> true;
+ {error, enoent} -> false
+ end.
+
+read_shutdown_terms(Dir) ->
+ rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
+
+store_clean_shutdown(Terms, Dir) ->
+ rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+
+terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) ->
+ State;
+terminate(StoreShutdown, Terms, State =
+ #qistate { journal_handle = JournalHdl,
+ dir = Dir, segments = Segments }) ->
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ SegTerms = segment_fold(
+ fun (Seg, #segment { handle = Hdl, pubs = PubCount,
+ acks = AckCount }, SegTermsAcc) ->
+ ok = case Hdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(Hdl)
+ end,
+ [{Seg, {PubCount, AckCount}} | SegTermsAcc]
+ end, [], Segments),
+ case StoreShutdown of
+ true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
+ false -> ok
+ end,
+ State #qistate { journal_handle = undefined, segments = undefined }.
+
+queue_name_to_dir_name(Name = #resource { kind = queue }) ->
+ Bin = term_to_binary(Name),
+ Size = 8*size(Bin),
+ <<Num:Size>> = Bin,
+ lists:flatten(io_lib:format("~.36B", [Num])).
+
+queues_dir() ->
+ filename:join(rabbit_mnesia:dir(), "queues").
+
+%%----------------------------------------------------------------------------
+%% msg store startup delta function
%%----------------------------------------------------------------------------
queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
@@ -490,39 +537,138 @@ queue_index_walker_reader(QueueName, Gatherer) ->
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
-%% Minors
+%% journal manipulation
%%----------------------------------------------------------------------------
+maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
+ Segment;
+maybe_add_to_journal( true, false, del, _RelSeq, Segment) ->
+ Segment;
+maybe_add_to_journal( true, false, _Del, RelSeq, Segment) ->
+ add_to_journal(RelSeq, del, Segment);
+maybe_add_to_journal(false, _, del, RelSeq, Segment) ->
+ add_to_journal(RelSeq, ack, Segment);
+maybe_add_to_journal(false, _, _Del, RelSeq, Segment) ->
+ add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
+
+add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
+ segments = Segments,
+ dir = Dir }) ->
+ {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ Segment = segment_find_or_new(Seg, Dir, Segments),
+ Segment1 = add_to_journal(RelSeq, Action, Segment),
+ State #qistate { dirty_count = DCount + 1,
+ segments = segment_store(Segment1, Segments) };
+
+add_to_journal(RelSeq, Action,
+ Segment = #segment { journal_entries = JEntries,
+ pubs = PubCount, acks = AckCount }) ->
+ Segment1 = Segment #segment {
+ journal_entries = add_to_journal(RelSeq, Action, JEntries) },
+ case Action of
+ del -> Segment1;
+ ack -> Segment1 #segment { acks = AckCount + 1 };
+ {_Guid, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
+ end;
+
+%% This is a more relaxed version of deliver_or_ack_msg because we can
+%% have dels or acks in the journal without the corresponding
+%% pub. Also, always want to keep acks. Things must occur in the right
+%% order though.
+add_to_journal(RelSeq, Action, SegJArray) ->
+ case array:get(RelSeq, SegJArray) of
+ undefined ->
+ array:set(RelSeq,
+ case Action of
+ {_Msg, _IsPersistent} -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end, SegJArray);
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, SegJArray);
+ ({Pub, Del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {Pub, Del, ack}, SegJArray)
+ end.
+
maybe_flush_journal(State = #qistate { dirty_count = DCount })
when DCount > ?MAX_JOURNAL_ENTRY_COUNT ->
flush_journal(State);
maybe_flush_journal(State) ->
State.
-all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
- lists:sort(
- sets:to_list(
- lists:foldl(
- fun (SegName, Set) ->
- sets:add_element(
- list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
- SegName)), Set)
- end, sets:from_list(segment_fetch_keys(Segments)),
- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
+get_journal_handle(State = #qistate { journal_handle = undefined,
+ dir = Dir }) ->
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ {ok, Hdl} = file_handle_cache:open(Path,
+ [binary, raw, read, write,
+ {read_ahead, ?SEGMENT_TOTAL_SIZE}],
+ [{write_buffer, infinity}]),
+ {Hdl, State #qistate { journal_handle = Hdl }};
+get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
+ {Hdl, State}.
-blank_state(QueueName) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- #qistate { dir = Dir,
- segments = segments_new(),
- journal_handle = undefined,
- dirty_count = 0
- }.
+%% Loading Journal. This isn't idempotent and will mess up the counts
+%% if you call it more than once on the same state. Assumes the counts
+%% are 0 to start with.
+load_journal(State) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ {ok, 0} = file_handle_cache:position(JournalHdl, 0),
+ State2 = #qistate { segments = Segments } = load_journal_entries(State1),
+ Segments1 =
+ segment_map(
+ fun (_Seg, Segment = #segment { journal_entries = JEntries,
+ pubs = PubCountInJournal,
+ acks = AckCountInJournal }) ->
+ %% We want to keep acks in so that we can remove
+ %% them if duplicates are in the journal. The counts
+ %% here are purely from the segment itself.
+ {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} =
+ load_segment(true, Segment),
+ %% Removed counts here are the number of pubs and
+ %% acks that are duplicates - i.e. found in both the
+ %% segment and journal.
+ {JEntries1, PubsRemoved, AcksRemoved} =
+ journal_minus_segment(JEntries, SegEntries),
+ PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
+ AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
+ Segment1 #segment { journal_entries = JEntries1,
+ pubs = PubCount1,
+ acks = AckCount1 }
+ end, Segments),
+ State2 #qistate { segments = Segments1 }.
-array_new() ->
- array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
+load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
+ case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
+ {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
+ case Prefix of
+ ?DEL_JPREFIX ->
+ load_journal_entries(add_to_journal(SeqId, del, State));
+ ?ACK_JPREFIX ->
+ load_journal_entries(add_to_journal(SeqId, ack, State));
+ _ ->
+ case file_handle_cache:read(Hdl, ?GUID_BYTES) of
+ {ok, <<GuidNum:?GUID_BITS>>} ->
+ %% work around for binary data
+ %% fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<Guid:?GUID_BYTES/binary>> =
+ <<GuidNum:?GUID_BITS>>,
+ Publish = {Guid, case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end},
+ load_journal_entries(
+ add_to_journal(SeqId, Publish, State));
+ _ErrOrEoF -> %% err, we've lost at least a publish
+ State
+ end
+ end;
+ _ErrOrEoF -> State
+ end.
+
+%%----------------------------------------------------------------------------
+%% segment manipulation
+%%----------------------------------------------------------------------------
seq_id_to_seg_and_rel_seq_id(SeqId) ->
{ SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }.
@@ -534,32 +680,23 @@ seg_num_to_path(Dir, Seg) ->
SegName = integer_to_list(Seg),
filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
+all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
+ lists:sort(
+ sets:to_list(
+ lists:foldl(
+ fun (SegName, Set) ->
+ sets:add_element(
+ list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
+ SegName)), Set)
+ end, sets:from_list(segment_fetch_keys(Segments)),
+ filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
+
delete_segment(#segment { handle = undefined }) ->
ok;
delete_segment(#segment { handle = Hdl }) ->
ok = file_handle_cache:delete(Hdl).
-detect_clean_shutdown(Dir) ->
- case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
- ok -> true;
- {error, enoent} -> false
- end.
-
-read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
-
-store_clean_shutdown(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
-
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- Bin = term_to_binary(Name),
- Size = 8*size(Bin),
- <<Num:Size>> = Bin,
- lists:flatten(io_lib:format("~.36B", [Num])).
-
-queues_dir() ->
- filename:join(rabbit_mnesia:dir(), "queues").
-
get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
{ok, Hdl} = file_handle_cache:open(Path,
[binary, raw, read, write,
@@ -626,20 +763,6 @@ segment_fetch_keys({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-get_journal_handle(State =
- #qistate { journal_handle = undefined, dir = Dir }) ->
- Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path,
- [binary, raw, read, write,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}],
- [{write_buffer, infinity}]),
- {Hdl, State #qistate { journal_handle = Hdl }};
-get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
- {Hdl, State}.
-
-bool_to_int(true ) -> 1;
-bool_to_int(false) -> 0.
-
write_entry_to_segment(_RelSeq, {{_Guid, _IsPersistent}, del, ack}, Hdl) ->
Hdl;
write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
@@ -666,57 +789,26 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
end,
Hdl.
-terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) ->
- State;
-terminate(StoreShutdown, Terms, State =
- #qistate { journal_handle = JournalHdl,
- dir = Dir, segments = Segments }) ->
- ok = case JournalHdl of
- undefined -> ok;
- _ -> file_handle_cache:close(JournalHdl)
- end,
- SegTerms = segment_fold(
- fun (Seg, #segment { handle = Hdl, pubs = PubCount,
- acks = AckCount }, SegTermsAcc) ->
- ok = case Hdl of
- undefined -> ok;
- _ -> file_handle_cache:close(Hdl)
- end,
- [{Seg, {PubCount, AckCount}} | SegTermsAcc]
- end, [], Segments),
- case StoreShutdown of
- true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
- false -> ok
- end,
- State #qistate { journal_handle = undefined, segments = undefined }.
-
-%%----------------------------------------------------------------------------
-%% Majors
-%%----------------------------------------------------------------------------
-
%% Loading segments
-
+%%
%% Does not do any combining with the journal at all. The PubCount
%% that comes back is the number of publishes in the segment. The
%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
%% false, then array:sparse_size(SegEntries) == PubCount -
%% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries)
%% == PubCount.
-load_segment(KeepAcks,
- Segment = #segment { path = Path, handle = SegHdl }) ->
+load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) ->
SegmentExists = case SegHdl of
undefined -> filelib:is_file(Path);
_ -> true
end,
case SegmentExists of
- false ->
- {array_new(), 0, 0, Segment};
- true ->
- {Hdl, Segment1} = get_segment_handle(Segment),
- {ok, 0} = file_handle_cache:position(Hdl, bof),
- {SegEntries, PubCount, AckCount} =
- load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0),
- {SegEntries, PubCount, AckCount, Segment1}
+ false -> {array_new(), 0, 0, Segment};
+ true -> {Hdl, Segment1} = get_segment_handle(Segment),
+ {ok, 0} = file_handle_cache:position(Hdl, bof),
+ {SegEntries, PubCount, AckCount} =
+ load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0),
+ {SegEntries, PubCount, AckCount, Segment1}
end.
load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
@@ -752,104 +844,15 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) ->
{AckCount + 1, array:reset(RelSeq, SegEntries)}
end.
-%% Loading Journal. This isn't idempotent and will mess up the counts
-%% if you call it more than once on the same state. Assumes the counts
-%% are 0 to start with.
-
-load_journal(State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- {ok, 0} = file_handle_cache:position(JournalHdl, 0),
- State2 = #qistate { segments = Segments } = load_journal_entries(State1),
- Segments1 =
- segment_map(
- fun (_Seg, Segment = #segment { journal_entries = JEntries,
- pubs = PubCountInJournal,
- acks = AckCountInJournal }) ->
- %% We want to keep acks in so that we can remove
- %% them if duplicates are in the journal. The counts
- %% here are purely from the segment itself.
- {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} =
- load_segment(true, Segment),
- %% Removed counts here are the number of pubs and
- %% acks that are duplicates - i.e. found in both the
- %% segment and journal.
- {JEntries1, PubsRemoved, AcksRemoved} =
- journal_minus_segment(JEntries, SegEntries),
- PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
- AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
- Segment1 #segment { journal_entries = JEntries1,
- pubs = PubCount1,
- acks = AckCount1 }
- end, Segments),
- State2 #qistate { segments = Segments1 }.
-
-load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
- case Prefix of
- ?DEL_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, del, State));
- ?ACK_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, ack, State));
- _ ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES) of
- {ok, <<GuidNum:?GUID_BITS>>} ->
- %% work around for binary data
- %% fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<Guid:?GUID_BYTES/binary>> =
- <<GuidNum:?GUID_BITS>>,
- Publish = {Guid, case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
- load_journal_entries(
- add_to_journal(SeqId, Publish, State));
- _ErrOrEoF -> %% err, we've lost at least a publish
- State
- end
- end;
- _ErrOrEoF -> State
- end.
-
-add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
- segments = Segments,
- dir = Dir }) ->
- {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- Segment = segment_find_or_new(Seg, Dir, Segments),
- Segment1 = add_to_journal(RelSeq, Action, Segment),
- State #qistate { dirty_count = DCount + 1,
- segments = segment_store(Segment1, Segments) };
+array_new() ->
+ array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
-add_to_journal(RelSeq, Action,
- Segment = #segment { journal_entries = JEntries,
- pubs = PubCount, acks = AckCount }) ->
- Segment1 = Segment #segment {
- journal_entries = add_to_journal(RelSeq, Action, JEntries) },
- case Action of
- del -> Segment1;
- ack -> Segment1 #segment { acks = AckCount + 1 };
- {_Guid, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
- end;
+bool_to_int(true ) -> 1;
+bool_to_int(false) -> 0.
-%% This is a more relaxed version of deliver_or_ack_msg because we can
-%% have dels or acks in the journal without the corresponding
-%% pub. Also, always want to keep acks. Things must occur in the right
-%% order though.
-add_to_journal(RelSeq, Action, SegJArray) ->
- case array:get(RelSeq, SegJArray) of
- undefined ->
- array:set(RelSeq,
- case Action of
- {_Msg, _IsPersistent} -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end, SegJArray);
- ({Pub, no_del, no_ack}) when Action == del ->
- array:set(RelSeq, {Pub, del, no_ack}, SegJArray);
- ({Pub, Del, no_ack}) when Action == ack ->
- array:set(RelSeq, {Pub, Del, ack}, SegJArray)
- end.
+%%----------------------------------------------------------------------------
+%% journal & segment combination
+%%----------------------------------------------------------------------------
%% Combine what we have just read from a segment file with what we're
%% holding for that segment in memory. There must be no