summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_disk_queue.erl217
-rw-r--r--src/rabbit_mnesia.erl1
-rw-r--r--src/rabbit_tests.erl5
4 files changed, 153 insertions, 72 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 4f06b8335d..44e1368460 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -64,7 +64,7 @@
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
--record(dq_msg_loc, {msg_id_and_queue, is_delivered}).
+-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index f0fab00d88..9b0849c35f 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
+-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
-export([stop/0, clean_stop/0]).
@@ -51,6 +51,7 @@
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(FILE_EXTENSION_DETS, ".dets").
@@ -60,6 +61,7 @@
-record(dqstate, {msg_location,
file_summary,
+ sequences,
current_file_num,
current_file_name,
current_file_handle,
@@ -77,8 +79,8 @@ start_link(FileSizeLimit, ReadFileHandlesLimit) ->
publish(Q, MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
-deliver(Q, MsgId) ->
- gen_server:call(?SERVER, {deliver, Q, MsgId}, infinity).
+deliver(Q) ->
+ gen_server:call(?SERVER, {deliver, Q}, infinity).
ack(Q, MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {ack, Q, MsgIds}).
@@ -113,6 +115,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
]),
State = #dqstate { msg_location = MsgLocation,
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]),
+ sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
current_file_num = 0,
current_file_name = InitName,
current_file_handle = undefined,
@@ -124,24 +127,25 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } = load_from_disk(State),
Path = form_filename(CurrentName),
- ok = filelib:ensure_dir(Path),
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek
{ok, Offset} = file:position(FileHdl, {bof, Offset}),
{ok, State1 # dqstate { current_file_handle = FileHdl }}.
-handle_call({deliver, Q, MsgId}, _From, State) ->
- {ok, {MsgBody, BodySize, Delivered}, State1} = internal_deliver(Q, MsgId, State),
- {reply, {MsgBody, BodySize, Delivered}, State1};
+handle_call({deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, State),
+ {reply, Result, State1};
handle_call({tx_commit, Q, MsgIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, MsgIds, State),
{reply, ok, State1};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(clean_stop, _From, State) ->
- State1 = #dqstate { file_summary = FileSummary }
+ State1 = #dqstate { file_summary = FileSummary,
+ sequences = Sequences }
= shutdown(State), %% tidy up file handles early
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
true = ets:delete(FileSummary),
+ true = ets:delete(Sequences),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok, State1 # dqstate { current_file_handle = undefined,
read_file_handles = {dict:new(), gb_trees:empty()}}}.
@@ -196,53 +200,62 @@ base_directory() ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
- read_file_handles_limit = ReadFileHandlesLimit,
- read_file_handles = {ReadHdls, ReadHdlsAge}
- }) ->
- [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId),
- % so this next bit implements an LRU for file handles. But it's a bit insane, and smells
- % of premature optimisation. So I might remove it and dump it overboard
- {FileHdl, ReadHdls1, ReadHdlsAge1}
- = case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]),
- Now = now(),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)};
- _False ->
- {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- ReadHdls2 = dict:erase(OldFile, ReadHdls),
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)}
- end;
- {ok, {Hdl, Then}} ->
- Now = now(),
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
- gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
- end,
- % read the message
- {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize),
- [Obj = #dq_msg_loc {is_delivered = Delivered}]
- = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}),
- if Delivered -> ok;
- true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
- end,
- {ok, {MsgBody, BodySize, Delivered},
- State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}.
+internal_deliver(Q, State = #dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
+ read_file_handles_limit = ReadFileHandlesLimit,
+ read_file_handles = {ReadHdls, ReadHdlsAge}
+ }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {ok, empty, State};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
+ [] -> {ok, empty, State};
+ [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
+ [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId),
+ {FileHdl, ReadHdls1, ReadHdlsAge1}
+ = case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]),
+ Now = now(),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)};
+ _False ->
+ {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ ReadHdls2 = dict:erase(OldFile, ReadHdls),
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)}
+ end;
+ {ok, {Hdl, Then}} ->
+ Now = now(),
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
+ gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
+ end,
+ % read the message
+ {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize),
+ if Delivered -> ok;
+ true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
+ end,
+ true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
+ {ok, {MsgId, MsgBody, BodySize, Delivered},
+ State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}
+ end
+ end.
internal_ack(Q, MsgIds, State) ->
remove_messages(Q, MsgIds, true, State).
%% Q is only needed if MnesiaDelete = true
+%% called from tx_cancel with MnesiaDelete = false
+%% called from ack with MnesiaDelete = true
remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
file_summary = FileSummary,
current_file_name = CurName
}) ->
- Files
- = lists:foldl(fun (MsgId, Files2) ->
+ {Files, MaxSeqId}
+ = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= dets:lookup(MsgLocation, MsgId),
Files3 =
@@ -261,13 +274,28 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
Files2
end,
- if MnesiaDelete ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q});
- true ->
- ok
- end,
- Files3
- end, sets:new(), MsgIds),
+ {if MnesiaDelete ->
+ [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }]
+ = mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = {Q, '_'},
+ is_delivered = '_'
+ }),
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
+ lists:max([SeqId, MaxSeqId2]);
+ true ->
+ MaxSeqId2
+ end,
+ Files3}
+ end, {sets:new(), 0}, MsgIds),
+ true = if MnesiaDelete ->
+ [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
+ if MaxSeqId > ReadSeqId ->
+ true = ets:insert(Sequences, {Q, MaxSeqId, WriteSeqId});
+ true -> true
+ end;
+ true -> true
+ end,
State2 = compact(Files, State),
{ok, State2}.
@@ -300,28 +328,45 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
- current_file_name = CurName
+ current_file_name = CurName,
+ sequences = Sequences
}) ->
- {atomic, Sync}
+ {ReadSeqId, InitWriteSeqId}
+ = case ets:lookup(Sequences, Q) of
+ [] -> {0,0};
+ [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ end,
+ {atomic, {Sync, WriteSeqId}}
= mnesia:transaction(
fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun (MsgId, Acc) ->
+ lists:foldl(fun (MsgId, {Acc, NextWriteSeqId}) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
dets:lookup(MsgLocation, MsgId),
ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, Q},
- is_delivered = false}, write),
- Acc or (CurName =:= File)
- end, false, MsgIds)
+ #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId},
+ msg_id = MsgId, is_delivered = false},
+ write),
+ {Acc or (CurName =:= File), NextWriteSeqId + 1}
+ end, {false, InitWriteSeqId}, MsgIds)
end),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}),
if Sync -> ok = file:sync(CurHdl);
true -> ok
end,
{ok, State}.
internal_publish(Q, MsgId, MsgBody, State) ->
- {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
- ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q},
+ {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State),
+ WriteSeqId = case ets:lookup(Sequences, Q) of
+ [] -> % previously unseen queue
+ true = ets:insert_new(Sequences, {Q, 0, 1}),
+ 0;
+ [{Q, ReadSeqId, WriteSeqId2}] ->
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2 +1}),
+ WriteSeqId2
+ end,
+ ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
+ msg_id = MsgId,
is_delivered = false}),
{ok, State1}.
@@ -561,9 +606,42 @@ load_from_disk(State) ->
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
- true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end,
- true, mnesia:dirty_all_keys(rabbit_disk_queue)),
- {ok, State1}.
+ {atomic, true} = mnesia:transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
+ true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end,
+ true, rabbit_disk_queue)
+ end),
+ State2 = extract_sequence_numbers(State1),
+ {ok, State2}.
+
+extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
+ % next-seqid-to-read is the lowest seqid which has is_delivered = false
+ {atomic, true} = mnesia:transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+ is_delivered = Delivered }, true) ->
+ NextRead = if Delivered -> SeqId + 1;
+ true -> SeqId
+ end,
+ NextWrite = SeqId + 1,
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ true = ets:insert_new(Sequences, {Q, NextRead, NextWrite});
+ [Orig = {Q, Read, Write}] ->
+ Repl = {Q, lists:min([Read, NextRead]),
+ lists:max([Write, NextWrite])},
+ if Orig /= Repl ->
+ true = ets:insert(Sequences, Repl);
+ true -> true
+ end
+ end
+ end, true, rabbit_disk_queue)
+ end),
+ State.
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->
@@ -590,7 +668,8 @@ load_messages(Left, [File|Files],
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
is_delivered = '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
@@ -627,7 +706,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
is_delivered = '_'}))
end, MsgIdsTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
@@ -661,7 +741,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
lists:foreach(fun (MsgId) ->
true = 0 <
length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
is_delivered = '_'}))
end, MsgIds),
% The main file should be contiguous
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b3c4a9267e..3995166938 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -147,6 +147,7 @@ table_definitions() ->
{rabbit_disk_queue,
[{record_name, dq_msg_loc},
{type, set},
+ {index, [msg_id]},
{attributes, record_info(fields, dq_msg_loc)},
{disc_only_copies, [node()]}]}
].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 736ddfd473..1e66fe9a81 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -684,7 +684,6 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
% unicode chars are supported properly from r13 onwards
- % io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup μs\t| Publish μs\t| Pub μs/msg\t| Pub μs/byte\t| Deliver μs\t| Del μs/msg\t| Del μs/byte~n", []),
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
[begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds
MsgSize <- [512, 8192, 32768, 131072],
@@ -706,7 +705,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
]]),
{Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List],
+ [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List],
rabbit_disk_queue:ack(Q, List),
ok = rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
@@ -739,7 +738,7 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q, N),
+ [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q),
rabbit_disk_queue:ack(q, [N]),
rabbit_disk_queue:tx_commit(q, [])
end || N <- AckList],