summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl26
-rw-r--r--src/rabbit_queue_index.erl110
2 files changed, 106 insertions, 30 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 2a3f1ded74..8e0849022e 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -119,7 +119,8 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
- append_write_buffer/1, copy/3, set_maximum_since_use/1]).
+ append_write_buffer/1, copy/3, set_maximum_since_use/1, delete/1,
+ discard_write_buffer/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -193,6 +194,8 @@
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
({'ok', integer()} | error())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
+-spec(delete/1 :: (ref()) -> ok_or_error()).
+-spec(discard_write_buffer/1 :: (ref()) -> ok_or_error()).
-endif.
@@ -361,6 +364,27 @@ copy(Src, Dest, Count) ->
{error, incorrect_handle_modes}
end).
+delete(Ref) ->
+ case erase({Ref, fhc_handle}) of
+ undefined -> ok;
+ Handle = #handle { path = Path } ->
+ Handle1 = Handle #handle { is_dirty = false, write_buffer = [] },
+ case close1(Ref, Handle1, hard) of
+ ok -> file:delete(Path);
+ Error -> Error
+ end
+ end.
+
+discard_write_buffer(Ref) ->
+ with_handles(
+ [Ref],
+ fun ([#handle { write_buffer = [] }]) ->
+ ok;
+ ([Handle = #handle { write_buffer_size = Size, offset = Offset }]) ->
+ {ok, [Handle #handle { write_buffer = [], write_buffer_size = 0,
+ offset = Offset - Size }]}
+ end).
+
set_maximum_since_use(MaximumAge) ->
Now = now(),
case lists:foldl(
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index acebc32d6c..bbd9508672 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -87,7 +87,9 @@
{ dir,
segments,
journal_handle,
- dirty_count
+ dirty_count,
+ last_seg_a,
+ last_seg_b
}).
-record(segment,
@@ -238,8 +240,9 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
flush_journal(State = #qistate { dirty_count = 0 }) ->
State;
-flush_journal(State = #qistate { segments = Segments }) ->
- State1 =
+flush_journal(State) ->
+ State1 = #qistate { segments = Segments } = get_all_segments(State),
+ State2 =
dict:fold(
fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
acks = AckCount } = Segment, StateN) ->
@@ -261,12 +264,13 @@ flush_journal(State = #qistate { segments = Segments }) ->
dict:new() }, StateN)
end
end
- end, State #qistate { segments = dict:new() }, Segments),
- {JournalHdl, State2} = get_journal_handle(State1),
+ end, State1 #qistate { segments = dict:new() }, Segments),
+ {JournalHdl, State3} = get_journal_handle(State2),
+ ok = file_handle_cache:discard_write_buffer(JournalHdl),
{ok, 0} = file_handle_cache:position(JournalHdl, bof),
ok = file_handle_cache:truncate(JournalHdl),
ok = file_handle_cache:sync(JournalHdl),
- State2 #qistate { dirty_count = 0 }.
+ State3 #qistate { dirty_count = 0 }.
read_segment_entries(InitSeqId, State) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
@@ -384,7 +388,8 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount })
maybe_flush_journal(State) ->
State.
-all_segment_nums(#qistate { segments = Segments, dir = Dir }) ->
+all_segment_nums(State = #qistate { dir = Dir }) ->
+ #qistate { segments = Segments } = get_all_segments(State),
sets:to_list(
lists:foldl(
fun (SegName, Set) ->
@@ -402,7 +407,9 @@ blank_state(QueueName) ->
#qistate { dir = Dir,
segments = dict:new(),
journal_handle = undefined,
- dirty_count = 0
+ dirty_count = 0,
+ last_seg_a = undefined,
+ last_seg_b = undefined
}.
rev_sort(List) ->
@@ -420,9 +427,8 @@ seg_num_to_path(Dir, Seg) ->
delete_segment(#segment { handle = undefined }) ->
ok;
-delete_segment(#segment { handle = Hdl, path = Path }) ->
- ok = file_handle_cache:close(Hdl),
- ok = file:delete(Path),
+delete_segment(#segment { handle = Hdl }) ->
+ ok = file_handle_cache:delete(Hdl),
ok.
detect_clean_shutdown(Dir) ->
@@ -462,6 +468,10 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
get_segment_handle(Segment = #segment { handle = Hdl }) ->
{Hdl, Segment}.
+find_segment(Seg, #qistate { last_seg_a = #segment { num = Seg } = Segment }) ->
+ Segment;
+find_segment(Seg, #qistate { last_seg_b = #segment { num = Seg } = Segment }) ->
+ Segment;
find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
case dict:find(Seg, Segments) of
{ok, Segment = #segment{}} -> Segment;
@@ -474,9 +484,46 @@ find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
}
end.
-store_segment(Segment = #segment { num = Seg },
- State = #qistate { segments = Segments }) ->
- State #qistate { segments = dict:store(Seg, Segment, Segments) }.
+store_segment(Segment = #segment { num = Seg }, State =
+ #qistate { last_seg_a = #segment { num = Seg }}) ->
+ State #qistate { last_seg_a = Segment };
+store_segment(Segment = #segment { num = Seg }, State =
+ #qistate { last_seg_b = #segment { num = Seg }}) ->
+ State #qistate { last_seg_b = Segment };
+store_segment(Segment, State =
+ #qistate { last_seg_a = LastSegA, last_seg_b = LastSegB }) ->
+ case LastSegA of
+ undefined ->
+ State #qistate { last_seg_a = Segment };
+ _ ->
+ case LastSegB of
+ undefined ->
+ State #qistate { last_seg_b = Segment };
+ _ ->
+ State1 = #qistate { segments = Segments } =
+ State #qistate { last_seg_a = LastSegB,
+ last_seg_b = Segment },
+ State1 #qistate {
+ segments = return_segment_to_dict(LastSegA, Segments) }
+ end
+ end.
+
+get_all_segments(State = #qistate { last_seg_a = undefined,
+ last_seg_b = undefined }) ->
+ State;
+get_all_segments(State = #qistate { segments = Segments,
+ last_seg_a = LastSegA,
+ last_seg_b = LastSegB }) ->
+ State #qistate { last_seg_a = undefined,
+ last_seg_b = undefined,
+ segments = return_segment_to_dict(
+ LastSegB,
+ return_segment_to_dict(LastSegA, Segments)) }.
+
+return_segment_to_dict(undefined, Segments) ->
+ Segments;
+return_segment_to_dict(Segment = #segment { num = Seg }, Segments) ->
+ dict:store(Seg, Segment, Segments).
get_journal_handle(State =
#qistate { journal_handle = undefined, dir = Dir }) ->
@@ -517,8 +564,9 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
Hdl.
terminate(StoreShutdown, State =
- #qistate { segments = Segments, journal_handle = JournalHdl,
+ #qistate { journal_handle = JournalHdl,
dir = Dir }) ->
+ State1 = #qistate { segments = Segments } = get_all_segments(State),
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
@@ -533,7 +581,7 @@ terminate(StoreShutdown, State =
true -> store_clean_shutdown(Dir);
false -> ok
end,
- State #qistate { journal_handle = undefined, segments = dict:new() }.
+ State1 #qistate { journal_handle = undefined, segments = dict:new() }.
%%----------------------------------------------------------------------------
%% Majors
@@ -608,7 +656,8 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) ->
load_journal(State) ->
{JournalHdl, State1} = get_journal_handle(State),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- State2 = #qistate { segments = Segments } = load_journal_entries(State1),
+ State2 = #qistate { segments = Segments } =
+ get_all_segments(load_journal_entries(State1)),
dict:fold(
fun (Seg, #segment { journal_entries = JEntries,
pubs = PubCountInJournal,
@@ -681,18 +730,21 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
%% pub. Also, always want to keep acks. Things must occur in the right
%% order though.
add_to_journal(RelSeq, Action, SegJDict) ->
- case dict:find(RelSeq, SegJDict) of
- {ok, {PubRecord, no_del, no_ack}} when Action == del ->
- dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict);
- {ok, {PubRecord, DelRecord, no_ack}} when Action == ack ->
- dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict);
- error when Action == del ->
- dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict);
- error when Action == ack ->
- dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict);
- error ->
- {_MsgId, _IsPersistent} = Action, %% ASSERTION
- dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict)
+ case dict:is_key(RelSeq, SegJDict) of
+ true ->
+ dict:update(RelSeq,
+ fun ({PubRecord, no_del, no_ack}) when Action == del ->
+ {PubRecord, del, no_ack};
+ ({PubRecord, Del, no_ack}) when Action == ack ->
+ {PubRecord, Del, ack}
+ end, SegJDict);
+ false ->
+ dict:store(RelSeq,
+ case Action of
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack};
+ {_Msg, _IsPersistent} -> {Action, no_del, no_ack}
+ end, SegJDict)
end.
%% Combine what we have just read from a segment file with what we're