diff options
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 217 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 5 |
4 files changed, 153 insertions, 72 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4f06b8335d..44e1368460 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,7 +64,7 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). --record(dq_msg_loc, {msg_id_and_queue, is_delivered}). +-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index f0fab00d88..9b0849c35f 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]). +-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]). -export([stop/0, clean_stop/0]). @@ -51,6 +51,7 @@ -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(FILE_EXTENSION_DETS, ".dets"). @@ -60,6 +61,7 @@ -record(dqstate, {msg_location, file_summary, + sequences, current_file_num, current_file_name, current_file_handle, @@ -77,8 +79,8 @@ start_link(FileSizeLimit, ReadFileHandlesLimit) -> publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). -deliver(Q, MsgId) -> - gen_server:call(?SERVER, {deliver, Q, MsgId}, infinity). +deliver(Q) -> + gen_server:call(?SERVER, {deliver, Q}, infinity). ack(Q, MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {ack, Q, MsgIds}). @@ -113,6 +115,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ]), State = #dqstate { msg_location = MsgLocation, file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), + sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), current_file_num = 0, current_file_name = InitName, current_file_handle = undefined, @@ -124,24 +127,25 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), Path = form_filename(CurrentName), - ok = filelib:ensure_dir(Path), {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek {ok, Offset} = file:position(FileHdl, {bof, Offset}), {ok, State1 # dqstate { current_file_handle = FileHdl }}. -handle_call({deliver, Q, MsgId}, _From, State) -> - {ok, {MsgBody, BodySize, Delivered}, State1} = internal_deliver(Q, MsgId, State), - {reply, {MsgBody, BodySize, Delivered}, State1}; +handle_call({deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, State), + {reply, Result, State1}; handle_call({tx_commit, Q, MsgIds}, _From, State) -> {ok, State1} = internal_tx_commit(Q, MsgIds, State), {reply, ok, State1}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(clean_stop, _From, State) -> - State1 = #dqstate { file_summary = FileSummary } + State1 = #dqstate { file_summary = FileSummary, + sequences = Sequences } = shutdown(State), %% tidy up file handles early {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), true = ets:delete(FileSummary), + true = ets:delete(Sequences), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1 # dqstate { current_file_handle = undefined, read_file_handles = {dict:new(), gb_trees:empty()}}}. @@ -196,53 +200,62 @@ base_directory() -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, - read_file_handles_limit = ReadFileHandlesLimit, - read_file_handles = {ReadHdls, ReadHdlsAge} - }) -> - [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), - % so this next bit implements an LRU for file handles. But it's a bit insane, and smells - % of premature optimisation. So I might remove it and dump it overboard - {FileHdl, ReadHdls1, ReadHdlsAge1} - = case dict:find(File, ReadHdls) of - error -> - {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]), - Now = now(), - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)}; - _False -> - {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - ReadHdls2 = dict:erase(OldFile, ReadHdls), - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)} - end; - {ok, {Hdl, Then}} -> - Now = now(), - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), - gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} - end, - % read the message - {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), - [Obj = #dq_msg_loc {is_delivered = Delivered}] - = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}), - if Delivered -> ok; - true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) - end, - {ok, {MsgBody, BodySize, Delivered}, - State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. +internal_deliver(Q, State = #dqstate { msg_location = MsgLocation, + sequences = Sequences, + read_file_handles_limit = ReadFileHandlesLimit, + read_file_handles = {ReadHdls, ReadHdlsAge} + }) -> + case ets:lookup(Sequences, Q) of + [] -> {ok, empty, State}; + [{Q, ReadSeqId, WriteSeqId}] -> + case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of + [] -> {ok, empty, State}; + [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> + [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), + {FileHdl, ReadHdls1, ReadHdlsAge1} + = case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]), + Now = now(), + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)}; + _False -> + {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + ReadHdls2 = dict:erase(OldFile, ReadHdls), + {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)} + end; + {ok, {Hdl, Then}} -> + Now = now(), + {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), + gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} + end, + % read the message + {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), + if Delivered -> ok; + true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) + end, + true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), + {ok, {MsgId, MsgBody, BodySize, Delivered}, + State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }} + end + end. internal_ack(Q, MsgIds, State) -> remove_messages(Q, MsgIds, true, State). %% Q is only needed if MnesiaDelete = true +%% called from tx_cancel with MnesiaDelete = false +%% called from ack with MnesiaDelete = true remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, + sequences = Sequences, file_summary = FileSummary, current_file_name = CurName }) -> - Files - = lists:foldl(fun (MsgId, Files2) -> + {Files, MaxSeqId} + = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) -> [{MsgId, RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), Files3 = @@ -261,13 +274,28 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), Files2 end, - if MnesiaDelete -> - ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q}); - true -> - ok - end, - Files3 - end, sets:new(), MsgIds), + {if MnesiaDelete -> + [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }] + = mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = {Q, '_'}, + is_delivered = '_' + }), + ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}), + lists:max([SeqId, MaxSeqId2]); + true -> + MaxSeqId2 + end, + Files3} + end, {sets:new(), 0}, MsgIds), + true = if MnesiaDelete -> + [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), + if MaxSeqId > ReadSeqId -> + true = ets:insert(Sequences, {Q, MaxSeqId, WriteSeqId}); + true -> true + end; + true -> true + end, State2 = compact(Files, State), {ok, State2}. @@ -300,28 +328,45 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, current_file_handle = CurHdl, - current_file_name = CurName + current_file_name = CurName, + sequences = Sequences }) -> - {atomic, Sync} + {ReadSeqId, InitWriteSeqId} + = case ets:lookup(Sequences, Q) of + [] -> {0,0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + end, + {atomic, {Sync, WriteSeqId}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun (MsgId, Acc) -> + lists:foldl(fun (MsgId, {Acc, NextWriteSeqId}) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets:lookup(MsgLocation, MsgId), ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, - is_delivered = false}, write), - Acc or (CurName =:= File) - end, false, MsgIds) + #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId}, + msg_id = MsgId, is_delivered = false}, + write), + {Acc or (CurName =:= File), NextWriteSeqId + 1} + end, {false, InitWriteSeqId}, MsgIds) end), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), if Sync -> ok = file:sync(CurHdl); true -> ok end, {ok, State}. internal_publish(Q, MsgId, MsgBody, State) -> - {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), - ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, + {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), + WriteSeqId = case ets:lookup(Sequences, Q) of + [] -> % previously unseen queue + true = ets:insert_new(Sequences, {Q, 0, 1}), + 0; + [{Q, ReadSeqId, WriteSeqId2}] -> + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2 +1}), + WriteSeqId2 + end, + ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, + msg_id = MsgId, is_delivered = false}), {ok, State1}. @@ -561,9 +606,42 @@ load_from_disk(State) -> % There should be no more tmp files now, so go ahead and load the whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), % Finally, check there is nothing in mnesia which we haven't loaded - true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end, - true, mnesia:dirty_all_keys(rabbit_disk_queue)), - {ok, State1}. + {atomic, true} = mnesia:transaction( + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> + true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end, + true, rabbit_disk_queue) + end), + State2 = extract_sequence_numbers(State1), + {ok, State2}. + +extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> + % next-seqid-to-read is the lowest seqid which has is_delivered = false + {atomic, true} = mnesia:transaction( + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId}, + is_delivered = Delivered }, true) -> + NextRead = if Delivered -> SeqId + 1; + true -> SeqId + end, + NextWrite = SeqId + 1, + case ets:lookup(Sequences, Q) of + [] -> + true = ets:insert_new(Sequences, {Q, NextRead, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, NextRead]), + lists:max([Write, NextWrite])}, + if Orig /= Repl -> + true = ets:insert(Sequences, Repl); + true -> true + end + end + end, true, rabbit_disk_queue) + end), + State. load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> @@ -590,7 +668,8 @@ load_messages(Left, [File|Files], {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', is_delivered = '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> @@ -627,7 +706,8 @@ recover_crashed_compactions1(Files, TmpFile) -> % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', is_delivered = '_'})) end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), @@ -661,7 +741,8 @@ recover_crashed_compactions1(Files, TmpFile) -> lists:foreach(fun (MsgId) -> true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', is_delivered = '_'})) end, MsgIds), % The main file should be contiguous diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b3c4a9267e..3995166938 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -147,6 +147,7 @@ table_definitions() -> {rabbit_disk_queue, [{record_name, dq_msg_loc}, {type, set}, + {index, [msg_id]}, {attributes, record_info(fields, dq_msg_loc)}, {disc_only_copies, [node()]}]} ]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 736ddfd473..1e66fe9a81 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -684,7 +684,6 @@ delete_log_handlers(Handlers) -> test_disk_queue() -> % unicode chars are supported properly from r13 onwards - % io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup μs\t| Publish μs\t| Pub μs/msg\t| Pub μs/byte\t| Deliver μs\t| Del μs/msg\t| Del μs/byte~n", []), io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds MsgSize <- [512, 8192, 32768, 131072], @@ -706,7 +705,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end ]]), {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List], + [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List], rabbit_disk_queue:ack(Q, List), ok = rabbit_disk_queue:tx_commit(Q, []) end || Q <- Qs] @@ -739,7 +738,7 @@ rdq_stress_gc(MsgCount) -> end end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)]))) ++ lists:seq(1, (StartChunk - 1)), - [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q, N), + [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q), rabbit_disk_queue:ack(q, [N]), rabbit_disk_queue:tx_commit(q, []) end || N <- AckList], |
