summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-12 13:39:06 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-12 13:39:06 +0100
commit5d6ba0740a11762bbd916a637df0f2f787c7afc4 (patch)
tree9127048fe8bb1e96e6b6869f34dd08a02dfef0e0 /src
parent09b7163518b2b1e1ad6114ac1badee17f0171bb6 (diff)
downloadrabbitmq-server-git-5d6ba0740a11762bbd916a637df0f2f787c7afc4.tar.gz
Added to the disk queue the ability to dynamically switch between disk-only and disk+ram modes. The disk+ram mode uses disk_copies for mnesia and ets for msg_location. This results in a substantial performance improvement (minimum 5 times faster), but is ram limited by number of messages. The disk-only mode uses dets and disk_only_copies for mnesia. This is much slower, but should not be limited.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl181
-rw-r--r--src/rabbit_tests.erl4
2 files changed, 127 insertions, 58 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 55840ce9d1..16208fd058 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -40,7 +40,7 @@
-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
--export([stop/0, stop_and_obliterate/0]).
+-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -49,7 +49,7 @@
-define(WRITE_OK, 255).
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
--define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location).
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
-define(FILE_EXTENSION, ".rdq").
@@ -61,7 +61,9 @@
-define(MAX_READ_FILE_HANDLES, 256).
--record(dqstate, {msg_location, %% where are messages?
+-record(dqstate, {msg_location_dets, %% where are messages?
+ msg_location_ets, %% as above, but for ets version
+ operation_mode, %% ram_disk | disk_only
file_summary, %% what's in the files?
sequences, %% next read and write for each q
current_file_num, %% current file name as number
@@ -259,6 +261,12 @@ stop() ->
stop_and_obliterate() ->
gen_server:call(?SERVER, stop_vaporise, infinity).
+to_disk_only_mode() ->
+ gen_server:call(?SERVER, to_disk_only_mode, infinity).
+
+to_ram_disk_mode() ->
+ gen_server:call(?SERVER, to_ram_disk_mode, infinity).
+
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -271,19 +279,30 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
+ Node = node(),
+ ok =
+ case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok;
+ E -> E
+ end,
ok = filelib:ensure_dir(form_filename("nothing")),
InitName = "0" ++ ?FILE_EXTENSION,
- {ok, MsgLocation} =
- dets:open_file(?MSG_LOC_DETS_NAME,
- [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++
+ {ok, MsgLocationDets} =
+ dets:open_file(?MSG_LOC_NAME,
+ [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)},
{min_no_slots, 1024*1024},
%% man says this should be <= 32M. But it works...
{max_no_slots, 1024*1024*1024},
{type, set}
]),
+ MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+ true = ets:safe_fixtable(MsgLocationEts, true),
State =
- #dqstate { msg_location = MsgLocation,
+ #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ operation_mode = disk_only,
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
[set, private]),
sequences = ets:new(?SEQUENCE_ETS_NAME,
@@ -323,8 +342,26 @@ handle_call(stop_vaporise, _From, State) ->
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok,
State1 #dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}}.
+ read_file_handles = {dict:new(), gb_trees:empty()}}};
%% gen_server now calls terminate, which then calls shutdown
+handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) ->
+ {reply, ok, State};
+handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies),
+ ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
+ true = ets:delete_all_objects(MsgLocationEts),
+ {reply, ok, State #dqstate { operation_mode = disk_only }};
+handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) ->
+ {reply, ok, State};
+handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies),
+ true = ets:from_dets(MsgLocationEts, MsgLocationDets),
+ ok = dets:delete_all_objects(MsgLocationDets),
+ {reply, ok, State #dqstate { operation_mode = ram_disk }}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
@@ -345,14 +382,16 @@ handle_info(_Info, State) ->
terminate(_Reason, State) ->
shutdown(State).
-shutdown(State = #dqstate { msg_location = MsgLocation,
+shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
%% deliberately ignoring return codes here
- dets:close(MsgLocation),
- file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++
+ dets:close(MsgLocationDets),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
+ true = ets:delete_all_objects(MsgLocationEts),
if FileHdl =:= undefined -> ok;
true -> file:sync(FileHdl),
file:close(FileHdl)
@@ -374,12 +413,46 @@ form_filename(Name) ->
base_directory() ->
filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
+dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+ Key) ->
+ dets:lookup(MsgLocationDets, Key);
+dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+ Key) ->
+ ets:lookup(MsgLocationEts, Key).
+
+dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+ Key) ->
+ ok = dets:delete(MsgLocationDets, Key);
+dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+ Key) ->
+ true = ets:delete(MsgLocationEts, Key),
+ ok.
+
+dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+ Obj) ->
+ ok = dets:insert(MsgLocationDets, Obj);
+dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+ Obj) ->
+ true = ets:insert(MsgLocationEts, Obj),
+ ok.
+
+dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+ Obj) ->
+ true = dets:insert_new(MsgLocationDets, Obj);
+dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+ Obj) ->
+ true = ets:insert_new(MsgLocationEts, Obj).
+
+dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+ Obj) ->
+ dets:match_object(MsgLocationDets, Obj);
+dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+ Obj) ->
+ ets:match_object(MsgLocationEts, Obj).
+
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, State =
- #dqstate { msg_location = MsgLocation,
- sequences = Sequences
- }) ->
+internal_deliver(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
[{Q, ReadSeqId, WriteSeqId}] ->
@@ -388,7 +461,7 @@ internal_deliver(Q, State =
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
[{MsgId, _RefCount, File, Offset, TotalSize}] =
- dets:lookup(MsgLocation, MsgId),
+ dets_ets_lookup(State, MsgId),
{FileHdl, State1} = getReadHandle(File, State),
%% read the message
{ok, {MsgBody, BodySize}} =
@@ -438,18 +511,17 @@ internal_ack(Q, MsgIds, State) ->
%% called from tx_cancel with MnesiaDelete = false
%% called from ack with MnesiaDelete = true
remove_messages(Q, MsgSeqIds, MnesiaDelete,
- State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary,
+ State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
}) ->
Files =
lists:foldl(
fun ({MsgId, SeqId}, Files2) ->
[{MsgId, RefCount, File, Offset, TotalSize}] =
- dets:lookup(MsgLocation, MsgId),
+ dets_ets_lookup(State, MsgId),
Files3 =
if 1 =:= RefCount ->
- ok = dets:delete(MsgLocation, MsgId),
+ ok = dets_ets_delete(State, MsgId),
[{File, ValidTotalSize, ContiguousTop, Left, Right}] =
ets:lookup(FileSummary, File),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
@@ -461,8 +533,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
true -> sets:add_element(File, Files2)
end;
1 < RefCount ->
- ok = dets:insert(MsgLocation, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
+ ok = dets_ets_insert(State, {MsgId, RefCount - 1,
+ File, Offset, TotalSize}),
Files2
end,
if MnesiaDelete ->
@@ -475,18 +547,17 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
{ok, State2}.
internal_tx_publish(MsgId, MsgBody,
- State = #dqstate { msg_location = MsgLocation,
- current_file_handle = CurHdl,
+ State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
file_summary = FileSummary
}) ->
- case dets:lookup(MsgLocation, MsgId) of
+ case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
- true = dets:insert_new(MsgLocation, {MsgId, 1, CurName,
- CurOffset, TotalSize}),
+ true = dets_ets_insert_new(State, {MsgId, 1, CurName,
+ CurOffset, TotalSize}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize +
@@ -503,14 +574,13 @@ internal_tx_publish(MsgId, MsgBody,
State #dqstate {current_offset = NextOffset});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
%% We already know about it, just update counter
- ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File,
- Offset, TotalSize}),
+ ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
+ Offset, TotalSize}),
{ok, State}
end.
internal_tx_commit(Q, MsgIds,
- State = #dqstate { msg_location = MsgLocation,
- current_file_handle = CurHdl,
+ State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
sequences = Sequences
}) ->
@@ -525,7 +595,7 @@ internal_tx_commit(Q, MsgIds,
lists:foldl(
fun (MsgId, {Acc, NextWriteSeqId}) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
- dets:lookup(MsgLocation, MsgId),
+ dets_ets_lookup(State, MsgId),
ok = mnesia:write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
{Q, NextWriteSeqId},
@@ -687,8 +757,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
{Destination, DestinationValid, DestinationContiguousTop,
_DestinationLeft, _DestinationRight},
State1) ->
- (State = #dqstate { msg_location = MsgLocation }) =
- closeFile(Source, closeFile(Destination, State1)),
+ State = closeFile(Source, closeFile(Destination, State1)),
{ok, SourceHdl} =
file:open(form_filename(Source),
[read, write, raw, binary, delayed_write, read_ahead]),
@@ -719,10 +788,10 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% Given expected access patterns, I suspect that the list should be
%% naturally sorted as we require, however, we need to enforce it anyway
end, sortMsgLocationsByOffset(true,
- dets:match_object(MsgLocation,
- {'_', '_',
- Destination,
- '_', '_'}))),
+ dets_ets_match_object(State,
+ {'_', '_',
+ Destination,
+ '_', '_'}))),
TmpSize = DestinationValid - DestinationContiguousTop,
{TmpSize, BlockStart1, BlockEnd1} =
lists:foldl(
@@ -735,8 +804,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% Destination, at DestinationContiguousTop
%% + CurOffset
FinalOffset = DestinationContiguousTop + CurOffset,
- ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
- FinalOffset, TotalSize}),
+ ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
+ FinalOffset, TotalSize}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
%% base case, called only for the
@@ -763,7 +832,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
{ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
{ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
%% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocation has been updated to
+ %% Destination, and MsgLocationDets has been updated to
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
@@ -777,9 +846,10 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
ok = file:delete(form_filename(Tmp))
end,
SourceWorkList =
- sortMsgLocationsByOffset(true, dets:match_object(MsgLocation,
- {'_', '_', Source,
- '_', '_'})),
+ sortMsgLocationsByOffset(true,
+ dets_ets_match_object(State,
+ {'_', '_', Source,
+ '_', '_'})),
{ExpectedSize, BlockStart2, BlockEnd2} =
lists:foldl(
fun ({MsgId, RefCount, _Source, Offset, TotalSize},
@@ -787,9 +857,9 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% update MsgLocation to reflect change of file and offset
- ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination,
- CurOffset, TotalSize}),
+ %% update MsgLocationDets to reflect change of file and offset
+ ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
%% base case, called only for the first list
@@ -865,8 +935,7 @@ load_from_disk(State) ->
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
- (State1 = #dqstate{ msg_location = MsgLocation }) =
- load_messages(undefined, Files, State),
+ State1 = load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
{atomic, true} = mnesia:transaction(
@@ -874,7 +943,7 @@ load_from_disk(State) ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
true = 1 =:=
- length(dets:lookup(MsgLocation, MsgId))
+ length(dets_ets_lookup(State1, MsgId))
end,
true, rabbit_disk_queue)
end),
@@ -910,9 +979,9 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->
true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
State;
-load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
+load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of
+ Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
[] -> 0;
L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
sortMsgLocationsByOffset(false, L),
@@ -921,9 +990,7 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
State #dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
load_messages(Left, [File|Files],
- State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary
- }) ->
+ State = #dqstate { file_summary = FileSummary }) ->
%% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
@@ -935,8 +1002,8 @@ load_messages(Left, [File|Files],
is_delivered = '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
- true = dets:insert_new(MsgLocation, {MsgId, RefCount, File,
- Offset, TotalSize}),
+ true = dets_ets_insert_new(State, {MsgId, RefCount, File,
+ Offset, TotalSize}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2f1d0c4339..9ce62f8636 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -770,6 +770,7 @@ rdq_time_insane_startup() ->
rdq_virgin(),
OneGig = 1024*1024*1024,
rabbit_disk_queue:start_link(OneGig),
+ rabbit_disk_queue:to_ram_disk_mode(),
Msg = <<>>,
List = lists:seq(1, 1024*1024),
%% 1M empty messages, at say, 100B per message, should all fit
@@ -796,4 +797,5 @@ rdq_start() ->
{ok, _} = rabbit_disk_queue:start_link(1024*1024).
rdq_stop() ->
- rabbit_disk_queue:stop().
+ rabbit_disk_queue:stop(),
+ timer:sleep(1000).