summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-02 22:29:05 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-02 22:29:05 +0000
commit19bb74623da7a270ba6bfbc6e8f4a0bbb4f363fd (patch)
tree5db2fd5102075c0144be4440478fd5863d7a8575 /src
parent13ff649097a4c3f0baa9549e0a32d37d1e326264 (diff)
downloadrabbitmq-server-git-19bb74623da7a270ba6bfbc6e8f4a0bbb4f363fd.tar.gz
Up to 11kHz persistent on my home machine. Still a little way off the 13kHz that I was getting before, but there were bugs in the previous QI (eg missing syncs) which could well have led to the old version being too fast. Added two functions to fhc: delete/1 which deletes without flushing any data at all. It will refuse to delete if the file isn't open; discard_write_buffer/1 which does what it says. We use the latter after scattering the journal as after we hit each segment, we sync the segment, so at that point there's no need at all to force out the data in the fhc for the journal prior to truncation.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl26
-rw-r--r--src/rabbit_queue_index.erl110
2 files changed, 106 insertions, 30 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 2a3f1ded74..8e0849022e 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -119,7 +119,8 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
- append_write_buffer/1, copy/3, set_maximum_since_use/1]).
+ append_write_buffer/1, copy/3, set_maximum_since_use/1, delete/1,
+ discard_write_buffer/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -193,6 +194,8 @@
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
({'ok', integer()} | error())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
+-spec(delete/1 :: (ref()) -> ok_or_error()).
+-spec(discard_write_buffer/1 :: (ref()) -> ok_or_error()).
-endif.
@@ -361,6 +364,27 @@ copy(Src, Dest, Count) ->
{error, incorrect_handle_modes}
end).
+delete(Ref) ->
+ case erase({Ref, fhc_handle}) of
+ undefined -> ok;
+ Handle = #handle { path = Path } ->
+ Handle1 = Handle #handle { is_dirty = false, write_buffer = [] },
+ case close1(Ref, Handle1, hard) of
+ ok -> file:delete(Path);
+ Error -> Error
+ end
+ end.
+
+discard_write_buffer(Ref) ->
+ with_handles(
+ [Ref],
+ fun ([#handle { write_buffer = [] }]) ->
+ ok;
+ ([Handle = #handle { write_buffer_size = Size, offset = Offset }]) ->
+ {ok, [Handle #handle { write_buffer = [], write_buffer_size = 0,
+ offset = Offset - Size }]}
+ end).
+
set_maximum_since_use(MaximumAge) ->
Now = now(),
case lists:foldl(
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index acebc32d6c..bbd9508672 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -87,7 +87,9 @@
{ dir,
segments,
journal_handle,
- dirty_count
+ dirty_count,
+ last_seg_a,
+ last_seg_b
}).
-record(segment,
@@ -238,8 +240,9 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
flush_journal(State = #qistate { dirty_count = 0 }) ->
State;
-flush_journal(State = #qistate { segments = Segments }) ->
- State1 =
+flush_journal(State) ->
+ State1 = #qistate { segments = Segments } = get_all_segments(State),
+ State2 =
dict:fold(
fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
acks = AckCount } = Segment, StateN) ->
@@ -261,12 +264,13 @@ flush_journal(State = #qistate { segments = Segments }) ->
dict:new() }, StateN)
end
end
- end, State #qistate { segments = dict:new() }, Segments),
- {JournalHdl, State2} = get_journal_handle(State1),
+ end, State1 #qistate { segments = dict:new() }, Segments),
+ {JournalHdl, State3} = get_journal_handle(State2),
+ ok = file_handle_cache:discard_write_buffer(JournalHdl),
{ok, 0} = file_handle_cache:position(JournalHdl, bof),
ok = file_handle_cache:truncate(JournalHdl),
ok = file_handle_cache:sync(JournalHdl),
- State2 #qistate { dirty_count = 0 }.
+ State3 #qistate { dirty_count = 0 }.
read_segment_entries(InitSeqId, State) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
@@ -384,7 +388,8 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount })
maybe_flush_journal(State) ->
State.
-all_segment_nums(#qistate { segments = Segments, dir = Dir }) ->
+all_segment_nums(State = #qistate { dir = Dir }) ->
+ #qistate { segments = Segments } = get_all_segments(State),
sets:to_list(
lists:foldl(
fun (SegName, Set) ->
@@ -402,7 +407,9 @@ blank_state(QueueName) ->
#qistate { dir = Dir,
segments = dict:new(),
journal_handle = undefined,
- dirty_count = 0
+ dirty_count = 0,
+ last_seg_a = undefined,
+ last_seg_b = undefined
}.
rev_sort(List) ->
@@ -420,9 +427,8 @@ seg_num_to_path(Dir, Seg) ->
delete_segment(#segment { handle = undefined }) ->
ok;
-delete_segment(#segment { handle = Hdl, path = Path }) ->
- ok = file_handle_cache:close(Hdl),
- ok = file:delete(Path),
+delete_segment(#segment { handle = Hdl }) ->
+ ok = file_handle_cache:delete(Hdl),
ok.
detect_clean_shutdown(Dir) ->
@@ -462,6 +468,10 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
get_segment_handle(Segment = #segment { handle = Hdl }) ->
{Hdl, Segment}.
+find_segment(Seg, #qistate { last_seg_a = #segment { num = Seg } = Segment }) ->
+ Segment;
+find_segment(Seg, #qistate { last_seg_b = #segment { num = Seg } = Segment }) ->
+ Segment;
find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
case dict:find(Seg, Segments) of
{ok, Segment = #segment{}} -> Segment;
@@ -474,9 +484,46 @@ find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
}
end.
-store_segment(Segment = #segment { num = Seg },
- State = #qistate { segments = Segments }) ->
- State #qistate { segments = dict:store(Seg, Segment, Segments) }.
+store_segment(Segment = #segment { num = Seg }, State =
+ #qistate { last_seg_a = #segment { num = Seg }}) ->
+ State #qistate { last_seg_a = Segment };
+store_segment(Segment = #segment { num = Seg }, State =
+ #qistate { last_seg_b = #segment { num = Seg }}) ->
+ State #qistate { last_seg_b = Segment };
+store_segment(Segment, State =
+ #qistate { last_seg_a = LastSegA, last_seg_b = LastSegB }) ->
+ case LastSegA of
+ undefined ->
+ State #qistate { last_seg_a = Segment };
+ _ ->
+ case LastSegB of
+ undefined ->
+ State #qistate { last_seg_b = Segment };
+ _ ->
+ State1 = #qistate { segments = Segments } =
+ State #qistate { last_seg_a = LastSegB,
+ last_seg_b = Segment },
+ State1 #qistate {
+ segments = return_segment_to_dict(LastSegA, Segments) }
+ end
+ end.
+
+get_all_segments(State = #qistate { last_seg_a = undefined,
+ last_seg_b = undefined }) ->
+ State;
+get_all_segments(State = #qistate { segments = Segments,
+ last_seg_a = LastSegA,
+ last_seg_b = LastSegB }) ->
+ State #qistate { last_seg_a = undefined,
+ last_seg_b = undefined,
+ segments = return_segment_to_dict(
+ LastSegB,
+ return_segment_to_dict(LastSegA, Segments)) }.
+
+return_segment_to_dict(undefined, Segments) ->
+ Segments;
+return_segment_to_dict(Segment = #segment { num = Seg }, Segments) ->
+ dict:store(Seg, Segment, Segments).
get_journal_handle(State =
#qistate { journal_handle = undefined, dir = Dir }) ->
@@ -517,8 +564,9 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
Hdl.
terminate(StoreShutdown, State =
- #qistate { segments = Segments, journal_handle = JournalHdl,
+ #qistate { journal_handle = JournalHdl,
dir = Dir }) ->
+ State1 = #qistate { segments = Segments } = get_all_segments(State),
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
@@ -533,7 +581,7 @@ terminate(StoreShutdown, State =
true -> store_clean_shutdown(Dir);
false -> ok
end,
- State #qistate { journal_handle = undefined, segments = dict:new() }.
+ State1 #qistate { journal_handle = undefined, segments = dict:new() }.
%%----------------------------------------------------------------------------
%% Majors
@@ -608,7 +656,8 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) ->
load_journal(State) ->
{JournalHdl, State1} = get_journal_handle(State),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- State2 = #qistate { segments = Segments } = load_journal_entries(State1),
+ State2 = #qistate { segments = Segments } =
+ get_all_segments(load_journal_entries(State1)),
dict:fold(
fun (Seg, #segment { journal_entries = JEntries,
pubs = PubCountInJournal,
@@ -681,18 +730,21 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
%% pub. Also, always want to keep acks. Things must occur in the right
%% order though.
add_to_journal(RelSeq, Action, SegJDict) ->
- case dict:find(RelSeq, SegJDict) of
- {ok, {PubRecord, no_del, no_ack}} when Action == del ->
- dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict);
- {ok, {PubRecord, DelRecord, no_ack}} when Action == ack ->
- dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict);
- error when Action == del ->
- dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict);
- error when Action == ack ->
- dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict);
- error ->
- {_MsgId, _IsPersistent} = Action, %% ASSERTION
- dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict)
+ case dict:is_key(RelSeq, SegJDict) of
+ true ->
+ dict:update(RelSeq,
+ fun ({PubRecord, no_del, no_ack}) when Action == del ->
+ {PubRecord, del, no_ack};
+ ({PubRecord, Del, no_ack}) when Action == ack ->
+ {PubRecord, Del, ack}
+ end, SegJDict);
+ false ->
+ dict:store(RelSeq,
+ case Action of
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack};
+ {_Msg, _IsPersistent} -> {Action, no_del, no_ack}
+ end, SegJDict)
end.
%% Combine what we have just read from a segment file with what we're