summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-21 12:56:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-21 12:56:36 +0100
commit7a35bdd921e3b24d2e1919b9e6f2023e1c7ea37c (patch)
tree88648da1a96335aec26065500e6c976c65ecd374 /src
parent364fa5017fa13cd348b03742341532b18deac079 (diff)
downloadrabbitmq-server-git-7a35bdd921e3b24d2e1919b9e6f2023e1c7ea37c.tar.gz
Introduced sequence IDs internally. Sadly, because of the need to lookup seqids for each msgid in acks, acks are now very slow.
Thus I'm going to alter the API so that deliver returns the seqid and then ack takes [seqid]. This should make things faster.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl217
-rw-r--r--src/rabbit_mnesia.erl1
-rw-r--r--src/rabbit_tests.erl5
3 files changed, 152 insertions, 71 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index f0fab00d88..9b0849c35f 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
+-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
-export([stop/0, clean_stop/0]).
@@ -51,6 +51,7 @@
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(FILE_EXTENSION_DETS, ".dets").
@@ -60,6 +61,7 @@
-record(dqstate, {msg_location,
file_summary,
+ sequences,
current_file_num,
current_file_name,
current_file_handle,
@@ -77,8 +79,8 @@ start_link(FileSizeLimit, ReadFileHandlesLimit) ->
publish(Q, MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
-deliver(Q, MsgId) ->
- gen_server:call(?SERVER, {deliver, Q, MsgId}, infinity).
+deliver(Q) ->
+ gen_server:call(?SERVER, {deliver, Q}, infinity).
ack(Q, MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {ack, Q, MsgIds}).
@@ -113,6 +115,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
]),
State = #dqstate { msg_location = MsgLocation,
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]),
+ sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
current_file_num = 0,
current_file_name = InitName,
current_file_handle = undefined,
@@ -124,24 +127,25 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } = load_from_disk(State),
Path = form_filename(CurrentName),
- ok = filelib:ensure_dir(Path),
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek
{ok, Offset} = file:position(FileHdl, {bof, Offset}),
{ok, State1 # dqstate { current_file_handle = FileHdl }}.
-handle_call({deliver, Q, MsgId}, _From, State) ->
- {ok, {MsgBody, BodySize, Delivered}, State1} = internal_deliver(Q, MsgId, State),
- {reply, {MsgBody, BodySize, Delivered}, State1};
+handle_call({deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, State),
+ {reply, Result, State1};
handle_call({tx_commit, Q, MsgIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, MsgIds, State),
{reply, ok, State1};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(clean_stop, _From, State) ->
- State1 = #dqstate { file_summary = FileSummary }
+ State1 = #dqstate { file_summary = FileSummary,
+ sequences = Sequences }
= shutdown(State), %% tidy up file handles early
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
true = ets:delete(FileSummary),
+ true = ets:delete(Sequences),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok, State1 # dqstate { current_file_handle = undefined,
read_file_handles = {dict:new(), gb_trees:empty()}}}.
@@ -196,53 +200,62 @@ base_directory() ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
- read_file_handles_limit = ReadFileHandlesLimit,
- read_file_handles = {ReadHdls, ReadHdlsAge}
- }) ->
- [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId),
- % so this next bit implements an LRU for file handles. But it's a bit insane, and smells
- % of premature optimisation. So I might remove it and dump it overboard
- {FileHdl, ReadHdls1, ReadHdlsAge1}
- = case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]),
- Now = now(),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)};
- _False ->
- {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- ReadHdls2 = dict:erase(OldFile, ReadHdls),
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)}
- end;
- {ok, {Hdl, Then}} ->
- Now = now(),
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
- gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
- end,
- % read the message
- {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize),
- [Obj = #dq_msg_loc {is_delivered = Delivered}]
- = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}),
- if Delivered -> ok;
- true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
- end,
- {ok, {MsgBody, BodySize, Delivered},
- State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
+internal_deliver(Q, State = #dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
+ read_file_handles_limit = ReadFileHandlesLimit,
+ read_file_handles = {ReadHdls, ReadHdlsAge}
+ }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {ok, empty, State};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
+ [] -> {ok, empty, State};
+ [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
+ [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId),
+ {FileHdl, ReadHdls1, ReadHdlsAge1}
+ = case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]),
+ Now = now(),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)};
+ _False ->
+ {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ ReadHdls2 = dict:erase(OldFile, ReadHdls),
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)}
+ end;
+ {ok, {Hdl, Then}} ->
+ Now = now(),
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
+ gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
+ end,
+ % read the message
+ {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize),
+ if Delivered -> ok;
+ true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
+ end,
+ true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
+ {ok, {MsgId, MsgBody, BodySize, Delivered},
+ State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}
+ end
+ end.
internal_ack(Q, MsgIds, State) ->
remove_messages(Q, MsgIds, true, State).
%% Q is only needed if MnesiaDelete = true
+%% called from tx_cancel with MnesiaDelete = false
+%% called from ack with MnesiaDelete = true
remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- Files
- = lists:foldl(fun (MsgId, Files2) ->
+ {Files, MaxSeqId}
+ = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= dets:lookup(MsgLocation, MsgId),
Files3 =
@@ -261,13 +274,28 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
Files2
end,
- if MnesiaDelete ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q});
- true ->
- ok
- end,
- Files3
- end, sets:new(), MsgIds),
+ {if MnesiaDelete ->
+ [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }]
+ = mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = {Q, '_'},
+ is_delivered = '_'
+ }),
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
+ lists:max([SeqId, MaxSeqId2]);
+ true ->
+ MaxSeqId2
+ end,
+ Files3}
+ end, {sets:new(), 0}, MsgIds),
+ true = if MnesiaDelete ->
+ [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
+ if MaxSeqId > ReadSeqId ->
+ true = ets:insert(Sequences, {Q, MaxSeqId, WriteSeqId});
+ true -> true
+ end;
+ true -> true
+ end,
State2 = compact(Files, State),
{ok, State2}.
@@ -300,28 +328,45 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
- current_file_name = CurName
+ current_file_name = CurName,
+ sequences = Sequences
}) ->
- {atomic, Sync}
+ {ReadSeqId, InitWriteSeqId}
+ = case ets:lookup(Sequences, Q) of
+ [] -> {0,0};
+ [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ end,
+ {atomic, {Sync, WriteSeqId}}
= mnesia:transaction(
fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun (MsgId, Acc) ->
+ lists:foldl(fun (MsgId, {Acc, NextWriteSeqId}) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
dets:lookup(MsgLocation, MsgId),
ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, Q},
- is_delivered = false}, write),
- Acc or (CurName =:= File)
- end, false, MsgIds)
+ #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId},
+ msg_id = MsgId, is_delivered = false},
+ write),
+ {Acc or (CurName =:= File), NextWriteSeqId + 1}
+ end, {false, InitWriteSeqId}, MsgIds)
end),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}),
if Sync -> ok = file:sync(CurHdl);
true -> ok
end,
{ok, State}.
internal_publish(Q, MsgId, MsgBody, State) ->
- {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
- ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q},
+ {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State),
+ WriteSeqId = case ets:lookup(Sequences, Q) of
+ [] -> % previously unseen queue
+ true = ets:insert_new(Sequences, {Q, 0, 1}),
+ 0;
+ [{Q, ReadSeqId, WriteSeqId2}] ->
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2 +1}),
+ WriteSeqId2
+ end,
+ ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
+ msg_id = MsgId,
is_delivered = false}),
{ok, State1}.
@@ -561,9 +606,42 @@ load_from_disk(State) ->
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
- true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end,
- true, mnesia:dirty_all_keys(rabbit_disk_queue)),
- {ok, State1}.
+ {atomic, true} = mnesia:transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
+ true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end,
+ true, rabbit_disk_queue)
+ end),
+ State2 = extract_sequence_numbers(State1),
+ {ok, State2}.
+
+extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
+ % next-seqid-to-read is the lowest seqid which has is_delivered = false
+ {atomic, true} = mnesia:transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+ is_delivered = Delivered }, true) ->
+ NextRead = if Delivered -> SeqId + 1;
+ true -> SeqId
+ end,
+ NextWrite = SeqId + 1,
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ true = ets:insert_new(Sequences, {Q, NextRead, NextWrite});
+ [Orig = {Q, Read, Write}] ->
+ Repl = {Q, lists:min([Read, NextRead]),
+ lists:max([Write, NextWrite])},
+ if Orig /= Repl ->
+ true = ets:insert(Sequences, Repl);
+ true -> true
+ end
+ end
+ end, true, rabbit_disk_queue)
+ end),
+ State.
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->
@@ -590,7 +668,8 @@ load_messages(Left, [File|Files],
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
is_delivered = '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
@@ -627,7 +706,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
is_delivered = '_'}))
end, MsgIdsTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
@@ -661,7 +741,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
lists:foreach(fun (MsgId) ->
true = 0 <
length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
is_delivered = '_'}))
end, MsgIds),
% The main file should be contiguous
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b3c4a9267e..3995166938 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -147,6 +147,7 @@ table_definitions() ->
{rabbit_disk_queue,
[{record_name, dq_msg_loc},
{type, set},
+ {index, [msg_id]},
{attributes, record_info(fields, dq_msg_loc)},
{disc_only_copies, [node()]}]}
].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 736ddfd473..1e66fe9a81 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -684,7 +684,6 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
% unicode chars are supported properly from r13 onwards
- % io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup μs\t| Publish μs\t| Pub μs/msg\t| Pub μs/byte\t| Deliver μs\t| Del μs/msg\t| Del μs/byte~n", []),
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
[begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds
MsgSize <- [512, 8192, 32768, 131072],
@@ -706,7 +705,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
]]),
{Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List],
+ [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List],
rabbit_disk_queue:ack(Q, List),
ok = rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
@@ -739,7 +738,7 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q, N),
+ [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q),
rabbit_disk_queue:ack(q, [N]),
rabbit_disk_queue:tx_commit(q, [])
end || N <- AckList],