summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-12 16:56:49 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-12 16:56:49 +0100
commitcb87008620431c8f30269fba02f4aeffbc31e073 (patch)
treee51f2a938064e815b50da5cc70a616dddbdb232e /src
parent383558344a9103b4a250f729cc4380534bc07da8 (diff)
downloadrabbitmq-server-git-cb87008620431c8f30269fba02f4aeffbc31e073.tar.gz
added a stop and clean_stop api to help with tests/benchmarks
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl122
1 files changed, 82 insertions, 40 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index db719ef3b3..b67896ceb2 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -40,6 +40,8 @@
-export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]).
+-export([stop/0, clean_stop/0]).
+
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -93,6 +95,12 @@ tx_commit(Q, MsgIds) when is_list(MsgIds) ->
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
+stop() ->
+ gen_server:call(?SERVER, stop).
+
+clean_stop() ->
+ gen_server:call(?SERVER, clean_stop).
+
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -110,7 +118,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit
},
- {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State),
+ {ok, State1 = #dqstate { current_file_name = CurrentName,
+ current_offset = Offset } } = load_from_disk(State),
Path = form_filename(CurrentName),
ok = filelib:ensure_dir(Path),
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek
@@ -122,7 +131,22 @@ handle_call({deliver, Q, MsgId}, _From, State) ->
{reply, {MsgBody, BodySize, Delivered}, State1};
handle_call({tx_commit, Q, MsgIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, MsgIds, State),
- {reply, ok, State1}.
+ {reply, ok, State1};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}; %% gen_server now calls terminate
+handle_call(clean_stop, _From, State) ->
+ State1 = #dqstate { msg_location = MsgLocation,
+ file_summary = FileSummary,
+ file_detail = FileDetail }
+ = shutdown(State), %% tidy up file handles early
+ {atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
+ true = ets:delete(MsgLocation),
+ true = ets:delete(FileSummary),
+ true = ets:delete(FileDetail),
+ lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
+ {stop, normal, ok, State # dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}}.
+ %% gen_server now calls terminate, which then calls shutdown
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
@@ -140,14 +164,21 @@ handle_cast({tx_cancel, MsgIds}, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, #dqstate { current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge}
- }) ->
- file:sync(FileHdl),
- file:close(FileHdl),
+terminate(_Reason, State) ->
+ shutdown(State).
+
+shutdown(State = #dqstate { current_file_handle = FileHdl,
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
+ }) ->
+ if FileHdl =:= undefined -> ok;
+ true -> file:sync(FileHdl),
+ file:close(FileHdl)
+ end,
dict:fold(fun (_File, Hdl, _Acc) ->
file:close(Hdl)
- end, ok, ReadHdls).
+ end, ok, ReadHdls),
+ State # dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -186,7 +217,8 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
end;
{ok, {Hdl, Then}} ->
Now = now(),
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
+ {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
+ gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
end,
% read the message
{ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
@@ -206,29 +238,32 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
file_summary = FileSummary,
file_detail = FileDetail
}) ->
- Files = lists:foldl(fun (MsgId, Files2) ->
- [{MsgId, RefCount, File, Offset, TotalSize}]
- = ets:lookup(MsgLocation, MsgId),
- if 1 =:= RefCount ->
- true = ets:delete(MsgLocation, MsgId),
- [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
- contiguous_prefix = ContiguousTop }}]
- = ets:lookup(FileSummary, File),
- true = ets:delete(FileDetail, {File, Offset}),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)),
- contiguous_prefix = ContiguousTop1}}),
- if MnesiaDelete ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q});
- true ->
- ok
- end,
- sets:add_element(File, Files2);
- 1 < RefCount ->
- ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
- Files2
- end
- end, sets:new(), MsgIds),
+ Files
+ = lists:foldl(fun (MsgId, Files2) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}]
+ = ets:lookup(MsgLocation, MsgId),
+ if 1 =:= RefCount ->
+ true = ets:delete(MsgLocation, MsgId),
+ [{File, FileSum = #dqfile { valid_data = ValidTotalSize,
+ contiguous_prefix = ContiguousTop }}]
+ = ets:lookup(FileSummary, File),
+ true = ets:delete(FileDetail, {File, Offset}),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ true = ets:insert(FileSummary,
+ {File, FileSum #dqfile { valid_data = (ValidTotalSize -
+ TotalSize - (?FILE_PACKING_ADJUSTMENT)),
+ contiguous_prefix = ContiguousTop1}}),
+ if MnesiaDelete ->
+ ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q});
+ true ->
+ ok
+ end,
+ sets:add_element(File, Files2);
+ 1 < RefCount ->
+ ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
+ Files2
+ end
+ end, sets:new(), MsgIds),
State2 = compact(Files, State),
{ok, State2}.
@@ -275,7 +310,8 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
ets:lookup(MsgLocation, MsgId),
ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}, write),
+ #dq_msg_loc { msg_id_and_queue = {MsgId, Q},
+ is_delivered = false}, write),
Acc or (CurName =:= File)
end, false, MsgIds)
end),
@@ -286,7 +322,8 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
- ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}),
+ ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q},
+ is_delivered = false}),
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->
@@ -338,7 +375,8 @@ load_from_disk(State) ->
true, mnesia:dirty_all_keys(rabbit_disk_queue)),
{ok, State1}.
-load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) ->
+load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName }) ->
true = ets:insert_new(FileSummary, {CurName, #dqfile { valid_data = 0,
contiguous_prefix = 0,
left = undefined,
@@ -363,7 +401,8 @@ load_messages(Left, [File|Files],
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) of
+ #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ is_delivered = '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
@@ -404,7 +443,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'}))
+ #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ is_delivered = '_'}))
end, MsgIdsTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
@@ -435,8 +475,10 @@ recover_crashed_compactions1(Files, TmpFile) ->
% we're in case 4 above.
% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'}))
+ true = 0 <
+ length(mnesia:dirty_match_object(rabbit_disk_queue,
+ #dq_msg_loc { msg_id_and_queue = {MsgId, '_'},
+ is_delivered = '_'}))
end, MsgIds),
% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
@@ -485,7 +527,7 @@ find_contiguous_block_prefix([], 0, Acc) ->
find_contiguous_block_prefix([], _N, _Acc) ->
{0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc)
- when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use (?FILE_PACKING_ADJUSTMENT)
+ when ExpectedOffset =:= Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT) ->
find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
find_contiguous_block_prefix(List).