diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-15 17:25:14 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-15 17:25:14 +0100 |
| commit | b5cb4324a2b25a5ba79f4b2865e63ad7cebe4e6b (patch) | |
| tree | 8e3c464a5e84e1b83512259b5c6bc1c0633b91ae /src | |
| parent | 596b6087ca0176a9fd30f0f6588830f59ba11895 (diff) | |
| download | rabbitmq-server-git-b5cb4324a2b25a5ba79f4b2865e63ad7cebe4e6b.tar.gz | |
cosmetic: group functions by purpose
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 415 |
1 files changed, 209 insertions, 206 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a4e3689168..5e5ac4ce3f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -215,7 +215,7 @@ %%---------------------------------------------------------------------------- -%% Public API +%% public API %%---------------------------------------------------------------------------- init(Name, MsgStoreRecovered, ContainsCheckFun) -> @@ -294,17 +294,6 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> dirty_count = 1 }), {Count, Terms, State3}. -maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> - Segment; -maybe_add_to_journal( true, false, del, _RelSeq, Segment) -> - Segment; -maybe_add_to_journal( true, false, _Del, RelSeq, Segment) -> - add_to_journal(RelSeq, del, Segment); -maybe_add_to_journal(false, _, del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, Segment); -maybe_add_to_journal(false, _, _Del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). - terminate(Terms, State) -> terminate(true, Terms, State). @@ -453,7 +442,65 @@ recover(DurableQueues) -> {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. %%---------------------------------------------------------------------------- -%% Msg Store Startup Delta Function +%% startup and shutdown +%%---------------------------------------------------------------------------- + +blank_state(QueueName) -> + StrName = queue_name_to_dir_name(QueueName), + Dir = filename:join(queues_dir(), StrName), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + #qistate { dir = Dir, + segments = segments_new(), + journal_handle = undefined, + dirty_count = 0 }. + +detect_clean_shutdown(Dir) -> + case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + ok -> true; + {error, enoent} -> false + end. + +read_shutdown_terms(Dir) -> + rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)). + +store_clean_shutdown(Terms, Dir) -> + rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + +terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) -> + State; +terminate(StoreShutdown, Terms, State = + #qistate { journal_handle = JournalHdl, + dir = Dir, segments = Segments }) -> + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + SegTerms = segment_fold( + fun (Seg, #segment { handle = Hdl, pubs = PubCount, + acks = AckCount }, SegTermsAcc) -> + ok = case Hdl of + undefined -> ok; + _ -> file_handle_cache:close(Hdl) + end, + [{Seg, {PubCount, AckCount}} | SegTermsAcc] + end, [], Segments), + case StoreShutdown of + true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); + false -> ok + end, + State #qistate { journal_handle = undefined, segments = undefined }. + +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + Bin = term_to_binary(Name), + Size = 8*size(Bin), + <<Num:Size>> = Bin, + lists:flatten(io_lib:format("~.36B", [Num])). + +queues_dir() -> + filename:join(rabbit_mnesia:dir(), "queues"). + +%%---------------------------------------------------------------------------- +%% msg store startup delta function %%---------------------------------------------------------------------------- queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> @@ -490,39 +537,138 @@ queue_index_walker_reader(QueueName, Gatherer) -> ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- -%% Minors +%% journal manipulation %%---------------------------------------------------------------------------- +maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> + Segment; +maybe_add_to_journal( true, false, del, _RelSeq, Segment) -> + Segment; +maybe_add_to_journal( true, false, _Del, RelSeq, Segment) -> + add_to_journal(RelSeq, del, Segment); +maybe_add_to_journal(false, _, del, RelSeq, Segment) -> + add_to_journal(RelSeq, ack, Segment); +maybe_add_to_journal(false, _, _Del, RelSeq, Segment) -> + add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). + +add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, + segments = Segments, + dir = Dir }) -> + {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + Segment = segment_find_or_new(Seg, Dir, Segments), + Segment1 = add_to_journal(RelSeq, Action, Segment), + State #qistate { dirty_count = DCount + 1, + segments = segment_store(Segment1, Segments) }; + +add_to_journal(RelSeq, Action, + Segment = #segment { journal_entries = JEntries, + pubs = PubCount, acks = AckCount }) -> + Segment1 = Segment #segment { + journal_entries = add_to_journal(RelSeq, Action, JEntries) }, + case Action of + del -> Segment1; + ack -> Segment1 #segment { acks = AckCount + 1 }; + {_Guid, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } + end; + +%% This is a more relaxed version of deliver_or_ack_msg because we can +%% have dels or acks in the journal without the corresponding +%% pub. Also, always want to keep acks. Things must occur in the right +%% order though. +add_to_journal(RelSeq, Action, SegJArray) -> + case array:get(RelSeq, SegJArray) of + undefined -> + array:set(RelSeq, + case Action of + {_Msg, _IsPersistent} -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end, SegJArray); + ({Pub, no_del, no_ack}) when Action == del -> + array:set(RelSeq, {Pub, del, no_ack}, SegJArray); + ({Pub, Del, no_ack}) when Action == ack -> + array:set(RelSeq, {Pub, Del, ack}, SegJArray) + end. + maybe_flush_journal(State = #qistate { dirty_count = DCount }) when DCount > ?MAX_JOURNAL_ENTRY_COUNT -> flush_journal(State); maybe_flush_journal(State) -> State. -all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> - lists:sort( - sets:to_list( - lists:foldl( - fun (SegName, Set) -> - sets:add_element( - list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, - SegName)), Set) - end, sets:from_list(segment_fetch_keys(Segments)), - filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). +get_journal_handle(State = #qistate { journal_handle = undefined, + dir = Dir }) -> + Path = filename:join(Dir, ?JOURNAL_FILENAME), + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, State #qistate { journal_handle = Hdl }}; +get_journal_handle(State = #qistate { journal_handle = Hdl }) -> + {Hdl, State}. -blank_state(QueueName) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - segments = segments_new(), - journal_handle = undefined, - dirty_count = 0 - }. +%% Loading Journal. This isn't idempotent and will mess up the counts +%% if you call it more than once on the same state. Assumes the counts +%% are 0 to start with. +load_journal(State) -> + {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + State2 = #qistate { segments = Segments } = load_journal_entries(State1), + Segments1 = + segment_map( + fun (_Seg, Segment = #segment { journal_entries = JEntries, + pubs = PubCountInJournal, + acks = AckCountInJournal }) -> + %% We want to keep acks in so that we can remove + %% them if duplicates are in the journal. The counts + %% here are purely from the segment itself. + {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = + load_segment(true, Segment), + %% Removed counts here are the number of pubs and + %% acks that are duplicates - i.e. found in both the + %% segment and journal. + {JEntries1, PubsRemoved, AcksRemoved} = + journal_minus_segment(JEntries, SegEntries), + PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, + AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, + Segment1 #segment { journal_entries = JEntries1, + pubs = PubCount1, + acks = AckCount1 } + end, Segments), + State2 #qistate { segments = Segments1 }. -array_new() -> - array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). +load_journal_entries(State = #qistate { journal_handle = Hdl }) -> + case file_handle_cache:read(Hdl, ?SEQ_BYTES) of + {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> + case Prefix of + ?DEL_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, del, State)); + ?ACK_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, ack, State)); + _ -> + case file_handle_cache:read(Hdl, ?GUID_BYTES) of + {ok, <<GuidNum:?GUID_BITS>>} -> + %% work around for binary data + %% fragmentation. See + %% rabbit_msg_file:read_next/2 + <<Guid:?GUID_BYTES/binary>> = + <<GuidNum:?GUID_BITS>>, + Publish = {Guid, case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, + load_journal_entries( + add_to_journal(SeqId, Publish, State)); + _ErrOrEoF -> %% err, we've lost at least a publish + State + end + end; + _ErrOrEoF -> State + end. + +%%---------------------------------------------------------------------------- +%% segment manipulation +%%---------------------------------------------------------------------------- seq_id_to_seg_and_rel_seq_id(SeqId) -> { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. @@ -534,32 +680,23 @@ seg_num_to_path(Dir, Seg) -> SegName = integer_to_list(Seg), filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). +all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> + lists:sort( + sets:to_list( + lists:foldl( + fun (SegName, Set) -> + sets:add_element( + list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, + SegName)), Set) + end, sets:from_list(segment_fetch_keys(Segments)), + filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). + delete_segment(#segment { handle = undefined }) -> ok; delete_segment(#segment { handle = Hdl }) -> ok = file_handle_cache:delete(Hdl). -detect_clean_shutdown(Dir) -> - case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of - ok -> true; - {error, enoent} -> false - end. - -read_shutdown_terms(Dir) -> - rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)). - -store_clean_shutdown(Terms, Dir) -> - rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). - -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, - lists:flatten(io_lib:format("~.36B", [Num])). - -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). - get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> {ok, Hdl} = file_handle_cache:open(Path, [binary, raw, read, write, @@ -626,20 +763,6 @@ segment_fetch_keys({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -get_journal_handle(State = - #qistate { journal_handle = undefined, dir = Dir }) -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - [{write_buffer, infinity}]), - {Hdl, State #qistate { journal_handle = Hdl }}; -get_journal_handle(State = #qistate { journal_handle = Hdl }) -> - {Hdl, State}. - -bool_to_int(true ) -> 1; -bool_to_int(false) -> 0. - write_entry_to_segment(_RelSeq, {{_Guid, _IsPersistent}, del, ack}, Hdl) -> Hdl; write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> @@ -666,57 +789,26 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> end, Hdl. -terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) -> - State; -terminate(StoreShutdown, Terms, State = - #qistate { journal_handle = JournalHdl, - dir = Dir, segments = Segments }) -> - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - SegTerms = segment_fold( - fun (Seg, #segment { handle = Hdl, pubs = PubCount, - acks = AckCount }, SegTermsAcc) -> - ok = case Hdl of - undefined -> ok; - _ -> file_handle_cache:close(Hdl) - end, - [{Seg, {PubCount, AckCount}} | SegTermsAcc] - end, [], Segments), - case StoreShutdown of - true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); - false -> ok - end, - State #qistate { journal_handle = undefined, segments = undefined }. - -%%---------------------------------------------------------------------------- -%% Majors -%%---------------------------------------------------------------------------- - %% Loading segments - +%% %% Does not do any combining with the journal at all. The PubCount %% that comes back is the number of publishes in the segment. The %% number of unacked msgs is PubCount - AckCount. If KeepAcks is %% false, then array:sparse_size(SegEntries) == PubCount - %% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries) %% == PubCount. -load_segment(KeepAcks, - Segment = #segment { path = Path, handle = SegHdl }) -> +load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> SegmentExists = case SegHdl of undefined -> filelib:is_file(Path); _ -> true end, case SegmentExists of - false -> - {array_new(), 0, 0, Segment}; - true -> - {Hdl, Segment1} = get_segment_handle(Segment), - {ok, 0} = file_handle_cache:position(Hdl, bof), - {SegEntries, PubCount, AckCount} = - load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0), - {SegEntries, PubCount, AckCount, Segment1} + false -> {array_new(), 0, 0, Segment}; + true -> {Hdl, Segment1} = get_segment_handle(Segment), + {ok, 0} = file_handle_cache:position(Hdl, bof), + {SegEntries, PubCount, AckCount} = + load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0), + {SegEntries, PubCount, AckCount, Segment1} end. load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> @@ -752,104 +844,15 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) -> {AckCount + 1, array:reset(RelSeq, SegEntries)} end. -%% Loading Journal. This isn't idempotent and will mess up the counts -%% if you call it more than once on the same state. Assumes the counts -%% are 0 to start with. - -load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State2 = #qistate { segments = Segments } = load_journal_entries(State1), - Segments1 = - segment_map( - fun (_Seg, Segment = #segment { journal_entries = JEntries, - pubs = PubCountInJournal, - acks = AckCountInJournal }) -> - %% We want to keep acks in so that we can remove - %% them if duplicates are in the journal. The counts - %% here are purely from the segment itself. - {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = - load_segment(true, Segment), - %% Removed counts here are the number of pubs and - %% acks that are duplicates - i.e. found in both the - %% segment and journal. - {JEntries1, PubsRemoved, AcksRemoved} = - journal_minus_segment(JEntries, SegEntries), - PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, - AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, - Segment1 #segment { journal_entries = JEntries1, - pubs = PubCount1, - acks = AckCount1 } - end, Segments), - State2 #qistate { segments = Segments1 }. - -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<GuidNum:?GUID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<Guid:?GUID_BYTES/binary>> = - <<GuidNum:?GUID_BITS>>, - Publish = {Guid, case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, - load_journal_entries( - add_to_journal(SeqId, Publish, State)); - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. - -add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, - segments = Segments, - dir = Dir }) -> - {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = segment_find_or_new(Seg, Dir, Segments), - Segment1 = add_to_journal(RelSeq, Action, Segment), - State #qistate { dirty_count = DCount + 1, - segments = segment_store(Segment1, Segments) }; +array_new() -> + array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). -add_to_journal(RelSeq, Action, - Segment = #segment { journal_entries = JEntries, - pubs = PubCount, acks = AckCount }) -> - Segment1 = Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries) }, - case Action of - del -> Segment1; - ack -> Segment1 #segment { acks = AckCount + 1 }; - {_Guid, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } - end; +bool_to_int(true ) -> 1; +bool_to_int(false) -> 0. -%% This is a more relaxed version of deliver_or_ack_msg because we can -%% have dels or acks in the journal without the corresponding -%% pub. Also, always want to keep acks. Things must occur in the right -%% order though. -add_to_journal(RelSeq, Action, SegJArray) -> - case array:get(RelSeq, SegJArray) of - undefined -> - array:set(RelSeq, - case Action of - {_Msg, _IsPersistent} -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end, SegJArray); - ({Pub, no_del, no_ack}) when Action == del -> - array:set(RelSeq, {Pub, del, no_ack}, SegJArray); - ({Pub, Del, no_ack}) when Action == ack -> - array:set(RelSeq, {Pub, Del, ack}, SegJArray) - end. +%%---------------------------------------------------------------------------- +%% journal & segment combination +%%---------------------------------------------------------------------------- %% Combine what we have just read from a segment file with what we're %% holding for that segment in memory. There must be no |
