summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl110
1 files changed, 43 insertions, 67 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 02d0d8ad41..858ade2629 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -155,11 +155,13 @@
-define(PUB, {_Guid, _IsPersistent}).
+-define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
+
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count }).
--record(segment, { unacked, handle, journal_entries, path, num }).
+-record(segment, { unacked, journal_entries, path, num }).
-include("rabbit.hrl").
@@ -170,7 +172,6 @@
-type(hdl() :: ('undefined' | any())).
-type(segment() :: ('undefined' |
#segment { unacked :: non_neg_integer(),
- handle :: hdl(),
journal_entries :: array(),
path :: file_path(),
num :: non_neg_integer()
@@ -296,9 +297,9 @@ read(Start, End, State = #qistate { segments = Segments,
true -> EndRelSeq;
false -> ?SEGMENT_ENTRY_COUNT
end,
- Segment = segment_find_or_new(StartSeg, Dir, Segments),
- {SegEntries, _UnackedCount, Segment1} = load_segment(false, Segment),
- #segment { journal_entries = JEntries } = Segment1,
+ Segment = #segment { journal_entries = JEntries } =
+ segment_find_or_new(StartSeg, Dir, Segments),
+ {SegEntries, _UnackedCount} = load_segment(false, Segment),
{SegEntries1, _UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
{array:sparse_foldr(
@@ -310,7 +311,7 @@ read(Start, End, State = #qistate { segments = Segments,
Acc
end, [], SegEntries1),
Again,
- State #qistate { segments = segment_store(Segment1, Segments) }}.
+ State #qistate { segments = segment_store(Segment, Segments) }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -448,28 +449,23 @@ terminate(State = #qistate { journal_handle = JournalHdl,
end,
SegmentCounts =
segment_fold(
- fun (Seg, #segment { handle = Hdl, unacked = UnackedCount },
- SegmentCountsAcc) ->
- ok = case Hdl of
- undefined -> ok;
- _ -> file_handle_cache:close(Hdl)
- end,
+ fun (Seg, #segment { unacked = UnackedCount }, SegmentCountsAcc) ->
[{Seg, UnackedCount} | SegmentCountsAcc]
end, [], Segments),
{SegmentCounts, State #qistate { journal_handle = undefined,
segments = undefined }}.
-recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
- {SegEntries, UnackedCount, Segment1} = load_segment(false, Segment),
- #segment { journal_entries = JEntries } = Segment1,
+recover_segment(ContainsCheckFun, CleanShutdown,
+ Segment = #segment { journal_entries = JEntries }) ->
+ {SegEntries, UnackedCount} = load_segment(false, Segment),
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment2) ->
+ fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
- Del, RelSeq, Segment2)
+ Del, RelSeq, Segment1)
end,
- Segment1 #segment { unacked = UnackedCount + UnackedCountDelta },
+ Segment #segment { unacked = UnackedCount + UnackedCountDelta },
SegEntries1).
recover_message( true, true, _Del, _RelSeq, Segment) ->
@@ -578,8 +574,11 @@ maybe_flush_journal(State) ->
flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
- fun (_Seg, #segment { unacked = 0 } = Segment, SegmentsN) ->
- ok = delete_segment(Segment),
+ fun (_Seg, #segment { unacked = 0, path = Path }, SegmentsN) ->
+ case filelib:is_file(Path) of
+ true -> ok = file:delete(Path);
+ false -> ok
+ end,
SegmentsN;
(_Seg, #segment {} = Segment, SegmentsN) ->
segment_store(append_journal_to_segment(Segment), SegmentsN)
@@ -589,21 +588,21 @@ flush_journal(State = #qistate { segments = Segments }) ->
ok = file_handle_cache:clear(JournalHdl),
State1 #qistate { dirty_count = 0 }.
-append_journal_to_segment(#segment { journal_entries = JEntries } = Segment) ->
+append_journal_to_segment(#segment { journal_entries = JEntries,
+ path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {Hdl, Segment1} = get_segment_handle(Segment),
+ _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ [{write_buffer, infinity}]),
array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
- ok = file_handle_cache:sync(Hdl),
- Segment1 #segment { journal_entries = array_new() }
+ file_handle_cache:close(Hdl),
+ Segment #segment { journal_entries = array_new() }
end.
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path,
- [binary, raw, read, write,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}],
+ {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
@@ -627,14 +626,13 @@ recover_journal(State) ->
%% We want to keep ack'd entries in so that we can
%% remove them if duplicates are in the journal. The
%% counts here are purely from the segment itself.
- {SegEntries, UnackedCountInSeg, Segment1} =
- load_segment(true, Segment),
+ {SegEntries, UnackedCountInSeg} = load_segment(true, Segment),
{JEntries1, UnackedCountDuplicates} =
journal_minus_segment(JEntries, SegEntries),
- Segment1 #segment { journal_entries = JEntries1,
- unacked = (UnackedCountInJournal +
- UnackedCountInSeg -
- UnackedCountDuplicates) }
+ Segment #segment { journal_entries = JEntries1,
+ unacked = (UnackedCountInJournal +
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
end, Segments),
State1 #qistate { segments = Segments1 }.
@@ -693,31 +691,13 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
end, sets:from_list(segment_fetch_keys(Segments)),
filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
-delete_segment(#segment { handle = undefined }) ->
- ok;
-delete_segment(#segment { handle = Hdl }) ->
- ok = file_handle_cache:delete(Hdl).
-
-get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
- {ok, Hdl} = file_handle_cache:open(Path,
- [binary, raw, read, write,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}],
- [{write_buffer, infinity}]),
- {Hdl, Segment #segment { handle = Hdl }};
-get_segment_handle(Segment = #segment { handle = Hdl }) ->
- {Hdl, Segment}.
-
-segment_new(Seg, Dir) ->
- #segment { unacked = 0,
- handle = undefined,
- journal_entries = array_new(),
- path = seg_num_to_path(Dir, Seg),
- num = Seg }.
-
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
- error -> segment_new(Seg, Dir);
- {ok, Segment} -> Segment
+ {ok, Segment} -> Segment;
+ error -> #segment { unacked = 0,
+ journal_entries = array_new(),
+ path = seg_num_to_path(Dir, Seg),
+ num = Seg }
end.
segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
@@ -793,18 +773,14 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
%% Does not do any combining with the journal at all. The PubCount
%% that comes back is the number of publishes in the segment. The
%% number of unacked msgs is PubCount - AckCount.
-load_segment(KeepAcked, Segment = #segment { path = Path, handle = SegHdl }) ->
- SegmentExists = case SegHdl of
- undefined -> filelib:is_file(Path);
- _ -> true
- end,
- case SegmentExists of
- false -> {array_new(), 0, Segment};
- true -> {Hdl, Segment1} = get_segment_handle(Segment),
+load_segment(KeepAcked, #segment { path = Path }) ->
+ case filelib:is_file(Path) of
+ false -> {array_new(), 0};
+ true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {SegEntries, UnackedCount} =
- load_segment_entries(KeepAcked, Hdl, array_new(), 0),
- {SegEntries, UnackedCount, Segment1}
+ Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
+ file_handle_cache:close(Hdl),
+ Res
end.
load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->