diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-12 16:56:49 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-12 16:56:49 +0100 |
| commit | cb87008620431c8f30269fba02f4aeffbc31e073 (patch) | |
| tree | e51f2a938064e815b50da5cc70a616dddbdb232e /src | |
| parent | 383558344a9103b4a250f729cc4380534bc07da8 (diff) | |
| download | rabbitmq-server-git-cb87008620431c8f30269fba02f4aeffbc31e073.tar.gz | |
added a stop and clean_stop api to help with tests/benchmarks
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 122 |
1 files changed, 82 insertions, 40 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index db719ef3b3..b67896ceb2 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,6 +40,8 @@ -export([publish/3, deliver/2, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]). +-export([stop/0, clean_stop/0]). + -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -93,6 +95,12 @@ tx_commit(Q, MsgIds) when is_list(MsgIds) -> tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). +stop() -> + gen_server:call(?SERVER, stop). + +clean_stop() -> + gen_server:call(?SERVER, clean_stop). + %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> @@ -110,7 +118,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit }, - {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), + {ok, State1 = #dqstate { current_file_name = CurrentName, + current_offset = Offset } } = load_from_disk(State), Path = form_filename(CurrentName), ok = filelib:ensure_dir(Path), {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek @@ -122,7 +131,22 @@ handle_call({deliver, Q, MsgId}, _From, State) -> {reply, {MsgBody, BodySize, Delivered}, State1}; handle_call({tx_commit, Q, MsgIds}, _From, State) -> {ok, State1} = internal_tx_commit(Q, MsgIds, State), - {reply, ok, State1}. + {reply, ok, State1}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; %% gen_server now calls terminate +handle_call(clean_stop, _From, State) -> + State1 = #dqstate { msg_location = MsgLocation, + file_summary = FileSummary, + file_detail = FileDetail } + = shutdown(State), %% tidy up file handles early + {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), + true = ets:delete(MsgLocation), + true = ets:delete(FileSummary), + true = ets:delete(FileDetail), + lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), + {stop, normal, ok, State # dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}}. + %% gen_server now calls terminate, which then calls shutdown handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), @@ -140,14 +164,21 @@ handle_cast({tx_cancel, MsgIds}, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #dqstate { current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge} - }) -> - file:sync(FileHdl), - file:close(FileHdl), +terminate(_Reason, State) -> + shutdown(State). + +shutdown(State = #dqstate { current_file_handle = FileHdl, + read_file_handles = {ReadHdls, _ReadHdlsAge} + }) -> + if FileHdl =:= undefined -> ok; + true -> file:sync(FileHdl), + file:close(FileHdl) + end, dict:fold(fun (_File, Hdl, _Acc) -> file:close(Hdl) - end, ok, ReadHdls). + end, ok, ReadHdls), + State # dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -186,7 +217,8 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, end; {ok, {Hdl, Then}} -> Now = now(), - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} + {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), + gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} end, % read the message {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), @@ -206,29 +238,32 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL file_summary = FileSummary, file_detail = FileDetail }) -> - Files = lists:foldl(fun (MsgId, Files2) -> - [{MsgId, RefCount, File, Offset, TotalSize}] - = ets:lookup(MsgLocation, MsgId), - if 1 =:= RefCount -> - true = ets:delete(MsgLocation, MsgId), - [{File, FileSum = #dqfile { valid_data = ValidTotalSize, - contiguous_prefix = ContiguousTop }}] - = ets:lookup(FileSummary, File), - true = ets:delete(FileDetail, {File, Offset}), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, {File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), - contiguous_prefix = ContiguousTop1}}), - if MnesiaDelete -> - ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q}); - true -> - ok - end, - sets:add_element(File, Files2); - 1 < RefCount -> - ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - Files2 - end - end, sets:new(), MsgIds), + Files + = lists:foldl(fun (MsgId, Files2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] + = ets:lookup(MsgLocation, MsgId), + if 1 =:= RefCount -> + true = ets:delete(MsgLocation, MsgId), + [{File, FileSum = #dqfile { valid_data = ValidTotalSize, + contiguous_prefix = ContiguousTop }}] + = ets:lookup(FileSummary, File), + true = ets:delete(FileDetail, {File, Offset}), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + true = ets:insert(FileSummary, + {File, FileSum #dqfile { valid_data = (ValidTotalSize - + TotalSize - (?FILE_PACKING_ADJUSTMENT)), + contiguous_prefix = ContiguousTop1}}), + if MnesiaDelete -> + ok = mnesia:dirty_delete(rabbit_disk_queue, {MsgId, Q}); + true -> + ok + end, + sets:add_element(File, Files2); + 1 < RefCount -> + ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), + Files2 + end + end, sets:new(), MsgIds), State2 = compact(Files, State), {ok, State2}. @@ -275,7 +310,8 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, [{MsgId, _RefCount, File, _Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}, write), + #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, + is_delivered = false}, write), Acc or (CurName =:= File) end, false, MsgIds) end), @@ -286,7 +322,8 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, internal_publish(Q, MsgId, MsgBody, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), - ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}), + ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, + is_delivered = false}), {ok, State1}. internal_tx_cancel(MsgIds, State) -> @@ -338,7 +375,8 @@ load_from_disk(State) -> true, mnesia:dirty_all_keys(rabbit_disk_queue)), {ok, State1}. -load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> +load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, + current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, #dqfile { valid_data = 0, contiguous_prefix = 0, left = undefined, @@ -363,7 +401,8 @@ load_messages(Left, [File|Files], {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) of + #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, + is_delivered = '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), @@ -404,7 +443,8 @@ recover_crashed_compactions1(Files, TmpFile) -> % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) + #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, + is_delivered = '_'})) end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), @@ -435,8 +475,10 @@ recover_crashed_compactions1(Files, TmpFile) -> % we're in case 4 above. % check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, is_delivered = '_'})) + true = 0 < + length(mnesia:dirty_match_object(rabbit_disk_queue, + #dq_msg_loc { msg_id_and_queue = {MsgId, '_'}, + is_delivered = '_'})) end, MsgIds), % The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), @@ -485,7 +527,7 @@ find_contiguous_block_prefix([], 0, Acc) -> find_contiguous_block_prefix([], _N, _Acc) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc) - when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use (?FILE_PACKING_ADJUSTMENT) + when ExpectedOffset =:= Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT) -> find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> find_contiguous_block_prefix(List). |
