diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-22 13:01:53 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-22 13:01:53 +0100 |
| commit | 8e0df683b31d15039b4751554f9248809d10a752 (patch) | |
| tree | c8dfe88b1a215d7a9d5c78e1c6d4d8860bdd3f17 | |
| parent | 29d323a6881a508977d7382e15b379695648ed13 (diff) | |
| download | rabbitmq-server-git-8e0df683b31d15039b4751554f9248809d10a752.tar.gz | |
a bit more refactoring. Also, drop file size and file handle count in tests to stress those code paths more.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
2 files changed, 36 insertions, 38 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 26f831e343..ecb8c91e76 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -137,7 +137,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% read is only needed so that we can seek {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), {ok, Offset} = file:position(FileHdl, {bof, Offset}), - {ok, State1 # dqstate { current_file_handle = FileHdl }}. + {ok, State1 #dqstate { current_file_handle = FileHdl }}. handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, State), @@ -156,8 +156,8 @@ handle_call(clean_stop, _From, State) -> true = ets:delete(Sequences), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, - State1 # dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}}. + State1 #dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}}. %% gen_server now calls terminate, which then calls shutdown handle_cast({publish, Q, MsgId, MsgBody}, State) -> @@ -194,8 +194,8 @@ shutdown(State = #dqstate { msg_location = MsgLocation, dict:fold(fun (_File, Hdl, _Acc) -> file:close(Hdl) end, ok, ReadHdls), - State # dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}. + State #dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -232,26 +232,24 @@ internal_deliver(Q, State = {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]), - {ReadHdls2, ReadHdlsAge2} = - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {ReadHdls, ReadHdlsAge}; - _False -> - {_Then, OldFile, ReadHdlsAge3} = - gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, _Then}} = - dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - {dict:erase(OldFile, ReadHdls), - ReadHdlsAge3} - end, - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), - gb_trees:enter(Now, File, ReadHdlsAge2)}; + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, ReadHdls, ReadHdlsAge}; + _False -> + {Then, OldFile, ReadHdlsAge3} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {Hdl, dict:erase(OldFile, ReadHdls), + ReadHdlsAge3} + end; {ok, {Hdl, Then}} -> - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), - gb_trees:enter(Now, File, - gb_trees:delete(Then, ReadHdlsAge))} + {Hdl, ReadHdls, + gb_trees:delete(Then, ReadHdlsAge)} end, + ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdlsAge2 = gb_trees:enter(Now, File, ReadHdlsAge1), %% read the message {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), @@ -261,7 +259,7 @@ internal_deliver(Q, State = end, true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, - State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }} + State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge2} }} end end. @@ -272,10 +270,10 @@ internal_ack(Q, MsgIds, State) -> %% called from tx_cancel with MnesiaDelete = false %% called from ack with MnesiaDelete = true remove_messages(Q, MsgSeqIds, MnesiaDelete, - State = # dqstate { msg_location = MsgLocation, - file_summary = FileSummary, - current_file_name = CurName - }) -> + State = #dqstate { msg_location = MsgLocation, + file_summary = FileSummary, + current_file_name = CurName + }) -> Files = lists:foldl( fun ({MsgId, SeqId}, Files2) -> @@ -334,7 +332,7 @@ internal_tx_publish(MsgId, MsgBody, ContiguousTop1, Left, undefined}), NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, maybe_roll_to_new_file(NextOffset, - State # dqstate {current_offset = NextOffset}); + State #dqstate {current_offset = NextOffset}); [{MsgId, RefCount, File, Offset, TotalSize}] -> %% We already know about it, just update counter ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File, @@ -417,11 +415,11 @@ maybe_roll_to_new_file(Offset, [write, raw, binary, delayed_write]), true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), - {ok, State # dqstate { current_file_name = NextName, - current_file_handle = NextHdl, - current_file_num = NextNum, - current_offset = 0 - }}; + {ok, State #dqstate { current_file_name = NextName, + current_file_handle = NextHdl, + current_file_num = NextNum, + current_offset = 0 + }}; maybe_roll_to_new_file(_, State) -> {ok, State}. @@ -745,8 +743,8 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> sortMsgLocationsByOffset(false, L), MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, - State # dqstate { current_file_num = Num, current_file_name = Left, - current_offset = Offset }; + State #dqstate { current_file_num = Num, current_file_name = Left, + current_offset = Offset }; load_messages(Left, [File|Files], State = #dqstate { msg_location = MsgLocation, file_summary = FileSummary diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index cce9da1a68..08b05da282 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -771,12 +771,12 @@ rdq_time_commands(Funcs) -> rdq_virgin() -> {Micros, {ok, _}} = - timer:tc(rabbit_disk_queue, start_link, [1024*1024*10, 1000]), + timer:tc(rabbit_disk_queue, start_link, [1024*1024, 5]), ok = rabbit_disk_queue:clean_stop(), Micros. rdq_start() -> - {ok, _} = rabbit_disk_queue:start_link(1024*1024*10, 1000). + {ok, _} = rabbit_disk_queue:start_link(1024*1024, 5). rdq_stop() -> rabbit_disk_queue:stop(). |
